From fe297d09aba112779087b6f344747c78fe42e626 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ir=C3=A9n=C3=A9e?= Date: Wed, 19 Nov 2025 09:48:03 +0100 Subject: [PATCH] fix: In queue mode "Respond to Webhook" node returns 500 when there is an error in the execution (#21981) --- .../__tests__/job-processor.service.test.ts | 60 ++++++++++++++++- .../scaling/__tests__/scaling.service.test.ts | 65 +++++++++++++++++++ packages/cli/src/scaling/job-processor.ts | 12 ++++ packages/cli/src/scaling/scaling.service.ts | 15 ++++- packages/cli/src/scaling/scaling.types.ts | 1 + 5 files changed, 150 insertions(+), 3 deletions(-) diff --git a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts index 08408c58437..73f82960587 100644 --- a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts +++ b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts @@ -12,6 +12,7 @@ import { Workflow, type IRunExecutionData, type WorkflowExecuteMode, + type ExecutionError, } from 'n8n-workflow'; import { JobProcessor } from '../job-processor'; @@ -110,12 +111,69 @@ describe('JobProcessor', () => { mock(), ); - await jobProcessor.processJob(mock()); + const job = mock(); + + await jobProcessor.processJob(job); expect(manualExecutionService.runManually).toHaveBeenCalledTimes(1); + + expect(job.progress).toHaveBeenCalledWith( + expect.objectContaining({ + kind: 'job-finished', + success: true, + }), + ); }, ); + it('should send job-finished with success=false when execution has errors', async () => { + const executionRepository = mock(); + // First call: initial execution fetch (no error yet) + executionRepository.findSingleExecution.mockResolvedValueOnce( + mock({ + mode: 'manual', + workflowData: { nodes: [] }, + data: mock({ + executionData: undefined, + }), + }), + ); + // Second call: after execution completes, fetch again to check for errors + executionRepository.findSingleExecution.mockResolvedValueOnce( + mock({ + status: 'error', + data: { + resultData: { + error: mock(), + }, + }, + }), + ); + + const manualExecutionService = mock(); + const jobProcessor = new JobProcessor( + logger, + executionRepository, + mock(), + mock(), + mock(), + manualExecutionService, + executionsConfig, + mock(), + ); + + const job = mock(); + + await jobProcessor.processJob(job); + + expect(job.progress).toHaveBeenCalledWith( + expect.objectContaining({ + kind: 'job-finished', + success: false, + }), + ); + }); + it('should pass additional data for partial executions to run', async () => { const executionRepository = mock(); const pinData: IPinData = { pinned: [] }; diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index 3040d331e5d..248c1e27c50 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -360,6 +360,71 @@ describe('ScalingService', () => { content: 'test', }); }); + + it('should resolve responsePromise with empty response when job-finished has success=true', async () => { + const activeExecutions = mock(); + scalingService = new ScalingService( + mockLogger(), + mock(), + activeExecutions, + jobProcessor, + globalConfig, + mock(), + instanceSettings, + mock(), + ); + + await scalingService.setupQueue(); + + const messageHandler = queue.on.mock.calls.find( + ([event]) => (event as string) === 'global:progress', + )?.[1] as (jobId: JobId, msg: unknown) => void; + + const jobFinishedMessage = { + kind: 'job-finished', + executionId: 'exec-123', + workerId: 'worker-456', + success: true, + }; + + messageHandler('job-789', jobFinishedMessage); + + expect(activeExecutions.resolveResponsePromise).toHaveBeenCalledWith('exec-123', {}); + }); + + it('should resolve responsePromise with error response when job-finished has success=false', async () => { + const activeExecutions = mock(); + scalingService = new ScalingService( + mockLogger(), + mock(), + activeExecutions, + jobProcessor, + globalConfig, + mock(), + instanceSettings, + mock(), + ); + + await scalingService.setupQueue(); + + const messageHandler = queue.on.mock.calls.find( + ([event]) => (event as string) === 'global:progress', + )?.[1] as (jobId: JobId, msg: unknown) => void; + + const jobFinishedMessage = { + kind: 'job-finished', + executionId: 'exec-123', + workerId: 'worker-456', + success: false, + }; + + messageHandler('job-789', jobFinishedMessage); + + expect(activeExecutions.resolveResponsePromise).toHaveBeenCalledWith('exec-123', { + body: { message: 'Workflow execution failed' }, + statusCode: 500, + }); + }); }); describe('recoverFromQueue', () => { diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index e0023f6671d..e54dc498d95 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -247,16 +247,19 @@ export class JobProcessor { delete this.runningJobs[job.id]; + const hasErrors = await this.executionHasErrors(executionId); this.logger.info(`Worker finished execution ${executionId} (job ${job.id})`, { executionId, workflowId, jobId: job.id, + success: !hasErrors, }); const msg: JobFinishedMessage = { kind: 'job-finished', executionId, workerId: this.instanceSettings.hostId, + success: !hasErrors, }; await job.progress(msg); @@ -269,6 +272,15 @@ export class JobProcessor { return { success: true }; } + private async executionHasErrors(executionId: string): Promise { + const execution = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + return execution?.status === 'error' || execution?.data?.resultData?.error !== undefined; + } + stopJob(jobId: JobId) { const runningJob = this.runningJobs[jobId]; if (!runningJob) return; diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index ddd2c8f1a38..9142a6c8d2a 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -324,11 +324,22 @@ export class ScalingService { this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); break; case 'job-finished': - this.activeExecutions.resolveResponsePromise(msg.executionId, {}); - this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, { + if (msg.success) { + this.activeExecutions.resolveResponsePromise(msg.executionId, {}); + } else { + this.activeExecutions.resolveResponsePromise(msg.executionId, { + body: { + message: 'Workflow execution failed', + }, + statusCode: 500, + }); + } + + this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished`, { workerId: msg.workerId, executionId: msg.executionId, jobId, + success: msg.success, }); break; case 'job-failed': diff --git a/packages/cli/src/scaling/scaling.types.ts b/packages/cli/src/scaling/scaling.types.ts index 33576a6eff2..afbaa5c66c6 100644 --- a/packages/cli/src/scaling/scaling.types.ts +++ b/packages/cli/src/scaling/scaling.types.ts @@ -57,6 +57,7 @@ export type JobFinishedMessage = { kind: 'job-finished'; executionId: string; workerId: string; + success: boolean; }; export type SendChunkMessage = {