refactor(core): Route $input methods through typed-RPC dispatcher (backport to 1.x) (#31573)

Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
n8n-assistant[bot] 2026-06-02 14:22:04 +00:00 committed by GitHub
parent 497bf6b6e1
commit 1ea4a271ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 255 additions and 15 deletions

View File

@ -316,3 +316,118 @@ describe("Typed RPC: $('Foo') proxy fallthrough and `in` checks", () => {
expect(evaluator.evaluate("{{ 'all' in $('Foo') }}", data, caller)).toBe(true);
});
});
describe('Typed RPC: $input.{first,last,all} route via getInput*', () => {
let evaluator: ExpressionEvaluator;
const caller = {};
beforeAll(async () => {
evaluator = new ExpressionEvaluator({
createBridge: () => new IsolatedVmBridge({ timeout: 5000 }),
maxCodeCacheSize: 64,
});
await evaluator.initialize();
await evaluator.acquire(caller);
});
afterAll(async () => {
await evaluator.release(caller);
await evaluator.dispose();
});
it('$input.first() returns the value of data.$input.first()', () => {
const data: Record<string, unknown> = {
$input: {
first: () => ({ json: { id: 1, name: 'first-item' } }),
},
};
const result = evaluator.evaluate('{{ $input.first() }}', data, caller);
expect(result).toEqual({ json: { id: 1, name: 'first-item' } });
});
it('$input.last() returns the value of data.$input.last()', () => {
const data: Record<string, unknown> = {
$input: {
last: () => ({ json: { id: 9, name: 'last-item' } }),
},
};
const result = evaluator.evaluate('{{ $input.last() }}', data, caller);
expect(result).toEqual({ json: { id: 9, name: 'last-item' } });
});
it('$input.all() returns the array from data.$input.all()', () => {
const data: Record<string, unknown> = {
$input: {
all: () => [{ json: { id: 1 } }, { json: { id: 2 } }],
},
};
const result = evaluator.evaluate('{{ $input.all() }}', data, caller);
expect(result).toEqual([{ json: { id: 1 } }, { json: { id: 2 } }]);
});
it('drops any arguments the isolate tries to pass to the host method', () => {
// The host's `WorkflowDataProxy` throws if `$input.first/last/all` is
// called with any arguments. The typed-RPC schemas have no fields
// besides `type`, so the in-isolate stub closes over a zero-arg
// invocation regardless of what the expression passed. Documenting:
// `$input.first('arg')` produces the same result as `$input.first()`
// because the host method is invoked with no arguments either way.
const args: unknown[][] = [];
const data: Record<string, unknown> = {
$input: {
first: (...received: unknown[]) => {
args.push(received);
return { json: { ok: true } };
},
},
};
evaluator.evaluate('{{ $input.first() }}', data, caller);
evaluator.evaluate("{{ $input.first('ignored') }}", data, caller);
evaluator.evaluate('{{ $input.first(1, 2, 3) }}', data, caller);
expect(args).toEqual([[], [], []]);
});
it('non-RPC properties (`.item`) still delegate to the lazy proxy (host getter)', () => {
// `.item` on $input is a host getter, not a typed RPC. The synthetic
// proxy should fall through to the lazy proxy which fetches via
// getValueAtPath — and the host's `.item` getter must be invoked on
// the host side. Defining `.item` as a real getter (instead of a
// plain property) proves the getter ran: the bridge can only reach
// it via host-side property access, which is what `getValueAtPath`
// does. If the routing had wrongly sent a typed RPC, the dispatcher
// would reject the unknown `type` and return undefined.
let getterInvocations = 0;
const data: Record<string, unknown> = {
$input: Object.defineProperty({} as Record<string, unknown>, 'item', {
get() {
getterInvocations += 1;
return { id: 42 };
},
enumerable: true,
}),
};
const result = evaluator.evaluate('{{ $input.item.id }}', data, caller);
expect(result).toBe(42);
expect(getterInvocations).toBeGreaterThan(0);
});
it("'first', 'last', 'all' are reported by $input's `has` trap", () => {
const data: Record<string, unknown> = {
$input: {
first: () => undefined,
last: () => undefined,
all: () => [],
},
};
expect(evaluator.evaluate("{{ 'first' in $input }}", data, caller)).toBe(true);
expect(evaluator.evaluate("{{ 'last' in $input }}", data, caller)).toBe(true);
expect(evaluator.evaluate("{{ 'all' in $input }}", data, caller)).toBe(true);
});
});

View File

@ -36,6 +36,14 @@ describe('bridgeMessageSchema', () => {
expect(parsed.type).toBe('getNodeAll');
});
it.each([['getInputFirst'], ['getInputLast'], ['getInputAll']] as const)(
'parses a valid %s envelope',
(type) => {
const parsed = bridgeMessageSchema.parse({ type });
expect(parsed.type).toBe(type);
},
);
it('rejects an unknown discriminator value', () => {
expect(() => bridgeMessageSchema.parse({ type: 'evalArbitrary', nodeName: 'Foo' })).toThrow();
});
@ -45,6 +53,16 @@ describe('bridgeMessageSchema', () => {
});
});
describe('getInput* — no extra fields allowed', () => {
it.each([['getInputFirst'], ['getInputLast'], ['getInputAll']] as const)(
'rejects %s with extra fields (.strict)',
(type) => {
expect(() => bridgeMessageSchema.parse({ type, nodeName: 'Foo' })).toThrow();
expect(() => bridgeMessageSchema.parse({ type, branchIndex: 0 })).toThrow();
},
);
});
describe('.strict() enforcement', () => {
it('rejects extra fields on a known schema', () => {
expect(() =>

View File

@ -58,6 +58,22 @@ export const getNodeAllMessage = z
})
.strict();
/**
* `$input.first()` fetch the first item of the current node's input.
* Host enforces zero arguments; the schema has no fields besides `type`.
*/
export const getInputFirstMessage = z.object({ type: z.literal('getInputFirst') }).strict();
/**
* `$input.last()` fetch the last item of the current node's input.
*/
export const getInputLastMessage = z.object({ type: z.literal('getInputLast') }).strict();
/**
* `$input.all()` fetch every item of the current node's input.
*/
export const getInputAllMessage = z.object({ type: z.literal('getInputAll') }).strict();
/**
* The full set of messages the bridge will accept. Discriminator is `type`.
*
@ -69,6 +85,9 @@ export const bridgeMessageSchema = z.discriminatedUnion('type', [
getNodeFirstMessage,
getNodeLastMessage,
getNodeAllMessage,
getInputFirstMessage,
getInputLastMessage,
getInputAllMessage,
]);
export type BridgeMessage = z.infer<typeof bridgeMessageSchema>;

View File

@ -488,6 +488,12 @@ export class IsolatedVmBridge implements RuntimeBridge {
return this.handleGetNodeLast(msg, data);
case 'getNodeAll':
return this.handleGetNodeAll(msg, data);
case 'getInputFirst':
return this.handleGetInputFirst(data);
case 'getInputLast':
return this.handleGetInputLast(data);
case 'getInputAll':
return this.handleGetInputAll(data);
default: {
// Unreachable at runtime — zod rejects unknown `type` values
// before the switch. The `never` assignment is the compile-time
@ -541,6 +547,29 @@ export class IsolatedVmBridge implements RuntimeBridge {
return data.$?.(msg.nodeName)?.all?.(msg.branchIndex, msg.runIndex);
}
/**
* Handlers for the `$input.{first,last,all}` typed RPCs.
*
* Each reads a fixed literal property name off `data.$input` (the host's
* `WorkflowDataProxy` input proxy). The host enforces zero arguments on
* these methods the schemas have no fields besides `type`, so the
* isolate cannot pass anything that would trigger the "should have no
* arguments" error path on the host side.
*
* @private
*/
private handleGetInputFirst(data: WorkflowData): unknown {
return data.$input?.first?.();
}
private handleGetInputLast(data: WorkflowData): unknown {
return data.$input?.last?.();
}
private handleGetInputAll(data: WorkflowData): unknown {
return data.$input?.all?.();
}
/**
* Execute JavaScript code in the isolated context.
*

View File

@ -43,13 +43,15 @@ describe('createDeepLazyProxy', () => {
mocks = createMockCallbacks();
});
// Helper to create proxy with current mocks
function proxy(basePath?: string[], knownKeys?: string[]) {
// Helper to create proxy with current mocks. Returns `any` so test
// assertions can freely index into the proxy's nested shape without
// `as unknown as ...` ceremony — the underlying proxy is dynamic data.
function proxy(basePath?: string[], knownKeys?: string[]): any {
const meta = knownKeys ? { kind: 'object' as const, keys: knownKeys } : undefined;
return createDeepLazyProxy(basePath, meta, mocks.callbacks);
}
function arrayProxy(basePath: string[], length: number) {
function arrayProxy(basePath: string[], length: number): any {
return createDeepLazyProxy(basePath, { kind: 'array' as const, length }, mocks.callbacks);
}

View File

@ -21,7 +21,7 @@ describe('isKeyOf', () => {
expect(isKeyOf(registry, 'valueOf')).toBe(false);
});
it('returns false for own keys that are not in the registry', () => {
it('returns false for string keys not in the registry', () => {
expect(isKeyOf(registry, 'unknown')).toBe(false);
});

View File

@ -36,6 +36,17 @@ const NODE_RPC_TYPES = {
} as const satisfies Record<string, BridgeMessage['type']>;
type NodeRpcType = (typeof NODE_RPC_TYPES)[keyof typeof NODE_RPC_TYPES];
/**
* Same shape as `NODE_RPC_TYPES`, for the current node's `$input` proxy.
* Discriminators are `getInput*` and the host enforces zero-arg invocation.
*/
const INPUT_RPC_TYPES = {
first: 'getInputFirst',
last: 'getInputLast',
all: 'getInputAll',
} as const satisfies Record<string, BridgeMessage['type']>;
type InputRpcType = (typeof INPUT_RPC_TYPES)[keyof typeof INPUT_RPC_TYPES];
// ============================================================================
// Build Context Function
// ============================================================================
@ -208,16 +219,40 @@ export function buildContext(
}
// Everything else: delegate to the lazy proxy. The lazy proxy's
// own `get` trap handles caching, host fetching, and metadata.
return (lazyProxy as Record<string | symbol, unknown>)[prop];
return lazyProxy[prop];
},
has(_emptyTarget, prop) {
return (
isKeyOf(NODE_RPC_TYPES, prop) || prop in (lazyProxy as Record<string | symbol, unknown>)
);
return isKeyOf(NODE_RPC_TYPES, prop) || prop in lazyProxy;
},
});
};
// $input — current-node input proxy. Same synthetic-Proxy pattern as
// `target.$()`: intercept the typed-RPC method names (`first`, `last`,
// `all`, all zero-arg per the host's `WorkflowDataProxy`), delegate
// everything else (notably the `.item` getter and `.params` / `.context`
// properties) to a lazy proxy on `$input`.
const lazyInputProxy = createDeepLazyProxy(['$input'], undefined, callbacks);
const sendInputMethod = (type: InputRpcType) => {
return () => {
const result = callbacks.callHost.applySync(null, [{ type }], {
arguments: { copy: true },
result: { copy: true },
});
throwIfErrorSentinel(result);
return result;
};
};
target.$input = new Proxy({} as Record<string, unknown>, {
get(_emptyTarget, prop) {
if (isKeyOf(INPUT_RPC_TYPES, prop)) return sendInputMethod(INPUT_RPC_TYPES[prop]);
return lazyInputProxy[prop];
},
has(_emptyTarget, prop) {
return isKeyOf(INPUT_RPC_TYPES, prop) || prop in lazyInputProxy;
},
});
// -------------------------------------------------------------------------
// Resolve an unknown key from the host. Called by the proxy's has/get traps
// for keys not already on the target. The resolved value is cached on target

View File

@ -11,7 +11,7 @@ import { __prepareForTransfer } from './serialize';
declare global {
namespace globalThis {
// Proxy creator function
var createDeepLazyProxy: (basePath?: string[]) => any;
var createDeepLazyProxy: (basePath?: string[]) => Record<string | symbol, unknown>;
// Context builder (closure-scoped alternative to resetDataProxies).
// Accepts a single callbacks bundle so adding new typed RPCs doesn't

View File

@ -114,7 +114,7 @@ export function createDeepLazyProxy(
getArrayElement: any;
callFunctionAtPath: any;
},
): any {
): Record<string | symbol, unknown> {
if (!callbacks) {
throw new Error('createDeepLazyProxy requires callbacks parameter');
}
@ -326,5 +326,5 @@ export function createDeepLazyProxy(
});
proxyPaths.set(proxy, basePath);
return proxy;
return proxy as Record<string | symbol, unknown>;
}

View File

@ -113,16 +113,38 @@ export interface NodeProxy {
all?: (branchIndex?: number, runIndex?: number) => unknown;
}
/**
* The methods on `data.$input` that typed-RPC handlers dispatch into.
* Mirrors the host-side `ProxyInput` shape (`packages/workflow/src/interfaces.ts`),
* restricted to the no-arg method forms the host enforces (`$input.first()`,
* `.last()`, `.all()` throw on any arguments). Properties like `.item`,
* `.context`, `.params` stay on `getValueAtPath` and aren't part of this
* type.
*
* Return types are `unknown` rather than `INodeExecutionData` / `[]`:
* results cross the isolate boundary via `applySync({ result: { copy: true } })`,
* which structured-clones the value and erases nominal types. The handlers
* pass the clone through verbatim, so a precise return type would be
* misleading. Matches the `NodeProxy` return type for the same reason.
*/
export interface InputProxy {
first?: () => unknown;
last?: () => unknown;
all?: () => unknown;
}
/**
* Workflow data proxy from `WorkflowDataProxy.getDataProxy()`.
*
* `$` is the named typed-RPC accessor (`$('NodeName').first()` etc.) and is
* called directly from typed-RPC handlers. Everything else flows through
* the generic data-access primitives (`getValueAtPath`, `getArrayElement`),
* which read paths off the index signature without needing per-key types.
* `$` and `$input` are the typed-RPC accessors (`$('NodeName').first()`,
* `$input.first()`, etc.) and are called directly from typed-RPC handlers.
* Everything else flows through the generic data-access primitives
* (`getValueAtPath`, `getArrayElement`), which read paths off the index
* signature without needing per-key types.
*/
export interface WorkflowData {
$?: (nodeName: string) => NodeProxy | null | undefined;
$input?: InputProxy;
[key: string]: unknown;
}