/* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { Logger } from '@n8n/backend-common'; import { ExecutionsConfig } from '@n8n/config'; import { ExecutionRepository } from '@n8n/db'; import { Container, Service } from '@n8n/di'; import type { ExecutionLifecycleHooks } from 'n8n-core'; import { ErrorReporter, establishExecutionContext, InstanceSettings, StorageConfig, WorkflowExecute, } from 'n8n-core'; import type { ExecutionError, IDeferredPromise, IExecuteResponsePromiseData, INode, IPinData, IRun, WorkflowExecuteMode, IWorkflowExecutionDataProcess, } from 'n8n-workflow'; import { createRunExecutionData, ExecutionCancelledError, ManualExecutionCancelledError, TimeoutExecutionCancelledError, Workflow, } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; import { ActiveExecutions } from '@/active-executions'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; // eslint-disable-next-line import-x/no-cycle import { getLifecycleHooksForRegularMain, getLifecycleHooksForScalingWorker, getLifecycleHooksForScalingMain, } from '@/execution-lifecycle/execution-lifecycle-hooks'; import { FailedRunFactory } from '@/executions/failed-run-factory'; import { CredentialsPermissionChecker } from '@/executions/pre-execution-checks'; import { ExternalHooks } from '@/external-hooks'; import { ManualExecutionService } from '@/manual-execution.service'; import { NodeTypes } from '@/node-types'; import type { ScalingService } from '@/scaling/scaling.service'; import type { Job, JobData } from '@/scaling/scaling.types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { EventService } from './events/event.service'; /** Interval between keepalive writes on streaming responses to prevent proxy timeouts */ const STREAMING_HEARTBEAT_INTERVAL_MS = 30_000; /** JSON chunk written periodically to keep the streaming connection alive through reverse proxies */ const STREAMING_KEEPALIVE_CHUNK = '{"type":"keepalive"}\n'; /** * Flush the response through the compression middleware. * The `flush` method is added at runtime by the Express `compression` middleware * and is not part of the standard Response type. */ function flushResponse(res: { flush?: () => void }) { if (typeof res.flush === 'function') { res.flush(); } } @Service() export class WorkflowRunner { private scalingService: ScalingService; constructor( private readonly logger: Logger, private readonly errorReporter: ErrorReporter, private readonly activeExecutions: ActiveExecutions, private readonly executionRepository: ExecutionRepository, private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly nodeTypes: NodeTypes, private readonly credentialsPermissionChecker: CredentialsPermissionChecker, private readonly instanceSettings: InstanceSettings, private readonly manualExecutionService: ManualExecutionService, private readonly failedRunFactory: FailedRunFactory, private readonly eventService: EventService, private readonly executionsConfig: ExecutionsConfig, private readonly storageConfig: StorageConfig, private readonly externalHooks: ExternalHooks, ) {} /** The process did error */ async processError( error: ExecutionError | ExecutionNotFoundError, startedAt: Date, executionMode: WorkflowExecuteMode, executionId: string, hooks?: ExecutionLifecycleHooks, ) { // This means the execution was probably cancelled and has already // been cleaned up. // // FIXME: This is a quick fix. The proper fix would be to not remove // the execution from the active executions while it's still running. if ( error instanceof ExecutionNotFoundError || error instanceof ExecutionCancelledError || (typeof error.message === 'string' && error.message.includes('cancelled')) ) { return; } this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`); this.errorReporter.error(error, { executionId }); const isQueueMode = this.executionsConfig.mode === 'queue'; // in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled // by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415 if (isQueueMode) { const executionWithoutData = await this.executionRepository.findSingleExecution(executionId, { includeData: false, }); if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') { // false positive, execution was successful return; } } const fullRunData: IRun = { data: createRunExecutionData({ resultData: { error: { ...error, message: error.message, stack: error.stack, }, runData: {}, }, }), finished: false, mode: executionMode, startedAt, stoppedAt: new Date(), status: 'error', storedAt: this.storageConfig.modeTag, }; // Remove from active execution with empty data. That will // set the execution to failed. this.activeExecutions.finalizeExecution(executionId, fullRunData); await hooks?.runHook('workflowExecuteAfter', [fullRunData]); } /** * Persist a failed execution record for an already-registered execution and * finalize it, so a pre-flight failure surfaces as a normal failed run. */ private async failExecution( data: IWorkflowExecutionDataProcess, executionId: string, error: ExecutionError & { node?: INode }, responsePromise?: IDeferredPromise, ): Promise { const runData = this.failedRunFactory.generateFailedExecutionFromError( data.executionMode, error, error.node, ); const lifecycleHooks = getLifecycleHooksForRegularMain(data, executionId); await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]); await lifecycleHooks.runHook('workflowExecuteAfter', [runData]); responsePromise?.reject(error); this.activeExecutions.finalizeExecution(executionId); } /** Run the workflow * @param realtime This is used in queue mode to change the priority of an execution, making sure they are picked up quicker. */ async run( data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, restartExecutionId?: string, responsePromise?: IDeferredPromise, ): Promise { // Establish the execution context before persisting to the DB. // activeExecutions.add() -> executionPersistence.create() writes // data.executionData to the DB; any header masking or runtimeData // population must happen before that write so the persisted record // does not contain raw trigger-item data (e.g. Authorization headers). // The runtimeData early-exit guard in establishExecutionContext keeps // the subsequent worker-side call at workflow-execute.ts idempotent. // Guard on the inner executionData: in queue mode with manual offload // the outer IRunExecutionData is created with `executionData: null` // so the trigger-item stack is undefined here; nothing to mask yet, // the worker will establish context once it populates the stack. let establishContextError: (ExecutionError & { node?: INode }) | undefined; if (data.executionData?.executionData) { // Deliberately lightweight: no pinData, no staticData loading, // no additionalData. establishExecutionContext only needs the // workflow's settings (for redactionPolicy) and node lookups. // runMainProcess() builds its own fully-configured Workflow for // actual execution. const contextWorkflow = new Workflow({ id: data.workflowData.id, name: data.workflowData.name, nodes: data.workflowData.nodes, connections: data.workflowData.connections, active: data.workflowData.activeVersionId !== null, nodeTypes: this.nodeTypes, staticData: data.workflowData.staticData, settings: data.workflowData.settings ?? {}, }); try { await establishExecutionContext( contextWorkflow, data.executionData, undefined, data.executionMode, ); } catch (error) { // Masking may have failed partway through, so the trigger-item // stack can still contain raw header data. Drop it before // activeExecutions.add() persists the execution row. data.executionData.executionData.nodeExecutionStack = []; establishContextError = error as ExecutionError & { node?: INode }; } } // Register a new execution const executionId = await this.activeExecutions.add(data, restartExecutionId); if (establishContextError) { await this.failExecution(data, executionId, establishContextError, responsePromise); return executionId; } const { id: workflowId, nodes } = data.workflowData; try { await this.credentialsPermissionChecker.check(workflowId, nodes); } catch (error) { await this.failExecution(data, executionId, error, responsePromise); return executionId; } if (responsePromise) { this.activeExecutions.attachResponsePromise(executionId, responsePromise); } // Set up streaming heartbeat on the main process that holds the HTTP response. // This must happen BEFORE the queue/local decision because in queue mode the // execution runs on a worker process that has no access to the HTTP response. let heartbeatInterval: NodeJS.Timeout | undefined; if (data.streamingEnabled === true && data.httpResponse) { const res = data.httpResponse; heartbeatInterval = setInterval(() => { if (!res.writableEnded) { res.write(STREAMING_KEEPALIVE_CHUNK); flushResponse(res); } }, STREAMING_HEARTBEAT_INTERVAL_MS); } // @TODO: Reduce to true branch once feature is stable const shouldEnqueue = process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true' ? this.executionsConfig.mode === 'queue' : this.executionsConfig.mode === 'queue' && data.executionMode !== 'manual'; if (shouldEnqueue) { await this.enqueueExecution( executionId, workflowId, data, loadStaticData, realtime, restartExecutionId, ); } else { await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId); } // only run these when not in queue mode or when the execution is manual, // since these calls are now done by the worker directly if ( this.executionsConfig.mode !== 'queue' || this.instanceSettings.instanceType === 'worker' || data.executionMode === 'manual' || data.executionMode === 'chat' ) { const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); postExecutePromise.catch((error) => { if (error instanceof ExecutionCancelledError) return; this.errorReporter.error(error, { extra: { executionId, workflowId }, }); this.logger.error('There was an error in the post-execution promise', { error, executionId, workflowId, workflowName: data.workflowData.name, ...(data.projectId && { projectId: data.projectId }), ...(data.projectName && { projectName: data.projectName }), }); }); } // Clean up the streaming heartbeat when the execution finishes if (heartbeatInterval) { const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); void postExecutePromise.finally(() => { clearInterval(heartbeatInterval); }); } return executionId; } /** Run the workflow in current process */ private async runMainProcess( executionId: string, data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string, ): Promise { const workflowId = data.workflowData.id; if (loadStaticData === true && workflowId) { data.workflowData.staticData = await this.workflowStaticDataService.getStaticDataById(workflowId); } // Soft timeout to stop workflow execution after current running node // Changes were made by adding the `workflowTimeout` to the `additionalData` // So that the timeout will also work for executions with nested workflows. let executionTimeout: NodeJS.Timeout; const workflowSettings = data.workflowData.settings ?? {}; let workflowTimeout = workflowSettings.executionTimeout ?? this.executionsConfig.timeout; // initialize with default if (workflowTimeout > 0) { workflowTimeout = Math.min(workflowTimeout, this.executionsConfig.maxTimeout); } let pinData: IPinData | undefined; if (['manual', 'evaluation'].includes(data.executionMode)) { pinData = data.pinData ?? data.workflowData.pinData; } const workflow = new Workflow({ id: workflowId, name: data.workflowData.name, nodes: data.workflowData.nodes, connections: data.workflowData.connections, active: data.workflowData.activeVersionId !== null, nodeTypes: this.nodeTypes, staticData: data.workflowData.staticData, settings: workflowSettings, pinData, }); const additionalData = await WorkflowExecuteAdditionalData.getBase({ userId: data.userId, workflowId: workflow.id, executionTimeoutTimestamp: workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000, workflowSettings, }); additionalData.restartExecutionId = restartExecutionId; additionalData.streamingEnabled = data.streamingEnabled; additionalData.executionId = executionId; this.logger.debug( `Execution for workflow ${data.workflowData.name} was assigned id ${executionId}`, { executionId }, ); let workflowExecution: PCancelable; await this.executionRepository.setRunning(executionId); // write try { const lifecycleHooks = getLifecycleHooksForRegularMain(data, executionId); additionalData.hooks = lifecycleHooks; lifecycleHooks.addHandler('sendResponse', (response) => { this.activeExecutions.resolveResponsePromise(executionId, response); }); if (data.streamingEnabled) { lifecycleHooks.addHandler('sendChunk', (chunk) => { data.httpResponse?.write(JSON.stringify(chunk) + '\n'); if (data.httpResponse) flushResponse(data.httpResponse); }); } additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ executionId, }); additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({ pushRef: data.pushRef, }); if (data.executionData !== undefined) { this.logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, { executionId, }); const workflowExecute = new WorkflowExecute( additionalData, data.executionMode, data.executionData, ); workflowExecution = workflowExecute.processRunExecutionData(workflow); } else { workflowExecution = this.manualExecutionService.runManually( data, workflow, additionalData, executionId, pinData, ); } this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); if (workflowTimeout > 0) { let timeout = Math.min(workflowTimeout, this.executionsConfig.maxTimeout) * 1000; // as milliseconds if (data.startedAt && data.startedAt instanceof Date) { // If startedAt is set, we calculate the timeout based on the startedAt time // This is useful for executions that were waiting in a waiting state // and we want to ensure the timeout is relative to when the execution started. const now = Date.now(); timeout = Math.max(timeout - (now - data.startedAt.getTime()), 0); } if (timeout === 0) { this.activeExecutions.stopExecution( executionId, new TimeoutExecutionCancelledError(executionId), ); } else { executionTimeout = setTimeout(() => { void this.activeExecutions.stopExecution( executionId, new TimeoutExecutionCancelledError(executionId), ); }, timeout); } } workflowExecution .then((fullRunData) => { clearTimeout(executionTimeout); if (workflowExecution.isCanceled) { fullRunData.finished = false; } this.activeExecutions.resolveExecutionResponsePromise(executionId); this.activeExecutions.finalizeExecution(executionId, fullRunData); }) .catch( async (error) => await this.processError( error, new Date(), data.executionMode, executionId, additionalData.hooks, ), ); } catch (error) { await this.processError( error, new Date(), data.executionMode, executionId, additionalData.hooks, ); throw error; } } private async enqueueExecution( executionId: string, workflowId: string, data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, restartExecutionId?: string, ): Promise { const jobData: JobData = { workflowId, executionId, loadStaticData: !!loadStaticData, pushRef: data.pushRef, streamingEnabled: data.streamingEnabled, restartExecutionId, projectId: data.projectId, projectName: data.projectName, // MCP-specific fields for queue mode support isMcpExecution: data.isMcpExecution, mcpType: data.mcpType, mcpSessionId: data.mcpSessionId, mcpMessageId: data.mcpMessageId, mcpToolCall: data.mcpToolCall, }; if (!this.scalingService) { const { ScalingService } = await import('@/scaling/scaling.service'); this.scalingService = Container.get(ScalingService); await this.scalingService.setupQueue(); } // TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds. // Check if they get retried by default and how often. let job: Job; let lifecycleHooks: ExecutionLifecycleHooks; try { job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); lifecycleHooks = getLifecycleHooksForScalingMain(data, executionId); // Normally also workflow should be supplied here but as it only used for sending // data to editor-UI is not needed. await lifecycleHooks.runHook('workflowExecuteBefore', [undefined, data.executionData]); } catch (error) { // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId); await this.processError(error, new Date(), data.executionMode, executionId, lifecycleHooks); throw error; } const workflowExecution: PCancelable = new PCancelable( async (resolve, reject, onCancel) => { onCancel.shouldReject = false; onCancel(async () => { await this.scalingService.stopJob(job); // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId); const error = new ManualExecutionCancelledError(executionId); await this.processError( error, new Date(), data.executionMode, executionId, lifecycleHooks, ); reject(error); }); try { await job.finished(); } catch (error) { if ( error instanceof Error && typeof error.message === 'string' && error.message.includes('job stalled more than maxStalledCount') ) { error = new MaxStalledCountError(error); this.eventService.emit('job-stalled', { executionId: job.data.executionId, workflowId: job.data.workflowId, hostId: this.instanceSettings.hostId, jobId: job.id.toString(), }); } // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the // "workflowExecuteAfter" which we require. const lifecycleHooks = getLifecycleHooksForScalingWorker(data, executionId); await this.processError( error, new Date(), data.executionMode, executionId, lifecycleHooks, ); this.scalingService.popJobResult(executionId); return reject(error); } const jobResult = this.scalingService.popJobResult(executionId); let runData: IRun; if ( !jobResult || this.needsFullExecutionData(data.executionMode, executionId, data.forceFullExecutionData) ) { const fullExecutionData = await this.executionRepository.findSingleExecution( executionId, { includeData: true, unflattenData: true, }, ); if (!fullExecutionData) { return reject(new Error(`Could not find execution with id "${executionId}"`)); } runData = { finished: fullExecutionData.finished, mode: fullExecutionData.mode, startedAt: fullExecutionData.startedAt, stoppedAt: fullExecutionData.stoppedAt, status: fullExecutionData.status, waitTill: fullExecutionData.waitTill, data: fullExecutionData.data, jobId: job.id.toString(), storedAt: fullExecutionData.storedAt, }; } else { runData = { finished: jobResult.success, mode: data.executionMode, startedAt: jobResult.startedAt, stoppedAt: jobResult.stoppedAt, status: jobResult.status, waitTill: jobResult.waitTill, data: createRunExecutionData({ resultData: { runData: {}, lastNodeExecuted: jobResult.lastNodeExecuted, error: jobResult.error, metadata: jobResult.metadata, }, }), jobId: job.id.toString(), storedAt: this.storageConfig.modeTag, }; } this.activeExecutions.finalizeExecution(executionId, runData); // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed. await lifecycleHooks.runHook('workflowExecuteAfter', [runData]); resolve(runData); }, ); workflowExecution.catch(() => { // We `reject` this promise if the execution fails // but the error is handled already by processError // So we're just preventing crashes here. }); this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); } /** * Whether main must retrieve full execution data from the DB on job completion. * * Full data is needed when: * - `integrated` mode: parent workflow needs child execution output data * - `lastNode` response mode: webhook response is built from the last node's output * - `workflow.postExecute` hook: external hooks receive full execution data * * In all other cases we can skip the DB fetch and use the lightweight * result summary sent by the worker via the job progress message. */ private needsFullExecutionData( executionMode: WorkflowExecuteMode, executionId: string, forceFullExecutionData?: boolean, ): boolean { if (forceFullExecutionData) return true; if (!process.env.N8N_MINIMIZE_EXECUTION_DATA_FETCHING) return true; return ( executionMode === 'integrated' || this.activeExecutions.getResponseMode(executionId) === 'lastNode' || this.externalHooks.hasHook('workflow.postExecute') ); } }