mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-02 01:37:07 +02:00
feat(Execute Workflow Node): Return items from every run of the sub-workflow's last node (#30716)
This commit is contained in:
parent
ca56b6b90a
commit
91f07efd00
|
|
@ -400,6 +400,7 @@ export class RetrieverWorkflow implements INodeType {
|
|||
executionId: workflowProxy.$execution.id,
|
||||
workflowId: workflowProxy.$workflow.id,
|
||||
},
|
||||
returnLastRunOnly: true, // Retrieved documents are the sub-workflow's final-run output, not its intermediate pipeline steps.
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -123,6 +123,7 @@ export class ToolWorkflowV1 implements INodeType {
|
|||
executionId: workflowProxy.$execution.id,
|
||||
workflowId: workflowProxy.$workflow.id,
|
||||
},
|
||||
returnLastRunOnly: true, // The tool's answer is the sub-workflow's final-run output, not its internal multi-run computation.
|
||||
});
|
||||
subExecutionId = receivedData.executionId;
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -259,6 +259,7 @@ export class WorkflowToolService {
|
|||
executionId: workflowProxy.$execution.id,
|
||||
workflowId: workflowProxy.$workflow.id,
|
||||
},
|
||||
returnLastRunOnly: true, // The tool's answer is the sub-workflow's final-run output, not its internal multi-run computation.
|
||||
});
|
||||
// Set sub-workflow execution id so it can be used in other places
|
||||
this.subExecutionId = receivedData.executionId;
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import type {
|
|||
IRun,
|
||||
INodeExecutionData,
|
||||
INode,
|
||||
ITaskData,
|
||||
} from 'n8n-workflow';
|
||||
import { createRunExecutionData } from 'n8n-workflow';
|
||||
import type PCancelable from 'p-cancelable';
|
||||
|
|
@ -40,6 +41,8 @@ import {
|
|||
getRunData,
|
||||
getDraftWorkflowData,
|
||||
getPublishedWorkflowData,
|
||||
buildSubWorkflowOutput,
|
||||
triggerReturnsLastRunOnly,
|
||||
} from '@/workflow-execute-additional-data';
|
||||
import * as WorkflowHelpers from '@/workflow-helpers';
|
||||
|
||||
|
|
@ -859,4 +862,141 @@ describe('WorkflowExecuteAdditionalData', () => {
|
|||
expect(ownershipService.getWorkflowProjectCached).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('buildSubWorkflowOutput', () => {
|
||||
const twoRunsOnTerminalNode: Record<string, ITaskData[]> = {
|
||||
[LAST_NODE_EXECUTED]: [
|
||||
{ data: { main: [[{ json: { itemId: 0 } }]] } },
|
||||
{ data: { main: [[{ json: { itemId: 1 } }, { json: { itemId: 2 } }]] } },
|
||||
] as unknown as ITaskData[],
|
||||
};
|
||||
|
||||
function buildRun(overrides: {
|
||||
mode?: IRun['mode'];
|
||||
runData?: Record<string, ITaskData[]>;
|
||||
pinData?: Record<string, unknown>;
|
||||
lastNodeExecuted?: string;
|
||||
}): IRun {
|
||||
return {
|
||||
mode: overrides.mode ?? 'manual',
|
||||
data: {
|
||||
resultData: {
|
||||
runData: overrides.runData ?? twoRunsOnTerminalNode,
|
||||
pinData: overrides.pinData,
|
||||
lastNodeExecuted:
|
||||
overrides.lastNodeExecuted === undefined
|
||||
? LAST_NODE_EXECUTED
|
||||
: overrides.lastNodeExecuted,
|
||||
},
|
||||
},
|
||||
finished: true,
|
||||
} as unknown as IRun;
|
||||
}
|
||||
|
||||
function trigger(typeVersion: number, returnOutput?: string): INode {
|
||||
return mock<INode>({
|
||||
type: 'n8n-nodes-base.executeWorkflowTrigger',
|
||||
typeVersion,
|
||||
parameters: returnOutput === undefined ? {} : { returnOutput },
|
||||
});
|
||||
}
|
||||
|
||||
const expectedItemsFromBothRunsConcatenated = [
|
||||
[{ json: { itemId: 0 } }, { json: { itemId: 1 } }, { json: { itemId: 2 } }],
|
||||
];
|
||||
const expectedItemsFromTheFinalRunOnly = [[{ json: { itemId: 1 } }, { json: { itemId: 2 } }]];
|
||||
|
||||
it('merges every run when the trigger is v1.2+', () => {
|
||||
const output = buildSubWorkflowOutput(buildRun({ mode: 'trigger' }), [trigger(1.2)], false);
|
||||
expect(output).toEqual(expectedItemsFromBothRunsConcatenated);
|
||||
});
|
||||
|
||||
it('falls back to `lastRunOnly` for pre-1.2 triggers by default', () => {
|
||||
const output = buildSubWorkflowOutput(buildRun({ mode: 'trigger' }), [trigger(1.1)], false);
|
||||
expect(output).toEqual(expectedItemsFromTheFinalRunOnly);
|
||||
});
|
||||
|
||||
it('honours a pre-1.2 trigger that opted in via `returnOutput`', () => {
|
||||
const output = buildSubWorkflowOutput(
|
||||
buildRun({ mode: 'trigger' }),
|
||||
[trigger(1.1, 'allRuns')],
|
||||
false,
|
||||
);
|
||||
expect(output).toEqual(expectedItemsFromBothRunsConcatenated);
|
||||
});
|
||||
|
||||
it('caller can force `lastRunOnly` even when the trigger declares `allRuns`', () => {
|
||||
const output = buildSubWorkflowOutput(buildRun({ mode: 'trigger' }), [trigger(1.2)], true);
|
||||
expect(output).toEqual(expectedItemsFromTheFinalRunOnly);
|
||||
});
|
||||
|
||||
describe('pinData substitution', () => {
|
||||
it('ignores pinData when the sub-workflow is not running in manual mode', () => {
|
||||
const output = buildSubWorkflowOutput(
|
||||
buildRun({
|
||||
mode: 'trigger',
|
||||
pinData: { [LAST_NODE_EXECUTED]: [{ pinned: true }] },
|
||||
}),
|
||||
[trigger(1.2)],
|
||||
false,
|
||||
);
|
||||
expect(output).toEqual(expectedItemsFromBothRunsConcatenated);
|
||||
});
|
||||
|
||||
it('substitutes pinData when manual mode, even on the merged-runs path', () => {
|
||||
const output = buildSubWorkflowOutput(
|
||||
buildRun({
|
||||
mode: 'manual',
|
||||
pinData: { [LAST_NODE_EXECUTED]: [{ pinned: true }] },
|
||||
}),
|
||||
[trigger(1.2)],
|
||||
false,
|
||||
);
|
||||
|
||||
const expectedPinnedItems = [[{ json: { pinned: true }, pairedItem: { item: 0 } }]];
|
||||
expect(output).toEqual(expectedPinnedItems);
|
||||
});
|
||||
});
|
||||
|
||||
it('returns `[null]` when the sub-workflow recorded no run data', () => {
|
||||
expect(
|
||||
buildSubWorkflowOutput(
|
||||
buildRun({ mode: 'trigger', runData: {}, lastNodeExecuted: undefined }),
|
||||
[trigger(1.2)],
|
||||
false,
|
||||
),
|
||||
).toEqual([null]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('triggerReturnsLastRunOnly', () => {
|
||||
function trigger(typeVersion: number, returnOutput?: string): INode {
|
||||
return mock<INode>({
|
||||
type: 'n8n-nodes-base.executeWorkflowTrigger',
|
||||
typeVersion,
|
||||
parameters: returnOutput === undefined ? {} : { returnOutput },
|
||||
});
|
||||
}
|
||||
|
||||
it('returns true when there is no Execute Workflow Trigger', () => {
|
||||
expect(triggerReturnsLastRunOnly([])).toBe(true);
|
||||
expect(triggerReturnsLastRunOnly([mock<INode>({ type: 'n8n-nodes-base.set' })])).toBe(true);
|
||||
});
|
||||
|
||||
it('defaults pre-1.2 triggers to `lastRunOnly` (backward compat)', () => {
|
||||
expect(triggerReturnsLastRunOnly([trigger(1)])).toBe(true);
|
||||
expect(triggerReturnsLastRunOnly([trigger(1.1)])).toBe(true);
|
||||
});
|
||||
|
||||
it('honours a pre-1.2 trigger that opted in via `returnOutput`', () => {
|
||||
expect(triggerReturnsLastRunOnly([trigger(1.1, 'allRuns')])).toBe(false);
|
||||
expect(triggerReturnsLastRunOnly([trigger(1.1, 'lastRunOnly')])).toBe(true);
|
||||
});
|
||||
|
||||
it('returns false on v1.2+ triggers (option deprecated, allRuns is the default)', () => {
|
||||
expect(triggerReturnsLastRunOnly([trigger(1.2)])).toBe(false);
|
||||
// Parameters on v1.2+ are ignored: the merged-runs behavior is the default.
|
||||
expect(triggerReturnsLastRunOnly([trigger(1.2, 'lastRunOnly')])).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,14 +2,13 @@ import { MAX_PINNED_DATA_SIZE, MAX_WORKFLOW_SIZE, MAX_EXPECTED_REQUEST_SIZE } fr
|
|||
import { mockInstance } from '@n8n/backend-test-utils';
|
||||
import type { CredentialsEntity, Project, Variables } from '@n8n/db';
|
||||
import { CredentialsRepository } from '@n8n/db';
|
||||
import type { ITaskData, IWorkflowBase, IWorkflowSettings } from 'n8n-workflow';
|
||||
|
||||
import type { IRun } from 'n8n-workflow';
|
||||
import type { IRun, ITaskData, IWorkflowBase, IWorkflowSettings } from 'n8n-workflow';
|
||||
|
||||
import { VariablesService } from '@/environments.ee/variables/variables.service.ee';
|
||||
import { OwnershipService } from '@/services/ownership.service';
|
||||
import {
|
||||
getDataLastExecutedNodeData,
|
||||
getLastExecutedNodeData,
|
||||
getLastExecutedNodeRuns,
|
||||
getVariables,
|
||||
preserveInputOverride,
|
||||
removeDefaultValues,
|
||||
|
|
@ -18,6 +17,7 @@ import {
|
|||
validatePinDataSize,
|
||||
validateWorkflowNodeGroups,
|
||||
} from '@/workflow-helpers';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
|
||||
describe('workflow-helpers', () => {
|
||||
beforeAll(() => {
|
||||
|
|
@ -513,7 +513,7 @@ describe('validatePinDataSize', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('getDataLastExecutedNodeData', () => {
|
||||
describe('getLastExecutedNodeData', () => {
|
||||
const lastNodeTaskData: ITaskData = {
|
||||
startTime: 0,
|
||||
executionIndex: 0,
|
||||
|
|
@ -544,11 +544,71 @@ describe('getDataLastExecutedNodeData', () => {
|
|||
it('returns last node run data when pinData is null', () => {
|
||||
// Regression: destructure default `pinData = {}` only applies to undefined,
|
||||
// so a null value used to throw `Cannot read properties of null`.
|
||||
expect(() => getDataLastExecutedNodeData(buildRun(null))).not.toThrow();
|
||||
expect(getDataLastExecutedNodeData(buildRun(null))).toBe(lastNodeTaskData);
|
||||
expect(() => getLastExecutedNodeData(buildRun(null))).not.toThrow();
|
||||
expect(getLastExecutedNodeData(buildRun(null))).toBe(lastNodeTaskData);
|
||||
});
|
||||
|
||||
it('returns last node run data when pinData is undefined', () => {
|
||||
expect(getDataLastExecutedNodeData(buildRun(undefined))).toBe(lastNodeTaskData);
|
||||
expect(getLastExecutedNodeData(buildRun(undefined))).toBe(lastNodeTaskData);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getLastExecutedNodeRuns', () => {
|
||||
function buildRun(
|
||||
lastNodeExecuted: string | undefined,
|
||||
runData: Record<string, ITaskData[]>,
|
||||
): IRun {
|
||||
return {
|
||||
data: {
|
||||
resultData: {
|
||||
lastNodeExecuted,
|
||||
runData,
|
||||
},
|
||||
},
|
||||
} as unknown as IRun;
|
||||
}
|
||||
|
||||
it('returns an empty array when no last node executed is recorded', () => {
|
||||
expect(getLastExecutedNodeRuns(buildRun(undefined, {}))).toEqual([]);
|
||||
});
|
||||
|
||||
it('returns an empty array when the recorded last node has no run data', () => {
|
||||
expect(getLastExecutedNodeRuns(buildRun('Last executed node', {}))).toEqual([]);
|
||||
expect(
|
||||
getLastExecutedNodeRuns(buildRun('Last executed node', { 'Last executed node': [] })),
|
||||
).toEqual([]);
|
||||
});
|
||||
|
||||
it('returns every recorded run of the last executed node, in order', () => {
|
||||
const runs = [
|
||||
mock<ITaskData>({ executionIndex: 0, data: { main: [[{ json: { value: 0 } }]] } }),
|
||||
mock<ITaskData>({ executionIndex: 1, data: { main: [[{ json: { value: 1 } }]] } }),
|
||||
mock<ITaskData>({ executionIndex: 2, data: { main: [[{ json: { value: 2 } }]] } }),
|
||||
];
|
||||
expect(
|
||||
getLastExecutedNodeRuns(buildRun('Last executed node', { 'Last executed node': runs })),
|
||||
).toEqual(runs);
|
||||
});
|
||||
|
||||
it('sorts runs by executionIndex when recorded out of order', () => {
|
||||
const run0 = mock<ITaskData>({ executionIndex: 0, data: { main: [[{ json: { value: 0 } }]] } });
|
||||
const run1 = mock<ITaskData>({ executionIndex: 1, data: { main: [[{ json: { value: 1 } }]] } });
|
||||
const run2 = mock<ITaskData>({ executionIndex: 2, data: { main: [[{ json: { value: 2 } }]] } });
|
||||
const outOfOrderRuns = [run2, run0, run1];
|
||||
expect(
|
||||
getLastExecutedNodeRuns(
|
||||
buildRun('Last executed node', { 'Last executed node': outOfOrderRuns }),
|
||||
),
|
||||
).toEqual([run0, run1, run2]);
|
||||
});
|
||||
|
||||
it('does not mutate the original runData array', () => {
|
||||
const runs = [
|
||||
mock<ITaskData>({ executionIndex: 2, data: { main: [[{ json: { value: 2 } }]] } }),
|
||||
mock<ITaskData>({ executionIndex: 0, data: { main: [[{ json: { value: 0 } }]] } }),
|
||||
];
|
||||
const snapshot = [...runs];
|
||||
getLastExecutedNodeRuns(buildRun('Last executed node', { 'Last executed node': runs }));
|
||||
expect(runs).toEqual(snapshot);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,15 +1,15 @@
|
|||
import { Logger } from '@n8n/backend-common';
|
||||
import { ExecutionRepository, UserRepository } from '@n8n/db';
|
||||
import type { User } from '@n8n/db';
|
||||
import { ExecutionRepository, UserRepository } from '@n8n/db';
|
||||
import { LifecycleMetadata } from '@n8n/decorators';
|
||||
import { Container, Service } from '@n8n/di';
|
||||
import { stringify } from 'flatted';
|
||||
import {
|
||||
BinaryDataService,
|
||||
ErrorReporter,
|
||||
ExecutionLifecycleHooks,
|
||||
FileLocation,
|
||||
InstanceSettings,
|
||||
ExecutionLifecycleHooks,
|
||||
} from 'n8n-core';
|
||||
import type {
|
||||
ExecutionStatus,
|
||||
|
|
@ -17,9 +17,9 @@ import type {
|
|||
IRunData,
|
||||
IRunExecutionData,
|
||||
IWorkflowBase,
|
||||
IWorkflowExecutionDataProcess,
|
||||
RelatedExecution,
|
||||
WorkflowExecuteMode,
|
||||
IWorkflowExecutionDataProcess,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { EventService } from '@/events/event.service';
|
||||
|
|
@ -31,7 +31,7 @@ import { Push } from '@/push';
|
|||
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
|
||||
import { isWorkflowIdValid } from '@/utils';
|
||||
import { getItemCountByConnectionType } from '@/utils/get-item-count-by-connection-type';
|
||||
import { getDataLastExecutedNodeData } from '@/workflow-helpers';
|
||||
import { getLastExecutedNodeData } from '@/workflow-helpers';
|
||||
import { WorkflowHookContextService } from '@/workflow-hook-context.service';
|
||||
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
|
||||
|
||||
|
|
@ -495,7 +495,7 @@ async function duplicateBinaryDataToParent(
|
|||
parentExecution: RelatedExecution,
|
||||
binaryDataService: BinaryDataService,
|
||||
) {
|
||||
const outputData = getDataLastExecutedNodeData(fullRunData);
|
||||
const outputData = getLastExecutedNodeData(fullRunData);
|
||||
if (outputData?.data?.main) {
|
||||
const duplicatedData = await binaryDataService.duplicateBinaryData(
|
||||
FileLocation.ofExecution(parentExecution.workflowId, parentExecution.executionId),
|
||||
|
|
|
|||
|
|
@ -871,7 +871,7 @@ export async function executeWebhook(
|
|||
runData.data.resultData.pinData = pinData;
|
||||
}
|
||||
|
||||
const lastNodeTaskData = WorkflowHelpers.getDataLastExecutedNodeData(runData);
|
||||
const lastNodeTaskData = WorkflowHelpers.getLastExecutedNodeData(runData);
|
||||
if (runData.data.resultData.error || lastNodeTaskData?.error !== undefined) {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,12 @@ import { ExecutionRepository, WorkflowRepository } from '@n8n/db';
|
|||
import { Container } from '@n8n/di';
|
||||
import type { ServiceIdentifier } from '@n8n/di';
|
||||
import { ExternalSecretsProxy, WorkflowExecute } from 'n8n-core';
|
||||
import { UnexpectedError, Workflow, createRunExecutionData } from 'n8n-workflow';
|
||||
import {
|
||||
UnexpectedError,
|
||||
Workflow,
|
||||
createRunExecutionData,
|
||||
mergeRunsPerBranch,
|
||||
} from 'n8n-workflow';
|
||||
import type {
|
||||
AiEvent,
|
||||
IDataObject,
|
||||
|
|
@ -32,6 +37,7 @@ import type {
|
|||
ExecuteWorkflowData,
|
||||
ExecuteAgentData,
|
||||
RelatedExecution,
|
||||
IRun,
|
||||
IRunExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
|
|
@ -314,6 +320,48 @@ async function listAgents(userId: string): Promise<Array<{ id: string; name: str
|
|||
return agents.map((agent) => ({ id: agent.id, name: agent.name }));
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the sub-workflow's `Execute Workflow Trigger` wants the legacy single-run output.
|
||||
* v1.2+ triggers always opt into the new merged-runs default;
|
||||
* pre-1.2 triggers can opt in via the `returnOutput` parameter
|
||||
* and otherwise stay on `lastRunOnly` for backward compatibility.
|
||||
* Sub-workflows without an `Execute Workflow Trigger` keep the legacy output too.
|
||||
* See n8n-io/n8n#9989
|
||||
*/
|
||||
export function triggerReturnsLastRunOnly(nodes: INode[]): boolean {
|
||||
const trigger = nodes.find((node) => node.type === 'n8n-nodes-base.executeWorkflowTrigger');
|
||||
const triggerVersion = trigger?.typeVersion ?? 1;
|
||||
return triggerVersion < 1.2 && trigger?.parameters?.returnOutput !== 'allRuns';
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the items the parent workflow gets back from the sub-workflow's last node.
|
||||
* By default, items from every run of the terminal node are concatenated per output branch.
|
||||
* The trigger declares its preference.
|
||||
* The caller can additionally force the legacy single-run output via `returnLastRunOnly`
|
||||
* (used by LangChain tool/retriever callers that need a single-answer output).
|
||||
* Pinned data on the last node always wins in manual mode.
|
||||
* See n8n-io/n8n#9989.
|
||||
*/
|
||||
export function buildSubWorkflowOutput(
|
||||
data: IRun,
|
||||
workflowNodes: INode[],
|
||||
callerReturnsLastRunOnly: boolean,
|
||||
): Array<INodeExecutionData[] | null> {
|
||||
const lastRunOnly = callerReturnsLastRunOnly || triggerReturnsLastRunOnly(workflowNodes);
|
||||
const runs = WorkflowHelpers.getLastExecutedNodeRuns(data);
|
||||
const { lastNodeExecuted, pinData = {} } = data.data.resultData;
|
||||
const manualPinDataOverride =
|
||||
data.mode === 'manual' &&
|
||||
lastNodeExecuted !== undefined &&
|
||||
pinData[lastNodeExecuted] !== undefined;
|
||||
|
||||
if (!lastRunOnly && runs.length > 0 && !manualPinDataOverride) {
|
||||
return mergeRunsPerBranch(runs);
|
||||
}
|
||||
return WorkflowHelpers.getLastExecutedNodeData(data)?.data?.main ?? [null];
|
||||
}
|
||||
|
||||
async function startExecution(
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
options: ExecuteWorkflowOptions,
|
||||
|
|
@ -460,10 +508,10 @@ async function startExecution(
|
|||
// Workflow did finish successfully
|
||||
|
||||
activeExecutions.finalizeExecution(executionId, data);
|
||||
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
|
||||
|
||||
return {
|
||||
executionId,
|
||||
data: returnData!.data!.main,
|
||||
data: buildSubWorkflowOutput(data, workflowData.nodes, options.returnLastRunOnly ?? false),
|
||||
waitTill: data.waitTill,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,9 +55,21 @@ export function validatePinDataSize(workflow: IWorkflowBase): void {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the data of the last executed node
|
||||
* All runs of the last executed node, ordered by `executionIndex` (raw, no pinData substitution).
|
||||
*/
|
||||
export function getDataLastExecutedNodeData(inputData: IRun): ITaskData | undefined {
|
||||
export function getLastExecutedNodeRuns(inputData: IRun): ITaskData[] {
|
||||
const { runData, lastNodeExecuted } = inputData.data.resultData;
|
||||
if (lastNodeExecuted === undefined) {
|
||||
return [];
|
||||
}
|
||||
const runs = runData[lastNodeExecuted];
|
||||
return runs?.toSorted((a, b) => (a.executionIndex ?? 0) - (b.executionIndex ?? 0)) ?? [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Final-run output of the last executed node, with pinData substituted in manual mode.
|
||||
*/
|
||||
export function getLastExecutedNodeData(inputData: IRun): ITaskData | undefined {
|
||||
const { runData, lastNodeExecuted } = inputData.data.resultData;
|
||||
const pinData = inputData.data.resultData.pinData ?? {};
|
||||
|
||||
|
|
@ -411,7 +423,7 @@ export async function updateParentExecutionWithChildResults(
|
|||
parentExecutionId: string,
|
||||
subworkflowResults: IRun,
|
||||
): Promise<void> {
|
||||
const lastExecutedNodeData = getDataLastExecutedNodeData(subworkflowResults);
|
||||
const lastExecutedNodeData = getLastExecutedNodeData(subworkflowResults);
|
||||
if (!lastExecutedNodeData?.data) return;
|
||||
const parent = await executionRepository.findSingleExecution(parentExecutionId, {
|
||||
includeData: true,
|
||||
|
|
|
|||
|
|
@ -122,6 +122,7 @@ export class BaseExecuteContext extends NodeExecutionContext {
|
|||
doNotWaitToFinish?: boolean;
|
||||
parentExecution?: RelatedExecution;
|
||||
executionMode?: WorkflowExecuteMode;
|
||||
returnLastRunOnly?: boolean;
|
||||
},
|
||||
): Promise<ExecuteWorkflowData> {
|
||||
if (options?.parentExecution) {
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ export class ExecuteWorkflowTrigger implements INodeType {
|
|||
icon: 'node:sub-workflow-trigger',
|
||||
iconColor: 'black',
|
||||
group: ['trigger'],
|
||||
version: [1, 1.1],
|
||||
version: [1, 1.1, 1.2],
|
||||
description:
|
||||
'Helpers for calling other n8n workflows. Used for designing modular, microservice-like workflows.',
|
||||
eventTriggerDescription: '',
|
||||
|
|
@ -191,6 +191,32 @@ export class ExecuteWorkflowTrigger implements INodeType {
|
|||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
displayName: 'Items to Return',
|
||||
name: 'returnOutput',
|
||||
type: 'options',
|
||||
noDataExpression: true,
|
||||
default: 'lastRunOnly',
|
||||
description:
|
||||
'Choose what to send back when the last node ran multiple times (for example, after a Loop Over Items)',
|
||||
options: [
|
||||
{
|
||||
name: 'All Items From Every Run',
|
||||
value: 'allRuns',
|
||||
description: 'Send every item the last node produced, across all its runs',
|
||||
},
|
||||
{
|
||||
name: 'Items From the Last Run Only',
|
||||
value: 'lastRunOnly',
|
||||
description: "Send only the items from the last node's final run",
|
||||
},
|
||||
],
|
||||
displayOptions: {
|
||||
show: {
|
||||
'@version': [{ _cnd: { lt: 1.2 } }],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ export * from './execution-context';
|
|||
export * from './execution-context-establishment-hooks';
|
||||
export * from './global-state';
|
||||
export * from './interfaces';
|
||||
export * from './sub-workflow-output';
|
||||
export * from './run-execution-data-factory';
|
||||
export * from './message-event-bus';
|
||||
export * from './execution-status';
|
||||
|
|
|
|||
|
|
@ -1094,6 +1094,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
|
|||
doNotWaitToFinish?: boolean;
|
||||
parentExecution?: RelatedExecution;
|
||||
executionMode?: WorkflowExecuteMode;
|
||||
returnLastRunOnly?: boolean; // Forces the caller to receive only the items from the terminal node's final run.
|
||||
},
|
||||
): Promise<ExecuteWorkflowData>;
|
||||
executeAgent(
|
||||
|
|
@ -1963,6 +1964,7 @@ export interface ITriggerResponse {
|
|||
|
||||
export interface ExecuteWorkflowData {
|
||||
executionId: string;
|
||||
/** Terminal node output: items from every run concatenated per output branch, unless the caller sets `returnLastRunOnly`. */
|
||||
data: Array<INodeExecutionData[] | null>;
|
||||
waitTill?: Date | null;
|
||||
}
|
||||
|
|
@ -3156,6 +3158,7 @@ export interface ExecuteWorkflowOptions {
|
|||
doNotWaitToFinish?: boolean;
|
||||
parentExecution?: RelatedExecution;
|
||||
executionMode?: WorkflowExecuteMode;
|
||||
returnLastRunOnly?: boolean; // Forces the caller to receive only the items from the terminal node's final run.
|
||||
}
|
||||
|
||||
export type AiEvent =
|
||||
|
|
|
|||
11
packages/workflow/src/sub-workflow-output.ts
Normal file
11
packages/workflow/src/sub-workflow-output.ts
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
import type { INodeExecutionData, ITaskData } from './interfaces';
|
||||
|
||||
/**
|
||||
* For each output branch, concatenate items from every run in the order they were produced.
|
||||
*/
|
||||
export function mergeRunsPerBranch(runs: ITaskData[]): Array<INodeExecutionData[] | null> {
|
||||
const branchCount = runs.reduce((max, run) => Math.max(max, run.data?.main?.length ?? 0), 0);
|
||||
return Array.from({ length: branchCount }, (_, branch) =>
|
||||
runs.flatMap((run) => run.data?.main?.[branch] ?? []),
|
||||
);
|
||||
}
|
||||
65
packages/workflow/test/sub-workflow-output.test.ts
Normal file
65
packages/workflow/test/sub-workflow-output.test.ts
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
import type { ITaskData } from '../src/interfaces';
|
||||
import { mergeRunsPerBranch } from '../src/sub-workflow-output';
|
||||
|
||||
function buildRun(outputBranches: { json: object }[][]): ITaskData {
|
||||
return {
|
||||
data: {
|
||||
main: outputBranches,
|
||||
},
|
||||
} as unknown as ITaskData;
|
||||
}
|
||||
|
||||
describe('mergeRunsPerBranch', () => {
|
||||
it('returns an empty array for no runs', () => {
|
||||
expect(mergeRunsPerBranch([])).toEqual([]);
|
||||
});
|
||||
|
||||
it('returns a single run unchanged', () => {
|
||||
const singleRun = buildRun([[{ json: { id: 1 } }, { json: { id: 2 } }]]);
|
||||
const singleRunUnchanged = [[{ json: { id: 1 } }, { json: { id: 2 } }]];
|
||||
|
||||
expect(mergeRunsPerBranch([singleRun])).toEqual(singleRunUnchanged);
|
||||
});
|
||||
|
||||
it('concatenates items across runs on the single main branch', () => {
|
||||
const firstRun = buildRun([[{ json: { id: 0 } }]]);
|
||||
const secondRun = buildRun([[{ json: { id: 1 } }, { json: { id: 2 } }]]);
|
||||
const thirdRun = buildRun([[{ json: { id: 3 } }]]);
|
||||
|
||||
const allItemsConcatenatedOnOneBranch = [
|
||||
[{ json: { id: 0 } }, { json: { id: 1 } }, { json: { id: 2 } }, { json: { id: 3 } }],
|
||||
];
|
||||
|
||||
expect(mergeRunsPerBranch([firstRun, secondRun, thirdRun])).toEqual(
|
||||
allItemsConcatenatedOnOneBranch,
|
||||
);
|
||||
});
|
||||
|
||||
it('preserves multi-output shape and concatenates per branch', () => {
|
||||
const firstRun = buildRun([[{ json: { primary: 0 } }], [{ json: { secondary: 0 } }]]);
|
||||
const secondRun = buildRun([[{ json: { primary: 1 } }], [{ json: { secondary: 1 } }]]);
|
||||
|
||||
const mergedPrimaryBranch = [{ json: { primary: 0 } }, { json: { primary: 1 } }];
|
||||
const mergedSecondaryBranch = [{ json: { secondary: 0 } }, { json: { secondary: 1 } }];
|
||||
|
||||
expect(mergeRunsPerBranch([firstRun, secondRun])).toEqual([
|
||||
mergedPrimaryBranch,
|
||||
mergedSecondaryBranch,
|
||||
]);
|
||||
});
|
||||
|
||||
it('tolerates missing branches across runs', () => {
|
||||
const runWithPrimaryBranchOnly = buildRun([[{ json: { primary: 0 } }]]);
|
||||
const runWithBothBranches = buildRun([
|
||||
[{ json: { primary: 1 } }],
|
||||
[{ json: { secondary: 1 } }],
|
||||
]);
|
||||
const mergedPrimaryBranch = [{ json: { primary: 0 } }, { json: { primary: 1 } }];
|
||||
const secondaryBranchFromTheOnlyRunThatProducedIt = [{ json: { secondary: 1 } }];
|
||||
|
||||
expect(mergeRunsPerBranch([runWithPrimaryBranchOnly, runWithBothBranches])).toEqual([
|
||||
mergedPrimaryBranch,
|
||||
secondaryBranchFromTheOnlyRunThatProducedIt,
|
||||
]);
|
||||
});
|
||||
});
|
||||
Loading…
Reference in New Issue
Block a user