diff --git a/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts index 6cf045a50b4..7d83c7207a2 100644 --- a/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/retrievers/RetrieverWorkflow/RetrieverWorkflow.node.ts @@ -400,6 +400,7 @@ export class RetrieverWorkflow implements INodeType { executionId: workflowProxy.$execution.id, workflowId: workflowProxy.$workflow.id, }, + returnLastRunOnly: true, // Retrieved documents are the sub-workflow's final-run output, not its intermediate pipeline steps. }, ); } catch (error) { diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v1/ToolWorkflowV1.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v1/ToolWorkflowV1.node.ts index ecf2c69f917..4cbdec52721 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v1/ToolWorkflowV1.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v1/ToolWorkflowV1.node.ts @@ -123,6 +123,7 @@ export class ToolWorkflowV1 implements INodeType { executionId: workflowProxy.$execution.id, workflowId: workflowProxy.$workflow.id, }, + returnLastRunOnly: true, // The tool's answer is the sub-workflow's final-run output, not its internal multi-run computation. }); subExecutionId = receivedData.executionId; } catch (error) { diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts index 8946b7faf63..8f01b8fd5aa 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts @@ -259,6 +259,7 @@ export class WorkflowToolService { executionId: workflowProxy.$execution.id, workflowId: workflowProxy.$workflow.id, }, + returnLastRunOnly: true, // The tool's answer is the sub-workflow's final-run output, not its internal multi-run computation. }); // Set sub-workflow execution id so it can be used in other places this.subExecutionId = receivedData.executionId; diff --git a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts index c8e34acc6e2..fda30b8f2d0 100644 --- a/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts +++ b/packages/cli/src/__tests__/workflow-execute-additional-data.test.ts @@ -13,6 +13,7 @@ import type { IRun, INodeExecutionData, INode, + ITaskData, } from 'n8n-workflow'; import { createRunExecutionData } from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; @@ -40,6 +41,8 @@ import { getRunData, getDraftWorkflowData, getPublishedWorkflowData, + buildSubWorkflowOutput, + triggerReturnsLastRunOnly, } from '@/workflow-execute-additional-data'; import * as WorkflowHelpers from '@/workflow-helpers'; @@ -859,4 +862,141 @@ describe('WorkflowExecuteAdditionalData', () => { expect(ownershipService.getWorkflowProjectCached).not.toHaveBeenCalled(); }); }); + + describe('buildSubWorkflowOutput', () => { + const twoRunsOnTerminalNode: Record = { + [LAST_NODE_EXECUTED]: [ + { data: { main: [[{ json: { itemId: 0 } }]] } }, + { data: { main: [[{ json: { itemId: 1 } }, { json: { itemId: 2 } }]] } }, + ] as unknown as ITaskData[], + }; + + function buildRun(overrides: { + mode?: IRun['mode']; + runData?: Record; + pinData?: Record; + lastNodeExecuted?: string; + }): IRun { + return { + mode: overrides.mode ?? 'manual', + data: { + resultData: { + runData: overrides.runData ?? twoRunsOnTerminalNode, + pinData: overrides.pinData, + lastNodeExecuted: + overrides.lastNodeExecuted === undefined + ? LAST_NODE_EXECUTED + : overrides.lastNodeExecuted, + }, + }, + finished: true, + } as unknown as IRun; + } + + function trigger(typeVersion: number, returnOutput?: string): INode { + return mock({ + type: 'n8n-nodes-base.executeWorkflowTrigger', + typeVersion, + parameters: returnOutput === undefined ? {} : { returnOutput }, + }); + } + + const expectedItemsFromBothRunsConcatenated = [ + [{ json: { itemId: 0 } }, { json: { itemId: 1 } }, { json: { itemId: 2 } }], + ]; + const expectedItemsFromTheFinalRunOnly = [[{ json: { itemId: 1 } }, { json: { itemId: 2 } }]]; + + it('merges every run when the trigger is v1.2+', () => { + const output = buildSubWorkflowOutput(buildRun({ mode: 'trigger' }), [trigger(1.2)], false); + expect(output).toEqual(expectedItemsFromBothRunsConcatenated); + }); + + it('falls back to `lastRunOnly` for pre-1.2 triggers by default', () => { + const output = buildSubWorkflowOutput(buildRun({ mode: 'trigger' }), [trigger(1.1)], false); + expect(output).toEqual(expectedItemsFromTheFinalRunOnly); + }); + + it('honours a pre-1.2 trigger that opted in via `returnOutput`', () => { + const output = buildSubWorkflowOutput( + buildRun({ mode: 'trigger' }), + [trigger(1.1, 'allRuns')], + false, + ); + expect(output).toEqual(expectedItemsFromBothRunsConcatenated); + }); + + it('caller can force `lastRunOnly` even when the trigger declares `allRuns`', () => { + const output = buildSubWorkflowOutput(buildRun({ mode: 'trigger' }), [trigger(1.2)], true); + expect(output).toEqual(expectedItemsFromTheFinalRunOnly); + }); + + describe('pinData substitution', () => { + it('ignores pinData when the sub-workflow is not running in manual mode', () => { + const output = buildSubWorkflowOutput( + buildRun({ + mode: 'trigger', + pinData: { [LAST_NODE_EXECUTED]: [{ pinned: true }] }, + }), + [trigger(1.2)], + false, + ); + expect(output).toEqual(expectedItemsFromBothRunsConcatenated); + }); + + it('substitutes pinData when manual mode, even on the merged-runs path', () => { + const output = buildSubWorkflowOutput( + buildRun({ + mode: 'manual', + pinData: { [LAST_NODE_EXECUTED]: [{ pinned: true }] }, + }), + [trigger(1.2)], + false, + ); + + const expectedPinnedItems = [[{ json: { pinned: true }, pairedItem: { item: 0 } }]]; + expect(output).toEqual(expectedPinnedItems); + }); + }); + + it('returns `[null]` when the sub-workflow recorded no run data', () => { + expect( + buildSubWorkflowOutput( + buildRun({ mode: 'trigger', runData: {}, lastNodeExecuted: undefined }), + [trigger(1.2)], + false, + ), + ).toEqual([null]); + }); + }); + + describe('triggerReturnsLastRunOnly', () => { + function trigger(typeVersion: number, returnOutput?: string): INode { + return mock({ + type: 'n8n-nodes-base.executeWorkflowTrigger', + typeVersion, + parameters: returnOutput === undefined ? {} : { returnOutput }, + }); + } + + it('returns true when there is no Execute Workflow Trigger', () => { + expect(triggerReturnsLastRunOnly([])).toBe(true); + expect(triggerReturnsLastRunOnly([mock({ type: 'n8n-nodes-base.set' })])).toBe(true); + }); + + it('defaults pre-1.2 triggers to `lastRunOnly` (backward compat)', () => { + expect(triggerReturnsLastRunOnly([trigger(1)])).toBe(true); + expect(triggerReturnsLastRunOnly([trigger(1.1)])).toBe(true); + }); + + it('honours a pre-1.2 trigger that opted in via `returnOutput`', () => { + expect(triggerReturnsLastRunOnly([trigger(1.1, 'allRuns')])).toBe(false); + expect(triggerReturnsLastRunOnly([trigger(1.1, 'lastRunOnly')])).toBe(true); + }); + + it('returns false on v1.2+ triggers (option deprecated, allRuns is the default)', () => { + expect(triggerReturnsLastRunOnly([trigger(1.2)])).toBe(false); + // Parameters on v1.2+ are ignored: the merged-runs behavior is the default. + expect(triggerReturnsLastRunOnly([trigger(1.2, 'lastRunOnly')])).toBe(false); + }); + }); }); diff --git a/packages/cli/src/__tests__/workflow-helpers.test.ts b/packages/cli/src/__tests__/workflow-helpers.test.ts index cde05542439..25670ee97a6 100644 --- a/packages/cli/src/__tests__/workflow-helpers.test.ts +++ b/packages/cli/src/__tests__/workflow-helpers.test.ts @@ -2,14 +2,13 @@ import { MAX_PINNED_DATA_SIZE, MAX_WORKFLOW_SIZE, MAX_EXPECTED_REQUEST_SIZE } fr import { mockInstance } from '@n8n/backend-test-utils'; import type { CredentialsEntity, Project, Variables } from '@n8n/db'; import { CredentialsRepository } from '@n8n/db'; -import type { ITaskData, IWorkflowBase, IWorkflowSettings } from 'n8n-workflow'; - -import type { IRun } from 'n8n-workflow'; +import type { IRun, ITaskData, IWorkflowBase, IWorkflowSettings } from 'n8n-workflow'; import { VariablesService } from '@/environments.ee/variables/variables.service.ee'; import { OwnershipService } from '@/services/ownership.service'; import { - getDataLastExecutedNodeData, + getLastExecutedNodeData, + getLastExecutedNodeRuns, getVariables, preserveInputOverride, removeDefaultValues, @@ -18,6 +17,7 @@ import { validatePinDataSize, validateWorkflowNodeGroups, } from '@/workflow-helpers'; +import { mock } from 'jest-mock-extended'; describe('workflow-helpers', () => { beforeAll(() => { @@ -513,7 +513,7 @@ describe('validatePinDataSize', () => { }); }); -describe('getDataLastExecutedNodeData', () => { +describe('getLastExecutedNodeData', () => { const lastNodeTaskData: ITaskData = { startTime: 0, executionIndex: 0, @@ -544,11 +544,71 @@ describe('getDataLastExecutedNodeData', () => { it('returns last node run data when pinData is null', () => { // Regression: destructure default `pinData = {}` only applies to undefined, // so a null value used to throw `Cannot read properties of null`. - expect(() => getDataLastExecutedNodeData(buildRun(null))).not.toThrow(); - expect(getDataLastExecutedNodeData(buildRun(null))).toBe(lastNodeTaskData); + expect(() => getLastExecutedNodeData(buildRun(null))).not.toThrow(); + expect(getLastExecutedNodeData(buildRun(null))).toBe(lastNodeTaskData); }); it('returns last node run data when pinData is undefined', () => { - expect(getDataLastExecutedNodeData(buildRun(undefined))).toBe(lastNodeTaskData); + expect(getLastExecutedNodeData(buildRun(undefined))).toBe(lastNodeTaskData); + }); +}); + +describe('getLastExecutedNodeRuns', () => { + function buildRun( + lastNodeExecuted: string | undefined, + runData: Record, + ): IRun { + return { + data: { + resultData: { + lastNodeExecuted, + runData, + }, + }, + } as unknown as IRun; + } + + it('returns an empty array when no last node executed is recorded', () => { + expect(getLastExecutedNodeRuns(buildRun(undefined, {}))).toEqual([]); + }); + + it('returns an empty array when the recorded last node has no run data', () => { + expect(getLastExecutedNodeRuns(buildRun('Last executed node', {}))).toEqual([]); + expect( + getLastExecutedNodeRuns(buildRun('Last executed node', { 'Last executed node': [] })), + ).toEqual([]); + }); + + it('returns every recorded run of the last executed node, in order', () => { + const runs = [ + mock({ executionIndex: 0, data: { main: [[{ json: { value: 0 } }]] } }), + mock({ executionIndex: 1, data: { main: [[{ json: { value: 1 } }]] } }), + mock({ executionIndex: 2, data: { main: [[{ json: { value: 2 } }]] } }), + ]; + expect( + getLastExecutedNodeRuns(buildRun('Last executed node', { 'Last executed node': runs })), + ).toEqual(runs); + }); + + it('sorts runs by executionIndex when recorded out of order', () => { + const run0 = mock({ executionIndex: 0, data: { main: [[{ json: { value: 0 } }]] } }); + const run1 = mock({ executionIndex: 1, data: { main: [[{ json: { value: 1 } }]] } }); + const run2 = mock({ executionIndex: 2, data: { main: [[{ json: { value: 2 } }]] } }); + const outOfOrderRuns = [run2, run0, run1]; + expect( + getLastExecutedNodeRuns( + buildRun('Last executed node', { 'Last executed node': outOfOrderRuns }), + ), + ).toEqual([run0, run1, run2]); + }); + + it('does not mutate the original runData array', () => { + const runs = [ + mock({ executionIndex: 2, data: { main: [[{ json: { value: 2 } }]] } }), + mock({ executionIndex: 0, data: { main: [[{ json: { value: 0 } }]] } }), + ]; + const snapshot = [...runs]; + getLastExecutedNodeRuns(buildRun('Last executed node', { 'Last executed node': runs })); + expect(runs).toEqual(snapshot); }); }); diff --git a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts index 478c5c517ab..c7944c89de7 100644 --- a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts +++ b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts @@ -1,15 +1,15 @@ import { Logger } from '@n8n/backend-common'; -import { ExecutionRepository, UserRepository } from '@n8n/db'; import type { User } from '@n8n/db'; +import { ExecutionRepository, UserRepository } from '@n8n/db'; import { LifecycleMetadata } from '@n8n/decorators'; import { Container, Service } from '@n8n/di'; import { stringify } from 'flatted'; import { BinaryDataService, ErrorReporter, + ExecutionLifecycleHooks, FileLocation, InstanceSettings, - ExecutionLifecycleHooks, } from 'n8n-core'; import type { ExecutionStatus, @@ -17,9 +17,9 @@ import type { IRunData, IRunExecutionData, IWorkflowBase, + IWorkflowExecutionDataProcess, RelatedExecution, WorkflowExecuteMode, - IWorkflowExecutionDataProcess, } from 'n8n-workflow'; import { EventService } from '@/events/event.service'; @@ -31,7 +31,7 @@ import { Push } from '@/push'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { isWorkflowIdValid } from '@/utils'; import { getItemCountByConnectionType } from '@/utils/get-item-count-by-connection-type'; -import { getDataLastExecutedNodeData } from '@/workflow-helpers'; +import { getLastExecutedNodeData } from '@/workflow-helpers'; import { WorkflowHookContextService } from '@/workflow-hook-context.service'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; @@ -495,7 +495,7 @@ async function duplicateBinaryDataToParent( parentExecution: RelatedExecution, binaryDataService: BinaryDataService, ) { - const outputData = getDataLastExecutedNodeData(fullRunData); + const outputData = getLastExecutedNodeData(fullRunData); if (outputData?.data?.main) { const duplicatedData = await binaryDataService.duplicateBinaryData( FileLocation.ofExecution(parentExecution.workflowId, parentExecution.executionId), diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index 03b2e60309c..c4aca1723e0 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -871,7 +871,7 @@ export async function executeWebhook( runData.data.resultData.pinData = pinData; } - const lastNodeTaskData = WorkflowHelpers.getDataLastExecutedNodeData(runData); + const lastNodeTaskData = WorkflowHelpers.getLastExecutedNodeData(runData); if (runData.data.resultData.error || lastNodeTaskData?.error !== undefined) { if (!didSendResponse) { responseCallback(null, { diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 2374dae7b66..36380a60364 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -9,7 +9,12 @@ import { ExecutionRepository, WorkflowRepository } from '@n8n/db'; import { Container } from '@n8n/di'; import type { ServiceIdentifier } from '@n8n/di'; import { ExternalSecretsProxy, WorkflowExecute } from 'n8n-core'; -import { UnexpectedError, Workflow, createRunExecutionData } from 'n8n-workflow'; +import { + UnexpectedError, + Workflow, + createRunExecutionData, + mergeRunsPerBranch, +} from 'n8n-workflow'; import type { AiEvent, IDataObject, @@ -32,6 +37,7 @@ import type { ExecuteWorkflowData, ExecuteAgentData, RelatedExecution, + IRun, IRunExecutionData, } from 'n8n-workflow'; @@ -314,6 +320,48 @@ async function listAgents(userId: string): Promise ({ id: agent.id, name: agent.name })); } +/** + * Whether the sub-workflow's `Execute Workflow Trigger` wants the legacy single-run output. + * v1.2+ triggers always opt into the new merged-runs default; + * pre-1.2 triggers can opt in via the `returnOutput` parameter + * and otherwise stay on `lastRunOnly` for backward compatibility. + * Sub-workflows without an `Execute Workflow Trigger` keep the legacy output too. + * See n8n-io/n8n#9989 + */ +export function triggerReturnsLastRunOnly(nodes: INode[]): boolean { + const trigger = nodes.find((node) => node.type === 'n8n-nodes-base.executeWorkflowTrigger'); + const triggerVersion = trigger?.typeVersion ?? 1; + return triggerVersion < 1.2 && trigger?.parameters?.returnOutput !== 'allRuns'; +} + +/** + * Returns the items the parent workflow gets back from the sub-workflow's last node. + * By default, items from every run of the terminal node are concatenated per output branch. + * The trigger declares its preference. + * The caller can additionally force the legacy single-run output via `returnLastRunOnly` + * (used by LangChain tool/retriever callers that need a single-answer output). + * Pinned data on the last node always wins in manual mode. + * See n8n-io/n8n#9989. + */ +export function buildSubWorkflowOutput( + data: IRun, + workflowNodes: INode[], + callerReturnsLastRunOnly: boolean, +): Array { + const lastRunOnly = callerReturnsLastRunOnly || triggerReturnsLastRunOnly(workflowNodes); + const runs = WorkflowHelpers.getLastExecutedNodeRuns(data); + const { lastNodeExecuted, pinData = {} } = data.data.resultData; + const manualPinDataOverride = + data.mode === 'manual' && + lastNodeExecuted !== undefined && + pinData[lastNodeExecuted] !== undefined; + + if (!lastRunOnly && runs.length > 0 && !manualPinDataOverride) { + return mergeRunsPerBranch(runs); + } + return WorkflowHelpers.getLastExecutedNodeData(data)?.data?.main ?? [null]; +} + async function startExecution( additionalData: IWorkflowExecuteAdditionalData, options: ExecuteWorkflowOptions, @@ -460,10 +508,10 @@ async function startExecution( // Workflow did finish successfully activeExecutions.finalizeExecution(executionId, data); - const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); + return { executionId, - data: returnData!.data!.main, + data: buildSubWorkflowOutput(data, workflowData.nodes, options.returnLastRunOnly ?? false), waitTill: data.waitTill, }; } diff --git a/packages/cli/src/workflow-helpers.ts b/packages/cli/src/workflow-helpers.ts index 0c9a7e11f4a..8399e6e0f6f 100644 --- a/packages/cli/src/workflow-helpers.ts +++ b/packages/cli/src/workflow-helpers.ts @@ -55,9 +55,21 @@ export function validatePinDataSize(workflow: IWorkflowBase): void { } /** - * Returns the data of the last executed node + * All runs of the last executed node, ordered by `executionIndex` (raw, no pinData substitution). */ -export function getDataLastExecutedNodeData(inputData: IRun): ITaskData | undefined { +export function getLastExecutedNodeRuns(inputData: IRun): ITaskData[] { + const { runData, lastNodeExecuted } = inputData.data.resultData; + if (lastNodeExecuted === undefined) { + return []; + } + const runs = runData[lastNodeExecuted]; + return runs?.toSorted((a, b) => (a.executionIndex ?? 0) - (b.executionIndex ?? 0)) ?? []; +} + +/** + * Final-run output of the last executed node, with pinData substituted in manual mode. + */ +export function getLastExecutedNodeData(inputData: IRun): ITaskData | undefined { const { runData, lastNodeExecuted } = inputData.data.resultData; const pinData = inputData.data.resultData.pinData ?? {}; @@ -411,7 +423,7 @@ export async function updateParentExecutionWithChildResults( parentExecutionId: string, subworkflowResults: IRun, ): Promise { - const lastExecutedNodeData = getDataLastExecutedNodeData(subworkflowResults); + const lastExecutedNodeData = getLastExecutedNodeData(subworkflowResults); if (!lastExecutedNodeData?.data) return; const parent = await executionRepository.findSingleExecution(parentExecutionId, { includeData: true, diff --git a/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts b/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts index 795f88fe4de..13ca1ddbd36 100644 --- a/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts @@ -122,6 +122,7 @@ export class BaseExecuteContext extends NodeExecutionContext { doNotWaitToFinish?: boolean; parentExecution?: RelatedExecution; executionMode?: WorkflowExecuteMode; + returnLastRunOnly?: boolean; }, ): Promise { if (options?.parentExecution) { diff --git a/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflowTrigger/ExecuteWorkflowTrigger.node.ts b/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflowTrigger/ExecuteWorkflowTrigger.node.ts index dd74a7ac513..42531e3a369 100644 --- a/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflowTrigger/ExecuteWorkflowTrigger.node.ts +++ b/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflowTrigger/ExecuteWorkflowTrigger.node.ts @@ -27,7 +27,7 @@ export class ExecuteWorkflowTrigger implements INodeType { icon: 'node:sub-workflow-trigger', iconColor: 'black', group: ['trigger'], - version: [1, 1.1], + version: [1, 1.1, 1.2], description: 'Helpers for calling other n8n workflows. Used for designing modular, microservice-like workflows.', eventTriggerDescription: '', @@ -191,6 +191,32 @@ export class ExecuteWorkflowTrigger implements INodeType { }, ], }, + { + displayName: 'Items to Return', + name: 'returnOutput', + type: 'options', + noDataExpression: true, + default: 'lastRunOnly', + description: + 'Choose what to send back when the last node ran multiple times (for example, after a Loop Over Items)', + options: [ + { + name: 'All Items From Every Run', + value: 'allRuns', + description: 'Send every item the last node produced, across all its runs', + }, + { + name: 'Items From the Last Run Only', + value: 'lastRunOnly', + description: "Send only the items from the last node's final run", + }, + ], + displayOptions: { + show: { + '@version': [{ _cnd: { lt: 1.2 } }], + }, + }, + }, ], }; diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 5e05c6035dc..17fe08d7101 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -13,6 +13,7 @@ export * from './execution-context'; export * from './execution-context-establishment-hooks'; export * from './global-state'; export * from './interfaces'; +export * from './sub-workflow-output'; export * from './run-execution-data-factory'; export * from './message-event-bus'; export * from './execution-status'; diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index ef6c72b869a..6aa3c51fb05 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -1094,6 +1094,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & doNotWaitToFinish?: boolean; parentExecution?: RelatedExecution; executionMode?: WorkflowExecuteMode; + returnLastRunOnly?: boolean; // Forces the caller to receive only the items from the terminal node's final run. }, ): Promise; executeAgent( @@ -1963,6 +1964,7 @@ export interface ITriggerResponse { export interface ExecuteWorkflowData { executionId: string; + /** Terminal node output: items from every run concatenated per output branch, unless the caller sets `returnLastRunOnly`. */ data: Array; waitTill?: Date | null; } @@ -3156,6 +3158,7 @@ export interface ExecuteWorkflowOptions { doNotWaitToFinish?: boolean; parentExecution?: RelatedExecution; executionMode?: WorkflowExecuteMode; + returnLastRunOnly?: boolean; // Forces the caller to receive only the items from the terminal node's final run. } export type AiEvent = diff --git a/packages/workflow/src/sub-workflow-output.ts b/packages/workflow/src/sub-workflow-output.ts new file mode 100644 index 00000000000..d4c8651e06d --- /dev/null +++ b/packages/workflow/src/sub-workflow-output.ts @@ -0,0 +1,11 @@ +import type { INodeExecutionData, ITaskData } from './interfaces'; + +/** + * For each output branch, concatenate items from every run in the order they were produced. + */ +export function mergeRunsPerBranch(runs: ITaskData[]): Array { + const branchCount = runs.reduce((max, run) => Math.max(max, run.data?.main?.length ?? 0), 0); + return Array.from({ length: branchCount }, (_, branch) => + runs.flatMap((run) => run.data?.main?.[branch] ?? []), + ); +} diff --git a/packages/workflow/test/sub-workflow-output.test.ts b/packages/workflow/test/sub-workflow-output.test.ts new file mode 100644 index 00000000000..ade43644a1f --- /dev/null +++ b/packages/workflow/test/sub-workflow-output.test.ts @@ -0,0 +1,65 @@ +import type { ITaskData } from '../src/interfaces'; +import { mergeRunsPerBranch } from '../src/sub-workflow-output'; + +function buildRun(outputBranches: { json: object }[][]): ITaskData { + return { + data: { + main: outputBranches, + }, + } as unknown as ITaskData; +} + +describe('mergeRunsPerBranch', () => { + it('returns an empty array for no runs', () => { + expect(mergeRunsPerBranch([])).toEqual([]); + }); + + it('returns a single run unchanged', () => { + const singleRun = buildRun([[{ json: { id: 1 } }, { json: { id: 2 } }]]); + const singleRunUnchanged = [[{ json: { id: 1 } }, { json: { id: 2 } }]]; + + expect(mergeRunsPerBranch([singleRun])).toEqual(singleRunUnchanged); + }); + + it('concatenates items across runs on the single main branch', () => { + const firstRun = buildRun([[{ json: { id: 0 } }]]); + const secondRun = buildRun([[{ json: { id: 1 } }, { json: { id: 2 } }]]); + const thirdRun = buildRun([[{ json: { id: 3 } }]]); + + const allItemsConcatenatedOnOneBranch = [ + [{ json: { id: 0 } }, { json: { id: 1 } }, { json: { id: 2 } }, { json: { id: 3 } }], + ]; + + expect(mergeRunsPerBranch([firstRun, secondRun, thirdRun])).toEqual( + allItemsConcatenatedOnOneBranch, + ); + }); + + it('preserves multi-output shape and concatenates per branch', () => { + const firstRun = buildRun([[{ json: { primary: 0 } }], [{ json: { secondary: 0 } }]]); + const secondRun = buildRun([[{ json: { primary: 1 } }], [{ json: { secondary: 1 } }]]); + + const mergedPrimaryBranch = [{ json: { primary: 0 } }, { json: { primary: 1 } }]; + const mergedSecondaryBranch = [{ json: { secondary: 0 } }, { json: { secondary: 1 } }]; + + expect(mergeRunsPerBranch([firstRun, secondRun])).toEqual([ + mergedPrimaryBranch, + mergedSecondaryBranch, + ]); + }); + + it('tolerates missing branches across runs', () => { + const runWithPrimaryBranchOnly = buildRun([[{ json: { primary: 0 } }]]); + const runWithBothBranches = buildRun([ + [{ json: { primary: 1 } }], + [{ json: { secondary: 1 } }], + ]); + const mergedPrimaryBranch = [{ json: { primary: 0 } }, { json: { primary: 1 } }]; + const secondaryBranchFromTheOnlyRunThatProducedIt = [{ json: { secondary: 1 } }]; + + expect(mergeRunsPerBranch([runWithPrimaryBranchOnly, runWithBothBranches])).toEqual([ + mergedPrimaryBranch, + secondaryBranchFromTheOnlyRunThatProducedIt, + ]); + }); +});