mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-04 10:39:23 +02:00
refactor(core): Route $getPairedItem through typed-RPC dispatcher (backport to 1.x) (#31617)
Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
parent
c848191492
commit
91c46b57fb
|
|
@ -771,3 +771,68 @@ describe('Typed RPC: $evaluateExpression() routes via evaluateExpression', () =>
|
|||
expect(result).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Typed RPC: $getPairedItem() routes via getPairedItem', () => {
|
||||
let evaluator: ExpressionEvaluator;
|
||||
const caller = {};
|
||||
|
||||
beforeAll(async () => {
|
||||
evaluator = new ExpressionEvaluator({
|
||||
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
|
||||
maxCodeCacheSize: 64,
|
||||
});
|
||||
await evaluator.initialize();
|
||||
await evaluator.acquire(caller);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await evaluator.release(caller);
|
||||
await evaluator.dispose();
|
||||
});
|
||||
|
||||
it('returns the value of data.$getPairedItem(...)', () => {
|
||||
const data: Record<string, unknown> = {
|
||||
$getPairedItem: () => ({ json: { city: 'Prague' } }),
|
||||
};
|
||||
|
||||
const result = evaluator.evaluate(
|
||||
"{{ JSON.stringify($getPairedItem('source', { previousNode: 'source' }, { item: 0 })) }}",
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
expect(result).toBe(JSON.stringify({ json: { city: 'Prague' } }));
|
||||
});
|
||||
|
||||
it('forwards destinationNodeName, incomingSourceData, initialPairedItem verbatim', () => {
|
||||
const calls: Array<unknown[]> = [];
|
||||
const data: Record<string, unknown> = {
|
||||
$getPairedItem: (...args: unknown[]) => {
|
||||
calls.push(args);
|
||||
return 'ok';
|
||||
},
|
||||
};
|
||||
|
||||
evaluator.evaluate(
|
||||
"{{ $getPairedItem('dest', { previousNode: 'src', previousNodeRun: 1 }, { item: 2, input: 0 }) }}",
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
evaluator.evaluate("{{ $getPairedItem('dest', null, { item: 0 }) }}", data, caller);
|
||||
|
||||
expect(calls).toEqual([
|
||||
['dest', { previousNode: 'src', previousNodeRun: 1 }, { item: 2, input: 0 }],
|
||||
['dest', null, { item: 0 }],
|
||||
]);
|
||||
});
|
||||
|
||||
it('handles missing data.$getPairedItem gracefully (returns undefined)', () => {
|
||||
const data: Record<string, unknown> = {};
|
||||
|
||||
const result = evaluator.evaluate(
|
||||
"{{ $getPairedItem('dest', null, { item: 0 }) }}",
|
||||
data,
|
||||
caller,
|
||||
);
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -201,6 +201,92 @@ describe('bridgeMessageSchema', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('getPairedItem', () => {
|
||||
it('parses a minimal valid envelope (null source)', () => {
|
||||
const parsed = bridgeMessageSchema.parse({
|
||||
type: 'getPairedItem',
|
||||
destinationNodeName: 'Foo',
|
||||
incomingSourceData: null,
|
||||
initialPairedItem: { item: 0 },
|
||||
});
|
||||
expect(parsed.type).toBe('getPairedItem');
|
||||
});
|
||||
|
||||
it('parses a fully populated envelope (nested sourceOverwrite)', () => {
|
||||
expect(() =>
|
||||
bridgeMessageSchema.parse({
|
||||
type: 'getPairedItem',
|
||||
destinationNodeName: 'Foo',
|
||||
incomingSourceData: {
|
||||
previousNode: 'Src',
|
||||
previousNodeOutput: 0,
|
||||
previousNodeRun: 1,
|
||||
},
|
||||
initialPairedItem: {
|
||||
item: 2,
|
||||
input: 0,
|
||||
sourceOverwrite: { previousNode: 'Other' },
|
||||
},
|
||||
}),
|
||||
).not.toThrow();
|
||||
});
|
||||
|
||||
it('rejects missing destinationNodeName', () => {
|
||||
expect(() =>
|
||||
bridgeMessageSchema.parse({
|
||||
type: 'getPairedItem',
|
||||
incomingSourceData: null,
|
||||
initialPairedItem: { item: 0 },
|
||||
}),
|
||||
).toThrow();
|
||||
});
|
||||
|
||||
it('rejects negative item index', () => {
|
||||
expect(() =>
|
||||
bridgeMessageSchema.parse({
|
||||
type: 'getPairedItem',
|
||||
destinationNodeName: 'Foo',
|
||||
incomingSourceData: null,
|
||||
initialPairedItem: { item: -1 },
|
||||
}),
|
||||
).toThrow();
|
||||
});
|
||||
|
||||
it('rejects extra fields on the envelope (.strict)', () => {
|
||||
expect(() =>
|
||||
bridgeMessageSchema.parse({
|
||||
type: 'getPairedItem',
|
||||
destinationNodeName: 'Foo',
|
||||
incomingSourceData: null,
|
||||
initialPairedItem: { item: 0 },
|
||||
usedMethodName: '$getPairedItem',
|
||||
}),
|
||||
).toThrow();
|
||||
});
|
||||
|
||||
it('rejects extra fields on nested sourceData (.strict)', () => {
|
||||
expect(() =>
|
||||
bridgeMessageSchema.parse({
|
||||
type: 'getPairedItem',
|
||||
destinationNodeName: 'Foo',
|
||||
incomingSourceData: { previousNode: 'Src', hijack: 'x' },
|
||||
initialPairedItem: { item: 0 },
|
||||
}),
|
||||
).toThrow();
|
||||
});
|
||||
|
||||
it('rejects extra fields on nested pairedItemData (.strict)', () => {
|
||||
expect(() =>
|
||||
bridgeMessageSchema.parse({
|
||||
type: 'getPairedItem',
|
||||
destinationNodeName: 'Foo',
|
||||
incomingSourceData: null,
|
||||
initialPairedItem: { item: 0, hijack: 'x' },
|
||||
}),
|
||||
).toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe('fromAi', () => {
|
||||
it('accepts a minimal envelope (type only)', () => {
|
||||
// `name` is optional in the schema so empty calls reach the host's
|
||||
|
|
|
|||
|
|
@ -193,6 +193,55 @@ export const evaluateExpressionMessage = z
|
|||
})
|
||||
.strict();
|
||||
|
||||
/**
|
||||
* `ISourceData` — the source record that accompanies a paired-item
|
||||
* traversal step. Mirrors the host interface used by
|
||||
* `WorkflowDataProxy.getPairedItem`.
|
||||
*/
|
||||
const sourceDataSchema = z
|
||||
.object({
|
||||
previousNode: z.string(),
|
||||
previousNodeOutput: z.number().int().nonnegative().optional(),
|
||||
previousNodeRun: z.number().int().nonnegative().optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
/**
|
||||
* `IPairedItemData` — one paired-item record. `sourceOverwrite` lets a
|
||||
* node override the upstream source while the helper walks the ancestry
|
||||
* chain; the field is optional and recurses through the same schema.
|
||||
*/
|
||||
const pairedItemDataSchema = z
|
||||
.object({
|
||||
item: z.number().int().nonnegative(),
|
||||
input: z.number().int().nonnegative().optional(),
|
||||
sourceOverwrite: sourceDataSchema.optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
/**
|
||||
* `$getPairedItem(destinationNodeName, incomingSourceData, initialPairedItem)` —
|
||||
* traverse the paired-item ancestry chain back to the named upstream node
|
||||
* and return the matching execution item.
|
||||
*
|
||||
* Two host-side fields are deliberately omitted from the schema:
|
||||
* - `usedMethodName` defaults to `$getPairedItem` on the host; the isolate
|
||||
* has no reason to spoof a different method name in the error path.
|
||||
* - `nodeBeforeLast` is an internal recursion argument; only the host
|
||||
* itself sets it during the recursive walk.
|
||||
*
|
||||
* `incomingSourceData` is nullable because the host's signature accepts
|
||||
* `ISourceData | null` (and throws a paired-item-not-found error when null).
|
||||
*/
|
||||
export const getPairedItemMessage = z
|
||||
.object({
|
||||
type: z.literal('getPairedItem'),
|
||||
destinationNodeName: z.string(),
|
||||
incomingSourceData: sourceDataSchema.nullable(),
|
||||
initialPairedItem: pairedItemDataSchema,
|
||||
})
|
||||
.strict();
|
||||
|
||||
/**
|
||||
* The full set of messages the bridge will accept. Discriminator is `type`.
|
||||
*
|
||||
|
|
@ -213,6 +262,7 @@ export const bridgeMessageSchema = z.discriminatedUnion('type', [
|
|||
getNodeItemMatchingMessage,
|
||||
getNodeItemMessage,
|
||||
evaluateExpressionMessage,
|
||||
getPairedItemMessage,
|
||||
]);
|
||||
|
||||
export type BridgeMessage = z.infer<typeof bridgeMessageSchema>;
|
||||
|
|
|
|||
|
|
@ -506,6 +506,8 @@ export class IsolatedVmBridge implements RuntimeBridge {
|
|||
return this.handleGetNodeItem(msg, data);
|
||||
case 'evaluateExpression':
|
||||
return this.handleEvaluateExpression(msg, data);
|
||||
case 'getPairedItem':
|
||||
return this.handleGetPairedItem(msg, data);
|
||||
default: {
|
||||
// Unreachable at runtime — zod rejects unknown `type` values
|
||||
// before the switch. The `never` assignment is the compile-time
|
||||
|
|
@ -676,6 +678,31 @@ export class IsolatedVmBridge implements RuntimeBridge {
|
|||
return data.$evaluateExpression?.(msg.expression, msg.itemIndex);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for `$getPairedItem(destinationNodeName, incomingSourceData,
|
||||
* initialPairedItem)`. Forwards directly to the host binding, which
|
||||
* walks the paired-item ancestry chain back to the named upstream node
|
||||
* and returns the matching execution item.
|
||||
*
|
||||
* The two trailing host parameters — `usedMethodName` and
|
||||
* `nodeBeforeLast` — are deliberately not part of the wire protocol:
|
||||
* the host's default for `usedMethodName` is already `$getPairedItem`,
|
||||
* and `nodeBeforeLast` is an internal recursion argument the host sets
|
||||
* during traversal.
|
||||
*
|
||||
* @private
|
||||
*/
|
||||
private handleGetPairedItem(
|
||||
msg: Extract<BridgeMessage, { type: 'getPairedItem' }>,
|
||||
data: WorkflowData,
|
||||
): unknown {
|
||||
return data.$getPairedItem?.(
|
||||
msg.destinationNodeName,
|
||||
msg.incomingSourceData,
|
||||
msg.initialPairedItem,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute JavaScript code in the isolated context.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -347,6 +347,32 @@ export function buildContext(
|
|||
return result;
|
||||
};
|
||||
|
||||
// $getPairedItem — walks the paired-item ancestry chain back to the
|
||||
// named upstream node. The host validates the structural shape of
|
||||
// `incomingSourceData` and `initialPairedItem` via the typed-RPC
|
||||
// schema; bad input surfaces as a schema-parse error sentinel rather
|
||||
// than a host throw, keeping the protocol surface tight.
|
||||
target.$getPairedItem = (
|
||||
destinationNodeName: string,
|
||||
incomingSourceData: unknown,
|
||||
initialPairedItem: unknown,
|
||||
) => {
|
||||
const result = callbacks.callHost.applySync(
|
||||
null,
|
||||
[
|
||||
{
|
||||
type: 'getPairedItem',
|
||||
destinationNodeName,
|
||||
incomingSourceData,
|
||||
initialPairedItem,
|
||||
},
|
||||
],
|
||||
{ arguments: { copy: true }, result: { copy: true } },
|
||||
);
|
||||
throwIfErrorSentinel(result);
|
||||
return result;
|
||||
};
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Resolve an unknown key from the host. Called by the proxy's has/get traps
|
||||
// for keys not already on the target. The resolved value is cached on target
|
||||
|
|
|
|||
|
|
@ -167,6 +167,27 @@ export type FromAi = (
|
|||
defaultValue?: unknown,
|
||||
) => unknown;
|
||||
|
||||
/**
|
||||
* Source data describing where an item came from upstream. Mirrors the
|
||||
* `ISourceData` interface from `n8n-workflow` without taking a runtime
|
||||
* dependency on it.
|
||||
*/
|
||||
export interface SourceData {
|
||||
previousNode: string;
|
||||
previousNodeOutput?: number;
|
||||
previousNodeRun?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Paired-item descriptor. Mirrors the `IPairedItemData` interface from
|
||||
* `n8n-workflow` without taking a runtime dependency on it.
|
||||
*/
|
||||
export interface PairedItemData {
|
||||
item: number;
|
||||
input?: number;
|
||||
sourceOverwrite?: SourceData;
|
||||
}
|
||||
|
||||
export interface WorkflowData {
|
||||
$?: (nodeName: string) => NodeProxy | null | undefined;
|
||||
$input?: InputProxy;
|
||||
|
|
@ -175,6 +196,11 @@ export interface WorkflowData {
|
|||
$fromAi?: FromAi;
|
||||
$fromai?: FromAi;
|
||||
$evaluateExpression?: (expression: string, itemIndex?: number) => unknown;
|
||||
$getPairedItem?: (
|
||||
destinationNodeName: string,
|
||||
incomingSourceData: SourceData | null,
|
||||
initialPairedItem: PairedItemData,
|
||||
) => unknown;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -993,6 +993,103 @@ describe('Expression', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('$getPairedItem through expression engine (engine parity)', () => {
|
||||
const nodeTypes = Helpers.NodeTypes();
|
||||
|
||||
const workflow = new Workflow({
|
||||
id: 'test-get-paired-item',
|
||||
name: 'Test',
|
||||
nodes: [
|
||||
{
|
||||
id: 'source-id',
|
||||
name: 'source',
|
||||
type: 'n8n-nodes-base.set',
|
||||
typeVersion: 1,
|
||||
position: [0, 0],
|
||||
parameters: {},
|
||||
},
|
||||
{
|
||||
id: 'consumer-id',
|
||||
name: 'consumer',
|
||||
type: 'n8n-nodes-base.set',
|
||||
typeVersion: 1,
|
||||
position: [200, 0],
|
||||
parameters: {},
|
||||
},
|
||||
],
|
||||
connections: { source: { main: [[{ node: 'consumer', type: 'main', index: 0 }]] } },
|
||||
active: false,
|
||||
nodeTypes,
|
||||
});
|
||||
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
source: [
|
||||
{
|
||||
startTime: 1,
|
||||
executionTime: 1,
|
||||
executionIndex: 0,
|
||||
source: [],
|
||||
data: {
|
||||
main: [[{ json: { city: 'Prague' }, pairedItem: { item: 0 } }]],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
beforeAll(async () => {
|
||||
await workflow.expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await workflow.expression.releaseIsolate();
|
||||
});
|
||||
|
||||
const evaluate = (expr: string) =>
|
||||
workflow.expression.getParameterValue(
|
||||
expr,
|
||||
runExecutionData,
|
||||
0,
|
||||
0,
|
||||
'consumer',
|
||||
[{ json: { city: 'Prague' }, pairedItem: { item: 0 } }],
|
||||
'manual',
|
||||
{},
|
||||
{
|
||||
node: workflow.getNode('consumer')!,
|
||||
data: {},
|
||||
source: {
|
||||
main: [{ previousNode: 'source', previousNodeOutput: 0, previousNodeRun: 0 }],
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
it('resolves the upstream item via the ancestry chain (parity)', () => {
|
||||
// Build the `incomingSourceData` literal inside the expression so the
|
||||
// argument is constructed in-isolate under the VM engine. Both
|
||||
// engines walk back from `consumer` to `source` and return the
|
||||
// matching item.
|
||||
const expr =
|
||||
"={{ JSON.stringify($getPairedItem('source', { previousNode: 'source', previousNodeOutput: 0, previousNodeRun: 0 }, { item: 0 })) }}";
|
||||
expect(evaluate(expr)).toBe(
|
||||
JSON.stringify({ json: { city: 'Prague' }, pairedItem: { item: 0 } }),
|
||||
);
|
||||
});
|
||||
|
||||
it('throws when `incomingSourceData` is null (parity)', () => {
|
||||
// Both engines surface the host's "paired item not found"
|
||||
// ExpressionError. The legacy engine throws directly from
|
||||
// `getPairedItem`. The VM engine sends the typed-RPC envelope with
|
||||
// `incomingSourceData: null`; the host throws and the sentinel
|
||||
// round-trips back into the isolate, where tournament's `E()`
|
||||
// re-throws it.
|
||||
const expr = "={{ $getPairedItem('source', null, { item: 0 }) }}";
|
||||
expect(() => evaluate(expr)).toThrow(ExpressionError);
|
||||
});
|
||||
});
|
||||
|
||||
describe('$evaluateExpression through expression engine (engine parity)', () => {
|
||||
const nodeTypes = Helpers.NodeTypes();
|
||||
const workflow = new Workflow({
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user