diff --git a/packages/@n8n/decorators/src/__tests__/on-multi-main-event.test.ts b/packages/@n8n/decorators/src/__tests__/on-multi-main-event.test.ts new file mode 100644 index 00000000000..96a36c53e7e --- /dev/null +++ b/packages/@n8n/decorators/src/__tests__/on-multi-main-event.test.ts @@ -0,0 +1,193 @@ +import { Container } from '@n8n/di'; +import { Service } from '@n8n/di'; +import { EventEmitter } from 'node:events'; + +import { MultiMainMetadata } from '../multi-main-metadata'; +import { LEADER_TAKEOVER_EVENT_NAME, LEADER_STEPDOWN_EVENT_NAME } from '../multi-main-metadata'; +import { NonMethodError, OnLeaderStepdown, OnLeaderTakeover } from '../on-multi-main-event'; + +class MockMultiMainSetup extends EventEmitter { + registerEventHandlers() { + const handlers = Container.get(MultiMainMetadata).getHandlers(); + + for (const { eventHandlerClass, methodName, eventName } of handlers) { + const instance = Container.get(eventHandlerClass); + this.on(eventName, async () => { + return await instance[methodName].call(instance); + }); + } + } +} + +let multiMainSetup: MockMultiMainSetup; +let metadata: MultiMainMetadata; + +beforeEach(() => { + Container.reset(); + + metadata = new MultiMainMetadata(); + Container.set(MultiMainMetadata, metadata); + + multiMainSetup = new MockMultiMainSetup(); +}); + +it('should register methods decorated with @OnLeaderTakeover', () => { + jest.spyOn(metadata, 'register'); + + @Service() + class TestService { + @OnLeaderTakeover() + async handleLeaderTakeover() {} + } + + expect(metadata.register).toHaveBeenCalledWith({ + eventName: LEADER_TAKEOVER_EVENT_NAME, + methodName: 'handleLeaderTakeover', + eventHandlerClass: TestService, + }); +}); + +it('should register methods decorated with @OnLeaderStepdown', () => { + jest.spyOn(metadata, 'register'); + + @Service() + class TestService { + @OnLeaderStepdown() + async handleLeaderStepdown() {} + } + + expect(metadata.register).toHaveBeenCalledTimes(1); + expect(metadata.register).toHaveBeenCalledWith({ + eventName: LEADER_STEPDOWN_EVENT_NAME, + methodName: 'handleLeaderStepdown', + eventHandlerClass: TestService, + }); +}); + +it('should throw an error if the decorated target is not a method', () => { + expect(() => { + @Service() + class TestService { + // @ts-expect-error Testing invalid code + @OnLeaderTakeover() + notAFunction = 'string'; + } + + new TestService(); + }).toThrowError(NonMethodError); +}); + +it('should call decorated methods when events are emitted', async () => { + @Service() + class TestService { + takeoverCalled = false; + + stepdownCalled = false; + + @OnLeaderTakeover() + async handleLeaderTakeover() { + this.takeoverCalled = true; + } + + @OnLeaderStepdown() + async handleLeaderStepdown() { + this.stepdownCalled = true; + } + } + + const testService = Container.get(TestService); + jest.spyOn(testService, 'handleLeaderTakeover'); + jest.spyOn(testService, 'handleLeaderStepdown'); + + multiMainSetup.registerEventHandlers(); + + multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME); + multiMainSetup.emit(LEADER_STEPDOWN_EVENT_NAME); + + expect(testService.handleLeaderTakeover).toHaveBeenCalledTimes(1); + expect(testService.handleLeaderStepdown).toHaveBeenCalledTimes(1); + expect(testService.takeoverCalled).toBe(true); + expect(testService.stepdownCalled).toBe(true); +}); + +it('should register multiple handlers for the same event', async () => { + @Service() + class TestService { + firstHandlerCalled = false; + + secondHandlerCalled = false; + + @OnLeaderTakeover() + async firstHandler() { + this.firstHandlerCalled = true; + } + + @OnLeaderTakeover() + async secondHandler() { + this.secondHandlerCalled = true; + } + } + + const testService = Container.get(TestService); + + multiMainSetup.registerEventHandlers(); + + multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME); + + expect(testService.firstHandlerCalled).toBe(true); + expect(testService.secondHandlerCalled).toBe(true); +}); + +it('should register handlers from multiple service classes', async () => { + @Service() + class FirstService { + handlerCalled = false; + + @OnLeaderTakeover() + async handleTakeover() { + this.handlerCalled = true; + } + } + + @Service() + class SecondService { + handlerCalled = false; + + @OnLeaderTakeover() + async handleTakeover() { + this.handlerCalled = true; + } + } + + const firstService = Container.get(FirstService); + const secondService = Container.get(SecondService); + + multiMainSetup.registerEventHandlers(); + + multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME); + + expect(firstService.handlerCalled).toBe(true); + expect(secondService.handlerCalled).toBe(true); +}); + +it('should handle async methods correctly', async () => { + @Service() + class TestService { + result = ''; + + @OnLeaderTakeover() + async handleLeaderTakeover() { + await new Promise((resolve) => setTimeout(resolve, 10)); + this.result = 'completed'; + } + } + + const testService = Container.get(TestService); + + multiMainSetup.registerEventHandlers(); + multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME); + + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(testService.result).toBe('completed'); +}); diff --git a/packages/@n8n/decorators/src/index.ts b/packages/@n8n/decorators/src/index.ts index 32461f07dd6..76008b795fc 100644 --- a/packages/@n8n/decorators/src/index.ts +++ b/packages/@n8n/decorators/src/index.ts @@ -18,3 +18,5 @@ export { BaseN8nModule, N8nModule } from './module'; export { Debounce } from './debounce'; export type { AccessScope, Controller, RateLimit } from './types'; export type { ShutdownHandler } from './types'; +export { MultiMainMetadata } from './multi-main-metadata'; +export { OnLeaderTakeover, OnLeaderStepdown } from './on-multi-main-event'; diff --git a/packages/@n8n/decorators/src/module.ts b/packages/@n8n/decorators/src/module.ts index c93bff7bfad..0969114e7f4 100644 --- a/packages/@n8n/decorators/src/module.ts +++ b/packages/@n8n/decorators/src/module.ts @@ -1,10 +1,4 @@ import { Container, Service, type Constructable } from '@n8n/di'; -import type EventEmitter from 'node:events'; - -/** - * @TODO Temporary dummy type until `MultiMainSetup` registers listeners via decorators. - */ -type MultiMainSetup = EventEmitter; /** * @TODO Temporary dummy type until `ExecutionLifecycleHooks` registers hooks via decorators. @@ -14,7 +8,6 @@ export type ExecutionLifecycleHooks = object; export interface BaseN8nModule { initialize?(): void; registerLifecycleHooks?(hooks: ExecutionLifecycleHooks): void; - registerMultiMainListeners?(multiMainSetup: MultiMainSetup): void; } type Module = Constructable; @@ -47,10 +40,4 @@ export class ModuleRegistry { } } } - - registerMultiMainListeners(multiMainSetup: MultiMainSetup) { - for (const ModuleClass of registry.keys()) { - Container.get(ModuleClass).registerMultiMainListeners?.(multiMainSetup); - } - } } diff --git a/packages/@n8n/decorators/src/multi-main-metadata.ts b/packages/@n8n/decorators/src/multi-main-metadata.ts new file mode 100644 index 00000000000..ce4bde8544e --- /dev/null +++ b/packages/@n8n/decorators/src/multi-main-metadata.ts @@ -0,0 +1,36 @@ +import { Service } from '@n8n/di'; + +import type { Class } from './types'; + +export const LEADER_TAKEOVER_EVENT_NAME = 'leader-takeover'; +export const LEADER_STEPDOWN_EVENT_NAME = 'leader-stepdown'; + +export type MultiMainEvent = typeof LEADER_TAKEOVER_EVENT_NAME | typeof LEADER_STEPDOWN_EVENT_NAME; + +type EventHandlerFn = () => Promise | void; + +export type EventHandlerClass = Class>; + +type EventHandler = { + /** Class holding the method to call on a multi-main event. */ + eventHandlerClass: EventHandlerClass; + + /** Name of the method to call on a multi-main event. */ + methodName: string; + + /** Name of the multi-main event to listen to. */ + eventName: MultiMainEvent; +}; + +@Service() +export class MultiMainMetadata { + private readonly handlers: EventHandler[] = []; + + register(handler: EventHandler) { + this.handlers.push(handler); + } + + getHandlers(): EventHandler[] { + return this.handlers; + } +} diff --git a/packages/@n8n/decorators/src/on-multi-main-event.ts b/packages/@n8n/decorators/src/on-multi-main-event.ts new file mode 100644 index 00000000000..d8e4118d53a --- /dev/null +++ b/packages/@n8n/decorators/src/on-multi-main-event.ts @@ -0,0 +1,66 @@ +import { Container } from '@n8n/di'; +import { UnexpectedError } from 'n8n-workflow'; + +import type { EventHandlerClass, MultiMainEvent } from './multi-main-metadata'; +import { + LEADER_TAKEOVER_EVENT_NAME, + LEADER_STEPDOWN_EVENT_NAME, + MultiMainMetadata, +} from './multi-main-metadata'; + +export class NonMethodError extends UnexpectedError { + constructor(name: string) { + super(`${name} must be a method on a class to use this decorator`); + } +} + +const OnMultiMainEvent = + (eventName: MultiMainEvent): MethodDecorator => + (prototype, propertyKey, descriptor) => { + const eventHandlerClass = prototype.constructor as EventHandlerClass; + const methodName = String(propertyKey); + + if (typeof descriptor?.value !== 'function') { + throw new NonMethodError(`${eventHandlerClass.name}.${methodName}()`); + } + + Container.get(MultiMainMetadata).register({ + eventHandlerClass, + methodName, + eventName, + }); + }; + +/** + * Decorator that registers a method to be called when this main instance becomes the leader. + * + * @example + * + * ```ts + * @Service() + * class MyService { + * @OnLeaderTakeover() + * async startDoingThings() { + * // ... + * } + * } + * ``` + */ +export const OnLeaderTakeover = () => OnMultiMainEvent(LEADER_TAKEOVER_EVENT_NAME); + +/** + * Decorator that registers a method to be called when this main instance stops being the leader. + * + * @example + * + * ```ts + * @Service() + * class MyService { + * @OnLeaderStepdown() + * async stopDoingThings() { + * // ... + * } + * } + * ``` + */ +export const OnLeaderStepdown = () => OnMultiMainEvent(LEADER_STEPDOWN_EVENT_NAME); diff --git a/packages/@n8n/decorators/src/types.ts b/packages/@n8n/decorators/src/types.ts index 11263ab1c3c..dcab87ecd6c 100644 --- a/packages/@n8n/decorators/src/types.ts +++ b/packages/@n8n/decorators/src/types.ts @@ -50,7 +50,7 @@ export type Controller = Constructable & type RouteHandlerFn = () => Promise | void; -type Class = new (...args: A) => T; +export type Class = new (...args: A) => T; export type ServiceClass = Class>; diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index 98a500a69c5..14f722793a1 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -92,12 +92,10 @@ export abstract class BaseCommand extends Command { } } - const moduleRegistry = Container.get(ModuleRegistry); - - moduleRegistry.initializeModules(); + Container.get(ModuleRegistry).initializeModules(); if (this.instanceSettings.isMultiMain) { - moduleRegistry.registerMultiMainListeners(Container.get(MultiMainSetup)); + Container.get(MultiMainSetup).registerEventHandlers(); } } diff --git a/packages/cli/src/modules/insights/__tests__/insights.module.test.ts b/packages/cli/src/modules/insights/__tests__/insights.module.test.ts index 53592cfded8..7d376bf5613 100644 --- a/packages/cli/src/modules/insights/__tests__/insights.module.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights.module.test.ts @@ -1,9 +1,7 @@ -import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { InstanceSettings } from 'n8n-core'; import type { Logger } from 'n8n-core'; -import { OrchestrationService } from '@/services/orchestration.service'; import { mockInstance } from '@test/mocking'; import { InsightsModule } from '../insights.module'; @@ -13,7 +11,6 @@ describe('InsightsModule', () => { let logger: Logger; let insightsService: InsightsService; let instanceSettings: InstanceSettings; - let orchestrationService: OrchestrationService; beforeEach(() => { logger = mock({ @@ -24,7 +21,6 @@ describe('InsightsModule', () => { ), }); insightsService = mockInstance(InsightsService); - orchestrationService = Container.get(OrchestrationService); }); describe('backgroundProcess', () => { @@ -41,25 +37,5 @@ describe('InsightsModule', () => { insightsModule.initialize(); expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled(); }); - - it('should start background process on leader takeover', () => { - instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false }); - const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); - insightsModule.initialize(); - expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled(); - insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup); - orchestrationService.multiMainSetup.emit('leader-takeover'); - expect(insightsService.startBackgroundProcess).toHaveBeenCalled(); - }); - - it('should stop background process on leader stepdown', () => { - instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true }); - const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); - insightsModule.initialize(); - expect(insightsService.stopBackgroundProcess).not.toHaveBeenCalled(); - insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup); - orchestrationService.multiMainSetup.emit('leader-stepdown'); - expect(insightsService.stopBackgroundProcess).toHaveBeenCalled(); - }); }); }); diff --git a/packages/cli/src/modules/insights/insights.module.ts b/packages/cli/src/modules/insights/insights.module.ts index e2fb716596c..b6604e77395 100644 --- a/packages/cli/src/modules/insights/insights.module.ts +++ b/packages/cli/src/modules/insights/insights.module.ts @@ -1,10 +1,8 @@ import type { BaseN8nModule } from '@n8n/decorators'; -import { N8nModule } from '@n8n/decorators'; +import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators'; import type { ExecutionLifecycleHooks } from 'n8n-core'; import { InstanceSettings, Logger } from 'n8n-core'; -import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; - import { InsightsService } from './insights.service'; import './insights.controller'; @@ -35,8 +33,13 @@ export class InsightsModule implements BaseN8nModule { }); } - registerMultiMainListeners(multiMainSetup: MultiMainSetup) { - multiMainSetup.on('leader-takeover', () => this.insightsService.startBackgroundProcess()); - multiMainSetup.on('leader-stepdown', () => this.insightsService.stopBackgroundProcess()); + @OnLeaderTakeover() + startBackgroundProcess() { + this.insightsService.startBackgroundProcess(); + } + + @OnLeaderStepdown() + stopBackgroundProcess() { + this.insightsService.stopBackgroundProcess(); } } diff --git a/packages/cli/src/scaling/multi-main-setup.ee.ts b/packages/cli/src/scaling/multi-main-setup.ee.ts index aa977ddc6a0..3f74784ef11 100644 --- a/packages/cli/src/scaling/multi-main-setup.ee.ts +++ b/packages/cli/src/scaling/multi-main-setup.ee.ts @@ -1,5 +1,6 @@ import { GlobalConfig } from '@n8n/config'; -import { Service } from '@n8n/di'; +import { MultiMainMetadata } from '@n8n/decorators'; +import { Container, Service } from '@n8n/di'; import { InstanceSettings, Logger } from 'n8n-core'; import config from '@/config'; @@ -12,14 +13,14 @@ type MultiMainEvents = { /** * Emitted when this instance loses leadership. In response, its various * services will stop triggers, pollers, pruning, wait-tracking, license - * renewal, queue recovery, etc. + * renewal, queue recovery, insights, etc. */ 'leader-stepdown': never; /** * Emitted when this instance gains leadership. In response, its various * services will start triggers, pollers, pruning, wait-tracking, license - * renewal, queue recovery, etc. + * renewal, queue recovery, insights, etc. */ 'leader-takeover': never; }; @@ -33,6 +34,7 @@ export class MultiMainSetup extends TypedEmitter { private readonly publisher: Publisher, private readonly redisClientService: RedisClientService, private readonly globalConfig: GlobalConfig, + private readonly metadata: MultiMainMetadata, ) { super(); this.logger = this.logger.scoped(['scaling', 'multi-main-setup']); @@ -128,4 +130,16 @@ export class MultiMainSetup extends TypedEmitter { async fetchLeaderKey() { return await this.publisher.get(this.leaderKey); } + + registerEventHandlers() { + const handlers = this.metadata.getHandlers(); + + for (const { eventHandlerClass, methodName, eventName } of handlers) { + const instance = Container.get(eventHandlerClass); + this.on(eventName, async () => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return instance[methodName].call(instance); + }); + } + } }