diff --git a/packages/cli/src/commands/__tests__/execute-batch.test.ts b/packages/cli/src/commands/__tests__/execute-batch.test.ts new file mode 100644 index 00000000000..3251a63b550 --- /dev/null +++ b/packages/cli/src/commands/__tests__/execute-batch.test.ts @@ -0,0 +1,94 @@ +import { GlobalConfig } from '@n8n/config'; +import type { User, WorkflowEntity } from '@n8n/db'; +import { Container } from '@n8n/di'; +import type { SelectQueryBuilder } from '@n8n/typeorm'; +import type { Config } from '@oclif/core'; +import { mock } from 'jest-mock-extended'; +import type { IRun } from 'n8n-workflow'; + +import { ActiveExecutions } from '@/active-executions'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { DeprecationService } from '@/deprecation/deprecation.service'; +import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; +import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay'; +import { ExternalHooks } from '@/external-hooks'; +import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; +import { PostHogClient } from '@/posthog'; +import { OwnershipService } from '@/services/ownership.service'; +import { ShutdownService } from '@/shutdown/shutdown.service'; +import { TaskRunnerModule } from '@/task-runners/task-runner-module'; +import { WorkflowRunner } from '@/workflow-runner'; +import { mockInstance } from '@test/mocking'; + +import { ExecuteBatch } from '../execute-batch'; + +const taskRunnerModule = mockInstance(TaskRunnerModule); +const workflowRepository = mockInstance(WorkflowRepository); +const ownershipService = mockInstance(OwnershipService); +const workflowRunner = mockInstance(WorkflowRunner); +const activeExecutions = mockInstance(ActiveExecutions); +const loadNodesAndCredentials = mockInstance(LoadNodesAndCredentials); +const shutdownService = mockInstance(ShutdownService); +const deprecationService = mockInstance(DeprecationService); +mockInstance(MessageEventBus); +const posthogClient = mockInstance(PostHogClient); +const telemetryEventRelay = mockInstance(TelemetryEventRelay); +const externalHooks = mockInstance(ExternalHooks); + +jest.mock('@/db', () => ({ + init: jest.fn().mockResolvedValue(undefined), + migrate: jest.fn().mockResolvedValue(undefined), + connectionState: { connected: false }, + close: jest.fn().mockResolvedValue(undefined), +})); + +test('should start a task runner when task runners are enabled', async () => { + // arrange + + const workflow = mock({ + id: '123', + nodes: [{ type: 'n8n-nodes-base.manualTrigger' }], + }); + + const run = mock({ data: { resultData: { error: undefined } } }); + + const queryBuilder = mock>({ + andWhere: jest.fn().mockReturnThis(), + getMany: jest.fn().mockResolvedValue([workflow]), + }); + + loadNodesAndCredentials.init.mockResolvedValue(undefined); + shutdownService.shutdown.mockReturnValue(); + deprecationService.warn.mockReturnValue(); + posthogClient.init.mockResolvedValue(); + telemetryEventRelay.init.mockResolvedValue(); + externalHooks.init.mockResolvedValue(); + + workflowRepository.createQueryBuilder.mockReturnValue(queryBuilder); + ownershipService.getInstanceOwner.mockResolvedValue(mock({ id: '123' })); + workflowRunner.run.mockResolvedValue('123'); + activeExecutions.getPostExecutePromise.mockResolvedValue(run); + + Container.set( + GlobalConfig, + mock({ + taskRunners: { enabled: true }, + nodes: { communityPackages: { enabled: false } }, + }), + ); + + const cmd = new ExecuteBatch([], {} as Config); + // @ts-expect-error Private property + cmd.parse = jest.fn().mockResolvedValue({ flags: {} }); + // @ts-expect-error Private property + cmd.runTests = jest.fn().mockResolvedValue({ summary: { failedExecutions: [] } }); + + // act + + await cmd.init(); + await cmd.run(); + + // assert + + expect(taskRunnerModule.start).toHaveBeenCalledTimes(1); +}); diff --git a/packages/cli/src/commands/__tests__/execute.test.ts b/packages/cli/src/commands/__tests__/execute.test.ts new file mode 100644 index 00000000000..369d5a8176c --- /dev/null +++ b/packages/cli/src/commands/__tests__/execute.test.ts @@ -0,0 +1,86 @@ +import { GlobalConfig } from '@n8n/config'; +import type { User, WorkflowEntity } from '@n8n/db'; +import { Container } from '@n8n/di'; +import type { Config } from '@oclif/core'; +import { mock } from 'jest-mock-extended'; +import type { IRun } from 'n8n-workflow'; + +import { ActiveExecutions } from '@/active-executions'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { DeprecationService } from '@/deprecation/deprecation.service'; +import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; +import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay'; +import { ExternalHooks } from '@/external-hooks'; +import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; +import { PostHogClient } from '@/posthog'; +import { OwnershipService } from '@/services/ownership.service'; +import { ShutdownService } from '@/shutdown/shutdown.service'; +import { TaskRunnerModule } from '@/task-runners/task-runner-module'; +import { WorkflowRunner } from '@/workflow-runner'; +import { mockInstance } from '@test/mocking'; + +import { Execute } from '../execute'; + +const taskRunnerModule = mockInstance(TaskRunnerModule); +const workflowRepository = mockInstance(WorkflowRepository); +const ownershipService = mockInstance(OwnershipService); +const workflowRunner = mockInstance(WorkflowRunner); +const activeExecutions = mockInstance(ActiveExecutions); +const loadNodesAndCredentials = mockInstance(LoadNodesAndCredentials); +const shutdownService = mockInstance(ShutdownService); +const deprecationService = mockInstance(DeprecationService); +mockInstance(MessageEventBus); +const posthogClient = mockInstance(PostHogClient); +const telemetryEventRelay = mockInstance(TelemetryEventRelay); +const externalHooks = mockInstance(ExternalHooks); + +jest.mock('@/db', () => ({ + init: jest.fn().mockResolvedValue(undefined), + migrate: jest.fn().mockResolvedValue(undefined), + connectionState: { connected: false }, + close: jest.fn().mockResolvedValue(undefined), +})); + +test('should start a task runner when task runners are enabled', async () => { + // arrange + + const workflow = mock({ + id: '123', + nodes: [{ type: 'n8n-nodes-base.manualTrigger' }], + }); + + const run = mock({ data: { resultData: { error: undefined } } }); + + loadNodesAndCredentials.init.mockResolvedValue(undefined); + shutdownService.shutdown.mockReturnValue(); + deprecationService.warn.mockReturnValue(); + posthogClient.init.mockResolvedValue(); + telemetryEventRelay.init.mockResolvedValue(); + externalHooks.init.mockResolvedValue(); + + workflowRepository.findOneBy.mockResolvedValue(workflow); + ownershipService.getInstanceOwner.mockResolvedValue(mock({ id: '123' })); + workflowRunner.run.mockResolvedValue('123'); + activeExecutions.getPostExecutePromise.mockResolvedValue(run); + + Container.set( + GlobalConfig, + mock({ + taskRunners: { enabled: true }, + nodes: { communityPackages: { enabled: false } }, + }), + ); + + const cmd = new Execute([], {} as Config); + // @ts-expect-error Private property + cmd.parse = jest.fn().mockResolvedValue({ flags: { id: '123' } }); + + // act + + await cmd.init(); + await cmd.run(); + + // assert + + expect(taskRunnerModule.start).toHaveBeenCalledTimes(1); +}); diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index 4e34d9dd4e0..c107ec9019f 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -69,6 +69,9 @@ export abstract class BaseCommand extends Command { /** Whether to init community packages (if enabled) */ protected needsCommunityPackages = false; + /** Whether to init task runner (if enabled). */ + protected needsTaskRunner = false; + protected async loadModules() { for (const moduleName of this.modulesConfig.modules) { let preInitModule: ModulePreInit | undefined; @@ -156,6 +159,11 @@ export abstract class BaseCommand extends Command { await Container.get(CommunityPackagesService).checkForMissingPackages(); } + if (this.needsTaskRunner && this.globalConfig.taskRunners.enabled) { + const { TaskRunnerModule } = await import('@/task-runners/task-runner-module'); + await Container.get(TaskRunnerModule).start(); + } + // TODO: remove this after the cyclic dependencies around the event-bus are resolved Container.get(MessageEventBus); diff --git a/packages/cli/src/commands/execute-batch.ts b/packages/cli/src/commands/execute-batch.ts index 42ff4e8e129..b4a5c312830 100644 --- a/packages/cli/src/commands/execute-batch.ts +++ b/packages/cli/src/commands/execute-batch.ts @@ -112,6 +112,8 @@ export class ExecuteBatch extends BaseCommand { override needsCommunityPackages = true; + override needsTaskRunner = true; + /** * Gracefully handles exit. * @param {boolean} skipExit Whether to skip exit or number according to received signal @@ -335,7 +337,6 @@ export class ExecuteBatch extends BaseCommand { if (results.summary.failedExecutions > 0) { this.exit(1); } - this.exit(0); } mergeResults(results: IResult, retryResults: IResult) { diff --git a/packages/cli/src/commands/execute.ts b/packages/cli/src/commands/execute.ts index e880c307332..70aa15bf82f 100644 --- a/packages/cli/src/commands/execute.ts +++ b/packages/cli/src/commands/execute.ts @@ -29,6 +29,8 @@ export class Execute extends BaseCommand { override needsCommunityPackages = true; + override needsTaskRunner = true; + async init() { await super.init(); await this.initBinaryDataService(); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index c7d473de28f..071a60a8c6e 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -68,6 +68,8 @@ export class Start extends BaseCommand { override needsCommunityPackages = true; + override needsTaskRunner = true; + private getEditorUrl = () => Container.get(UrlService).getInstanceBaseUrl(); /** @@ -234,13 +236,6 @@ export class Start extends BaseCommand { await this.generateStaticAssets(); } - const { taskRunners: taskRunnerConfig } = this.globalConfig; - if (taskRunnerConfig.enabled) { - const { TaskRunnerModule } = await import('@/task-runners/task-runner-module'); - const taskRunnerModule = Container.get(TaskRunnerModule); - await taskRunnerModule.start(); - } - await this.loadModules(); } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 989a11a21c8..4a407e1f795 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -39,6 +39,8 @@ export class Worker extends BaseCommand { override needsCommunityPackages = true; + override needsTaskRunner = true; + /** * Stop n8n in a graceful way. * Make for example sure that all the webhooks from third party services @@ -108,13 +110,6 @@ export class Worker extends BaseCommand { }), ); - const { taskRunners: taskRunnerConfig } = this.globalConfig; - if (taskRunnerConfig.enabled) { - const { TaskRunnerModule } = await import('@/task-runners/task-runner-module'); - const taskRunnerModule = Container.get(TaskRunnerModule); - await taskRunnerModule.start(); - } - await this.loadModules(); }