diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index 4ac31197ad6..0a497110415 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -1,6 +1,6 @@ import type { User } from '@n8n/db'; import type { ExecutionEntity } from '@n8n/db'; -import { Container } from '@n8n/di'; +import { Container, Service } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { DirectedGraph, WorkflowExecute } from 'n8n-core'; import * as core from 'n8n-core'; @@ -258,3 +258,39 @@ describe('run', () => { ); }); }); + +describe('enqueueExecution', () => { + const setupQueue = jest.fn(); + + @Service() + class MockScalingService { + setupQueue = setupQueue; + + addJob = jest.fn(); + } + + beforeAll(() => { + jest.mock('@/scaling/scaling.service', () => ({ + ScalingService: MockScalingService, + })); + }); + + afterAll(() => { + jest.unmock('@/scaling/scaling.service'); + }); + + it('should setup queue when scalingService is not initialized', async () => { + const activeExecutions = Container.get(ActiveExecutions); + jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValue(); + jest.spyOn(runner, 'processError').mockResolvedValue(); + const data = mock({ + workflowData: { nodes: [] }, + executionData: undefined, + }); + + // @ts-expect-error Private method + await runner.enqueueExecution('1', data); + + expect(setupQueue).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 65202886c5a..8b7efe2b55f 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -57,6 +57,9 @@ export class ScalingService { async setupQueue() { const { default: BullQueue } = await import('bull'); const { RedisClientService } = await import('@/services/redis-client.service'); + + if (this.queue) return; + const service = Container.get(RedisClientService); const bullPrefix = this.globalConfig.queue.bull.prefix; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index ef14b497467..f82e7867f16 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -348,6 +348,7 @@ export class WorkflowRunner { if (!this.scalingService) { const { ScalingService } = await import('@/scaling/scaling.service'); this.scalingService = Container.get(ScalingService); + await this.scalingService.setupQueue(); } // TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds.