diff --git a/packages/cli/src/webhooks/__tests__/webhook-helpers.test.ts b/packages/cli/src/webhooks/__tests__/webhook-helpers.test.ts index e383f1a7a01..0298a3d4c7b 100644 --- a/packages/cli/src/webhooks/__tests__/webhook-helpers.test.ts +++ b/packages/cli/src/webhooks/__tests__/webhook-helpers.test.ts @@ -1,10 +1,36 @@ +import type express from 'express'; import { mock, type MockProxy } from 'jest-mock-extended'; -import type { Workflow, INode, IDataObject } from 'n8n-workflow'; -import { FORM_NODE_TYPE, WAIT_NODE_TYPE } from 'n8n-workflow'; +import { BinaryDataService, ErrorReporter, Logger } from 'n8n-core'; +import type { + Workflow, + INode, + IDataObject, + IWebhookResponseData, + IDeferredPromise, + IN8nHttpFullResponse, + IWorkflowBase, + IRunExecutionData, + IExecuteData, +} from 'n8n-workflow'; +import { createDeferredPromise, FORM_NODE_TYPE, WAIT_NODE_TYPE } from 'n8n-workflow'; +import type { Readable } from 'stream'; +import { finished } from 'stream/promises'; -import { autoDetectResponseMode, handleFormRedirectionCase } from '../webhook-helpers'; +import { mockInstance } from '@test/mocking'; + +import { + autoDetectResponseMode, + handleFormRedirectionCase, + getResponseOnReceived, + setupResponseNodePromise, + prepareExecutionData, +} from '../webhook-helpers'; import type { IWebhookResponseCallbackData } from '../webhook.types'; +jest.mock('stream/promises', () => ({ + finished: jest.fn(), +})); + describe('autoDetectResponseMode', () => { let workflow: MockProxy; @@ -95,3 +121,287 @@ describe('handleFormRedirectionCase', () => { expect(result).toEqual(data); }); }); + +describe('getResponseOnReceived', () => { + const responseCode = 200; + const webhookResultData = mock(); + + beforeEach(() => { + jest.resetAllMocks(); + }); + + test('should return response with no data when responseData is "noData"', () => { + const callbackData = getResponseOnReceived('noData', webhookResultData, responseCode); + + expect(callbackData).toEqual({ responseCode }); + }); + + test('should return response with responseData when it is defined', () => { + const responseData = JSON.stringify({ foo: 'bar' }); + + const callbackData = getResponseOnReceived(responseData, webhookResultData, responseCode); + + expect(callbackData).toEqual({ data: responseData, responseCode }); + }); + + test('should return response with webhookResponse when responseData is falsy but webhookResponse exists', () => { + const webhookResponse = { success: true }; + webhookResultData.webhookResponse = webhookResponse; + + const callbackData = getResponseOnReceived(undefined, webhookResultData, responseCode); + + expect(callbackData).toEqual({ data: webhookResponse, responseCode }); + }); + + test('should return default response message when responseData and webhookResponse are falsy', () => { + webhookResultData.webhookResponse = undefined; + + const callbackData = getResponseOnReceived(undefined, webhookResultData, responseCode); + + expect(callbackData).toEqual({ + data: { message: 'Workflow was started' }, + responseCode, + }); + }); +}); + +describe('setupResponseNodePromise', () => { + const workflowId = 'test-workflow-id'; + const executionId = 'test-execution-id'; + const res = mock(); + const responseCallback = jest.fn(); + const workflowStartNode = mock(); + const workflow = mock({ id: workflowId }); + const binaryDataService = mockInstance(BinaryDataService); + const errorReporter = mockInstance(ErrorReporter); + const logger = mockInstance(Logger); + + let responsePromise: IDeferredPromise; + + beforeEach(() => { + jest.resetAllMocks(); + + responsePromise = createDeferredPromise(); + + res.header.mockReturnValue(res); + res.end.mockReturnValue(res); + }); + + test('should handle regular response object', async () => { + setupResponseNodePromise( + responsePromise, + res, + responseCallback, + workflowStartNode, + executionId, + workflow, + ); + + responsePromise.resolve({ + body: { data: 'test data' }, + headers: { 'content-type': 'application/json' }, + statusCode: 200, + }); + await new Promise(process.nextTick); + + expect(responseCallback).toHaveBeenCalledWith(null, { + data: { data: 'test data' }, + headers: { 'content-type': 'application/json' }, + responseCode: 200, + }); + expect(res.end).toHaveBeenCalled(); + }); + + test('should handle binary data with ID', async () => { + const mockStream = mock(); + binaryDataService.getAsStream.mockResolvedValue(mockStream); + + setupResponseNodePromise( + responsePromise, + res, + responseCallback, + workflowStartNode, + executionId, + workflow, + ); + + responsePromise.resolve({ + body: { binaryData: { id: 'binary-123' } }, + headers: { 'content-type': 'image/jpeg' }, + statusCode: 200, + }); + await new Promise(process.nextTick); + + expect(binaryDataService.getAsStream).toHaveBeenCalledWith('binary-123'); + expect(res.header).toHaveBeenCalledWith({ 'content-type': 'image/jpeg' }); + expect(mockStream.pipe).toHaveBeenCalledWith(res, { end: false }); + expect(finished).toHaveBeenCalledWith(mockStream); + expect(responseCallback).toHaveBeenCalledWith(null, { noWebhookResponse: true }); + }); + + test('should handle buffer response', async () => { + setupResponseNodePromise( + responsePromise, + res, + responseCallback, + workflowStartNode, + executionId, + workflow, + ); + + const buffer = Buffer.from('test buffer'); + responsePromise.resolve({ + body: buffer, + headers: { 'content-type': 'text/plain' }, + statusCode: 200, + }); + await new Promise(process.nextTick); + + expect(res.header).toHaveBeenCalledWith({ 'content-type': 'text/plain' }); + expect(res.end).toHaveBeenCalledWith(buffer); + expect(responseCallback).toHaveBeenCalledWith(null, { noWebhookResponse: true }); + }); + + test('should handle errors properly', async () => { + setupResponseNodePromise( + responsePromise, + res, + responseCallback, + workflowStartNode, + executionId, + workflow, + ); + + const error = new Error('Test error'); + responsePromise.reject(error); + await new Promise(process.nextTick); + + expect(errorReporter.error).toHaveBeenCalledWith(error); + expect(logger.error).toHaveBeenCalledWith( + `Error with Webhook-Response for execution "${executionId}": "${error.message}"`, + { executionId, workflowId }, + ); + expect(responseCallback).toHaveBeenCalledWith(error, {}); + }); +}); + +describe('prepareExecutionData', () => { + const workflowStartNode = mock({ name: 'Start' }); + const webhookResultData: IWebhookResponseData = { + workflowData: [[{ json: { data: 'test' } }]], + }; + const workflowData = mock({ + id: 'workflow1', + pinData: { nodeA: [{ json: { pinned: true } }] }, + }); + + test('should create new execution data when not provided', () => { + const { runExecutionData, pinData } = prepareExecutionData( + 'manual', + workflowStartNode, + webhookResultData, + undefined, + ); + + const nodeExecuteData = runExecutionData.executionData?.nodeExecutionStack?.[0]; + expect(nodeExecuteData).toBeDefined(); + expect(nodeExecuteData?.node).toBe(workflowStartNode); + expect(nodeExecuteData?.data.main).toBe(webhookResultData.workflowData); + expect(pinData).toBeUndefined(); + }); + + test('should update existing runExecutionData when executionId is defined', () => { + const executionId = 'test-execution-id'; + const nodeExecutionStack: IExecuteData[] = [ + { + node: workflowStartNode, + data: { main: [[{ json: { oldData: true } }]] }, + source: null, + }, + ]; + const existingRunExecutionData = { + startData: {}, + resultData: { runData: {} }, + executionData: { + contextData: {}, + nodeExecutionStack, + waitingExecution: {}, + }, + } as IRunExecutionData; + + prepareExecutionData( + 'manual', + workflowStartNode, + webhookResultData, + existingRunExecutionData, + undefined, + undefined, + executionId, + ); + + expect(nodeExecutionStack[0]?.data.main).toBe(webhookResultData.workflowData); + }); + + test('should set destination node when provided', () => { + const { runExecutionData } = prepareExecutionData( + 'manual', + workflowStartNode, + webhookResultData, + undefined, + {}, + 'targetNode', + ); + + expect(runExecutionData.startData?.destinationNode).toBe('targetNode'); + }); + + test('should update execution data with execution data merge', () => { + const runExecutionDataMerge = { + resultData: { + error: { message: 'Test error' }, + }, + }; + + const { runExecutionData } = prepareExecutionData( + 'manual', + workflowStartNode, + webhookResultData, + undefined, + runExecutionDataMerge, + ); + + expect(runExecutionData.resultData.error).toEqual({ message: 'Test error' }); + }); + + test('should set pinData when execution mode is manual', () => { + const { runExecutionData, pinData } = prepareExecutionData( + 'manual', + workflowStartNode, + webhookResultData, + undefined, + {}, + undefined, + undefined, + workflowData, + ); + + expect(pinData).toBe(workflowData.pinData); + expect(runExecutionData.resultData.pinData).toBe(workflowData.pinData); + }); + + test('should not set pinData when execution mode is not manual or evaluation', () => { + const { runExecutionData, pinData } = prepareExecutionData( + 'webhook', + workflowStartNode, + webhookResultData, + undefined, + {}, + undefined, + undefined, + workflowData, + ); + + expect(pinData).toBeUndefined(); + expect(runExecutionData.resultData.pinData).toBeUndefined(); + }); +}); diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index a41a778e46c..4b60d31a76d 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -30,15 +30,17 @@ import type { WorkflowExecuteMode, IWorkflowExecutionDataProcess, IWorkflowBase, + WebhookResponseData, } from 'n8n-workflow'; import { - ApplicationError, BINARY_ENCODING, createDeferredPromise, ExecutionCancelledError, FORM_NODE_TYPE, FORM_TRIGGER_NODE_TYPE, NodeOperationError, + OperationalError, + UnexpectedError, WAIT_NODE_TYPE, } from 'n8n-workflow'; import assert from 'node:assert'; @@ -107,7 +109,7 @@ export function autoDetectResponseMode( workflowStartNode: INode, workflow: Workflow, method: string, -) { +): WebhookResponseMode | undefined { if (workflowStartNode.type === FORM_TRIGGER_NODE_TYPE && method === 'POST') { const connectedNodes = workflow.getChildNodes(workflowStartNode.name); @@ -182,6 +184,135 @@ export const handleFormRedirectionCase = ( const { formDataFileSizeMax } = Container.get(GlobalConfig).endpoints; const parseFormData = createMultiFormDataParser(formDataFileSizeMax); +/** Return webhook response when responseMode is set to "onReceived" */ +export function getResponseOnReceived( + responseData: WebhookResponseData | string | undefined, + webhookResultData: IWebhookResponseData, + responseCode: number, +): IWebhookResponseCallbackData { + const callbackData: IWebhookResponseCallbackData = { responseCode }; + // Return response directly and do not wait for the workflow to finish + if (responseData === 'noData') { + // Return without data + } else if (responseData) { + // Return the data specified in the response data option + callbackData.data = responseData as unknown as IDataObject; + } else if (webhookResultData.webhookResponse !== undefined) { + // Data to respond with is given + callbackData.data = webhookResultData.webhookResponse; + } else { + callbackData.data = { message: 'Workflow was started' }; + } + return callbackData; +} + +export function setupResponseNodePromise( + responsePromise: IDeferredPromise, + res: express.Response, + responseCallback: (error: Error | null, data: IWebhookResponseCallbackData) => void, + workflowStartNode: INode, + executionId: string | undefined, + workflow: Workflow, +): void { + void responsePromise.promise + .then(async (response: IN8nHttpFullResponse) => { + const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData; + if (binaryData?.id) { + res.header(response.headers); + const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id); + stream.pipe(res, { end: false }); + await finished(stream); + responseCallback(null, { noWebhookResponse: true }); + } else if (Buffer.isBuffer(response.body)) { + res.header(response.headers); + res.end(response.body); + responseCallback(null, { noWebhookResponse: true }); + } else { + // TODO: This probably needs some more changes depending on the options on the + // Webhook Response node + + let data: IWebhookResponseCallbackData = { + data: response.body as IDataObject, + headers: response.headers, + responseCode: response.statusCode, + }; + + data = handleFormRedirectionCase(data, workflowStartNode); + + responseCallback(null, data); + } + + process.nextTick(() => res.end()); + }) + .catch(async (error) => { + Container.get(ErrorReporter).error(error); + Container.get(Logger).error( + `Error with Webhook-Response for execution "${executionId}": "${error.message}"`, + { executionId, workflowId: workflow.id }, + ); + responseCallback(error, {}); + }); +} + +export function prepareExecutionData( + executionMode: WorkflowExecuteMode, + workflowStartNode: INode, + webhookResultData: IWebhookResponseData, + runExecutionData: IRunExecutionData | undefined, + runExecutionDataMerge: object = {}, + destinationNode?: string, + executionId?: string, + workflowData?: IWorkflowBase, +): { runExecutionData: IRunExecutionData; pinData: IPinData | undefined } { + // Initialize the data of the webhook node + const nodeExecutionStack: IExecuteData[] = [ + { + node: workflowStartNode, + data: { + main: webhookResultData.workflowData ?? [], + }, + source: null, + }, + ]; + + runExecutionData ??= { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack, + waitingExecution: {}, + }, + } as IRunExecutionData; + + if (destinationNode && runExecutionData.startData) { + runExecutionData.startData.destinationNode = destinationNode; + } + + if (executionId !== undefined) { + // Set the data the webhook node did return on the waiting node if executionId + // already exists as it means that we are restarting an existing execution. + runExecutionData.executionData!.nodeExecutionStack[0].data.main = + webhookResultData.workflowData ?? []; + } + + if (Object.keys(runExecutionDataMerge).length !== 0) { + // If data to merge got defined add it to the execution data + Object.assign(runExecutionData, runExecutionDataMerge); + } + + let pinData: IPinData | undefined; + const usePinData = ['manual', 'evaluation'].includes(executionMode); + if (usePinData) { + pinData = workflowData?.pinData; + runExecutionData.resultData.pinData = pinData; + } + + return { runExecutionData, pinData }; +} + /** * Executes a webhook */ @@ -205,11 +336,6 @@ export async function executeWebhook( workflowStartNode.type, workflowStartNode.typeVersion, ); - if (nodeType === undefined) { - const errorMessage = `The type of the webhook node "${workflowStartNode.name}" is not known`; - responseCallback(new ApplicationError(errorMessage), {}); - throw new InternalServerError(errorMessage); - } const additionalKeys: IWorkflowDataProxyAdditionalKeys = { $executionId: executionId, @@ -229,22 +355,17 @@ export async function executeWebhook( additionalData.executionId = executionId; } - // Get the responseMode - let responseMode; - //check if response mode should be set automatically, e.g. multipage form - responseMode = autoDetectResponseMode(workflowStartNode, workflow, req.method); - - if (!responseMode) { - responseMode = workflow.expression.getSimpleParameterValue( + const responseMode = + autoDetectResponseMode(workflowStartNode, workflow, req.method) ?? + (workflow.expression.getSimpleParameterValue( workflowStartNode, webhookData.webhookDescription.responseMode, executionMode, additionalKeys, undefined, 'onReceived', - ) as WebhookResponseMode; - } + ) as WebhookResponseMode); const responseCode = workflow.expression.getSimpleParameterValue( workflowStartNode, @@ -255,6 +376,9 @@ export async function executeWebhook( 200, ) as number; + // This parameter is used for two different purposes: + // 1. as arbitrary string input defined in the workflow in the "respond immediately" mode, + // 2. as well as WebhookResponseData config in all the other modes const responseData = workflow.expression.getComplexParameterValue( workflowStartNode, webhookData.webhookDescription.responseData, @@ -262,14 +386,14 @@ export async function executeWebhook( additionalKeys, undefined, 'firstEntryJson', - ); + ) as WebhookResponseData | string | undefined; if (!['onReceived', 'lastNode', 'responseNode', 'formPage'].includes(responseMode)) { // If the mode is not known we error. Is probably best like that instead of using // the default that people know as early as possible (probably already testing phase) // that something does not resolve properly. const errorMessage = `The response mode '${responseMode}' is not valid!`; - responseCallback(new ApplicationError(errorMessage), {}); + responseCallback(new UnexpectedError(errorMessage), {}); throw new InternalServerError(errorMessage); } @@ -356,7 +480,7 @@ export async function executeWebhook( }, }); - responseCallback(new ApplicationError(errorMessage), {}); + responseCallback(new UnexpectedError(errorMessage), {}); didSendResponse = true; // Add error to execution data that it can be logged and send to Editor-UI @@ -380,10 +504,6 @@ export async function executeWebhook( }; } - const additionalKeys: IWorkflowDataProxyAdditionalKeys = { - $executionId: executionId, - }; - if (webhookData.webhookDescription.responseHeaders !== undefined) { const responseHeaders = workflow.expression.getComplexParameterValue( workflowStartNode, @@ -446,82 +566,23 @@ export async function executeWebhook( // Now that we know that the workflow should run we can return the default response // directly if responseMode it set to "onReceived" and a response should be sent if (responseMode === 'onReceived' && !didSendResponse) { - // Return response directly and do not wait for the workflow to finish - if (responseData === 'noData') { - // Return without data - responseCallback(null, { - responseCode, - }); - } else if (responseData) { - // Return the data specified in the response data option - responseCallback(null, { - data: responseData as IDataObject, - responseCode, - }); - } else if (webhookResultData.webhookResponse !== undefined) { - // Data to respond with is given - responseCallback(null, { - data: webhookResultData.webhookResponse, - responseCode, - }); - } else { - responseCallback(null, { - data: { - message: 'Workflow was started', - }, - responseCode, - }); - } - + const callbackData = getResponseOnReceived(responseData, webhookResultData, responseCode); + responseCallback(null, callbackData); didSendResponse = true; } - // Initialize the data of the webhook node - const nodeExecutionStack: IExecuteData[] = []; - nodeExecutionStack.push({ - node: workflowStartNode, - data: { - main: webhookResultData.workflowData, - }, - source: null, - }); - - runExecutionData = - runExecutionData || - ({ - startData: {}, - resultData: { - runData: {}, - }, - executionData: { - contextData: {}, - nodeExecutionStack, - waitingExecution: {}, - }, - } as IRunExecutionData); - - if (destinationNode && runExecutionData.startData) { - runExecutionData.startData.destinationNode = destinationNode; - } - - if (executionId !== undefined) { - // Set the data the webhook node did return on the waiting node if executionId - // already exists as it means that we are restarting an existing execution. - runExecutionData.executionData!.nodeExecutionStack[0].data.main = - webhookResultData.workflowData; - } - - if (Object.keys(runExecutionDataMerge).length !== 0) { - // If data to merge got defined add it to the execution data - Object.assign(runExecutionData, runExecutionDataMerge); - } - - let pinData: IPinData | undefined; - const usePinData = ['manual', 'evaluation'].includes(executionMode); - if (usePinData) { - pinData = workflowData.pinData; - runExecutionData.resultData.pinData = pinData; - } + // Prepare execution data + const { runExecutionData: preparedRunExecutionData, pinData } = prepareExecutionData( + executionMode, + workflowStartNode, + webhookResultData, + runExecutionData, + runExecutionDataMerge, + destinationNode, + executionId, + workflowData, + ); + runExecutionData = preparedRunExecutionData; const runData: IWorkflowExecutionDataProcess = { executionMode, @@ -540,49 +601,14 @@ export async function executeWebhook( let responsePromise: IDeferredPromise | undefined; if (responseMode === 'responseNode') { responsePromise = createDeferredPromise(); - responsePromise.promise - .then(async (response: IN8nHttpFullResponse) => { - if (didSendResponse) { - return; - } - - const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData; - if (binaryData?.id) { - res.header(response.headers); - const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id); - stream.pipe(res, { end: false }); - await finished(stream); - responseCallback(null, { noWebhookResponse: true }); - } else if (Buffer.isBuffer(response.body)) { - res.header(response.headers); - res.end(response.body); - responseCallback(null, { noWebhookResponse: true }); - } else { - // TODO: This probably needs some more changes depending on the options on the - // Webhook Response node - - let data: IWebhookResponseCallbackData = { - data: response.body as IDataObject, - headers: response.headers, - responseCode: response.statusCode, - }; - - data = handleFormRedirectionCase(data, workflowStartNode); - - responseCallback(null, data); - } - - process.nextTick(() => res.end()); - didSendResponse = true; - }) - .catch(async (error) => { - Container.get(ErrorReporter).error(error); - Container.get(Logger).error( - `Error with Webhook-Response for execution "${executionId}": "${error.message}"`, - { executionId, workflowId: workflow.id }, - ); - responseCallback(error, {}); - }); + setupResponseNodePromise( + responsePromise, + res, + responseCallback, + workflowStartNode, + executionId, + workflow, + ); } if ( @@ -645,7 +671,7 @@ export async function executeWebhook( return undefined; } - if (usePinData) { + if (pinData) { data.data.resultData.pinData = pinData; } @@ -683,10 +709,6 @@ export async function executeWebhook( return data; } - const additionalKeys: IWorkflowDataProxyAdditionalKeys = { - $executionId: executionId, - }; - if (!didSendResponse) { let data: IDataObject | IDataObject[] | undefined; @@ -694,7 +716,7 @@ export async function executeWebhook( // Return the JSON data of the first entry if (returnData.data!.main[0]![0] === undefined) { - responseCallback(new ApplicationError('No item to return got found'), {}); + responseCallback(new OperationalError('No item to return got found'), {}); didSendResponse = true; return undefined; } @@ -748,13 +770,13 @@ export async function executeWebhook( data = returnData.data!.main[0]![0]; if (data === undefined) { - responseCallback(new ApplicationError('No item was found to return'), {}); + responseCallback(new OperationalError('No item was found to return'), {}); didSendResponse = true; return undefined; } if (data.binary === undefined) { - responseCallback(new ApplicationError('No binary data was found to return'), {}); + responseCallback(new OperationalError('No binary data was found to return'), {}); didSendResponse = true; return undefined; } @@ -770,7 +792,7 @@ export async function executeWebhook( if (responseBinaryPropertyName === undefined && !didSendResponse) { responseCallback( - new ApplicationError("No 'responseBinaryPropertyName' is set"), + new OperationalError("No 'responseBinaryPropertyName' is set"), {}, ); didSendResponse = true; @@ -781,7 +803,7 @@ export async function executeWebhook( ]; if (binaryData === undefined && !didSendResponse) { responseCallback( - new ApplicationError( + new OperationalError( `The binary property '${responseBinaryPropertyName}' which should be returned does not exist`, ), {}, @@ -830,8 +852,7 @@ export async function executeWebhook( .catch((e) => { if (!didSendResponse) { responseCallback( - new ApplicationError('There was a problem executing the workflow', { - level: 'warning', + new OperationalError('There was a problem executing the workflow', { cause: e, }), {}, @@ -848,8 +869,7 @@ export async function executeWebhook( const error = e instanceof UnprocessableRequestError ? e - : new ApplicationError('There was a problem executing the workflow', { - level: 'warning', + : new OperationalError('There was a problem executing the workflow', { cause: e, }); if (didSendResponse) throw error;