refactor(core): Route getNodeLast and getNodeAll through typed-RPC dispatcher (#30825)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Danny Martini 2026-05-21 11:21:04 +02:00 committed by GitHub
parent 50dc050dc1
commit 6db810a266
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 431 additions and 52 deletions

View File

@ -143,3 +143,176 @@ describe("Typed RPC: $('Foo').first() routes via getNodeFirst", () => {
expect(result).toBeUndefined();
});
});
describe("Typed RPC: $('Foo').last() routes via getNodeLast", () => {
let evaluator: ExpressionEvaluator;
const caller = {};
beforeAll(async () => {
evaluator = new ExpressionEvaluator({
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
maxCodeCacheSize: 64,
});
await evaluator.initialize();
await evaluator.acquire(caller);
});
afterAll(async () => {
await evaluator.release(caller);
await evaluator.dispose();
});
it('returns the value of data.$(nodeName).last() and forwards args', () => {
const calls: Array<unknown[]> = [];
const data: Record<string, unknown> = {
$: (nodeName: string) => {
if (nodeName !== 'SourceNode') throw new Error(`unexpected node: ${nodeName}`);
return {
last: (...args: unknown[]) => {
calls.push(args);
return { json: { id: 99, name: 'Zelda' } };
},
};
},
};
const result = evaluator.evaluate("{{ $('SourceNode').last(1, 2) }}", data, caller) as Record<
string,
unknown
>;
expect(result).toEqual({ json: { id: 99, name: 'Zelda' } });
expect(calls).toEqual([[1, 2]]);
});
it('only invokes .last on the host proxy — never other methods', () => {
const invoked: string[] = [];
const data: Record<string, unknown> = {
$: (_nodeName: string) =>
new Proxy(
{},
{
get(_t, prop) {
if (typeof prop === 'symbol') return undefined;
invoked.push(prop);
if (prop === 'last') return () => ({ json: { ok: true } });
return () => {
throw new Error(`unexpected method invoked: ${prop}`);
};
},
},
),
};
evaluator.evaluate("{{ $('Foo').last() }}", data, caller);
expect(invoked).toEqual(['last']);
});
});
describe("Typed RPC: $('Foo').all() routes via getNodeAll", () => {
let evaluator: ExpressionEvaluator;
const caller = {};
beforeAll(async () => {
evaluator = new ExpressionEvaluator({
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
maxCodeCacheSize: 64,
});
await evaluator.initialize();
await evaluator.acquire(caller);
});
afterAll(async () => {
await evaluator.release(caller);
await evaluator.dispose();
});
it('returns the full array from data.$(nodeName).all()', () => {
const data: Record<string, unknown> = {
$: (_nodeName: string) => ({
all: () => [{ json: { id: 1 } }, { json: { id: 2 } }, { json: { id: 3 } }],
}),
};
const result = evaluator.evaluate("{{ $('Foo').all() }}", data, caller);
expect(result).toEqual([{ json: { id: 1 } }, { json: { id: 2 } }, { json: { id: 3 } }]);
});
it('only invokes .all on the host proxy — never other methods', () => {
const invoked: string[] = [];
const data: Record<string, unknown> = {
$: (_nodeName: string) =>
new Proxy(
{},
{
get(_t, prop) {
if (typeof prop === 'symbol') return undefined;
invoked.push(prop);
if (prop === 'all') return () => [];
return () => {
throw new Error(`unexpected method invoked: ${prop}`);
};
},
},
),
};
evaluator.evaluate("{{ $('Foo').all() }}", data, caller);
expect(invoked).toEqual(['all']);
});
});
describe("Typed RPC: $('Foo') proxy fallthrough and `in` checks", () => {
let evaluator: ExpressionEvaluator;
const caller = {};
beforeAll(async () => {
evaluator = new ExpressionEvaluator({
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
maxCodeCacheSize: 64,
});
await evaluator.initialize();
await evaluator.acquire(caller);
});
afterAll(async () => {
await evaluator.release(caller);
await evaluator.dispose();
});
it('non-RPC properties (`.params`) delegate to the lazy proxy', () => {
// `.params` on $('Foo') is not a typed RPC. The synthetic proxy's get
// trap should fall through to the underlying lazy proxy, which fetches
// the value from the host via getValueAtPath. Reading `params.value`
// (a primitive) exercises that path end-to-end.
const data: Record<string, unknown> = {
$: (_nodeName: string) => ({
params: { mode: 'manual' },
}),
};
const result = evaluator.evaluate("{{ $('Foo').params.mode }}", data, caller);
expect(result).toBe('manual');
});
it("'first', 'last', 'all' are reported by the synthetic proxy's `has` trap", () => {
// Tournament's variable polyfill compiles to `("x" in obj ? obj : global).x`,
// so the typed-RPC method names must answer `true` for `in` checks even
// though the inner target is `{}`. Without the `has` trap, the lookup
// would fall through to global and miss the typed-RPC routing.
const data: Record<string, unknown> = {
$: (_nodeName: string) => ({
first: () => ({ json: { ok: true } }),
last: () => ({ json: { ok: true } }),
all: () => [],
}),
};
expect(evaluator.evaluate("{{ 'first' in $('Foo') }}", data, caller)).toBe(true);
expect(evaluator.evaluate("{{ 'last' in $('Foo') }}", data, caller)).toBe(true);
expect(evaluator.evaluate("{{ 'all' in $('Foo') }}", data, caller)).toBe(true);
});
});

View File

@ -0,0 +1,132 @@
/**
* Schema-level unit tests for the typed-RPC bridge protocol.
*
* The integration-level tests in `__tests__/typed-rpc.test.ts` exercise
* routing through the isolate. This file tests `bridgeMessageSchema`
* directly: discriminator selection, `.strict()` enforcement, and the
* `int().nonnegative()` constraint on index fields.
*
* Anything that crosses the trust boundary into the host must parse
* successfully here; anything that fails to parse must not reach the
* dispatcher's `switch`.
*/
import { describe, it, expect } from 'vitest';
import { bridgeMessageSchema } from '../bridge-messages';
describe('bridgeMessageSchema', () => {
describe('discriminator', () => {
it('parses a valid getNodeFirst envelope', () => {
const parsed = bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
branchIndex: 0,
runIndex: 0,
});
expect(parsed.type).toBe('getNodeFirst');
});
it('parses a valid getNodeLast envelope', () => {
const parsed = bridgeMessageSchema.parse({ type: 'getNodeLast', nodeName: 'Foo' });
expect(parsed.type).toBe('getNodeLast');
});
it('parses a valid getNodeAll envelope', () => {
const parsed = bridgeMessageSchema.parse({ type: 'getNodeAll', nodeName: 'Foo' });
expect(parsed.type).toBe('getNodeAll');
});
it('rejects an unknown discriminator value', () => {
expect(() => bridgeMessageSchema.parse({ type: 'evalArbitrary', nodeName: 'Foo' })).toThrow();
});
it('rejects a missing discriminator', () => {
expect(() => bridgeMessageSchema.parse({ nodeName: 'Foo' })).toThrow();
});
});
describe('.strict() enforcement', () => {
it('rejects extra fields on a known schema', () => {
expect(() =>
bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
hijack: 'arbitrary',
}),
).toThrow();
});
});
describe('nodeName', () => {
it('rejects non-string nodeName', () => {
expect(() => bridgeMessageSchema.parse({ type: 'getNodeFirst', nodeName: 123 })).toThrow();
});
it('rejects missing nodeName', () => {
expect(() => bridgeMessageSchema.parse({ type: 'getNodeFirst' })).toThrow();
});
});
describe('branchIndex / runIndex', () => {
it('accepts non-negative integers', () => {
expect(() =>
bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
branchIndex: 0,
runIndex: 5,
}),
).not.toThrow();
});
it('rejects negative integers', () => {
expect(() =>
bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
branchIndex: -1,
}),
).toThrow();
});
it('rejects non-integer numbers', () => {
expect(() =>
bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
branchIndex: 1.5,
}),
).toThrow();
});
it('rejects NaN', () => {
expect(() =>
bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
branchIndex: NaN,
}),
).toThrow();
});
it('rejects Infinity', () => {
expect(() =>
bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
branchIndex: Infinity,
}),
).toThrow();
});
it('rejects string-encoded numbers', () => {
expect(() =>
bridgeMessageSchema.parse({
type: 'getNodeFirst',
nodeName: 'Foo',
branchIndex: '0',
}),
).toThrow();
});
});
});

View File

@ -32,6 +32,32 @@ export const getNodeFirstMessage = z
})
.strict();
/**
* `$('NodeName').last(branchIndex?, runIndex?)` fetch the last item of
* a named node's most recent execution data.
*/
export const getNodeLastMessage = z
.object({
type: z.literal('getNodeLast'),
nodeName: z.string(),
branchIndex: z.number().int().nonnegative().optional(),
runIndex: z.number().int().nonnegative().optional(),
})
.strict();
/**
* `$('NodeName').all(branchIndex?, runIndex?)` fetch every item of a
* named node's most recent execution data as an array.
*/
export const getNodeAllMessage = z
.object({
type: z.literal('getNodeAll'),
nodeName: z.string(),
branchIndex: z.number().int().nonnegative().optional(),
runIndex: z.number().int().nonnegative().optional(),
})
.strict();
/**
* The full set of messages the bridge will accept. Discriminator is `type`.
*
@ -39,6 +65,10 @@ export const getNodeFirstMessage = z
* silently ignored this catches typos in the runtime stubs and keeps the
* protocol surface tight.
*/
export const bridgeMessageSchema = z.discriminatedUnion('type', [getNodeFirstMessage]);
export const bridgeMessageSchema = z.discriminatedUnion('type', [
getNodeFirstMessage,
getNodeLastMessage,
getNodeAllMessage,
]);
export type BridgeMessage = z.infer<typeof bridgeMessageSchema>;

View File

@ -1,7 +1,7 @@
import type ivm from 'isolated-vm';
import { readFile } from 'node:fs/promises';
import * as path from 'node:path';
import type { RuntimeBridge, BridgeConfig, ExecuteOptions } from '../types';
import type { RuntimeBridge, BridgeConfig, ExecuteOptions, WorkflowData } from '../types';
import { DEFAULT_BRIDGE_CONFIG, TimeoutError, MemoryLimitError } from '../types';
import type { ErrorSentinel } from '../runtime/lazy-proxy';
import { bridgeMessageSchema, type BridgeMessage } from './bridge-messages';
@ -285,7 +285,7 @@ export class IsolatedVmBridge implements RuntimeBridge {
* @param data - Current workflow data to use for callback responses
* @private
*/
private createGetValueAtPathRef(data: Record<string, unknown>): ivm.Reference {
private createGetValueAtPathRef(data: WorkflowData): ivm.Reference {
return new (getIvm().Reference)((path: string[]) => {
try {
// Navigate to value
@ -357,7 +357,7 @@ export class IsolatedVmBridge implements RuntimeBridge {
* @param data - Current workflow data to use for callback responses
* @private
*/
private createGetArrayElementRef(data: Record<string, unknown>): ivm.Reference {
private createGetArrayElementRef(data: WorkflowData): ivm.Reference {
return new (getIvm().Reference)((path: string[], index: number) => {
try {
// Navigate to array
@ -422,7 +422,7 @@ export class IsolatedVmBridge implements RuntimeBridge {
* @param data - Current workflow data to use for callback responses
* @private
*/
private createCallFunctionAtPathRef(data: Record<string, unknown>): ivm.Reference {
private createCallFunctionAtPathRef(data: WorkflowData): ivm.Reference {
return new (getIvm().Reference)((path: string[], ...args: unknown[]) => {
try {
// Navigate to function, tracking parent to preserve `this` context
@ -477,16 +477,25 @@ export class IsolatedVmBridge implements RuntimeBridge {
* @param data - Current workflow data
* @private
*/
private createCallHostRef(data: Record<string, unknown>): ivm.Reference {
private createCallHostRef(data: WorkflowData): ivm.Reference {
return new (getIvm().Reference)((rawMsg: unknown) => {
try {
const msg = bridgeMessageSchema.parse(rawMsg);
switch (msg.type) {
case 'getNodeFirst':
return this.handleGetNodeFirst(msg, data);
case 'getNodeLast':
return this.handleGetNodeLast(msg, data);
case 'getNodeAll':
return this.handleGetNodeAll(msg, data);
default: {
const exhaustive: never = msg.type;
throw new Error(`Unhandled bridge message type: ${String(exhaustive)}`);
// Unreachable at runtime — zod rejects unknown `type` values
// before the switch. The `never` assignment is the compile-time
// guard: a new schema added to `bridgeMessageSchema` without a
// matching case here becomes a type error.
const exhaustive: never = msg;
void exhaustive;
throw new Error('Unhandled bridge message');
}
}
} catch (err) {
@ -496,38 +505,40 @@ export class IsolatedVmBridge implements RuntimeBridge {
}
/**
* Handler for `getNodeFirst` fetches the first item of a named node's
* most recent execution data.
* Handlers for the `$('Foo').{first,last,all}` typed RPCs.
*
* Note: this still invokes `data.$` host-side. Eliminating `data.$` as a
* host-callable entirely would require reaching the `WorkflowDataProxy`
* internals (e.g. `getNodeExecutionOrPinnedData`) rather than the public
* `$()` API. That's a follow-up; the security win here is "the isolate
* can only invoke `.first` via this validated RPC, not any other method,
* regardless of what `data.$` returns."
* Each handler reads a fixed literal property name off the host-side node
* proxy the isolate cannot influence which property is dereferenced.
* Eliminating `data.$` as a host-callable entirely would require reaching
* the `WorkflowDataProxy` internals (e.g. `getNodeExecutionOrPinnedData`)
* rather than the public `$()` API; that's a follow-up.
*
* `data.$` is a host-wired function (`WorkflowDataProxy`'s `$`). If it
* ever isn't, optional chaining short-circuits to `undefined` the same
* observable result the runtime's `E()` handler produces from any thrown
* error here.
*
* @private
*/
private handleGetNodeFirst(
msg: Extract<BridgeMessage, { type: 'getNodeFirst' }>,
data: Record<string, unknown>,
data: WorkflowData,
): unknown {
const dollarFn = data.$;
if (typeof dollarFn !== 'function') {
throw new Error('getNodeFirst: $ is not available in expression context');
}
return data.$?.(msg.nodeName)?.first?.(msg.branchIndex, msg.runIndex);
}
const nodeProxy = (dollarFn as (n: string) => unknown)(msg.nodeName);
if (!nodeProxy || typeof nodeProxy !== 'object') {
return undefined;
}
private handleGetNodeLast(
msg: Extract<BridgeMessage, { type: 'getNodeLast' }>,
data: WorkflowData,
): unknown {
return data.$?.(msg.nodeName)?.last?.(msg.branchIndex, msg.runIndex);
}
const firstFn = (nodeProxy as Record<string, unknown>).first;
if (typeof firstFn !== 'function') {
return undefined;
}
return (firstFn as (...a: unknown[]) => unknown).call(nodeProxy, msg.branchIndex, msg.runIndex);
private handleGetNodeAll(
msg: Extract<BridgeMessage, { type: 'getNodeAll' }>,
data: WorkflowData,
): unknown {
return data.$?.(msg.nodeName)?.all?.(msg.branchIndex, msg.runIndex);
}
/**
@ -549,7 +560,7 @@ export class IsolatedVmBridge implements RuntimeBridge {
* @returns Result of the expression
* @throws {Error} If bridge not initialized or execution fails
*/
execute(code: string, data: Record<string, unknown>, options?: ExecuteOptions): unknown {
execute(code: string, data: WorkflowData, options?: ExecuteOptions): unknown {
if (!this.initialized || !this.context) {
throw new Error('Bridge not initialized. Call initialize() first.');
}

View File

@ -54,6 +54,10 @@ interface BridgeCallback {
* are new schemas in `bridge/bridge-messages.ts` + new cases in the
* dispatcher switch. The name reflects what this is: a synchronous
* host RPC, not a postMessage-style async send.
*
* The bridge wires all four callbacks unconditionally before invoking
* `buildContext`, so the runtime treats them as present no defensive
* null/undefined checks at each call site.
*/
export interface BridgeCallbacks {
getValueAtPath: BridgeCallback;
@ -153,9 +157,8 @@ export function buildContext(
// The returned object is a Proxy whose `get` trap intercepts properties
// that have a typed RPC (e.g. `.first` → `getNodeFirst`) and routes them
// through the `callHost` envelope. Everything else (properties like
// `.params`, `.json`, and methods that don't yet have a typed RPC like
// `.last`, `.all`) is read from an underlying lazy proxy via explicit
// delegation.
// `.params`, `.json`, and methods that don't yet have a typed RPC) is
// read from an underlying lazy proxy via explicit delegation.
//
// Important: the synthetic Proxy's *target* is a plain `{}` rather than
// the lazy proxy itself. Nesting one Proxy inside another causes V8 to
@ -166,25 +169,35 @@ export function buildContext(
// the lazy proxy lives in closure and is only consulted on demand.
//
// As more typed RPCs are added, more cases land in this trap.
// The `has` trap mirrors the `get` trap for typed-RPC names so that
// tournament's `"first" in this.$('Foo')` check resolves true even though
// the inner target is empty.
target.$ = function (nodeName: string) {
const lazyProxy = createDeepLazyProxy(['$', nodeName], undefined, callbacks);
const sendNodeMethod = (type: 'getNodeFirst' | 'getNodeLast' | 'getNodeAll') => {
return (branchIndex?: number, runIndex?: number) => {
const result = callbacks.callHost.applySync(
null,
[{ type, nodeName, branchIndex, runIndex }],
{ arguments: { copy: true }, result: { copy: true } },
);
throwIfErrorSentinel(result);
return result;
};
};
return new Proxy({} as Record<string, unknown>, {
get(_emptyTarget, prop) {
if (prop === 'first') {
return (branchIndex?: number, runIndex?: number) => {
const result = callbacks.callHost.applySync(
null,
[{ type: 'getNodeFirst', nodeName, branchIndex, runIndex }],
{ arguments: { copy: true }, result: { copy: true } },
);
throwIfErrorSentinel(result);
return result;
};
}
if (prop === 'first') return sendNodeMethod('getNodeFirst');
if (prop === 'last') return sendNodeMethod('getNodeLast');
if (prop === 'all') return sendNodeMethod('getNodeAll');
// Everything else: delegate to the lazy proxy. The lazy proxy's
// own `get` trap handles caching, host fetching, and metadata.
return (lazyProxy as Record<string | symbol, unknown>)[prop];
},
has(_emptyTarget, prop) {
if (prop === 'first' || prop === 'last' || prop === 'all') return true;
return prop in (lazyProxy as Record<string | symbol, unknown>);
},
});
};

View File

@ -5,6 +5,8 @@
// Start here for CLI/backend (IsolatedVmBridge) or frontend (WebWorkerBridge).
// ============================================================================
import type { WorkflowData } from './evaluator';
/**
* Abstract interface for runtime bridges.
*
@ -32,7 +34,7 @@ export interface RuntimeBridge {
* Note: Synchronous for Node.js vm module (Slice 1).
* Will be async for isolated-vm (Slice 2).
*/
execute(code: string, data: Record<string, unknown>, options?: ExecuteOptions): unknown;
execute(code: string, data: WorkflowData, options?: ExecuteOptions): unknown;
/**
* Dispose of the isolated context and free resources.

View File

@ -101,12 +101,30 @@ export interface IExpressionEvaluator {
}
/**
* Workflow data proxy from WorkflowDataProxy.getDataProxy().
*
* For Slice 1: We pass this directly via VM context (simple pass-through).
* Later: Will implement deep lazy proxy for field-level data fetching.
* The methods on the per-node accessor returned by `data.$('NodeName')`.
* Mirrors the host-side `WorkflowDataProxy` `$()` return shape, restricted
* to the operations the typed-RPC handlers dispatch into. All optional
* the underlying proxy is dynamic and the handlers tolerate missing
* methods via optional chaining at the call site.
*/
export type WorkflowData = Record<string, unknown>;
export interface NodeProxy {
first?: (branchIndex?: number, runIndex?: number) => unknown;
last?: (branchIndex?: number, runIndex?: number) => unknown;
all?: (branchIndex?: number, runIndex?: number) => unknown;
}
/**
* Workflow data proxy from `WorkflowDataProxy.getDataProxy()`.
*
* `$` is the named typed-RPC accessor (`$('NodeName').first()` etc.) and is
* called directly from typed-RPC handlers. Everything else flows through
* the generic data-access primitives (`getValueAtPath`, `getArrayElement`),
* which read paths off the index signature without needing per-key types.
*/
export interface WorkflowData {
$?: (nodeName: string) => NodeProxy | null | undefined;
[key: string]: unknown;
}
/**
* Options for evaluate().