fix: In queue mode "Respond to Webhook" node returns 500 when there is an error in the execution (#21981)

This commit is contained in:
Irénée 2025-11-19 09:48:03 +01:00 committed by GitHub
parent 55912079d1
commit fe297d09ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 150 additions and 3 deletions

View File

@ -12,6 +12,7 @@ import {
Workflow,
type IRunExecutionData,
type WorkflowExecuteMode,
type ExecutionError,
} from 'n8n-workflow';
import { JobProcessor } from '../job-processor';
@ -110,12 +111,69 @@ describe('JobProcessor', () => {
mock(),
);
await jobProcessor.processJob(mock<Job>());
const job = mock<Job>();
await jobProcessor.processJob(job);
expect(manualExecutionService.runManually).toHaveBeenCalledTimes(1);
expect(job.progress).toHaveBeenCalledWith(
expect.objectContaining({
kind: 'job-finished',
success: true,
}),
);
},
);
it('should send job-finished with success=false when execution has errors', async () => {
const executionRepository = mock<ExecutionRepository>();
// First call: initial execution fetch (no error yet)
executionRepository.findSingleExecution.mockResolvedValueOnce(
mock<IExecutionResponse>({
mode: 'manual',
workflowData: { nodes: [] },
data: mock<IRunExecutionData>({
executionData: undefined,
}),
}),
);
// Second call: after execution completes, fetch again to check for errors
executionRepository.findSingleExecution.mockResolvedValueOnce(
mock<IExecutionResponse>({
status: 'error',
data: {
resultData: {
error: mock<ExecutionError>(),
},
},
}),
);
const manualExecutionService = mock<ManualExecutionService>();
const jobProcessor = new JobProcessor(
logger,
executionRepository,
mock(),
mock(),
mock(),
manualExecutionService,
executionsConfig,
mock(),
);
const job = mock<Job>();
await jobProcessor.processJob(job);
expect(job.progress).toHaveBeenCalledWith(
expect.objectContaining({
kind: 'job-finished',
success: false,
}),
);
});
it('should pass additional data for partial executions to run', async () => {
const executionRepository = mock<ExecutionRepository>();
const pinData: IPinData = { pinned: [] };

View File

@ -360,6 +360,71 @@ describe('ScalingService', () => {
content: 'test',
});
});
it('should resolve responsePromise with empty response when job-finished has success=true', async () => {
const activeExecutions = mock<ActiveExecutions>();
scalingService = new ScalingService(
mockLogger(),
mock(),
activeExecutions,
jobProcessor,
globalConfig,
mock(),
instanceSettings,
mock(),
);
await scalingService.setupQueue();
const messageHandler = queue.on.mock.calls.find(
([event]) => (event as string) === 'global:progress',
)?.[1] as (jobId: JobId, msg: unknown) => void;
const jobFinishedMessage = {
kind: 'job-finished',
executionId: 'exec-123',
workerId: 'worker-456',
success: true,
};
messageHandler('job-789', jobFinishedMessage);
expect(activeExecutions.resolveResponsePromise).toHaveBeenCalledWith('exec-123', {});
});
it('should resolve responsePromise with error response when job-finished has success=false', async () => {
const activeExecutions = mock<ActiveExecutions>();
scalingService = new ScalingService(
mockLogger(),
mock(),
activeExecutions,
jobProcessor,
globalConfig,
mock(),
instanceSettings,
mock(),
);
await scalingService.setupQueue();
const messageHandler = queue.on.mock.calls.find(
([event]) => (event as string) === 'global:progress',
)?.[1] as (jobId: JobId, msg: unknown) => void;
const jobFinishedMessage = {
kind: 'job-finished',
executionId: 'exec-123',
workerId: 'worker-456',
success: false,
};
messageHandler('job-789', jobFinishedMessage);
expect(activeExecutions.resolveResponsePromise).toHaveBeenCalledWith('exec-123', {
body: { message: 'Workflow execution failed' },
statusCode: 500,
});
});
});
describe('recoverFromQueue', () => {

View File

@ -247,16 +247,19 @@ export class JobProcessor {
delete this.runningJobs[job.id];
const hasErrors = await this.executionHasErrors(executionId);
this.logger.info(`Worker finished execution ${executionId} (job ${job.id})`, {
executionId,
workflowId,
jobId: job.id,
success: !hasErrors,
});
const msg: JobFinishedMessage = {
kind: 'job-finished',
executionId,
workerId: this.instanceSettings.hostId,
success: !hasErrors,
};
await job.progress(msg);
@ -269,6 +272,15 @@ export class JobProcessor {
return { success: true };
}
private async executionHasErrors(executionId: string): Promise<boolean> {
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
return execution?.status === 'error' || execution?.data?.resultData?.error !== undefined;
}
stopJob(jobId: JobId) {
const runningJob = this.runningJobs[jobId];
if (!runningJob) return;

View File

@ -324,11 +324,22 @@ export class ScalingService {
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
break;
case 'job-finished':
this.activeExecutions.resolveResponsePromise(msg.executionId, {});
this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, {
if (msg.success) {
this.activeExecutions.resolveResponsePromise(msg.executionId, {});
} else {
this.activeExecutions.resolveResponsePromise(msg.executionId, {
body: {
message: 'Workflow execution failed',
},
statusCode: 500,
});
}
this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished`, {
workerId: msg.workerId,
executionId: msg.executionId,
jobId,
success: msg.success,
});
break;
case 'job-failed':

View File

@ -57,6 +57,7 @@ export type JobFinishedMessage = {
kind: 'job-finished';
executionId: string;
workerId: string;
success: boolean;
};
export type SendChunkMessage = {