diff --git a/packages/@n8n/config/src/configs/multi-main-setup.config.ts b/packages/@n8n/config/src/configs/multi-main-setup.config.ts index 42574265f0f..0ec1274bba9 100644 --- a/packages/@n8n/config/src/configs/multi-main-setup.config.ts +++ b/packages/@n8n/config/src/configs/multi-main-setup.config.ts @@ -13,8 +13,4 @@ export class MultiMainSetupConfig { /** Interval in seconds between leader eligibility checks in multi-main setup. */ @Env('N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL') interval: number = 3; - - /** Whether to use the new leader election implementation (Lua-script based). */ - @Env('N8N_NEW_LEADER_ELECTION_IMPLEMENTATION') - newLeaderElection: boolean = false; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 1266e55ead4..b64fc9ca6f8 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -374,7 +374,6 @@ describe('GlobalConfig', () => { enabled: false, ttl: 10, interval: 3, - newLeaderElection: false, }, generic: { timezone: 'America/New_York', diff --git a/packages/cli/src/scaling/__tests__/multi-main-setup.ee.test.ts b/packages/cli/src/scaling/__tests__/multi-main-setup.ee.test.ts index 8086fb4159b..e03960fb672 100644 --- a/packages/cli/src/scaling/__tests__/multi-main-setup.ee.test.ts +++ b/packages/cli/src/scaling/__tests__/multi-main-setup.ee.test.ts @@ -1,6 +1,3 @@ -import type { LeaderElectionClient } from '@/scaling/leader-election-client'; -import type { Publisher } from '@/scaling/pubsub/publisher.service'; -import type { RedisClientService } from '@/services/redis-client.service'; import { mockLogger } from '@n8n/backend-test-utils'; import type { GlobalConfig } from '@n8n/config'; import { MultiMainMetadata } from '@n8n/decorators'; @@ -8,6 +5,8 @@ import { mock } from 'jest-mock-extended'; import type { ErrorReporter, InstanceSettings } from 'n8n-core'; import { createResultOk, createResultError } from 'n8n-workflow'; +import type { LeaderElectionClient } from '@/scaling/leader-election-client'; + import { MultiMainSetup } from '../multi-main-setup.ee'; function createInstanceSettings(hostId: string) { @@ -36,347 +35,217 @@ function createInstanceSettings(hostId: string) { describe('MultiMainSetup', () => { const hostId = 'main-n8n-main-0'; + const logger = mockLogger(); const metadata = new MultiMainMetadata(); + const errorReporter = mock(); + const client = mock(); - describe('with legacy implementation (flag off)', () => { - const logger = mockLogger(); - const publisher = mock(); - const redisClientService = mock(); - const errorReporter = mock(); + const globalConfig = mock({ + redis: { prefix: 'n8n' }, + multiMainSetup: { ttl: 10, interval: 3, enabled: true }, + }); - const globalConfig = mock({ - redis: { prefix: 'n8n' }, - multiMainSetup: { ttl: 10, interval: 3, enabled: true, newLeaderElection: false }, + let instanceSettings: InstanceSettings; + let multiMainSetup: MultiMainSetup; + + beforeEach(() => { + jest.clearAllMocks(); + instanceSettings = createInstanceSettings(hostId); + multiMainSetup = new MultiMainSetup( + logger, + instanceSettings, + globalConfig, + metadata, + errorReporter, + client, + ); + }); + + describe('init', () => { + it('should become leader if setLeaderIfNotExists succeeds', async () => { + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); + const emit = jest.spyOn(multiMainSetup, 'emit'); + + await multiMainSetup.init(); + + expect(client.setLeaderIfNotExists).toHaveBeenCalled(); + expect(instanceSettings.markAsLeader).toHaveBeenCalled(); + expect(emit).toHaveBeenCalledWith('leader-takeover'); }); - let instanceSettings: InstanceSettings; - let multiMainSetup: MultiMainSetup; + it('should remain follower if setLeaderIfNotExists returns false', async () => { + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); + const emit = jest.spyOn(multiMainSetup, 'emit'); - beforeEach(() => { - jest.clearAllMocks(); - instanceSettings = createInstanceSettings(hostId); - redisClientService.toValidPrefix.mockReturnValue('n8n'); - multiMainSetup = new MultiMainSetup( - logger, - instanceSettings, - globalConfig, - metadata, - errorReporter, - publisher, - redisClientService, + await multiMainSetup.init(); + + expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-takeover'); + }); + + it('should remain follower if setLeaderIfNotExists fails', async () => { + client.setLeaderIfNotExists.mockResolvedValue( + createResultError(new Error('Command timed out')), ); - }); + const emit = jest.spyOn(multiMainSetup, 'emit'); - describe('init', () => { - it('should become leader if setIfNotExists succeeds', async () => { - publisher.setIfNotExists.mockResolvedValue(true); - const emit = jest.spyOn(multiMainSetup, 'emit'); + await multiMainSetup.init(); - await multiMainSetup.init(); - - expect(publisher.setIfNotExists).toHaveBeenCalled(); - expect(instanceSettings.markAsLeader).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-takeover'); - }); - - it('should remain follower if setIfNotExists returns false', async () => { - publisher.setIfNotExists.mockResolvedValue(false); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup.init(); - - expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-takeover'); - }); - }); - - describe('checkLeader', () => { - it('should renew TTL when this instance is the leader', async () => { - publisher.setIfNotExists.mockResolvedValue(true); - await multiMainSetup.init(); - jest.clearAllMocks(); - - publisher.get.mockResolvedValue(hostId); - - await multiMainSetup['strategy'].checkLeader(); - - expect(publisher.setExpiration).toHaveBeenCalled(); - }); - - it('should emit leader-takeover on mismatch recovery', async () => { - publisher.setIfNotExists.mockResolvedValue(false); - await multiMainSetup.init(); - jest.clearAllMocks(); - - publisher.get.mockResolvedValue(hostId); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsLeader).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-takeover'); - }); - - it('should step down when another instance is leader', async () => { - publisher.setIfNotExists.mockResolvedValue(true); - await multiMainSetup.init(); - jest.clearAllMocks(); - - publisher.get.mockResolvedValue('main-n8n-main-1'); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsFollower).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-stepdown'); - }); - - it('should attempt to become leader when leadership is vacant', async () => { - publisher.setIfNotExists.mockResolvedValue(true); - await multiMainSetup.init(); - jest.clearAllMocks(); - - publisher.get.mockResolvedValue(null); - publisher.setIfNotExists.mockResolvedValue(true); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(emit).toHaveBeenCalledWith('leader-stepdown'); - expect(publisher.setIfNotExists).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-takeover'); - }); + expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-takeover'); }); }); - describe('with new implementation (flag on)', () => { - const logger = mockLogger(); - const publisher = mock(); - const redisClientService = mock(); - const errorReporter = mock(); - const client = mock(); - - const globalConfig = mock({ - redis: { prefix: 'n8n' }, - multiMainSetup: { ttl: 10, interval: 3, enabled: true, newLeaderElection: true }, - }); - - let instanceSettings: InstanceSettings; - let multiMainSetup: MultiMainSetup; - - beforeEach(() => { + describe('checkLeader (leader path)', () => { + beforeEach(async () => { + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); + await multiMainSetup.init(); jest.clearAllMocks(); - instanceSettings = createInstanceSettings(hostId); + }); - jest.mock('@/scaling/leader-election-client', () => ({ - LeaderElectionClient: jest.fn(), - })); + it('should stay leader when TTL renewal succeeds', async () => { + client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'success' })); + const emit = jest.spyOn(multiMainSetup, 'emit'); - const { Container } = jest.requireActual('@n8n/di'); - Container.set( - jest.requireMock('@/scaling/leader-election-client').LeaderElectionClient, - client, + await multiMainSetup['checkLeader'](); + + expect(client.tryRenewLeaderTtl).toHaveBeenCalled(); + expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); + }); + + it('should step down when another host is leader', async () => { + client.tryRenewLeaderTtl.mockResolvedValue( + createResultOk({ id: 'other-host-is-leader', currentLeaderId: 'main-n8n-main-1' }), ); + const emit = jest.spyOn(multiMainSetup, 'emit'); - multiMainSetup = new MultiMainSetup( - logger, - instanceSettings, - globalConfig, - metadata, - errorReporter, - publisher, - redisClientService, + await multiMainSetup['checkLeader'](); + + expect(instanceSettings.markAsFollower).toHaveBeenCalled(); + expect(emit).toHaveBeenCalledWith('leader-stepdown'); + }); + + it('should try to re-acquire leader key when key is missing', async () => { + client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' })); + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); + + await multiMainSetup['checkLeader'](); + + expect(client.setLeaderIfNotExists).toHaveBeenCalled(); + expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); + }); + + it('should step down when key is missing and re-acquire fails', async () => { + client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' })); + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); + const emit = jest.spyOn(multiMainSetup, 'emit'); + + await multiMainSetup['checkLeader'](); + + expect(instanceSettings.markAsFollower).toHaveBeenCalled(); + expect(emit).toHaveBeenCalledWith('leader-stepdown'); + }); + + it('should step down when key is missing and Redis command fails', async () => { + client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' })); + client.setLeaderIfNotExists.mockResolvedValue( + createResultError(new Error('Command timed out')), ); + const emit = jest.spyOn(multiMainSetup, 'emit'); + + await multiMainSetup['checkLeader'](); + + expect(instanceSettings.markAsFollower).toHaveBeenCalled(); + expect(emit).toHaveBeenCalledWith('leader-stepdown'); }); - describe('init', () => { - it('should become leader if setLeaderIfNotExists succeeds', async () => { - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); - const emit = jest.spyOn(multiMainSetup, 'emit'); + it('should stay leader when TTL renewal Redis command fails', async () => { + client.tryRenewLeaderTtl.mockResolvedValue(createResultError(new Error('Command timed out'))); + const emit = jest.spyOn(multiMainSetup, 'emit'); - await multiMainSetup.init(); + await multiMainSetup['checkLeader'](); - expect(client.setLeaderIfNotExists).toHaveBeenCalled(); - expect(instanceSettings.markAsLeader).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-takeover'); - }); + expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); + }); + }); - it('should remain follower if setLeaderIfNotExists returns false', async () => { - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup.init(); - - expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-takeover'); - }); - - it('should remain follower if setLeaderIfNotExists fails', async () => { - client.setLeaderIfNotExists.mockResolvedValue( - createResultError(new Error('Command timed out')), - ); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup.init(); - - expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-takeover'); - }); + describe('checkLeader (follower path)', () => { + beforeEach(async () => { + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); + await multiMainSetup.init(); + jest.clearAllMocks(); }); - describe('checkLeader (leader path)', () => { - beforeEach(async () => { - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); - await multiMainSetup.init(); - jest.clearAllMocks(); - }); + it('should become leader when Redis shows own hostId as leader (mismatch recovery)', async () => { + client.getLeader.mockResolvedValue(createResultOk(hostId)); + const emit = jest.spyOn(multiMainSetup, 'emit'); - it('should stay leader when TTL renewal succeeds', async () => { - client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'success' })); - const emit = jest.spyOn(multiMainSetup, 'emit'); + await multiMainSetup['checkLeader'](); - await multiMainSetup['strategy'].checkLeader(); - - expect(client.tryRenewLeaderTtl).toHaveBeenCalled(); - expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); - }); - - it('should step down when another host is leader', async () => { - client.tryRenewLeaderTtl.mockResolvedValue( - createResultOk({ id: 'other-host-is-leader', currentLeaderId: 'main-n8n-main-1' }), - ); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsFollower).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-stepdown'); - }); - - it('should try to re-acquire leader key when key is missing', async () => { - client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' })); - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); - - await multiMainSetup['strategy'].checkLeader(); - - expect(client.setLeaderIfNotExists).toHaveBeenCalled(); - expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); - }); - - it('should step down when key is missing and re-acquire fails', async () => { - client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' })); - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsFollower).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-stepdown'); - }); - - it('should step down when key is missing and Redis command fails', async () => { - client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' })); - client.setLeaderIfNotExists.mockResolvedValue( - createResultError(new Error('Command timed out')), - ); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsFollower).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-stepdown'); - }); - - it('should stay leader when TTL renewal Redis command fails', async () => { - client.tryRenewLeaderTtl.mockResolvedValue( - createResultError(new Error('Command timed out')), - ); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); - }); + expect(errorReporter.info).toHaveBeenCalled(); + expect(instanceSettings.markAsLeader).toHaveBeenCalled(); + expect(emit).toHaveBeenCalledWith('leader-takeover'); }); - describe('checkLeader (follower path)', () => { - beforeEach(async () => { - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); - await multiMainSetup.init(); - jest.clearAllMocks(); - }); + it('should stay follower when another instance is leader', async () => { + client.getLeader.mockResolvedValue(createResultOk('main-n8n-main-1')); + const emit = jest.spyOn(multiMainSetup, 'emit'); - it('should become leader when Redis shows own hostId as leader (mismatch recovery)', async () => { - client.getLeader.mockResolvedValue(createResultOk(hostId)); - const emit = jest.spyOn(multiMainSetup, 'emit'); + await multiMainSetup['checkLeader'](); - await multiMainSetup['strategy'].checkLeader(); + expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-takeover'); + expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); + }); - expect(errorReporter.info).toHaveBeenCalled(); - expect(instanceSettings.markAsLeader).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-takeover'); - }); + it('should attempt to become leader when leadership is vacant', async () => { + client.getLeader.mockResolvedValue(createResultOk(null)); + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); + const emit = jest.spyOn(multiMainSetup, 'emit'); - it('should stay follower when another instance is leader', async () => { - client.getLeader.mockResolvedValue(createResultOk('main-n8n-main-1')); - const emit = jest.spyOn(multiMainSetup, 'emit'); + await multiMainSetup['checkLeader'](); - await multiMainSetup['strategy'].checkLeader(); + expect(client.setLeaderIfNotExists).toHaveBeenCalled(); + expect(instanceSettings.markAsLeader).toHaveBeenCalled(); + expect(emit).toHaveBeenCalledWith('leader-takeover'); + }); - expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-takeover'); - expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); - }); + it('should stay follower when leadership is vacant but setLeaderIfNotExists fails', async () => { + client.getLeader.mockResolvedValue(createResultOk(null)); + client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); + const emit = jest.spyOn(multiMainSetup, 'emit'); - it('should attempt to become leader when leadership is vacant', async () => { - client.getLeader.mockResolvedValue(createResultOk(null)); - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true)); - const emit = jest.spyOn(multiMainSetup, 'emit'); + await multiMainSetup['checkLeader'](); - await multiMainSetup['strategy'].checkLeader(); + expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-takeover'); + }); - expect(client.setLeaderIfNotExists).toHaveBeenCalled(); - expect(instanceSettings.markAsLeader).toHaveBeenCalled(); - expect(emit).toHaveBeenCalledWith('leader-takeover'); - }); + it('should stay follower when Redis is unreachable', async () => { + client.getLeader.mockResolvedValue(createResultError(new Error('Command timed out'))); + const emit = jest.spyOn(multiMainSetup, 'emit'); - it('should stay follower when leadership is vacant but setLeaderIfNotExists fails', async () => { - client.getLeader.mockResolvedValue(createResultOk(null)); - client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false)); - const emit = jest.spyOn(multiMainSetup, 'emit'); + await multiMainSetup['checkLeader'](); - await multiMainSetup['strategy'].checkLeader(); + expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); + expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-takeover'); + expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); + }); - expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-takeover'); - }); + it('should stay follower when leadership is vacant and Redis command fails', async () => { + client.getLeader.mockResolvedValue(createResultOk(null)); + client.setLeaderIfNotExists.mockResolvedValue( + createResultError(new Error('Command timed out')), + ); + const emit = jest.spyOn(multiMainSetup, 'emit'); - it('should stay follower when Redis is unreachable', async () => { - client.getLeader.mockResolvedValue(createResultError(new Error('Command timed out'))); - const emit = jest.spyOn(multiMainSetup, 'emit'); + await multiMainSetup['checkLeader'](); - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); - expect(instanceSettings.markAsFollower).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-takeover'); - expect(emit).not.toHaveBeenCalledWith('leader-stepdown'); - }); - - it('should stay follower when leadership is vacant and Redis command fails', async () => { - client.getLeader.mockResolvedValue(createResultOk(null)); - client.setLeaderIfNotExists.mockResolvedValue( - createResultError(new Error('Command timed out')), - ); - const emit = jest.spyOn(multiMainSetup, 'emit'); - - await multiMainSetup['strategy'].checkLeader(); - - expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); - expect(emit).not.toHaveBeenCalledWith('leader-takeover'); - }); + expect(instanceSettings.markAsLeader).not.toHaveBeenCalled(); + expect(emit).not.toHaveBeenCalledWith('leader-takeover'); }); }); }); diff --git a/packages/cli/src/scaling/multi-main-setup-legacy.ts b/packages/cli/src/scaling/multi-main-setup-legacy.ts deleted file mode 100644 index db412540502..00000000000 --- a/packages/cli/src/scaling/multi-main-setup-legacy.ts +++ /dev/null @@ -1,120 +0,0 @@ -import type { Logger } from '@n8n/backend-common'; -import type { GlobalConfig } from '@n8n/config'; -import type { ErrorReporter, InstanceSettings } from 'n8n-core'; - -import type { Publisher } from '@/scaling/pubsub/publisher.service'; -import type { RedisClientService } from '@/services/redis-client.service'; - -import type { MultiMainStrategy } from './multi-main-setup.types'; - -type EmitFn = (event: 'leader-takeover' | 'leader-stepdown') => void; - -export class MultiMainSetupLegacy implements MultiMainStrategy { - private leaderKey: string; - - private readonly leaderKeyTtl: number; - - constructor( - private readonly logger: Logger, - private readonly instanceSettings: InstanceSettings, - private readonly publisher: Publisher, - private readonly redisClientService: RedisClientService, - private readonly globalConfig: GlobalConfig, - private readonly errorReporter: ErrorReporter, - private readonly emit: EmitFn, - ) { - this.leaderKeyTtl = this.globalConfig.multiMainSetup.ttl; - } - - async init() { - const prefix = this.globalConfig.redis.prefix; - const validPrefix = this.redisClientService.toValidPrefix(prefix); - this.leaderKey = validPrefix + ':main_instance_leader'; - - await this.tryBecomeLeader(); - } - - async shutdown() { - const { isLeader } = this.instanceSettings; - - if (isLeader) await this.publisher.clear(this.leaderKey); - } - - async checkLeader() { - const leaderId = await this.publisher.get(this.leaderKey); - - const { hostId } = this.instanceSettings; - - if (leaderId === hostId) { - if (!this.instanceSettings.isLeader) { - this.errorReporter.info( - `[Instance ID ${hostId}] Remote/Local leadership mismatch, marking self as leader`, - { - shouldBeLogged: true, - shouldReport: true, - }, - ); - - this.instanceSettings.markAsLeader(); - - this.emit('leader-takeover'); - } - - this.logger.debug(`[Instance ID ${hostId}] Leader is this instance`); - - await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); - - return; - } - - if (leaderId && leaderId !== hostId) { - this.logger.debug(`[Instance ID ${hostId}] Leader is other instance "${leaderId}"`); - - if (this.instanceSettings.isLeader) { - this.instanceSettings.markAsFollower(); - - this.emit('leader-stepdown'); - - this.logger.warn('[Multi-main setup] Leader failed to renew leader key'); - } - - return; - } - - if (!leaderId) { - this.logger.debug( - `[Instance ID ${hostId}] Leadership vacant, attempting to become leader...`, - ); - - this.instanceSettings.markAsFollower(); - - this.emit('leader-stepdown'); - - await this.tryBecomeLeader(); - } - } - - private async tryBecomeLeader() { - const { hostId } = this.instanceSettings; - - const keySetSuccessfully = await this.publisher.setIfNotExists( - this.leaderKey, - hostId, - this.leaderKeyTtl, - ); - - if (keySetSuccessfully) { - this.logger.info(`[Instance ID ${hostId}] Leader is now this instance`); - - this.instanceSettings.markAsLeader(); - - this.emit('leader-takeover'); - } else { - this.instanceSettings.markAsFollower(); - } - } - - async fetchLeaderKey() { - return await this.publisher.get(this.leaderKey); - } -} diff --git a/packages/cli/src/scaling/multi-main-setup-v2.ts b/packages/cli/src/scaling/multi-main-setup-v2.ts deleted file mode 100644 index 54e95e742e2..00000000000 --- a/packages/cli/src/scaling/multi-main-setup-v2.ts +++ /dev/null @@ -1,191 +0,0 @@ -import type { LeaderElectionClient } from '@/scaling/leader-election-client'; -import type { Logger } from '@n8n/backend-common'; -import type { ErrorReporter, InstanceSettings } from 'n8n-core'; -import assert from 'node:assert'; - -import type { MultiMainStrategy } from './multi-main-setup.types'; - -type EmitFn = (event: 'leader-takeover' | 'leader-stepdown') => void; - -export class MultiMainSetupV2 implements MultiMainStrategy { - private leaderCheckInProgress = false; - - private get hostId() { - return this.instanceSettings.hostId; - } - - constructor( - private readonly logger: Logger, - private readonly instanceSettings: InstanceSettings, - private readonly errorReporter: ErrorReporter, - private readonly client: LeaderElectionClient, - private readonly emit: EmitFn, - ) {} - - async init() { - const result = await this.client.setLeaderIfNotExists(); - if (!result.ok) { - this.logRedisCommandFailure('Failed to set leader key in Redis during init', result.error); - this.instanceSettings.markAsFollower(); - } else if (result.result) { - this.takeOverAsLeader(); - } else { - this.instanceSettings.markAsFollower(); - } - } - - async shutdown() { - const { isLeader } = this.instanceSettings; - - if (isLeader) { - // TODO: We should guard here that we only remove the key the key in Redis matches - // our host ID. - const result = await this.client.clearLeader(); - if (!result.ok) { - this.logger.warn('Failed to clear leader key from Redis', { error: result.error }); - } - } - - this.client.destroy(); - } - - async checkLeader() { - if (this.leaderCheckInProgress) { - this.logger.warn('Previous leader check is still in progress, skipping this check'); - return; - } - - this.leaderCheckInProgress = true; - try { - if (this.instanceSettings.isLeader) { - await this.checkAreWeStillLeader(); - } else { - await this.checkCanBecomeLeader(); - } - } finally { - this.leaderCheckInProgress = false; - } - } - - /** Renew our leadership lease. If we've lost the lease, step down to follower. */ - private async checkAreWeStillLeader() { - assert(this.instanceSettings.isLeader); - - const renewTtlResult = await this.client.tryRenewLeaderTtl(); - if (!renewTtlResult.ok) { - this.logRedisCommandFailure('Failed to renew leader TTL', renewTtlResult.error); - // There's a decision to be made here: Do we step down or not? Redis might - // be unavailable for all clients or only for us. We could also track the TTL - // locally, but this would make the implementation more complex and error-prone. - // For now we accept that this might cause some inconsistencies in a network - // partition scenario, but eventually the system will recover once Redis is available again. - return; - } - - const renewalResult = renewTtlResult.result; - if (renewalResult.id === 'success') { - this.logger.debug(`[Instance ID ${this.hostId}] Leader is this instance`); - return; - } - - this.logger.warn('[Multi-main setup] Leader failed to renew leader key'); - - if (renewalResult.id === 'other-host-is-leader') { - this.logger.debug( - `[Instance ID ${this.hostId}] Leader is other instance "${renewalResult.currentLeaderId}"`, - ); - this.stepDownToFollower(); - return; - } - - // The only remaining case is 'key-missing', which means we lost leadership - // (e.g. due to Redis unavailability or a network partition). In this case - // we try to become leader and step down if that fails. - assert(renewalResult.id === 'key-missing'); - - const result = await this.client.setLeaderIfNotExists(); - if (!result.ok) { - this.logRedisCommandFailure('Failed to set leader key in Redis', result.error); - this.stepDownToFollower(); - return; - } - - const wasSet = result.result; - if (!wasSet) { - this.stepDownToFollower(); - } - } - - private async checkCanBecomeLeader() { - assert(!this.instanceSettings.isLeader); - - const getResult = await this.client.getLeader(); - if (!getResult.ok) { - this.logRedisCommandFailure('Failed to get leader key from Redis', getResult.error); - return; - } - - const leaderId = getResult.result; - if (leaderId && leaderId === this.hostId) { - this.errorReporter.info( - `[Instance ID ${this.hostId}] Remote/Local leadership mismatch, marking self as leader`, - { - shouldBeLogged: true, - shouldReport: true, - }, - ); - - this.takeOverAsLeader(); - return; - } - - if (leaderId) { - this.logger.debug(`[Instance ID ${this.hostId}] Leader is other instance "${leaderId}"`); - return; - } - - this.logger.debug( - `[Instance ID ${this.hostId}] Leadership vacant, attempting to become leader...`, - ); - - const result = await this.client.setLeaderIfNotExists(); - if (!result.ok) { - this.logger.warn('Failed to try leader key set in Redis', { error: result.error }); - return; - } - - const wasSet = result.result; - if (wasSet) { - this.takeOverAsLeader(); - } - } - - private takeOverAsLeader() { - assert(!this.instanceSettings.isLeader); - - this.logger.info(`[Instance ID ${this.hostId}] Leader is now this instance`); - - this.instanceSettings.markAsLeader(); - - this.emit('leader-takeover'); - } - - private stepDownToFollower() { - assert(this.instanceSettings.isLeader); - - this.logger.info(`[Instance ID ${this.hostId}] This is now a follower instance`); - - this.instanceSettings.markAsFollower(); - - this.emit('leader-stepdown'); - } - - async fetchLeaderKey() { - const result = await this.client.getLeader(); - return result.ok ? result.result : null; - } - - private logRedisCommandFailure(message: string, error: Error) { - this.logger.warn(`${message}: ${error.message}`, { error }); - } -} diff --git a/packages/cli/src/scaling/multi-main-setup.ee.ts b/packages/cli/src/scaling/multi-main-setup.ee.ts index c412836c44a..275e18cdf2f 100644 --- a/packages/cli/src/scaling/multi-main-setup.ee.ts +++ b/packages/cli/src/scaling/multi-main-setup.ee.ts @@ -1,18 +1,13 @@ -import { TypedEmitter } from '@/typed-emitter'; import { Logger } from '@n8n/backend-common'; import { GlobalConfig } from '@n8n/config'; import { Time } from '@n8n/constants'; import { MultiMainMetadata } from '@n8n/decorators'; import { Container, Service } from '@n8n/di'; import { ErrorReporter, InstanceSettings } from 'n8n-core'; +import assert from 'node:assert'; -import type * as LeaderElectionClientModule from '@/scaling/leader-election-client'; -import { Publisher } from '@/scaling/pubsub/publisher.service'; -import { RedisClientService } from '@/services/redis-client.service'; - -import { MultiMainSetupLegacy } from './multi-main-setup-legacy'; -import type { MultiMainStrategy } from './multi-main-setup.types'; -import { MultiMainSetupV2 } from './multi-main-setup-v2'; +import { LeaderElectionClient } from '@/scaling/leader-election-client'; +import { TypedEmitter } from '@/typed-emitter'; type MultiMainEvents = { /** @@ -33,53 +28,40 @@ type MultiMainEvents = { /** Designates leader and followers when running multiple main processes. */ @Service() export class MultiMainSetup extends TypedEmitter { - private readonly strategy: MultiMainStrategy; - private leaderCheckInterval: NodeJS.Timeout | undefined; + private leaderCheckInProgress = false; + + private get hostId() { + return this.instanceSettings.hostId; + } + constructor( private readonly logger: Logger, private readonly instanceSettings: InstanceSettings, private readonly globalConfig: GlobalConfig, private readonly metadata: MultiMainMetadata, private readonly errorReporter: ErrorReporter, - private readonly publisher: Publisher, - private readonly redisClientService: RedisClientService, + private readonly client: LeaderElectionClient, ) { super(); this.logger = this.logger.scoped(['scaling', 'multi-main-setup']); - - const emitFn = (event: 'leader-takeover' | 'leader-stepdown') => this.emit(event); - - if (this.globalConfig.multiMainSetup.newLeaderElection) { - const { LeaderElectionClient } = - require('@/scaling/leader-election-client') as typeof LeaderElectionClientModule; - const client = Container.get(LeaderElectionClient); - this.strategy = new MultiMainSetupV2( - this.logger, - this.instanceSettings, - this.errorReporter, - client, - emitFn, - ); - } else { - this.strategy = new MultiMainSetupLegacy( - this.logger, - this.instanceSettings, - this.publisher, - this.redisClientService, - this.globalConfig, - this.errorReporter, - emitFn, - ); - } } async init() { - await this.strategy.init(); + const result = await this.client.setLeaderIfNotExists(); + if (!result.ok) { + this.logRedisCommandFailure('Failed to set leader key in Redis during init', result.error); + this.instanceSettings.markAsFollower(); + } else if (result.result) { + // we became leader + this.takeOverAsLeader(); + } else { + this.instanceSettings.markAsFollower(); + } this.leaderCheckInterval = setInterval(async () => { - await this.strategy.checkLeader(); + await this.checkLeader(); }, this.globalConfig.multiMainSetup.interval * Time.seconds.toMilliseconds); } @@ -87,11 +69,21 @@ export class MultiMainSetup extends TypedEmitter { async shutdown() { clearInterval(this.leaderCheckInterval); - await this.strategy.shutdown(); + if (this.instanceSettings.isLeader) { + // TODO: We should guard here that we only remove the key the key in Redis matches + // our host ID. + const result = await this.client.clearLeader(); + if (!result.ok) { + this.logger.warn('Failed to clear leader key from Redis', { error: result.error }); + } + } + + this.client.destroy(); } async fetchLeaderKey(): Promise { - return await this.strategy.fetchLeaderKey(); + const result = await this.client.getLeader(); + return result.ok ? result.result : null; } registerEventHandlers() { @@ -104,4 +96,137 @@ export class MultiMainSetup extends TypedEmitter { }); } } + + private async checkLeader() { + if (this.leaderCheckInProgress) { + this.logger.warn('Previous leader check is still in progress, skipping this check'); + return; + } + + this.leaderCheckInProgress = true; + try { + if (this.instanceSettings.isLeader) { + await this.checkAreWeStillLeader(); + } else { + await this.checkCanBecomeLeader(); + } + } finally { + this.leaderCheckInProgress = false; + } + } + + /** Renew our leadership lease. If we've lost the lease, step down to follower. */ + private async checkAreWeStillLeader() { + assert(this.instanceSettings.isLeader); + + const renewTtlResult = await this.client.tryRenewLeaderTtl(); + if (!renewTtlResult.ok) { + this.logRedisCommandFailure('Failed to renew leader TTL', renewTtlResult.error); + // There's a decision to be made here: Do we step down or not? Redis might + // be unavailable for all clients or only for us. We could also track the TTL + // locally, but this would make the implementation more complex and error-prone. + // For now we accept that this might cause some inconsistencies in a network + // partition scenario, but eventually the system will recover once Redis is available again. + return; + } + + const renewalResult = renewTtlResult.result; + if (renewalResult.id === 'success') { + this.logger.debug(`[Instance ID ${this.hostId}] Leader is this instance`); + return; + } + + this.logger.warn('[Multi-main setup] Leader failed to renew leader key'); + + if (renewalResult.id === 'other-host-is-leader') { + this.logger.debug( + `[Instance ID ${this.hostId}] Leader is other instance "${renewalResult.currentLeaderId}"`, + ); + this.stepDownToFollower(); + return; + } + + // The only remaining case is 'key-missing', which means we lost leadership + // (e.g. due to Redis unavailability or a network partition). In this case + // we try to become leader and step down if that fails. + assert(renewalResult.id === 'key-missing'); + + const result = await this.client.setLeaderIfNotExists(); + if (!result.ok) { + this.logRedisCommandFailure('Failed to set leader key in Redis', result.error); + this.stepDownToFollower(); + return; + } + + if (!result.result) { + this.stepDownToFollower(); + } + } + + private async checkCanBecomeLeader() { + assert(!this.instanceSettings.isLeader); + + const getResult = await this.client.getLeader(); + if (!getResult.ok) { + this.logRedisCommandFailure('Failed to get leader key from Redis', getResult.error); + return; + } + + const leaderId = getResult.result; + if (leaderId && leaderId === this.hostId) { + this.errorReporter.info( + `[Instance ID ${this.hostId}] Remote/Local leadership mismatch, marking self as leader`, + { + shouldBeLogged: true, + shouldReport: true, + }, + ); + + this.takeOverAsLeader(); + return; + } + + if (leaderId) { + this.logger.debug(`[Instance ID ${this.hostId}] Leader is other instance "${leaderId}"`); + return; + } + + this.logger.debug( + `[Instance ID ${this.hostId}] Leadership vacant, attempting to become leader...`, + ); + + const result = await this.client.setLeaderIfNotExists(); + if (!result.ok) { + this.logger.warn('Failed to try leader key set in Redis', { error: result.error }); + return; + } + + if (result.result) { + this.takeOverAsLeader(); + } + } + + private takeOverAsLeader() { + assert(!this.instanceSettings.isLeader); + + this.logger.info(`[Instance ID ${this.hostId}] Leader is now this instance`); + + this.instanceSettings.markAsLeader(); + + this.emit('leader-takeover'); + } + + private stepDownToFollower() { + assert(this.instanceSettings.isLeader); + + this.logger.info(`[Instance ID ${this.hostId}] This is now a follower instance`); + + this.instanceSettings.markAsFollower(); + + this.emit('leader-stepdown'); + } + + private logRedisCommandFailure(message: string, error: Error) { + this.logger.warn(`${message}: ${error.message}`, { error }); + } } diff --git a/packages/cli/src/scaling/multi-main-setup.types.ts b/packages/cli/src/scaling/multi-main-setup.types.ts deleted file mode 100644 index a7c68f2310b..00000000000 --- a/packages/cli/src/scaling/multi-main-setup.types.ts +++ /dev/null @@ -1,6 +0,0 @@ -export interface MultiMainStrategy { - init(): Promise; - shutdown(): Promise; - checkLeader(): Promise; - fetchLeaderKey(): Promise; -}