mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-31 00:37:10 +02:00
fix(core): Retain source overwrite in paired items in tool executions (#20629)
Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
parent
fb94b779c8
commit
6f368c326d
|
|
@ -140,7 +140,7 @@ export function modifyNode(originalNode: INodeType): NodeModifier {
|
|||
|
||||
// Handle function responses (for Response parameter injection)
|
||||
if (typeof predefinedResponse === 'function') {
|
||||
return predefinedResponse(response);
|
||||
return predefinedResponse.call(this, response);
|
||||
}
|
||||
|
||||
return predefinedResponse;
|
||||
|
|
|
|||
|
|
@ -5,6 +5,9 @@ import type {
|
|||
IWorkflowExecuteAdditionalData,
|
||||
EngineResponse,
|
||||
WorkflowExecuteMode,
|
||||
IExecuteFunctions,
|
||||
IPairedItemData,
|
||||
INodeExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
|
|
@ -352,13 +355,44 @@ describe('processRunExecutionData', () => {
|
|||
|
||||
// Tool nodes should have been added to runData with inputOverride
|
||||
expect(runData[tool1Node.name]).toHaveLength(1);
|
||||
|
||||
expect(runData[tool1Node.name][0].inputOverride).toEqual({
|
||||
ai_tool: [[{ json: { query: 'test input', toolCallId: 'action_1' } }]],
|
||||
ai_tool: [
|
||||
[
|
||||
{
|
||||
json: { query: 'test input', toolCallId: 'action_1' },
|
||||
pairedItem: {
|
||||
input: 0,
|
||||
item: 0,
|
||||
sourceOverwrite: {
|
||||
previousNode: 'Start',
|
||||
previousNodeOutput: 0,
|
||||
previousNodeRun: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
});
|
||||
|
||||
expect(runData[tool2Node.name]).toHaveLength(1);
|
||||
expect(runData[tool2Node.name][0].inputOverride).toEqual({
|
||||
ai_tool: [[{ json: { data: 'another input', toolCallId: 'action_2' } }]],
|
||||
ai_tool: [
|
||||
[
|
||||
{
|
||||
json: { data: 'another input', toolCallId: 'action_2' },
|
||||
pairedItem: {
|
||||
input: 0,
|
||||
item: 0,
|
||||
sourceOverwrite: {
|
||||
previousNode: 'Start',
|
||||
previousNodeOutput: 0,
|
||||
previousNodeRun: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
});
|
||||
|
||||
// Tools should have executed successfully
|
||||
|
|
@ -455,7 +489,22 @@ describe('processRunExecutionData', () => {
|
|||
// 2. Tool nodes get added to runData with inputOverride but are never actually executed
|
||||
expect(runData[tool1Node.name]).toHaveLength(1);
|
||||
expect(runData[tool1Node.name][0].inputOverride).toEqual({
|
||||
ai_tool: [[{ json: { query: 'test input', toolCallId: 'action_1' } }]],
|
||||
ai_tool: [
|
||||
[
|
||||
{
|
||||
json: { query: 'test input', toolCallId: 'action_1' },
|
||||
pairedItem: {
|
||||
input: 0,
|
||||
item: 0,
|
||||
sourceOverwrite: {
|
||||
previousNode: 'nodeWithRequests',
|
||||
previousNodeOutput: 0,
|
||||
previousNodeRun: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
});
|
||||
// The tool node should not have execution data since it was never run
|
||||
expect(runData[tool1Node.name][0].data).toBeUndefined();
|
||||
|
|
@ -630,4 +679,230 @@ describe('processRunExecutionData', () => {
|
|||
expect(result.data.resultData.lastNodeExecuted).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('pairedItem sourceOverwrite handling', () => {
|
||||
test('preserves sourceOverwrite for tools to enable expression resolution', async () => {
|
||||
// Test: DataNode → AgentNode → ToolNode where ToolNode accesses DataNode via expressions
|
||||
const dataNodeOutput = { field: 'testValue', nested: { value: 42 } };
|
||||
const dataNode = createNodeData({ name: 'DataNode', type: types.passThrough });
|
||||
|
||||
const toolNodeType = modifyNode(passThroughNode)
|
||||
.return(function (this: IExecuteFunctions, response?: EngineResponse) {
|
||||
try {
|
||||
const proxy = this.getWorkflowDataProxy(0);
|
||||
const connectionInputData =
|
||||
(this as IExecuteFunctions & { connectionInputData: INodeExecutionData[] })
|
||||
.connectionInputData || [];
|
||||
const firstItem = connectionInputData[0];
|
||||
const pairedItem = (firstItem?.pairedItem as IPairedItemData) ?? { item: 0 };
|
||||
const sourceData = this.getExecuteData().source?.main?.[0] ?? null;
|
||||
|
||||
const dataNodeItem = proxy.$getPairedItem('DataNode', sourceData, pairedItem);
|
||||
const fieldValue = dataNodeItem?.json?.field;
|
||||
const nestedValue = (dataNodeItem?.json?.nested as IDataObject)?.value;
|
||||
|
||||
return [
|
||||
[
|
||||
{
|
||||
json: {
|
||||
toolResult: 'Tool executed successfully',
|
||||
dataNodeField: fieldValue,
|
||||
dataNodeNested: nestedValue,
|
||||
response,
|
||||
},
|
||||
},
|
||||
],
|
||||
];
|
||||
} catch (error) {
|
||||
return [
|
||||
[
|
||||
{
|
||||
json: {
|
||||
toolResult: 'Failed to access DataNode',
|
||||
error: (error as Error).message,
|
||||
response,
|
||||
},
|
||||
},
|
||||
],
|
||||
];
|
||||
}
|
||||
})
|
||||
.done();
|
||||
const toolNode = createNodeData({ name: 'ToolNode', type: 'toolNodeType' });
|
||||
|
||||
const agentNodeType = modifyNode(passThroughNode)
|
||||
.return({
|
||||
actions: [
|
||||
{
|
||||
actionType: 'ExecutionNodeAction',
|
||||
nodeName: toolNode.name,
|
||||
input: { query: 'test query' },
|
||||
type: 'ai_tool',
|
||||
id: 'tool_action_1',
|
||||
metadata: {},
|
||||
},
|
||||
],
|
||||
metadata: { requestId: 'test_agent_request' },
|
||||
})
|
||||
.return((response?: EngineResponse) => {
|
||||
return [[{ json: { agentResult: 'Agent completed', response } }]];
|
||||
})
|
||||
.done();
|
||||
const agentNode = createNodeData({ name: 'AgentNode', type: 'agentNodeType' });
|
||||
|
||||
const customNodeTypes = NodeTypes({
|
||||
...nodeTypeArguments,
|
||||
agentNodeType: { type: agentNodeType, sourcePath: '' },
|
||||
toolNodeType: { type: toolNodeType, sourcePath: '' },
|
||||
});
|
||||
|
||||
const workflow = new DirectedGraph()
|
||||
.addNodes(dataNode, agentNode, toolNode)
|
||||
.addConnections({ from: dataNode, to: agentNode })
|
||||
.addConnections({ from: toolNode, to: agentNode, type: 'ai_tool' })
|
||||
.toWorkflow({
|
||||
name: '',
|
||||
active: false,
|
||||
nodeTypes: customNodeTypes,
|
||||
settings: { executionOrder: 'v1' },
|
||||
});
|
||||
|
||||
const taskDataConnection = { main: [[{ json: dataNodeOutput }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: { startNodes: [{ name: dataNode.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node: dataNode, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
const result = await workflowExecute.processRunExecutionData(workflow);
|
||||
const runData = result.data.resultData.runData;
|
||||
|
||||
// Verify preserveSourceOverwrite metadata is set
|
||||
expect(runData[toolNode.name][0].metadata?.preserveSourceOverwrite).toBeDefined();
|
||||
|
||||
// Verify sourceOverwrite points to DataNode
|
||||
const toolInput = runData[toolNode.name][0].inputOverride?.ai_tool?.[0]?.[0];
|
||||
expect(toolInput?.pairedItem).toBeDefined();
|
||||
if (typeof toolInput?.pairedItem === 'object' && !Array.isArray(toolInput.pairedItem)) {
|
||||
expect(toolInput.pairedItem.sourceOverwrite?.previousNode).toBe(dataNode.name);
|
||||
}
|
||||
|
||||
// Verify tool successfully accessed DataNode data via sourceOverwrite
|
||||
const toolOutput = runData[toolNode.name][0].data?.ai_tool?.[0]?.[0]?.json;
|
||||
expect(toolOutput?.toolResult).toBe('Tool executed successfully');
|
||||
expect(toolOutput?.dataNodeField).toBe('testValue');
|
||||
expect(toolOutput?.dataNodeNested).toBe(42);
|
||||
expect(toolOutput).not.toHaveProperty('error');
|
||||
});
|
||||
|
||||
test('sourceOverwrite works correctly in loop scenarios', async () => {
|
||||
// Test: TriggerNode → LoopNode → DataNode → IFNode
|
||||
// IFNode evaluates $('DataNode').item.json.email
|
||||
const triggerData = { email: 'test@example.com', name: 'Test User' };
|
||||
const triggerNode = createNodeData({ name: 'TriggerNode', type: types.passThrough });
|
||||
|
||||
let loopIteration = 0;
|
||||
const loopNodeType = modifyNode(passThroughNode)
|
||||
.return(function (this: IExecuteFunctions) {
|
||||
const items = this.getInputData();
|
||||
loopIteration++;
|
||||
|
||||
return [
|
||||
items.map((item, index) => ({
|
||||
json: item.json,
|
||||
pairedItem: {
|
||||
item: index,
|
||||
input: 0,
|
||||
sourceOverwrite: {
|
||||
previousNode: triggerNode.name,
|
||||
previousNodeOutput: 0,
|
||||
previousNodeRun: 0,
|
||||
},
|
||||
},
|
||||
})),
|
||||
];
|
||||
})
|
||||
.done();
|
||||
const loopNode = createNodeData({ name: 'LoopNode', type: 'loopNodeType' });
|
||||
const dataNode = createNodeData({ name: 'DataNode', type: types.passThrough });
|
||||
|
||||
let expressionError: Error | undefined;
|
||||
const ifNodeType = modifyNode(passThroughNode)
|
||||
.return(function (this: IExecuteFunctions) {
|
||||
try {
|
||||
const proxy = this.getWorkflowDataProxy(0);
|
||||
const connectionInputData =
|
||||
(this as IExecuteFunctions & { connectionInputData: INodeExecutionData[] })
|
||||
.connectionInputData ?? [];
|
||||
const firstItem = connectionInputData[0];
|
||||
const pairedItem = (firstItem?.pairedItem as IPairedItemData) ?? { item: 0 };
|
||||
const sourceData = this.getExecuteData().source?.main?.[0] ?? null;
|
||||
|
||||
const dataNodeItem = proxy.$getPairedItem('DataNode', sourceData, pairedItem);
|
||||
const email = dataNodeItem?.json?.email;
|
||||
|
||||
return [
|
||||
[
|
||||
{
|
||||
json: {
|
||||
result: 'Expression resolved',
|
||||
email,
|
||||
iteration: loopIteration,
|
||||
},
|
||||
},
|
||||
],
|
||||
];
|
||||
} catch (error) {
|
||||
expressionError = error;
|
||||
throw error;
|
||||
}
|
||||
})
|
||||
.done();
|
||||
const ifNode = createNodeData({ name: 'IFNode', type: 'ifNodeType' });
|
||||
|
||||
const customNodeTypes = NodeTypes({
|
||||
...nodeTypeArguments,
|
||||
loopNodeType: { type: loopNodeType, sourcePath: '' },
|
||||
ifNodeType: { type: ifNodeType, sourcePath: '' },
|
||||
});
|
||||
|
||||
const workflow = new DirectedGraph()
|
||||
.addNodes(triggerNode, loopNode, dataNode, ifNode)
|
||||
.addConnections({ from: triggerNode, to: loopNode })
|
||||
.addConnections({ from: loopNode, to: dataNode })
|
||||
.addConnections({ from: dataNode, to: ifNode })
|
||||
.toWorkflow({
|
||||
name: '',
|
||||
active: false,
|
||||
nodeTypes: customNodeTypes,
|
||||
settings: { executionOrder: 'v1' },
|
||||
});
|
||||
|
||||
const taskDataConnection = { main: [[{ json: triggerData }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: { startNodes: [{ name: triggerNode.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node: triggerNode, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
await expect(workflowExecute.processRunExecutionData(workflow)).resolves.toBeTruthy();
|
||||
expect(expressionError).toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ function prepareRequestedNodesForExecution(
|
|||
request: EngineRequest,
|
||||
runIndex: number,
|
||||
runData: IRunData,
|
||||
executionData: IExecuteData,
|
||||
) {
|
||||
// 1. collect nodes to be put on the stack
|
||||
const nodesToBeExecuted: NodeToBeExecuted[] = [];
|
||||
|
|
@ -56,6 +57,10 @@ function prepareRequestedNodesForExecution(
|
|||
index: 0,
|
||||
};
|
||||
const parentNode = currentNode.name;
|
||||
const parentSourceData = executionData.source?.main?.[runIndex];
|
||||
const parentOutputIndex = parentSourceData?.previousNodeOutput ?? 0;
|
||||
const parentRunIndex = parentSourceData?.previousNodeRun ?? 0;
|
||||
const parentSourceNode = parentSourceData?.previousNode ?? currentNode.name;
|
||||
const parentOutputData: INodeExecutionData[][] = [
|
||||
[
|
||||
{
|
||||
|
|
@ -63,10 +68,18 @@ function prepareRequestedNodesForExecution(
|
|||
...action.input,
|
||||
toolCallId: action.id,
|
||||
},
|
||||
pairedItem: {
|
||||
item: parentRunIndex,
|
||||
input: parentOutputIndex,
|
||||
sourceOverwrite: {
|
||||
previousNode: parentSourceNode,
|
||||
previousNodeOutput: parentOutputIndex,
|
||||
previousNodeRun: parentRunIndex,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
];
|
||||
const parentOutputIndex = 0;
|
||||
|
||||
runData[node.name] ||= [];
|
||||
const nodeRunData = runData[node.name];
|
||||
|
|
@ -88,6 +101,7 @@ function prepareRequestedNodesForExecution(
|
|||
parentOutputData,
|
||||
runIndex,
|
||||
nodeRunIndex,
|
||||
metadata: { preserveSourceOverwrite: true },
|
||||
});
|
||||
subNodeExecutionData.actions.push({
|
||||
action,
|
||||
|
|
@ -166,6 +180,7 @@ export function handleRequest({
|
|||
request,
|
||||
runIndex,
|
||||
runData,
|
||||
executionData,
|
||||
);
|
||||
|
||||
// 2. create metadata for current node
|
||||
|
|
|
|||
|
|
@ -1565,6 +1565,29 @@ export class WorkflowExecute {
|
|||
}
|
||||
|
||||
return input.map((item, itemIndex) => {
|
||||
// Preserve any existing sourceOverwrite from the pairedItem
|
||||
// for tool executions. Tool calls don't have a main
|
||||
// connection to the agent's input, so the data proxy needs
|
||||
// the sourceOverwrite information to know where to look up
|
||||
// paired items. This is necessary because the workflow data
|
||||
// proxy works on input data which normally scrubs paired
|
||||
// item information before executing the node.
|
||||
const isToolExecution = !!executionData.metadata?.preserveSourceOverwrite;
|
||||
if (
|
||||
isToolExecution &&
|
||||
typeof item.pairedItem === 'object' &&
|
||||
'sourceOverwrite' in item.pairedItem
|
||||
) {
|
||||
return {
|
||||
...item,
|
||||
pairedItem: {
|
||||
item: itemIndex,
|
||||
input: inputIndex || undefined,
|
||||
sourceOverwrite: item.pairedItem.sourceOverwrite,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
...item,
|
||||
pairedItem: {
|
||||
|
|
|
|||
|
|
@ -2472,6 +2472,7 @@ export interface ITaskMetadata {
|
|||
actions: SubNodeExecutionDataAction[];
|
||||
metadata: object;
|
||||
};
|
||||
preserveSourceOverwrite?: boolean;
|
||||
}
|
||||
|
||||
/** The data that gets returned when a node execution starts */
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user