diff --git a/packages/@n8n/expression-runtime/src/__tests__/typed-rpc.test.ts b/packages/@n8n/expression-runtime/src/__tests__/typed-rpc.test.ts index 09eb310fb03..02b400b0a08 100644 --- a/packages/@n8n/expression-runtime/src/__tests__/typed-rpc.test.ts +++ b/packages/@n8n/expression-runtime/src/__tests__/typed-rpc.test.ts @@ -771,3 +771,68 @@ describe('Typed RPC: $evaluateExpression() routes via evaluateExpression', () => expect(result).toBeUndefined(); }); }); + +describe('Typed RPC: $getPairedItem() routes via getPairedItem', () => { + let evaluator: ExpressionEvaluator; + const caller = {}; + + beforeAll(async () => { + evaluator = new ExpressionEvaluator({ + createBridge: () => new IsolatedVmBridge({ timeout: 5000 }), + maxCodeCacheSize: 64, + }); + await evaluator.initialize(); + await evaluator.acquire(caller); + }); + + afterAll(async () => { + await evaluator.release(caller); + await evaluator.dispose(); + }); + + it('returns the value of data.$getPairedItem(...)', () => { + const data: Record = { + $getPairedItem: () => ({ json: { city: 'Prague' } }), + }; + + const result = evaluator.evaluate( + "{{ JSON.stringify($getPairedItem('source', { previousNode: 'source' }, { item: 0 })) }}", + data, + caller, + ); + expect(result).toBe(JSON.stringify({ json: { city: 'Prague' } })); + }); + + it('forwards destinationNodeName, incomingSourceData, initialPairedItem verbatim', () => { + const calls: Array = []; + const data: Record = { + $getPairedItem: (...args: unknown[]) => { + calls.push(args); + return 'ok'; + }, + }; + + evaluator.evaluate( + "{{ $getPairedItem('dest', { previousNode: 'src', previousNodeRun: 1 }, { item: 2, input: 0 }) }}", + data, + caller, + ); + evaluator.evaluate("{{ $getPairedItem('dest', null, { item: 0 }) }}", data, caller); + + expect(calls).toEqual([ + ['dest', { previousNode: 'src', previousNodeRun: 1 }, { item: 2, input: 0 }], + ['dest', null, { item: 0 }], + ]); + }); + + it('handles missing data.$getPairedItem gracefully (returns undefined)', () => { + const data: Record = {}; + + const result = evaluator.evaluate( + "{{ $getPairedItem('dest', null, { item: 0 }) }}", + data, + caller, + ); + expect(result).toBeUndefined(); + }); +}); diff --git a/packages/@n8n/expression-runtime/src/bridge/__tests__/bridge-messages.test.ts b/packages/@n8n/expression-runtime/src/bridge/__tests__/bridge-messages.test.ts index e7c9be68052..efa5c78bced 100644 --- a/packages/@n8n/expression-runtime/src/bridge/__tests__/bridge-messages.test.ts +++ b/packages/@n8n/expression-runtime/src/bridge/__tests__/bridge-messages.test.ts @@ -201,6 +201,92 @@ describe('bridgeMessageSchema', () => { }); }); + describe('getPairedItem', () => { + it('parses a minimal valid envelope (null source)', () => { + const parsed = bridgeMessageSchema.parse({ + type: 'getPairedItem', + destinationNodeName: 'Foo', + incomingSourceData: null, + initialPairedItem: { item: 0 }, + }); + expect(parsed.type).toBe('getPairedItem'); + }); + + it('parses a fully populated envelope (nested sourceOverwrite)', () => { + expect(() => + bridgeMessageSchema.parse({ + type: 'getPairedItem', + destinationNodeName: 'Foo', + incomingSourceData: { + previousNode: 'Src', + previousNodeOutput: 0, + previousNodeRun: 1, + }, + initialPairedItem: { + item: 2, + input: 0, + sourceOverwrite: { previousNode: 'Other' }, + }, + }), + ).not.toThrow(); + }); + + it('rejects missing destinationNodeName', () => { + expect(() => + bridgeMessageSchema.parse({ + type: 'getPairedItem', + incomingSourceData: null, + initialPairedItem: { item: 0 }, + }), + ).toThrow(); + }); + + it('rejects negative item index', () => { + expect(() => + bridgeMessageSchema.parse({ + type: 'getPairedItem', + destinationNodeName: 'Foo', + incomingSourceData: null, + initialPairedItem: { item: -1 }, + }), + ).toThrow(); + }); + + it('rejects extra fields on the envelope (.strict)', () => { + expect(() => + bridgeMessageSchema.parse({ + type: 'getPairedItem', + destinationNodeName: 'Foo', + incomingSourceData: null, + initialPairedItem: { item: 0 }, + usedMethodName: '$getPairedItem', + }), + ).toThrow(); + }); + + it('rejects extra fields on nested sourceData (.strict)', () => { + expect(() => + bridgeMessageSchema.parse({ + type: 'getPairedItem', + destinationNodeName: 'Foo', + incomingSourceData: { previousNode: 'Src', hijack: 'x' }, + initialPairedItem: { item: 0 }, + }), + ).toThrow(); + }); + + it('rejects extra fields on nested pairedItemData (.strict)', () => { + expect(() => + bridgeMessageSchema.parse({ + type: 'getPairedItem', + destinationNodeName: 'Foo', + incomingSourceData: null, + initialPairedItem: { item: 0, hijack: 'x' }, + }), + ).toThrow(); + }); + }); + describe('fromAi', () => { it('accepts a minimal envelope (type only)', () => { // `name` is optional in the schema so empty calls reach the host's diff --git a/packages/@n8n/expression-runtime/src/bridge/bridge-messages.ts b/packages/@n8n/expression-runtime/src/bridge/bridge-messages.ts index 9f264d2ce50..6b999a240ad 100644 --- a/packages/@n8n/expression-runtime/src/bridge/bridge-messages.ts +++ b/packages/@n8n/expression-runtime/src/bridge/bridge-messages.ts @@ -193,6 +193,55 @@ export const evaluateExpressionMessage = z }) .strict(); +/** + * `ISourceData` — the source record that accompanies a paired-item + * traversal step. Mirrors the host interface used by + * `WorkflowDataProxy.getPairedItem`. + */ +const sourceDataSchema = z + .object({ + previousNode: z.string(), + previousNodeOutput: z.number().int().nonnegative().optional(), + previousNodeRun: z.number().int().nonnegative().optional(), + }) + .strict(); + +/** + * `IPairedItemData` — one paired-item record. `sourceOverwrite` lets a + * node override the upstream source while the helper walks the ancestry + * chain; the field is optional and recurses through the same schema. + */ +const pairedItemDataSchema = z + .object({ + item: z.number().int().nonnegative(), + input: z.number().int().nonnegative().optional(), + sourceOverwrite: sourceDataSchema.optional(), + }) + .strict(); + +/** + * `$getPairedItem(destinationNodeName, incomingSourceData, initialPairedItem)` — + * traverse the paired-item ancestry chain back to the named upstream node + * and return the matching execution item. + * + * Two host-side fields are deliberately omitted from the schema: + * - `usedMethodName` defaults to `$getPairedItem` on the host; the isolate + * has no reason to spoof a different method name in the error path. + * - `nodeBeforeLast` is an internal recursion argument; only the host + * itself sets it during the recursive walk. + * + * `incomingSourceData` is nullable because the host's signature accepts + * `ISourceData | null` (and throws a paired-item-not-found error when null). + */ +export const getPairedItemMessage = z + .object({ + type: z.literal('getPairedItem'), + destinationNodeName: z.string(), + incomingSourceData: sourceDataSchema.nullable(), + initialPairedItem: pairedItemDataSchema, + }) + .strict(); + /** * The full set of messages the bridge will accept. Discriminator is `type`. * @@ -213,6 +262,7 @@ export const bridgeMessageSchema = z.discriminatedUnion('type', [ getNodeItemMatchingMessage, getNodeItemMessage, evaluateExpressionMessage, + getPairedItemMessage, ]); export type BridgeMessage = z.infer; diff --git a/packages/@n8n/expression-runtime/src/bridge/isolated-vm-bridge.ts b/packages/@n8n/expression-runtime/src/bridge/isolated-vm-bridge.ts index 1657154a56d..57a202a357c 100644 --- a/packages/@n8n/expression-runtime/src/bridge/isolated-vm-bridge.ts +++ b/packages/@n8n/expression-runtime/src/bridge/isolated-vm-bridge.ts @@ -506,6 +506,8 @@ export class IsolatedVmBridge implements RuntimeBridge { return this.handleGetNodeItem(msg, data); case 'evaluateExpression': return this.handleEvaluateExpression(msg, data); + case 'getPairedItem': + return this.handleGetPairedItem(msg, data); default: { // Unreachable at runtime — zod rejects unknown `type` values // before the switch. The `never` assignment is the compile-time @@ -676,6 +678,31 @@ export class IsolatedVmBridge implements RuntimeBridge { return data.$evaluateExpression?.(msg.expression, msg.itemIndex); } + /** + * Handler for `$getPairedItem(destinationNodeName, incomingSourceData, + * initialPairedItem)`. Forwards directly to the host binding, which + * walks the paired-item ancestry chain back to the named upstream node + * and returns the matching execution item. + * + * The two trailing host parameters — `usedMethodName` and + * `nodeBeforeLast` — are deliberately not part of the wire protocol: + * the host's default for `usedMethodName` is already `$getPairedItem`, + * and `nodeBeforeLast` is an internal recursion argument the host sets + * during traversal. + * + * @private + */ + private handleGetPairedItem( + msg: Extract, + data: WorkflowData, + ): unknown { + return data.$getPairedItem?.( + msg.destinationNodeName, + msg.incomingSourceData, + msg.initialPairedItem, + ); + } + /** * Execute JavaScript code in the isolated context. * diff --git a/packages/@n8n/expression-runtime/src/runtime/context.ts b/packages/@n8n/expression-runtime/src/runtime/context.ts index bf5affdd57f..abc15b0530d 100644 --- a/packages/@n8n/expression-runtime/src/runtime/context.ts +++ b/packages/@n8n/expression-runtime/src/runtime/context.ts @@ -347,6 +347,32 @@ export function buildContext( return result; }; + // $getPairedItem — walks the paired-item ancestry chain back to the + // named upstream node. The host validates the structural shape of + // `incomingSourceData` and `initialPairedItem` via the typed-RPC + // schema; bad input surfaces as a schema-parse error sentinel rather + // than a host throw, keeping the protocol surface tight. + target.$getPairedItem = ( + destinationNodeName: string, + incomingSourceData: unknown, + initialPairedItem: unknown, + ) => { + const result = callbacks.callHost.applySync( + null, + [ + { + type: 'getPairedItem', + destinationNodeName, + incomingSourceData, + initialPairedItem, + }, + ], + { arguments: { copy: true }, result: { copy: true } }, + ); + throwIfErrorSentinel(result); + return result; + }; + // ------------------------------------------------------------------------- // Resolve an unknown key from the host. Called by the proxy's has/get traps // for keys not already on the target. The resolved value is cached on target diff --git a/packages/@n8n/expression-runtime/src/types/evaluator.ts b/packages/@n8n/expression-runtime/src/types/evaluator.ts index 31f10c19391..33a7f3dab13 100644 --- a/packages/@n8n/expression-runtime/src/types/evaluator.ts +++ b/packages/@n8n/expression-runtime/src/types/evaluator.ts @@ -167,6 +167,27 @@ export type FromAi = ( defaultValue?: unknown, ) => unknown; +/** + * Source data describing where an item came from upstream. Mirrors the + * `ISourceData` interface from `n8n-workflow` without taking a runtime + * dependency on it. + */ +export interface SourceData { + previousNode: string; + previousNodeOutput?: number; + previousNodeRun?: number; +} + +/** + * Paired-item descriptor. Mirrors the `IPairedItemData` interface from + * `n8n-workflow` without taking a runtime dependency on it. + */ +export interface PairedItemData { + item: number; + input?: number; + sourceOverwrite?: SourceData; +} + export interface WorkflowData { $?: (nodeName: string) => NodeProxy | null | undefined; $input?: InputProxy; @@ -175,6 +196,11 @@ export interface WorkflowData { $fromAi?: FromAi; $fromai?: FromAi; $evaluateExpression?: (expression: string, itemIndex?: number) => unknown; + $getPairedItem?: ( + destinationNodeName: string, + incomingSourceData: SourceData | null, + initialPairedItem: PairedItemData, + ) => unknown; [key: string]: unknown; } diff --git a/packages/workflow/test/expression.test.ts b/packages/workflow/test/expression.test.ts index daba861f29c..debd62369fa 100644 --- a/packages/workflow/test/expression.test.ts +++ b/packages/workflow/test/expression.test.ts @@ -993,6 +993,103 @@ describe('Expression', () => { }); }); + describe('$getPairedItem through expression engine (engine parity)', () => { + const nodeTypes = Helpers.NodeTypes(); + + const workflow = new Workflow({ + id: 'test-get-paired-item', + name: 'Test', + nodes: [ + { + id: 'source-id', + name: 'source', + type: 'n8n-nodes-base.set', + typeVersion: 1, + position: [0, 0], + parameters: {}, + }, + { + id: 'consumer-id', + name: 'consumer', + type: 'n8n-nodes-base.set', + typeVersion: 1, + position: [200, 0], + parameters: {}, + }, + ], + connections: { source: { main: [[{ node: 'consumer', type: 'main', index: 0 }]] } }, + active: false, + nodeTypes, + }); + + const runExecutionData = createRunExecutionData({ + resultData: { + runData: { + source: [ + { + startTime: 1, + executionTime: 1, + executionIndex: 0, + source: [], + data: { + main: [[{ json: { city: 'Prague' }, pairedItem: { item: 0 } }]], + }, + }, + ], + }, + }, + }); + + beforeAll(async () => { + await workflow.expression.acquireIsolate(); + }); + afterAll(async () => { + await workflow.expression.releaseIsolate(); + }); + + const evaluate = (expr: string) => + workflow.expression.getParameterValue( + expr, + runExecutionData, + 0, + 0, + 'consumer', + [{ json: { city: 'Prague' }, pairedItem: { item: 0 } }], + 'manual', + {}, + { + node: workflow.getNode('consumer')!, + data: {}, + source: { + main: [{ previousNode: 'source', previousNodeOutput: 0, previousNodeRun: 0 }], + }, + }, + ); + + it('resolves the upstream item via the ancestry chain (parity)', () => { + // Build the `incomingSourceData` literal inside the expression so the + // argument is constructed in-isolate under the VM engine. Both + // engines walk back from `consumer` to `source` and return the + // matching item. + const expr = + "={{ JSON.stringify($getPairedItem('source', { previousNode: 'source', previousNodeOutput: 0, previousNodeRun: 0 }, { item: 0 })) }}"; + expect(evaluate(expr)).toBe( + JSON.stringify({ json: { city: 'Prague' }, pairedItem: { item: 0 } }), + ); + }); + + it('throws when `incomingSourceData` is null (parity)', () => { + // Both engines surface the host's "paired item not found" + // ExpressionError. The legacy engine throws directly from + // `getPairedItem`. The VM engine sends the typed-RPC envelope with + // `incomingSourceData: null`; the host throws and the sentinel + // round-trips back into the isolate, where tournament's `E()` + // re-throws it. + const expr = "={{ $getPairedItem('source', null, { item: 0 }) }}"; + expect(() => evaluate(expr)).toThrow(ExpressionError); + }); + }); + describe('$evaluateExpression through expression engine (engine parity)', () => { const nodeTypes = Helpers.NodeTypes(); const workflow = new Workflow({