diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts index 4081c6ba1e4..3d40f3d2e36 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts @@ -630,4 +630,143 @@ describe('processRunExecutionData', () => { expect(result.data.resultData.lastNodeExecuted).toBeUndefined(); }); }); + + describe('pairedItem sourceOverwrite handling', () => { + test('preserves sourceOverwrite from existing pairedItem object', async () => { + // ARRANGE + const node = createNodeData({ name: 'testNode', type: types.passThrough }); + const workflow = new DirectedGraph() + .addNodes(node) + .toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } }); + + // Create execution data with items that have pairedItem.sourceOverwrite + const sourceOverwriteData = { + previousNode: 'CustomPreviousNode', + previousNodeOutput: 2, + previousNodeRun: 1, + }; + + const taskDataConnection = { + main: [ + [ + { + json: { data: 'test1' }, + pairedItem: { + item: 0, + input: 0, + sourceOverwrite: sourceOverwriteData, + }, + }, + { + json: { data: 'test2' }, + pairedItem: { + item: 1, + input: 0, + // No sourceOverwrite - should be undefined + }, + }, + ], + ], + }; + + const executionData: IRunExecutionData = { + startData: { startNodes: [{ name: node.name, sourceData: null }] }, + resultData: { runData: {} }, + executionData: { + contextData: {}, + nodeExecutionStack: [{ data: taskDataConnection, node, source: null }], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + + const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData); + + // ACT + const result = await workflowExecute.processRunExecutionData(workflow); + + // ASSERT + const runData = result.data.resultData.runData; + expect(runData[node.name]).toHaveLength(1); + + const nodeExecutionData = runData[node.name][0].data?.main?.[0]; + expect(nodeExecutionData).toHaveLength(2); + + // First item should preserve sourceOverwrite + expect(nodeExecutionData?.[0].pairedItem).toEqual({ + item: 0, + input: undefined, // input index 0 becomes undefined + sourceOverwrite: sourceOverwriteData, + }); + + // Second item should have undefined sourceOverwrite + expect(nodeExecutionData?.[1].pairedItem).toEqual({ + item: 1, + input: undefined, + sourceOverwrite: undefined, + }); + }); + + test('handles non-object pairedItem gracefully', async () => { + // ARRANGE + const node = createNodeData({ name: 'testNode', type: types.passThrough }); + const workflow = new DirectedGraph() + .addNodes(node) + .toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } }); + + // Create execution data with items that have non-object pairedItem + const taskDataConnection = { + main: [ + [ + { + json: { data: 'test1' }, + pairedItem: [], // This should result in undefined sourceOverwrite + }, + { + json: { data: 'test2' }, + // No pairedItem at all + }, + ], + ], + }; + + const executionData: IRunExecutionData = { + startData: { startNodes: [{ name: node.name, sourceData: null }] }, + resultData: { runData: {} }, + executionData: { + contextData: {}, + nodeExecutionStack: [{ data: taskDataConnection, node, source: null }], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + + const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData); + + // ACT + const result = await workflowExecute.processRunExecutionData(workflow); + + // ASSERT + const runData = result.data.resultData.runData; + expect(runData[node.name]).toHaveLength(1); + + const nodeExecutionData = runData[node.name][0].data?.main?.[0]; + expect(nodeExecutionData).toHaveLength(2); + + // Both items should have undefined sourceOverwrite since pairedItem wasn't an object + expect(nodeExecutionData?.[0].pairedItem).toEqual({ + item: 0, + input: undefined, + sourceOverwrite: undefined, + }); + + expect(nodeExecutionData?.[1].pairedItem).toEqual({ + item: 1, + input: undefined, + sourceOverwrite: undefined, + }); + }); + }); }); diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index 5c714693291..31addf68d16 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -1562,6 +1562,23 @@ export class WorkflowExecute { } return input.map((item, itemIndex) => { + // Preserve any existing sourceOverwrite from the pairedItem. + // This allows nodes like SplitInBatches to override the + // source information that tracks where an item originated + // from, which is critical for maintaining correct data + // lineage when nodes need to manipulate the item tracking + // chain. + if (typeof item.pairedItem === 'object' && 'sourceOverwrite' in item.pairedItem) { + return { + ...item, + pairedItem: { + item: itemIndex, + input: inputIndex || undefined, + sourceOverwrite: item.pairedItem.sourceOverwrite, + }, + }; + } + return { ...item, pairedItem: {