From 6f368c326d219f23cd508c2cf295a804988d15ec Mon Sep 17 00:00:00 2001 From: Benjamin Schroth <68321970+schrothbn@users.noreply.github.com> Date: Fri, 10 Oct 2025 15:15:51 +0200 Subject: [PATCH] fix(core): Retain source overwrite in paired items in tool executions (#20629) Co-authored-by: Danny Martini --- .../__tests__/mock-node-types.ts | 2 +- ...process-process-run-execution-data.test.ts | 281 +++++++++++++++++- .../src/execution-engine/requests-response.ts | 17 +- .../src/execution-engine/workflow-execute.ts | 23 ++ packages/workflow/src/interfaces.ts | 1 + 5 files changed, 319 insertions(+), 5 deletions(-) diff --git a/packages/core/src/execution-engine/__tests__/mock-node-types.ts b/packages/core/src/execution-engine/__tests__/mock-node-types.ts index bd9647c862e..f340ec4ddae 100644 --- a/packages/core/src/execution-engine/__tests__/mock-node-types.ts +++ b/packages/core/src/execution-engine/__tests__/mock-node-types.ts @@ -140,7 +140,7 @@ export function modifyNode(originalNode: INodeType): NodeModifier { // Handle function responses (for Response parameter injection) if (typeof predefinedResponse === 'function') { - return predefinedResponse(response); + return predefinedResponse.call(this, response); } return predefinedResponse; diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts index 4081c6ba1e4..bfd48bc7130 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts @@ -5,6 +5,9 @@ import type { IWorkflowExecuteAdditionalData, EngineResponse, WorkflowExecuteMode, + IExecuteFunctions, + IPairedItemData, + INodeExecutionData, } from 'n8n-workflow'; import { ApplicationError } from 'n8n-workflow'; @@ -352,13 +355,44 @@ describe('processRunExecutionData', () => { // Tool nodes should have been added to runData with inputOverride expect(runData[tool1Node.name]).toHaveLength(1); + expect(runData[tool1Node.name][0].inputOverride).toEqual({ - ai_tool: [[{ json: { query: 'test input', toolCallId: 'action_1' } }]], + ai_tool: [ + [ + { + json: { query: 'test input', toolCallId: 'action_1' }, + pairedItem: { + input: 0, + item: 0, + sourceOverwrite: { + previousNode: 'Start', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + }, + }, + ], + ], }); expect(runData[tool2Node.name]).toHaveLength(1); expect(runData[tool2Node.name][0].inputOverride).toEqual({ - ai_tool: [[{ json: { data: 'another input', toolCallId: 'action_2' } }]], + ai_tool: [ + [ + { + json: { data: 'another input', toolCallId: 'action_2' }, + pairedItem: { + input: 0, + item: 0, + sourceOverwrite: { + previousNode: 'Start', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + }, + }, + ], + ], }); // Tools should have executed successfully @@ -455,7 +489,22 @@ describe('processRunExecutionData', () => { // 2. Tool nodes get added to runData with inputOverride but are never actually executed expect(runData[tool1Node.name]).toHaveLength(1); expect(runData[tool1Node.name][0].inputOverride).toEqual({ - ai_tool: [[{ json: { query: 'test input', toolCallId: 'action_1' } }]], + ai_tool: [ + [ + { + json: { query: 'test input', toolCallId: 'action_1' }, + pairedItem: { + input: 0, + item: 0, + sourceOverwrite: { + previousNode: 'nodeWithRequests', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + }, + }, + ], + ], }); // The tool node should not have execution data since it was never run expect(runData[tool1Node.name][0].data).toBeUndefined(); @@ -630,4 +679,230 @@ describe('processRunExecutionData', () => { expect(result.data.resultData.lastNodeExecuted).toBeUndefined(); }); }); + + describe('pairedItem sourceOverwrite handling', () => { + test('preserves sourceOverwrite for tools to enable expression resolution', async () => { + // Test: DataNode → AgentNode → ToolNode where ToolNode accesses DataNode via expressions + const dataNodeOutput = { field: 'testValue', nested: { value: 42 } }; + const dataNode = createNodeData({ name: 'DataNode', type: types.passThrough }); + + const toolNodeType = modifyNode(passThroughNode) + .return(function (this: IExecuteFunctions, response?: EngineResponse) { + try { + const proxy = this.getWorkflowDataProxy(0); + const connectionInputData = + (this as IExecuteFunctions & { connectionInputData: INodeExecutionData[] }) + .connectionInputData || []; + const firstItem = connectionInputData[0]; + const pairedItem = (firstItem?.pairedItem as IPairedItemData) ?? { item: 0 }; + const sourceData = this.getExecuteData().source?.main?.[0] ?? null; + + const dataNodeItem = proxy.$getPairedItem('DataNode', sourceData, pairedItem); + const fieldValue = dataNodeItem?.json?.field; + const nestedValue = (dataNodeItem?.json?.nested as IDataObject)?.value; + + return [ + [ + { + json: { + toolResult: 'Tool executed successfully', + dataNodeField: fieldValue, + dataNodeNested: nestedValue, + response, + }, + }, + ], + ]; + } catch (error) { + return [ + [ + { + json: { + toolResult: 'Failed to access DataNode', + error: (error as Error).message, + response, + }, + }, + ], + ]; + } + }) + .done(); + const toolNode = createNodeData({ name: 'ToolNode', type: 'toolNodeType' }); + + const agentNodeType = modifyNode(passThroughNode) + .return({ + actions: [ + { + actionType: 'ExecutionNodeAction', + nodeName: toolNode.name, + input: { query: 'test query' }, + type: 'ai_tool', + id: 'tool_action_1', + metadata: {}, + }, + ], + metadata: { requestId: 'test_agent_request' }, + }) + .return((response?: EngineResponse) => { + return [[{ json: { agentResult: 'Agent completed', response } }]]; + }) + .done(); + const agentNode = createNodeData({ name: 'AgentNode', type: 'agentNodeType' }); + + const customNodeTypes = NodeTypes({ + ...nodeTypeArguments, + agentNodeType: { type: agentNodeType, sourcePath: '' }, + toolNodeType: { type: toolNodeType, sourcePath: '' }, + }); + + const workflow = new DirectedGraph() + .addNodes(dataNode, agentNode, toolNode) + .addConnections({ from: dataNode, to: agentNode }) + .addConnections({ from: toolNode, to: agentNode, type: 'ai_tool' }) + .toWorkflow({ + name: '', + active: false, + nodeTypes: customNodeTypes, + settings: { executionOrder: 'v1' }, + }); + + const taskDataConnection = { main: [[{ json: dataNodeOutput }]] }; + const executionData: IRunExecutionData = { + startData: { startNodes: [{ name: dataNode.name, sourceData: null }] }, + resultData: { runData: {} }, + executionData: { + contextData: {}, + nodeExecutionStack: [{ data: taskDataConnection, node: dataNode, source: null }], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + + const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData); + + const result = await workflowExecute.processRunExecutionData(workflow); + const runData = result.data.resultData.runData; + + // Verify preserveSourceOverwrite metadata is set + expect(runData[toolNode.name][0].metadata?.preserveSourceOverwrite).toBeDefined(); + + // Verify sourceOverwrite points to DataNode + const toolInput = runData[toolNode.name][0].inputOverride?.ai_tool?.[0]?.[0]; + expect(toolInput?.pairedItem).toBeDefined(); + if (typeof toolInput?.pairedItem === 'object' && !Array.isArray(toolInput.pairedItem)) { + expect(toolInput.pairedItem.sourceOverwrite?.previousNode).toBe(dataNode.name); + } + + // Verify tool successfully accessed DataNode data via sourceOverwrite + const toolOutput = runData[toolNode.name][0].data?.ai_tool?.[0]?.[0]?.json; + expect(toolOutput?.toolResult).toBe('Tool executed successfully'); + expect(toolOutput?.dataNodeField).toBe('testValue'); + expect(toolOutput?.dataNodeNested).toBe(42); + expect(toolOutput).not.toHaveProperty('error'); + }); + + test('sourceOverwrite works correctly in loop scenarios', async () => { + // Test: TriggerNode → LoopNode → DataNode → IFNode + // IFNode evaluates $('DataNode').item.json.email + const triggerData = { email: 'test@example.com', name: 'Test User' }; + const triggerNode = createNodeData({ name: 'TriggerNode', type: types.passThrough }); + + let loopIteration = 0; + const loopNodeType = modifyNode(passThroughNode) + .return(function (this: IExecuteFunctions) { + const items = this.getInputData(); + loopIteration++; + + return [ + items.map((item, index) => ({ + json: item.json, + pairedItem: { + item: index, + input: 0, + sourceOverwrite: { + previousNode: triggerNode.name, + previousNodeOutput: 0, + previousNodeRun: 0, + }, + }, + })), + ]; + }) + .done(); + const loopNode = createNodeData({ name: 'LoopNode', type: 'loopNodeType' }); + const dataNode = createNodeData({ name: 'DataNode', type: types.passThrough }); + + let expressionError: Error | undefined; + const ifNodeType = modifyNode(passThroughNode) + .return(function (this: IExecuteFunctions) { + try { + const proxy = this.getWorkflowDataProxy(0); + const connectionInputData = + (this as IExecuteFunctions & { connectionInputData: INodeExecutionData[] }) + .connectionInputData ?? []; + const firstItem = connectionInputData[0]; + const pairedItem = (firstItem?.pairedItem as IPairedItemData) ?? { item: 0 }; + const sourceData = this.getExecuteData().source?.main?.[0] ?? null; + + const dataNodeItem = proxy.$getPairedItem('DataNode', sourceData, pairedItem); + const email = dataNodeItem?.json?.email; + + return [ + [ + { + json: { + result: 'Expression resolved', + email, + iteration: loopIteration, + }, + }, + ], + ]; + } catch (error) { + expressionError = error; + throw error; + } + }) + .done(); + const ifNode = createNodeData({ name: 'IFNode', type: 'ifNodeType' }); + + const customNodeTypes = NodeTypes({ + ...nodeTypeArguments, + loopNodeType: { type: loopNodeType, sourcePath: '' }, + ifNodeType: { type: ifNodeType, sourcePath: '' }, + }); + + const workflow = new DirectedGraph() + .addNodes(triggerNode, loopNode, dataNode, ifNode) + .addConnections({ from: triggerNode, to: loopNode }) + .addConnections({ from: loopNode, to: dataNode }) + .addConnections({ from: dataNode, to: ifNode }) + .toWorkflow({ + name: '', + active: false, + nodeTypes: customNodeTypes, + settings: { executionOrder: 'v1' }, + }); + + const taskDataConnection = { main: [[{ json: triggerData }]] }; + const executionData: IRunExecutionData = { + startData: { startNodes: [{ name: triggerNode.name, sourceData: null }] }, + resultData: { runData: {} }, + executionData: { + contextData: {}, + nodeExecutionStack: [{ data: taskDataConnection, node: triggerNode, source: null }], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + + const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData); + + await expect(workflowExecute.processRunExecutionData(workflow)).resolves.toBeTruthy(); + expect(expressionError).toBeUndefined(); + }); + }); }); diff --git a/packages/core/src/execution-engine/requests-response.ts b/packages/core/src/execution-engine/requests-response.ts index b32bf41765f..d93effe2ea4 100644 --- a/packages/core/src/execution-engine/requests-response.ts +++ b/packages/core/src/execution-engine/requests-response.ts @@ -31,6 +31,7 @@ function prepareRequestedNodesForExecution( request: EngineRequest, runIndex: number, runData: IRunData, + executionData: IExecuteData, ) { // 1. collect nodes to be put on the stack const nodesToBeExecuted: NodeToBeExecuted[] = []; @@ -56,6 +57,10 @@ function prepareRequestedNodesForExecution( index: 0, }; const parentNode = currentNode.name; + const parentSourceData = executionData.source?.main?.[runIndex]; + const parentOutputIndex = parentSourceData?.previousNodeOutput ?? 0; + const parentRunIndex = parentSourceData?.previousNodeRun ?? 0; + const parentSourceNode = parentSourceData?.previousNode ?? currentNode.name; const parentOutputData: INodeExecutionData[][] = [ [ { @@ -63,10 +68,18 @@ function prepareRequestedNodesForExecution( ...action.input, toolCallId: action.id, }, + pairedItem: { + item: parentRunIndex, + input: parentOutputIndex, + sourceOverwrite: { + previousNode: parentSourceNode, + previousNodeOutput: parentOutputIndex, + previousNodeRun: parentRunIndex, + }, + }, }, ], ]; - const parentOutputIndex = 0; runData[node.name] ||= []; const nodeRunData = runData[node.name]; @@ -88,6 +101,7 @@ function prepareRequestedNodesForExecution( parentOutputData, runIndex, nodeRunIndex, + metadata: { preserveSourceOverwrite: true }, }); subNodeExecutionData.actions.push({ action, @@ -166,6 +180,7 @@ export function handleRequest({ request, runIndex, runData, + executionData, ); // 2. create metadata for current node diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index b2597ccf900..9529785c98e 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -1565,6 +1565,29 @@ export class WorkflowExecute { } return input.map((item, itemIndex) => { + // Preserve any existing sourceOverwrite from the pairedItem + // for tool executions. Tool calls don't have a main + // connection to the agent's input, so the data proxy needs + // the sourceOverwrite information to know where to look up + // paired items. This is necessary because the workflow data + // proxy works on input data which normally scrubs paired + // item information before executing the node. + const isToolExecution = !!executionData.metadata?.preserveSourceOverwrite; + if ( + isToolExecution && + typeof item.pairedItem === 'object' && + 'sourceOverwrite' in item.pairedItem + ) { + return { + ...item, + pairedItem: { + item: itemIndex, + input: inputIndex || undefined, + sourceOverwrite: item.pairedItem.sourceOverwrite, + }, + }; + } + return { ...item, pairedItem: { diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 6861cc5c2a2..d7432c36ec3 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -2472,6 +2472,7 @@ export interface ITaskMetadata { actions: SubNodeExecutionDataAction[]; metadata: object; }; + preserveSourceOverwrite?: boolean; } /** The data that gets returned when a node execution starts */