refactor(core): Remove legacy leader election implementation (no-changelog) (#30640)

This commit is contained in:
Tomi Turtiainen 2026-05-19 14:41:44 +03:00 committed by GitHub
parent d74dc0f4e5
commit 4ee0f670d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 336 additions and 664 deletions

View File

@ -13,8 +13,4 @@ export class MultiMainSetupConfig {
/** Interval in seconds between leader eligibility checks in multi-main setup. */
@Env('N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL')
interval: number = 3;
/** Whether to use the new leader election implementation (Lua-script based). */
@Env('N8N_NEW_LEADER_ELECTION_IMPLEMENTATION')
newLeaderElection: boolean = false;
}

View File

@ -374,7 +374,6 @@ describe('GlobalConfig', () => {
enabled: false,
ttl: 10,
interval: 3,
newLeaderElection: false,
},
generic: {
timezone: 'America/New_York',

View File

@ -1,6 +1,3 @@
import type { LeaderElectionClient } from '@/scaling/leader-election-client';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { RedisClientService } from '@/services/redis-client.service';
import { mockLogger } from '@n8n/backend-test-utils';
import type { GlobalConfig } from '@n8n/config';
import { MultiMainMetadata } from '@n8n/decorators';
@ -8,6 +5,8 @@ import { mock } from 'jest-mock-extended';
import type { ErrorReporter, InstanceSettings } from 'n8n-core';
import { createResultOk, createResultError } from 'n8n-workflow';
import type { LeaderElectionClient } from '@/scaling/leader-election-client';
import { MultiMainSetup } from '../multi-main-setup.ee';
function createInstanceSettings(hostId: string) {
@ -36,347 +35,217 @@ function createInstanceSettings(hostId: string) {
describe('MultiMainSetup', () => {
const hostId = 'main-n8n-main-0';
const logger = mockLogger();
const metadata = new MultiMainMetadata();
const errorReporter = mock<ErrorReporter>();
const client = mock<LeaderElectionClient>();
describe('with legacy implementation (flag off)', () => {
const logger = mockLogger();
const publisher = mock<Publisher>();
const redisClientService = mock<RedisClientService>();
const errorReporter = mock<ErrorReporter>();
const globalConfig = mock<GlobalConfig>({
redis: { prefix: 'n8n' },
multiMainSetup: { ttl: 10, interval: 3, enabled: true },
});
const globalConfig = mock<GlobalConfig>({
redis: { prefix: 'n8n' },
multiMainSetup: { ttl: 10, interval: 3, enabled: true, newLeaderElection: false },
let instanceSettings: InstanceSettings;
let multiMainSetup: MultiMainSetup;
beforeEach(() => {
jest.clearAllMocks();
instanceSettings = createInstanceSettings(hostId);
multiMainSetup = new MultiMainSetup(
logger,
instanceSettings,
globalConfig,
metadata,
errorReporter,
client,
);
});
describe('init', () => {
it('should become leader if setLeaderIfNotExists succeeds', async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup.init();
expect(client.setLeaderIfNotExists).toHaveBeenCalled();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
let instanceSettings: InstanceSettings;
let multiMainSetup: MultiMainSetup;
it('should remain follower if setLeaderIfNotExists returns false', async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
const emit = jest.spyOn(multiMainSetup, 'emit');
beforeEach(() => {
jest.clearAllMocks();
instanceSettings = createInstanceSettings(hostId);
redisClientService.toValidPrefix.mockReturnValue('n8n');
multiMainSetup = new MultiMainSetup(
logger,
instanceSettings,
globalConfig,
metadata,
errorReporter,
publisher,
redisClientService,
await multiMainSetup.init();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
it('should remain follower if setLeaderIfNotExists fails', async () => {
client.setLeaderIfNotExists.mockResolvedValue(
createResultError(new Error('Command timed out')),
);
});
const emit = jest.spyOn(multiMainSetup, 'emit');
describe('init', () => {
it('should become leader if setIfNotExists succeeds', async () => {
publisher.setIfNotExists.mockResolvedValue(true);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup.init();
await multiMainSetup.init();
expect(publisher.setIfNotExists).toHaveBeenCalled();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
it('should remain follower if setIfNotExists returns false', async () => {
publisher.setIfNotExists.mockResolvedValue(false);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup.init();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
});
describe('checkLeader', () => {
it('should renew TTL when this instance is the leader', async () => {
publisher.setIfNotExists.mockResolvedValue(true);
await multiMainSetup.init();
jest.clearAllMocks();
publisher.get.mockResolvedValue(hostId);
await multiMainSetup['strategy'].checkLeader();
expect(publisher.setExpiration).toHaveBeenCalled();
});
it('should emit leader-takeover on mismatch recovery', async () => {
publisher.setIfNotExists.mockResolvedValue(false);
await multiMainSetup.init();
jest.clearAllMocks();
publisher.get.mockResolvedValue(hostId);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
it('should step down when another instance is leader', async () => {
publisher.setIfNotExists.mockResolvedValue(true);
await multiMainSetup.init();
jest.clearAllMocks();
publisher.get.mockResolvedValue('main-n8n-main-1');
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsFollower).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
});
it('should attempt to become leader when leadership is vacant', async () => {
publisher.setIfNotExists.mockResolvedValue(true);
await multiMainSetup.init();
jest.clearAllMocks();
publisher.get.mockResolvedValue(null);
publisher.setIfNotExists.mockResolvedValue(true);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
expect(publisher.setIfNotExists).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
});
describe('with new implementation (flag on)', () => {
const logger = mockLogger();
const publisher = mock<Publisher>();
const redisClientService = mock<RedisClientService>();
const errorReporter = mock<ErrorReporter>();
const client = mock<LeaderElectionClient>();
const globalConfig = mock<GlobalConfig>({
redis: { prefix: 'n8n' },
multiMainSetup: { ttl: 10, interval: 3, enabled: true, newLeaderElection: true },
});
let instanceSettings: InstanceSettings;
let multiMainSetup: MultiMainSetup;
beforeEach(() => {
describe('checkLeader (leader path)', () => {
beforeEach(async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
await multiMainSetup.init();
jest.clearAllMocks();
instanceSettings = createInstanceSettings(hostId);
});
jest.mock('@/scaling/leader-election-client', () => ({
LeaderElectionClient: jest.fn(),
}));
it('should stay leader when TTL renewal succeeds', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'success' }));
const emit = jest.spyOn(multiMainSetup, 'emit');
const { Container } = jest.requireActual('@n8n/di');
Container.set(
jest.requireMock('@/scaling/leader-election-client').LeaderElectionClient,
client,
await multiMainSetup['checkLeader']();
expect(client.tryRenewLeaderTtl).toHaveBeenCalled();
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
it('should step down when another host is leader', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(
createResultOk({ id: 'other-host-is-leader', currentLeaderId: 'main-n8n-main-1' }),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
multiMainSetup = new MultiMainSetup(
logger,
instanceSettings,
globalConfig,
metadata,
errorReporter,
publisher,
redisClientService,
await multiMainSetup['checkLeader']();
expect(instanceSettings.markAsFollower).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
});
it('should try to re-acquire leader key when key is missing', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' }));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
await multiMainSetup['checkLeader']();
expect(client.setLeaderIfNotExists).toHaveBeenCalled();
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
});
it('should step down when key is missing and re-acquire fails', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' }));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
expect(instanceSettings.markAsFollower).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
});
it('should step down when key is missing and Redis command fails', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' }));
client.setLeaderIfNotExists.mockResolvedValue(
createResultError(new Error('Command timed out')),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
expect(instanceSettings.markAsFollower).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
});
describe('init', () => {
it('should become leader if setLeaderIfNotExists succeeds', async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
const emit = jest.spyOn(multiMainSetup, 'emit');
it('should stay leader when TTL renewal Redis command fails', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultError(new Error('Command timed out')));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup.init();
await multiMainSetup['checkLeader']();
expect(client.setLeaderIfNotExists).toHaveBeenCalled();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
});
it('should remain follower if setLeaderIfNotExists returns false', async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup.init();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
it('should remain follower if setLeaderIfNotExists fails', async () => {
client.setLeaderIfNotExists.mockResolvedValue(
createResultError(new Error('Command timed out')),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup.init();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
describe('checkLeader (follower path)', () => {
beforeEach(async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
await multiMainSetup.init();
jest.clearAllMocks();
});
describe('checkLeader (leader path)', () => {
beforeEach(async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
await multiMainSetup.init();
jest.clearAllMocks();
});
it('should become leader when Redis shows own hostId as leader (mismatch recovery)', async () => {
client.getLeader.mockResolvedValue(createResultOk(hostId));
const emit = jest.spyOn(multiMainSetup, 'emit');
it('should stay leader when TTL renewal succeeds', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'success' }));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
await multiMainSetup['strategy'].checkLeader();
expect(client.tryRenewLeaderTtl).toHaveBeenCalled();
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
it('should step down when another host is leader', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(
createResultOk({ id: 'other-host-is-leader', currentLeaderId: 'main-n8n-main-1' }),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsFollower).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
});
it('should try to re-acquire leader key when key is missing', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' }));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
await multiMainSetup['strategy'].checkLeader();
expect(client.setLeaderIfNotExists).toHaveBeenCalled();
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
});
it('should step down when key is missing and re-acquire fails', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' }));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsFollower).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
});
it('should step down when key is missing and Redis command fails', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(createResultOk({ id: 'key-missing' }));
client.setLeaderIfNotExists.mockResolvedValue(
createResultError(new Error('Command timed out')),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsFollower).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-stepdown');
});
it('should stay leader when TTL renewal Redis command fails', async () => {
client.tryRenewLeaderTtl.mockResolvedValue(
createResultError(new Error('Command timed out')),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
expect(errorReporter.info).toHaveBeenCalled();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
describe('checkLeader (follower path)', () => {
beforeEach(async () => {
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
await multiMainSetup.init();
jest.clearAllMocks();
});
it('should stay follower when another instance is leader', async () => {
client.getLeader.mockResolvedValue(createResultOk('main-n8n-main-1'));
const emit = jest.spyOn(multiMainSetup, 'emit');
it('should become leader when Redis shows own hostId as leader (mismatch recovery)', async () => {
client.getLeader.mockResolvedValue(createResultOk(hostId));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
expect(errorReporter.info).toHaveBeenCalled();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
it('should attempt to become leader when leadership is vacant', async () => {
client.getLeader.mockResolvedValue(createResultOk(null));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
const emit = jest.spyOn(multiMainSetup, 'emit');
it('should stay follower when another instance is leader', async () => {
client.getLeader.mockResolvedValue(createResultOk('main-n8n-main-1'));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
await multiMainSetup['strategy'].checkLeader();
expect(client.setLeaderIfNotExists).toHaveBeenCalled();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
it('should stay follower when leadership is vacant but setLeaderIfNotExists fails', async () => {
client.getLeader.mockResolvedValue(createResultOk(null));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
const emit = jest.spyOn(multiMainSetup, 'emit');
it('should attempt to become leader when leadership is vacant', async () => {
client.getLeader.mockResolvedValue(createResultOk(null));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(true));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
expect(client.setLeaderIfNotExists).toHaveBeenCalled();
expect(instanceSettings.markAsLeader).toHaveBeenCalled();
expect(emit).toHaveBeenCalledWith('leader-takeover');
});
it('should stay follower when Redis is unreachable', async () => {
client.getLeader.mockResolvedValue(createResultError(new Error('Command timed out')));
const emit = jest.spyOn(multiMainSetup, 'emit');
it('should stay follower when leadership is vacant but setLeaderIfNotExists fails', async () => {
client.getLeader.mockResolvedValue(createResultOk(null));
client.setLeaderIfNotExists.mockResolvedValue(createResultOk(false));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
it('should stay follower when leadership is vacant and Redis command fails', async () => {
client.getLeader.mockResolvedValue(createResultOk(null));
client.setLeaderIfNotExists.mockResolvedValue(
createResultError(new Error('Command timed out')),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
it('should stay follower when Redis is unreachable', async () => {
client.getLeader.mockResolvedValue(createResultError(new Error('Command timed out')));
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['checkLeader']();
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(instanceSettings.markAsFollower).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
expect(emit).not.toHaveBeenCalledWith('leader-stepdown');
});
it('should stay follower when leadership is vacant and Redis command fails', async () => {
client.getLeader.mockResolvedValue(createResultOk(null));
client.setLeaderIfNotExists.mockResolvedValue(
createResultError(new Error('Command timed out')),
);
const emit = jest.spyOn(multiMainSetup, 'emit');
await multiMainSetup['strategy'].checkLeader();
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
expect(instanceSettings.markAsLeader).not.toHaveBeenCalled();
expect(emit).not.toHaveBeenCalledWith('leader-takeover');
});
});
});

View File

@ -1,120 +0,0 @@
import type { Logger } from '@n8n/backend-common';
import type { GlobalConfig } from '@n8n/config';
import type { ErrorReporter, InstanceSettings } from 'n8n-core';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { RedisClientService } from '@/services/redis-client.service';
import type { MultiMainStrategy } from './multi-main-setup.types';
type EmitFn = (event: 'leader-takeover' | 'leader-stepdown') => void;
export class MultiMainSetupLegacy implements MultiMainStrategy {
private leaderKey: string;
private readonly leaderKeyTtl: number;
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly publisher: Publisher,
private readonly redisClientService: RedisClientService,
private readonly globalConfig: GlobalConfig,
private readonly errorReporter: ErrorReporter,
private readonly emit: EmitFn,
) {
this.leaderKeyTtl = this.globalConfig.multiMainSetup.ttl;
}
async init() {
const prefix = this.globalConfig.redis.prefix;
const validPrefix = this.redisClientService.toValidPrefix(prefix);
this.leaderKey = validPrefix + ':main_instance_leader';
await this.tryBecomeLeader();
}
async shutdown() {
const { isLeader } = this.instanceSettings;
if (isLeader) await this.publisher.clear(this.leaderKey);
}
async checkLeader() {
const leaderId = await this.publisher.get(this.leaderKey);
const { hostId } = this.instanceSettings;
if (leaderId === hostId) {
if (!this.instanceSettings.isLeader) {
this.errorReporter.info(
`[Instance ID ${hostId}] Remote/Local leadership mismatch, marking self as leader`,
{
shouldBeLogged: true,
shouldReport: true,
},
);
this.instanceSettings.markAsLeader();
this.emit('leader-takeover');
}
this.logger.debug(`[Instance ID ${hostId}] Leader is this instance`);
await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
return;
}
if (leaderId && leaderId !== hostId) {
this.logger.debug(`[Instance ID ${hostId}] Leader is other instance "${leaderId}"`);
if (this.instanceSettings.isLeader) {
this.instanceSettings.markAsFollower();
this.emit('leader-stepdown');
this.logger.warn('[Multi-main setup] Leader failed to renew leader key');
}
return;
}
if (!leaderId) {
this.logger.debug(
`[Instance ID ${hostId}] Leadership vacant, attempting to become leader...`,
);
this.instanceSettings.markAsFollower();
this.emit('leader-stepdown');
await this.tryBecomeLeader();
}
}
private async tryBecomeLeader() {
const { hostId } = this.instanceSettings;
const keySetSuccessfully = await this.publisher.setIfNotExists(
this.leaderKey,
hostId,
this.leaderKeyTtl,
);
if (keySetSuccessfully) {
this.logger.info(`[Instance ID ${hostId}] Leader is now this instance`);
this.instanceSettings.markAsLeader();
this.emit('leader-takeover');
} else {
this.instanceSettings.markAsFollower();
}
}
async fetchLeaderKey() {
return await this.publisher.get(this.leaderKey);
}
}

View File

@ -1,191 +0,0 @@
import type { LeaderElectionClient } from '@/scaling/leader-election-client';
import type { Logger } from '@n8n/backend-common';
import type { ErrorReporter, InstanceSettings } from 'n8n-core';
import assert from 'node:assert';
import type { MultiMainStrategy } from './multi-main-setup.types';
type EmitFn = (event: 'leader-takeover' | 'leader-stepdown') => void;
export class MultiMainSetupV2 implements MultiMainStrategy {
private leaderCheckInProgress = false;
private get hostId() {
return this.instanceSettings.hostId;
}
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly errorReporter: ErrorReporter,
private readonly client: LeaderElectionClient,
private readonly emit: EmitFn,
) {}
async init() {
const result = await this.client.setLeaderIfNotExists();
if (!result.ok) {
this.logRedisCommandFailure('Failed to set leader key in Redis during init', result.error);
this.instanceSettings.markAsFollower();
} else if (result.result) {
this.takeOverAsLeader();
} else {
this.instanceSettings.markAsFollower();
}
}
async shutdown() {
const { isLeader } = this.instanceSettings;
if (isLeader) {
// TODO: We should guard here that we only remove the key the key in Redis matches
// our host ID.
const result = await this.client.clearLeader();
if (!result.ok) {
this.logger.warn('Failed to clear leader key from Redis', { error: result.error });
}
}
this.client.destroy();
}
async checkLeader() {
if (this.leaderCheckInProgress) {
this.logger.warn('Previous leader check is still in progress, skipping this check');
return;
}
this.leaderCheckInProgress = true;
try {
if (this.instanceSettings.isLeader) {
await this.checkAreWeStillLeader();
} else {
await this.checkCanBecomeLeader();
}
} finally {
this.leaderCheckInProgress = false;
}
}
/** Renew our leadership lease. If we've lost the lease, step down to follower. */
private async checkAreWeStillLeader() {
assert(this.instanceSettings.isLeader);
const renewTtlResult = await this.client.tryRenewLeaderTtl();
if (!renewTtlResult.ok) {
this.logRedisCommandFailure('Failed to renew leader TTL', renewTtlResult.error);
// There's a decision to be made here: Do we step down or not? Redis might
// be unavailable for all clients or only for us. We could also track the TTL
// locally, but this would make the implementation more complex and error-prone.
// For now we accept that this might cause some inconsistencies in a network
// partition scenario, but eventually the system will recover once Redis is available again.
return;
}
const renewalResult = renewTtlResult.result;
if (renewalResult.id === 'success') {
this.logger.debug(`[Instance ID ${this.hostId}] Leader is this instance`);
return;
}
this.logger.warn('[Multi-main setup] Leader failed to renew leader key');
if (renewalResult.id === 'other-host-is-leader') {
this.logger.debug(
`[Instance ID ${this.hostId}] Leader is other instance "${renewalResult.currentLeaderId}"`,
);
this.stepDownToFollower();
return;
}
// The only remaining case is 'key-missing', which means we lost leadership
// (e.g. due to Redis unavailability or a network partition). In this case
// we try to become leader and step down if that fails.
assert(renewalResult.id === 'key-missing');
const result = await this.client.setLeaderIfNotExists();
if (!result.ok) {
this.logRedisCommandFailure('Failed to set leader key in Redis', result.error);
this.stepDownToFollower();
return;
}
const wasSet = result.result;
if (!wasSet) {
this.stepDownToFollower();
}
}
private async checkCanBecomeLeader() {
assert(!this.instanceSettings.isLeader);
const getResult = await this.client.getLeader();
if (!getResult.ok) {
this.logRedisCommandFailure('Failed to get leader key from Redis', getResult.error);
return;
}
const leaderId = getResult.result;
if (leaderId && leaderId === this.hostId) {
this.errorReporter.info(
`[Instance ID ${this.hostId}] Remote/Local leadership mismatch, marking self as leader`,
{
shouldBeLogged: true,
shouldReport: true,
},
);
this.takeOverAsLeader();
return;
}
if (leaderId) {
this.logger.debug(`[Instance ID ${this.hostId}] Leader is other instance "${leaderId}"`);
return;
}
this.logger.debug(
`[Instance ID ${this.hostId}] Leadership vacant, attempting to become leader...`,
);
const result = await this.client.setLeaderIfNotExists();
if (!result.ok) {
this.logger.warn('Failed to try leader key set in Redis', { error: result.error });
return;
}
const wasSet = result.result;
if (wasSet) {
this.takeOverAsLeader();
}
}
private takeOverAsLeader() {
assert(!this.instanceSettings.isLeader);
this.logger.info(`[Instance ID ${this.hostId}] Leader is now this instance`);
this.instanceSettings.markAsLeader();
this.emit('leader-takeover');
}
private stepDownToFollower() {
assert(this.instanceSettings.isLeader);
this.logger.info(`[Instance ID ${this.hostId}] This is now a follower instance`);
this.instanceSettings.markAsFollower();
this.emit('leader-stepdown');
}
async fetchLeaderKey() {
const result = await this.client.getLeader();
return result.ok ? result.result : null;
}
private logRedisCommandFailure(message: string, error: Error) {
this.logger.warn(`${message}: ${error.message}`, { error });
}
}

View File

@ -1,18 +1,13 @@
import { TypedEmitter } from '@/typed-emitter';
import { Logger } from '@n8n/backend-common';
import { GlobalConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import { MultiMainMetadata } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { ErrorReporter, InstanceSettings } from 'n8n-core';
import assert from 'node:assert';
import type * as LeaderElectionClientModule from '@/scaling/leader-election-client';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { RedisClientService } from '@/services/redis-client.service';
import { MultiMainSetupLegacy } from './multi-main-setup-legacy';
import type { MultiMainStrategy } from './multi-main-setup.types';
import { MultiMainSetupV2 } from './multi-main-setup-v2';
import { LeaderElectionClient } from '@/scaling/leader-election-client';
import { TypedEmitter } from '@/typed-emitter';
type MultiMainEvents = {
/**
@ -33,53 +28,40 @@ type MultiMainEvents = {
/** Designates leader and followers when running multiple main processes. */
@Service()
export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
private readonly strategy: MultiMainStrategy;
private leaderCheckInterval: NodeJS.Timeout | undefined;
private leaderCheckInProgress = false;
private get hostId() {
return this.instanceSettings.hostId;
}
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly globalConfig: GlobalConfig,
private readonly metadata: MultiMainMetadata,
private readonly errorReporter: ErrorReporter,
private readonly publisher: Publisher,
private readonly redisClientService: RedisClientService,
private readonly client: LeaderElectionClient,
) {
super();
this.logger = this.logger.scoped(['scaling', 'multi-main-setup']);
const emitFn = (event: 'leader-takeover' | 'leader-stepdown') => this.emit(event);
if (this.globalConfig.multiMainSetup.newLeaderElection) {
const { LeaderElectionClient } =
require('@/scaling/leader-election-client') as typeof LeaderElectionClientModule;
const client = Container.get(LeaderElectionClient);
this.strategy = new MultiMainSetupV2(
this.logger,
this.instanceSettings,
this.errorReporter,
client,
emitFn,
);
} else {
this.strategy = new MultiMainSetupLegacy(
this.logger,
this.instanceSettings,
this.publisher,
this.redisClientService,
this.globalConfig,
this.errorReporter,
emitFn,
);
}
}
async init() {
await this.strategy.init();
const result = await this.client.setLeaderIfNotExists();
if (!result.ok) {
this.logRedisCommandFailure('Failed to set leader key in Redis during init', result.error);
this.instanceSettings.markAsFollower();
} else if (result.result) {
// we became leader
this.takeOverAsLeader();
} else {
this.instanceSettings.markAsFollower();
}
this.leaderCheckInterval = setInterval(async () => {
await this.strategy.checkLeader();
await this.checkLeader();
}, this.globalConfig.multiMainSetup.interval * Time.seconds.toMilliseconds);
}
@ -87,11 +69,21 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
async shutdown() {
clearInterval(this.leaderCheckInterval);
await this.strategy.shutdown();
if (this.instanceSettings.isLeader) {
// TODO: We should guard here that we only remove the key the key in Redis matches
// our host ID.
const result = await this.client.clearLeader();
if (!result.ok) {
this.logger.warn('Failed to clear leader key from Redis', { error: result.error });
}
}
this.client.destroy();
}
async fetchLeaderKey(): Promise<string | null> {
return await this.strategy.fetchLeaderKey();
const result = await this.client.getLeader();
return result.ok ? result.result : null;
}
registerEventHandlers() {
@ -104,4 +96,137 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
});
}
}
private async checkLeader() {
if (this.leaderCheckInProgress) {
this.logger.warn('Previous leader check is still in progress, skipping this check');
return;
}
this.leaderCheckInProgress = true;
try {
if (this.instanceSettings.isLeader) {
await this.checkAreWeStillLeader();
} else {
await this.checkCanBecomeLeader();
}
} finally {
this.leaderCheckInProgress = false;
}
}
/** Renew our leadership lease. If we've lost the lease, step down to follower. */
private async checkAreWeStillLeader() {
assert(this.instanceSettings.isLeader);
const renewTtlResult = await this.client.tryRenewLeaderTtl();
if (!renewTtlResult.ok) {
this.logRedisCommandFailure('Failed to renew leader TTL', renewTtlResult.error);
// There's a decision to be made here: Do we step down or not? Redis might
// be unavailable for all clients or only for us. We could also track the TTL
// locally, but this would make the implementation more complex and error-prone.
// For now we accept that this might cause some inconsistencies in a network
// partition scenario, but eventually the system will recover once Redis is available again.
return;
}
const renewalResult = renewTtlResult.result;
if (renewalResult.id === 'success') {
this.logger.debug(`[Instance ID ${this.hostId}] Leader is this instance`);
return;
}
this.logger.warn('[Multi-main setup] Leader failed to renew leader key');
if (renewalResult.id === 'other-host-is-leader') {
this.logger.debug(
`[Instance ID ${this.hostId}] Leader is other instance "${renewalResult.currentLeaderId}"`,
);
this.stepDownToFollower();
return;
}
// The only remaining case is 'key-missing', which means we lost leadership
// (e.g. due to Redis unavailability or a network partition). In this case
// we try to become leader and step down if that fails.
assert(renewalResult.id === 'key-missing');
const result = await this.client.setLeaderIfNotExists();
if (!result.ok) {
this.logRedisCommandFailure('Failed to set leader key in Redis', result.error);
this.stepDownToFollower();
return;
}
if (!result.result) {
this.stepDownToFollower();
}
}
private async checkCanBecomeLeader() {
assert(!this.instanceSettings.isLeader);
const getResult = await this.client.getLeader();
if (!getResult.ok) {
this.logRedisCommandFailure('Failed to get leader key from Redis', getResult.error);
return;
}
const leaderId = getResult.result;
if (leaderId && leaderId === this.hostId) {
this.errorReporter.info(
`[Instance ID ${this.hostId}] Remote/Local leadership mismatch, marking self as leader`,
{
shouldBeLogged: true,
shouldReport: true,
},
);
this.takeOverAsLeader();
return;
}
if (leaderId) {
this.logger.debug(`[Instance ID ${this.hostId}] Leader is other instance "${leaderId}"`);
return;
}
this.logger.debug(
`[Instance ID ${this.hostId}] Leadership vacant, attempting to become leader...`,
);
const result = await this.client.setLeaderIfNotExists();
if (!result.ok) {
this.logger.warn('Failed to try leader key set in Redis', { error: result.error });
return;
}
if (result.result) {
this.takeOverAsLeader();
}
}
private takeOverAsLeader() {
assert(!this.instanceSettings.isLeader);
this.logger.info(`[Instance ID ${this.hostId}] Leader is now this instance`);
this.instanceSettings.markAsLeader();
this.emit('leader-takeover');
}
private stepDownToFollower() {
assert(this.instanceSettings.isLeader);
this.logger.info(`[Instance ID ${this.hostId}] This is now a follower instance`);
this.instanceSettings.markAsFollower();
this.emit('leader-stepdown');
}
private logRedisCommandFailure(message: string, error: Error) {
this.logger.warn(`${message}: ${error.message}`, { error });
}
}

View File

@ -1,6 +0,0 @@
export interface MultiMainStrategy {
init(): Promise<void>;
shutdown(): Promise<void>;
checkLeader(): Promise<void>;
fetchLeaderKey(): Promise<string | null>;
}