n8n/packages/cli/src/workflow-runner.ts

690 lines
23 KiB
TypeScript

/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Logger } from '@n8n/backend-common';
import { ExecutionsConfig } from '@n8n/config';
import { ExecutionRepository } from '@n8n/db';
import { Container, Service } from '@n8n/di';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import {
ErrorReporter,
establishExecutionContext,
InstanceSettings,
StorageConfig,
WorkflowExecute,
} from 'n8n-core';
import type {
ExecutionError,
IDeferredPromise,
IExecuteResponsePromiseData,
INode,
IPinData,
IRun,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
createRunExecutionData,
ExecutionCancelledError,
ManualExecutionCancelledError,
TimeoutExecutionCancelledError,
Workflow,
} from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import { ActiveExecutions } from '@/active-executions';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import { MaxStalledCountError } from '@/errors/max-stalled-count.error';
// eslint-disable-next-line import-x/no-cycle
import {
getLifecycleHooksForRegularMain,
getLifecycleHooksForScalingWorker,
getLifecycleHooksForScalingMain,
} from '@/execution-lifecycle/execution-lifecycle-hooks';
import { FailedRunFactory } from '@/executions/failed-run-factory';
import { CredentialsPermissionChecker } from '@/executions/pre-execution-checks';
import { ExternalHooks } from '@/external-hooks';
import { ManualExecutionService } from '@/manual-execution.service';
import { NodeTypes } from '@/node-types';
import type { ScalingService } from '@/scaling/scaling.service';
import type { Job, JobData } from '@/scaling/scaling.types';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { EventService } from './events/event.service';
/** Interval between keepalive writes on streaming responses to prevent proxy timeouts */
const STREAMING_HEARTBEAT_INTERVAL_MS = 30_000;
/** JSON chunk written periodically to keep the streaming connection alive through reverse proxies */
const STREAMING_KEEPALIVE_CHUNK = '{"type":"keepalive"}\n';
/**
* Flush the response through the compression middleware.
* The `flush` method is added at runtime by the Express `compression` middleware
* and is not part of the standard Response type.
*/
function flushResponse(res: { flush?: () => void }) {
if (typeof res.flush === 'function') {
res.flush();
}
}
@Service()
export class WorkflowRunner {
private scalingService: ScalingService;
constructor(
private readonly logger: Logger,
private readonly errorReporter: ErrorReporter,
private readonly activeExecutions: ActiveExecutions,
private readonly executionRepository: ExecutionRepository,
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly nodeTypes: NodeTypes,
private readonly credentialsPermissionChecker: CredentialsPermissionChecker,
private readonly instanceSettings: InstanceSettings,
private readonly manualExecutionService: ManualExecutionService,
private readonly failedRunFactory: FailedRunFactory,
private readonly eventService: EventService,
private readonly executionsConfig: ExecutionsConfig,
private readonly storageConfig: StorageConfig,
private readonly externalHooks: ExternalHooks,
) {}
/** The process did error */
async processError(
error: ExecutionError | ExecutionNotFoundError,
startedAt: Date,
executionMode: WorkflowExecuteMode,
executionId: string,
hooks?: ExecutionLifecycleHooks,
) {
// This means the execution was probably cancelled and has already
// been cleaned up.
//
// FIXME: This is a quick fix. The proper fix would be to not remove
// the execution from the active executions while it's still running.
if (
error instanceof ExecutionNotFoundError ||
error instanceof ExecutionCancelledError ||
(typeof error.message === 'string' && error.message.includes('cancelled'))
) {
return;
}
this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
this.errorReporter.error(error, { executionId });
const isQueueMode = this.executionsConfig.mode === 'queue';
// in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled
// by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415
if (isQueueMode) {
const executionWithoutData = await this.executionRepository.findSingleExecution(executionId, {
includeData: false,
});
if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') {
// false positive, execution was successful
return;
}
}
const fullRunData: IRun = {
data: createRunExecutionData({
resultData: {
error: {
...error,
message: error.message,
stack: error.stack,
},
runData: {},
},
}),
finished: false,
mode: executionMode,
startedAt,
stoppedAt: new Date(),
status: 'error',
storedAt: this.storageConfig.modeTag,
};
// Remove from active execution with empty data. That will
// set the execution to failed.
this.activeExecutions.finalizeExecution(executionId, fullRunData);
await hooks?.runHook('workflowExecuteAfter', [fullRunData]);
}
/**
* Persist a failed execution record for an already-registered execution and
* finalize it, so a pre-flight failure surfaces as a normal failed run.
*/
private async failExecution(
data: IWorkflowExecutionDataProcess,
executionId: string,
error: ExecutionError & { node?: INode },
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<void> {
const runData = this.failedRunFactory.generateFailedExecutionFromError(
data.executionMode,
error,
error.node,
);
const lifecycleHooks = getLifecycleHooksForRegularMain(data, executionId);
await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]);
await lifecycleHooks.runHook('workflowExecuteAfter', [runData]);
responsePromise?.reject(error);
this.activeExecutions.finalizeExecution(executionId);
}
/** Run the workflow
* @param realtime This is used in queue mode to change the priority of an execution, making sure they are picked up quicker.
*/
async run(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
// Establish the execution context before persisting to the DB.
// activeExecutions.add() -> executionPersistence.create() writes
// data.executionData to the DB; any header masking or runtimeData
// population must happen before that write so the persisted record
// does not contain raw trigger-item data (e.g. Authorization headers).
// The runtimeData early-exit guard in establishExecutionContext keeps
// the subsequent worker-side call at workflow-execute.ts idempotent.
// Guard on the inner executionData: in queue mode with manual offload
// the outer IRunExecutionData is created with `executionData: null`
// so the trigger-item stack is undefined here; nothing to mask yet,
// the worker will establish context once it populates the stack.
let establishContextError: (ExecutionError & { node?: INode }) | undefined;
if (data.executionData?.executionData) {
// Deliberately lightweight: no pinData, no staticData loading,
// no additionalData. establishExecutionContext only needs the
// workflow's settings (for redactionPolicy) and node lookups.
// runMainProcess() builds its own fully-configured Workflow for
// actual execution.
const contextWorkflow = new Workflow({
id: data.workflowData.id,
name: data.workflowData.name,
nodes: data.workflowData.nodes,
connections: data.workflowData.connections,
active: data.workflowData.activeVersionId !== null,
nodeTypes: this.nodeTypes,
staticData: data.workflowData.staticData,
settings: data.workflowData.settings ?? {},
});
try {
await establishExecutionContext(
contextWorkflow,
data.executionData,
undefined,
data.executionMode,
);
} catch (error) {
// Masking may have failed partway through, so the trigger-item
// stack can still contain raw header data. Drop it before
// activeExecutions.add() persists the execution row.
data.executionData.executionData.nodeExecutionStack = [];
establishContextError = error as ExecutionError & { node?: INode };
}
}
// Register a new execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (establishContextError) {
await this.failExecution(data, executionId, establishContextError, responsePromise);
return executionId;
}
const { id: workflowId, nodes } = data.workflowData;
try {
await this.credentialsPermissionChecker.check(workflowId, nodes);
} catch (error) {
await this.failExecution(data, executionId, error, responsePromise);
return executionId;
}
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
// Set up streaming heartbeat on the main process that holds the HTTP response.
// This must happen BEFORE the queue/local decision because in queue mode the
// execution runs on a worker process that has no access to the HTTP response.
let heartbeatInterval: NodeJS.Timeout | undefined;
if (data.streamingEnabled === true && data.httpResponse) {
const res = data.httpResponse;
heartbeatInterval = setInterval(() => {
if (!res.writableEnded) {
res.write(STREAMING_KEEPALIVE_CHUNK);
flushResponse(res);
}
}, STREAMING_HEARTBEAT_INTERVAL_MS);
}
// @TODO: Reduce to true branch once feature is stable
const shouldEnqueue =
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'
? this.executionsConfig.mode === 'queue'
: this.executionsConfig.mode === 'queue' && data.executionMode !== 'manual';
if (shouldEnqueue) {
await this.enqueueExecution(
executionId,
workflowId,
data,
loadStaticData,
realtime,
restartExecutionId,
);
} else {
await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId);
}
// only run these when not in queue mode or when the execution is manual,
// since these calls are now done by the worker directly
if (
this.executionsConfig.mode !== 'queue' ||
this.instanceSettings.instanceType === 'worker' ||
data.executionMode === 'manual' ||
data.executionMode === 'chat'
) {
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
postExecutePromise.catch((error) => {
if (error instanceof ExecutionCancelledError) return;
this.errorReporter.error(error, {
extra: { executionId, workflowId },
});
this.logger.error('There was an error in the post-execution promise', {
error,
executionId,
workflowId,
workflowName: data.workflowData.name,
...(data.projectId && { projectId: data.projectId }),
...(data.projectName && { projectName: data.projectName }),
});
});
}
// Clean up the streaming heartbeat when the execution finishes
if (heartbeatInterval) {
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
void postExecutePromise.finally(() => {
clearInterval(heartbeatInterval);
});
}
return executionId;
}
/** Run the workflow in current process */
private async runMainProcess(
executionId: string,
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
restartExecutionId?: string,
): Promise<void> {
const workflowId = data.workflowData.id;
if (loadStaticData === true && workflowId) {
data.workflowData.staticData =
await this.workflowStaticDataService.getStaticDataById(workflowId);
}
// Soft timeout to stop workflow execution after current running node
// Changes were made by adding the `workflowTimeout` to the `additionalData`
// So that the timeout will also work for executions with nested workflows.
let executionTimeout: NodeJS.Timeout;
const workflowSettings = data.workflowData.settings ?? {};
let workflowTimeout = workflowSettings.executionTimeout ?? this.executionsConfig.timeout; // initialize with default
if (workflowTimeout > 0) {
workflowTimeout = Math.min(workflowTimeout, this.executionsConfig.maxTimeout);
}
let pinData: IPinData | undefined;
if (['manual', 'evaluation'].includes(data.executionMode)) {
pinData = data.pinData ?? data.workflowData.pinData;
}
const workflow = new Workflow({
id: workflowId,
name: data.workflowData.name,
nodes: data.workflowData.nodes,
connections: data.workflowData.connections,
active: data.workflowData.activeVersionId !== null,
nodeTypes: this.nodeTypes,
staticData: data.workflowData.staticData,
settings: workflowSettings,
pinData,
});
const additionalData = await WorkflowExecuteAdditionalData.getBase({
userId: data.userId,
workflowId: workflow.id,
executionTimeoutTimestamp:
workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000,
workflowSettings,
});
additionalData.restartExecutionId = restartExecutionId;
additionalData.streamingEnabled = data.streamingEnabled;
additionalData.executionId = executionId;
this.logger.debug(
`Execution for workflow ${data.workflowData.name} was assigned id ${executionId}`,
{ executionId },
);
let workflowExecution: PCancelable<IRun>;
await this.executionRepository.setRunning(executionId); // write
try {
const lifecycleHooks = getLifecycleHooksForRegularMain(data, executionId);
additionalData.hooks = lifecycleHooks;
lifecycleHooks.addHandler('sendResponse', (response) => {
this.activeExecutions.resolveResponsePromise(executionId, response);
});
if (data.streamingEnabled) {
lifecycleHooks.addHandler('sendChunk', (chunk) => {
data.httpResponse?.write(JSON.stringify(chunk) + '\n');
if (data.httpResponse) flushResponse(data.httpResponse);
});
}
additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({
executionId,
});
additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({
pushRef: data.pushRef,
});
if (data.executionData !== undefined) {
this.logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, {
executionId,
});
const workflowExecute = new WorkflowExecute(
additionalData,
data.executionMode,
data.executionData,
);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else {
workflowExecution = this.manualExecutionService.runManually(
data,
workflow,
additionalData,
executionId,
pinData,
);
}
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
if (workflowTimeout > 0) {
let timeout = Math.min(workflowTimeout, this.executionsConfig.maxTimeout) * 1000; // as milliseconds
if (data.startedAt && data.startedAt instanceof Date) {
// If startedAt is set, we calculate the timeout based on the startedAt time
// This is useful for executions that were waiting in a waiting state
// and we want to ensure the timeout is relative to when the execution started.
const now = Date.now();
timeout = Math.max(timeout - (now - data.startedAt.getTime()), 0);
}
if (timeout === 0) {
this.activeExecutions.stopExecution(
executionId,
new TimeoutExecutionCancelledError(executionId),
);
} else {
executionTimeout = setTimeout(() => {
void this.activeExecutions.stopExecution(
executionId,
new TimeoutExecutionCancelledError(executionId),
);
}, timeout);
}
}
workflowExecution
.then((fullRunData) => {
clearTimeout(executionTimeout);
if (workflowExecution.isCanceled) {
fullRunData.finished = false;
}
this.activeExecutions.resolveExecutionResponsePromise(executionId);
this.activeExecutions.finalizeExecution(executionId, fullRunData);
})
.catch(
async (error) =>
await this.processError(
error,
new Date(),
data.executionMode,
executionId,
additionalData.hooks,
),
);
} catch (error) {
await this.processError(
error,
new Date(),
data.executionMode,
executionId,
additionalData.hooks,
);
throw error;
}
}
private async enqueueExecution(
executionId: string,
workflowId: string,
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
restartExecutionId?: string,
): Promise<void> {
const jobData: JobData = {
workflowId,
executionId,
loadStaticData: !!loadStaticData,
pushRef: data.pushRef,
streamingEnabled: data.streamingEnabled,
restartExecutionId,
projectId: data.projectId,
projectName: data.projectName,
// MCP-specific fields for queue mode support
isMcpExecution: data.isMcpExecution,
mcpType: data.mcpType,
mcpSessionId: data.mcpSessionId,
mcpMessageId: data.mcpMessageId,
mcpToolCall: data.mcpToolCall,
};
if (!this.scalingService) {
const { ScalingService } = await import('@/scaling/scaling.service');
this.scalingService = Container.get(ScalingService);
await this.scalingService.setupQueue();
}
// TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds.
// Check if they get retried by default and how often.
let job: Job;
let lifecycleHooks: ExecutionLifecycleHooks;
try {
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });
lifecycleHooks = getLifecycleHooksForScalingMain(data, executionId);
// Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed.
await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]);
} catch (error) {
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
await this.processError(error, new Date(), data.executionMode, executionId, lifecycleHooks);
throw error;
}
const workflowExecution: PCancelable<IRun> = new PCancelable(
async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
await this.scalingService.stopJob(job);
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
const error = new ManualExecutionCancelledError(executionId);
await this.processError(
error,
new Date(),
data.executionMode,
executionId,
lifecycleHooks,
);
reject(error);
});
try {
await job.finished();
} catch (error) {
if (
error instanceof Error &&
typeof error.message === 'string' &&
error.message.includes('job stalled more than maxStalledCount')
) {
error = new MaxStalledCountError(error);
this.eventService.emit('job-stalled', {
executionId: job.data.executionId,
workflowId: job.data.workflowId,
hostId: this.instanceSettings.hostId,
jobId: job.id.toString(),
});
}
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
// "workflowExecuteAfter" which we require.
const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId);
await this.processError(
error,
new Date(),
data.executionMode,
executionId,
lifecycleHooks,
);
this.scalingService.popJobResult(executionId);
return reject(error);
}
const jobResult = this.scalingService.popJobResult(executionId);
let runData: IRun;
if (
!jobResult ||
this.needsFullExecutionData(data.executionMode, executionId, data.forceFullExecutionData)
) {
const fullExecutionData = await this.executionRepository.findSingleExecution(
executionId,
{
includeData: true,
unflattenData: true,
},
);
if (!fullExecutionData) {
return reject(new Error(`Could not find execution with id "${executionId}"`));
}
runData = {
finished: fullExecutionData.finished,
mode: fullExecutionData.mode,
startedAt: fullExecutionData.startedAt,
stoppedAt: fullExecutionData.stoppedAt,
status: fullExecutionData.status,
waitTill: fullExecutionData.waitTill,
data: fullExecutionData.data,
jobId: job.id.toString(),
storedAt: fullExecutionData.storedAt,
};
} else {
runData = {
finished: jobResult.success,
mode: data.executionMode,
startedAt: jobResult.startedAt,
stoppedAt: jobResult.stoppedAt,
status: jobResult.status,
waitTill: jobResult.waitTill,
data: createRunExecutionData({
resultData: {
runData: {},
lastNodeExecuted: jobResult.lastNodeExecuted,
error: jobResult.error,
metadata: jobResult.metadata,
},
}),
jobId: job.id.toString(),
storedAt: this.storageConfig.modeTag,
};
}
this.activeExecutions.finalizeExecution(executionId, runData);
// Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed.
await lifecycleHooks.runHook('workflowExecuteAfter', [runData]);
resolve(runData);
},
);
workflowExecution.catch(() => {
// We `reject` this promise if the execution fails
// but the error is handled already by processError
// So we're just preventing crashes here.
});
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
}
/**
* Whether main must retrieve full execution data from the DB on job completion.
*
* Full data is needed when:
* - `integrated` mode: parent workflow needs child execution output data
* - `lastNode` response mode: webhook response is built from the last node's output
* - `workflow.postExecute` hook: external hooks receive full execution data
*
* In all other cases we can skip the DB fetch and use the lightweight
* result summary sent by the worker via the job progress message.
*/
private needsFullExecutionData(
executionMode: WorkflowExecuteMode,
executionId: string,
forceFullExecutionData?: boolean,
): boolean {
if (forceFullExecutionData) return true;
if (!process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING) return true;
return (
executionMode === 'integrated' ||
this.activeExecutions.getResponseMode(executionId) === 'lastNode' ||
this.externalHooks.hasHook('workflow.postExecute')
);
}
}