mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-31 16:57:08 +02:00
fix(core): Preserve sourceOverwrite in pairedItem data during workflow execution (#20064)
This commit is contained in:
parent
29aec202df
commit
7de2eddc8a
|
|
@ -630,4 +630,143 @@ describe('processRunExecutionData', () => {
|
|||
expect(result.data.resultData.lastNodeExecuted).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('pairedItem sourceOverwrite handling', () => {
|
||||
test('preserves sourceOverwrite from existing pairedItem object', async () => {
|
||||
// ARRANGE
|
||||
const node = createNodeData({ name: 'testNode', type: types.passThrough });
|
||||
const workflow = new DirectedGraph()
|
||||
.addNodes(node)
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
// Create execution data with items that have pairedItem.sourceOverwrite
|
||||
const sourceOverwriteData = {
|
||||
previousNode: 'CustomPreviousNode',
|
||||
previousNodeOutput: 2,
|
||||
previousNodeRun: 1,
|
||||
};
|
||||
|
||||
const taskDataConnection = {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: { data: 'test1' },
|
||||
pairedItem: {
|
||||
item: 0,
|
||||
input: 0,
|
||||
sourceOverwrite: sourceOverwriteData,
|
||||
},
|
||||
},
|
||||
{
|
||||
json: { data: 'test2' },
|
||||
pairedItem: {
|
||||
item: 1,
|
||||
input: 0,
|
||||
// No sourceOverwrite - should be undefined
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
};
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
// ACT
|
||||
const result = await workflowExecute.processRunExecutionData(workflow);
|
||||
|
||||
// ASSERT
|
||||
const runData = result.data.resultData.runData;
|
||||
expect(runData[node.name]).toHaveLength(1);
|
||||
|
||||
const nodeExecutionData = runData[node.name][0].data?.main?.[0];
|
||||
expect(nodeExecutionData).toHaveLength(2);
|
||||
|
||||
// First item should preserve sourceOverwrite
|
||||
expect(nodeExecutionData?.[0].pairedItem).toEqual({
|
||||
item: 0,
|
||||
input: undefined, // input index 0 becomes undefined
|
||||
sourceOverwrite: sourceOverwriteData,
|
||||
});
|
||||
|
||||
// Second item should have undefined sourceOverwrite
|
||||
expect(nodeExecutionData?.[1].pairedItem).toEqual({
|
||||
item: 1,
|
||||
input: undefined,
|
||||
sourceOverwrite: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
test('handles non-object pairedItem gracefully', async () => {
|
||||
// ARRANGE
|
||||
const node = createNodeData({ name: 'testNode', type: types.passThrough });
|
||||
const workflow = new DirectedGraph()
|
||||
.addNodes(node)
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
// Create execution data with items that have non-object pairedItem
|
||||
const taskDataConnection = {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: { data: 'test1' },
|
||||
pairedItem: [], // This should result in undefined sourceOverwrite
|
||||
},
|
||||
{
|
||||
json: { data: 'test2' },
|
||||
// No pairedItem at all
|
||||
},
|
||||
],
|
||||
],
|
||||
};
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
// ACT
|
||||
const result = await workflowExecute.processRunExecutionData(workflow);
|
||||
|
||||
// ASSERT
|
||||
const runData = result.data.resultData.runData;
|
||||
expect(runData[node.name]).toHaveLength(1);
|
||||
|
||||
const nodeExecutionData = runData[node.name][0].data?.main?.[0];
|
||||
expect(nodeExecutionData).toHaveLength(2);
|
||||
|
||||
// Both items should have undefined sourceOverwrite since pairedItem wasn't an object
|
||||
expect(nodeExecutionData?.[0].pairedItem).toEqual({
|
||||
item: 0,
|
||||
input: undefined,
|
||||
sourceOverwrite: undefined,
|
||||
});
|
||||
|
||||
expect(nodeExecutionData?.[1].pairedItem).toEqual({
|
||||
item: 1,
|
||||
input: undefined,
|
||||
sourceOverwrite: undefined,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1562,6 +1562,23 @@ export class WorkflowExecute {
|
|||
}
|
||||
|
||||
return input.map((item, itemIndex) => {
|
||||
// Preserve any existing sourceOverwrite from the pairedItem.
|
||||
// This allows nodes like SplitInBatches to override the
|
||||
// source information that tracks where an item originated
|
||||
// from, which is critical for maintaining correct data
|
||||
// lineage when nodes need to manipulate the item tracking
|
||||
// chain.
|
||||
if (typeof item.pairedItem === 'object' && 'sourceOverwrite' in item.pairedItem) {
|
||||
return {
|
||||
...item,
|
||||
pairedItem: {
|
||||
item: itemIndex,
|
||||
input: inputIndex || undefined,
|
||||
sourceOverwrite: item.pairedItem.sourceOverwrite,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
...item,
|
||||
pairedItem: {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user