mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-03 02:07:06 +02:00
chore(core): Backport expression isolation phases 4A–8 to 1.x (no-changelog)
Completes the CAT-2844 backport of Expression Isolation to 1.x. Phases 1–3 landed via #31186/#31383/#31384; this squashes the remaining phases (whose per-phase PRs #31385/#31387/#31388/#31389/#31390/#31474 were merged into the stacked branches rather than 1.x): - 4A — dispatch Expression.evaluate() to the VM evaluator (#31385) - 4B — init/dispose VM engine in cli base command (#31387) - 4C — acquire/release isolate at production callsites (#31388) - 5 — ExpressionObservabilityProvider (#31389) - 7 — workflow engine-parity test workspace (#31390) - 8 — rebuild isolated-vm in Docker image for musl libc (#31474) - test: make 1.x jmespath/array-proxy tests engine-aware Engine remains opt-in via N8N_EXPRESSION_ENGINE=vm (default legacy); v1 behaviour unchanged unless enabled. Refs https://linear.app/n8n/issue/CAT-2844
This commit is contained in:
parent
a9338d0c79
commit
e383f0f903
|
|
@ -38,7 +38,26 @@ RUN set -e; \
|
|||
cd / && rm -rf /launcher-temp
|
||||
|
||||
# ==============================================================================
|
||||
# STAGE 4: Final Runtime Image
|
||||
# STAGE 4: Native Module Builder (isolated-vm musl rebuild)
|
||||
# ==============================================================================
|
||||
# Rebuild isolated-vm from source for musl libc. node-gyp-build's musl
|
||||
# detection relies on /etc/alpine-release, which the hardened runtime base
|
||||
# omits, so the bundled prebuilt loader picks the glibc binary and segfaults
|
||||
# in musl pthread paths. Removing prebuilds and invoking node-gyp directly
|
||||
# avoids npm rebuild's double-build behavior (built-in node-gyp + install-script
|
||||
# node-gyp) which races on dep-file state. Build tooling stays in this throwaway
|
||||
# stage so the runtime image ships no compilers.
|
||||
FROM node:${NODE_VERSION}-alpine3.22 AS native-builder
|
||||
|
||||
COPY --from=app-artifact-processor /app /usr/local/lib/node_modules/n8n
|
||||
|
||||
RUN apk add --no-cache python3 make g++ && \
|
||||
cd /usr/local/lib/node_modules/n8n/node_modules/isolated-vm && \
|
||||
rm -rf prebuilds && \
|
||||
npx --yes node-gyp rebuild --release -j max
|
||||
|
||||
# ==============================================================================
|
||||
# STAGE 5: Final Runtime Image
|
||||
# ==============================================================================
|
||||
FROM system-deps AS runtime
|
||||
|
||||
|
|
@ -53,11 +72,14 @@ WORKDIR /home/node
|
|||
|
||||
COPY --from=app-artifact-processor /app /usr/local/lib/node_modules/n8n
|
||||
COPY --from=launcher-downloader /launcher-bin/* /usr/local/bin/
|
||||
# Overlay the musl-rebuilt isolated-vm binary (see native-builder stage).
|
||||
COPY --from=native-builder /usr/local/lib/node_modules/n8n/node_modules/isolated-vm/build /usr/local/lib/node_modules/n8n/node_modules/isolated-vm/build
|
||||
COPY docker/images/n8n/docker-entrypoint.sh /
|
||||
COPY docker/images/n8n/n8n-task-runners.json /etc/n8n-task-runners.json
|
||||
|
||||
RUN cd /usr/local/lib/node_modules/n8n && \
|
||||
npm rebuild sqlite3 && \
|
||||
rm -rf node_modules/isolated-vm/prebuilds && \
|
||||
ln -s /usr/local/lib/node_modules/n8n/bin/n8n /usr/local/bin/n8n && \
|
||||
mkdir -p /home/node/.n8n && \
|
||||
chown -R node:node /home/node
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ export const LOG_SCOPES = [
|
|||
'chat-hub',
|
||||
'breaking-changes',
|
||||
'circuit-breaker',
|
||||
'expression-engine',
|
||||
] as const;
|
||||
|
||||
export type LogScope = (typeof LOG_SCOPES)[number];
|
||||
|
|
|
|||
|
|
@ -1,27 +1,28 @@
|
|||
import { defineConfig } from 'vitest/config';
|
||||
import type { InlineConfig } from 'vitest/node';
|
||||
|
||||
export const createVitestConfig = (options: InlineConfig = {}) => {
|
||||
const vitestConfig = defineConfig({
|
||||
test: {
|
||||
silent: true,
|
||||
globals: true,
|
||||
environment: 'node',
|
||||
...(process.env.COVERAGE_ENABLED === 'true'
|
||||
? {
|
||||
coverage: {
|
||||
enabled: true,
|
||||
provider: 'v8',
|
||||
reporter: process.env.CI === 'true' ? 'cobertura' : 'text-summary',
|
||||
all: true,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
...options,
|
||||
},
|
||||
});
|
||||
/**
|
||||
* Shared test options without the outer defineConfig wrapper.
|
||||
* Use this when you need to spread the config into workspace projects.
|
||||
*/
|
||||
export const createBaseInlineConfig = (options: InlineConfig = {}): InlineConfig => ({
|
||||
silent: true,
|
||||
globals: true,
|
||||
environment: 'node',
|
||||
...(process.env.COVERAGE_ENABLED === 'true'
|
||||
? {
|
||||
coverage: {
|
||||
enabled: true,
|
||||
provider: 'v8',
|
||||
reporter: process.env.CI === 'true' ? 'cobertura' : 'text-summary',
|
||||
all: true,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
...options,
|
||||
});
|
||||
|
||||
return vitestConfig;
|
||||
};
|
||||
export const createVitestConfig = (options: InlineConfig = {}) =>
|
||||
defineConfig({ test: createBaseInlineConfig(options) });
|
||||
|
||||
export const vitestConfig = createVitestConfig();
|
||||
|
|
|
|||
|
|
@ -105,6 +105,7 @@
|
|||
"@n8n/decorators": "workspace:*",
|
||||
"@n8n/di": "workspace:*",
|
||||
"@n8n/errors": "workspace:*",
|
||||
"@n8n/expression-runtime": "workspace:*",
|
||||
"@n8n/n8n-nodes-langchain": "workspace:*",
|
||||
"@n8n/permissions": "workspace:*",
|
||||
"@n8n/task-runner": "workspace:*",
|
||||
|
|
@ -112,6 +113,7 @@
|
|||
"@n8n/utils": "workspace:*",
|
||||
"@n8n_io/ai-assistant-sdk": "catalog:",
|
||||
"@n8n_io/license-sdk": "2.24.1",
|
||||
"@opentelemetry/api": "^1.9.0",
|
||||
"@parcel/watcher": "^2.5.1",
|
||||
"@rudderstack/rudder-sdk-node": "3.0.5",
|
||||
"@sentry/node": "catalog:",
|
||||
|
|
|
|||
|
|
@ -280,10 +280,20 @@ export class ActiveWorkflowManager {
|
|||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({ workflowId: workflow.id });
|
||||
|
||||
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const webhooks = WebhookHelpers.getWorkflowWebhooks(
|
||||
workflow,
|
||||
additionalData,
|
||||
undefined,
|
||||
true,
|
||||
);
|
||||
|
||||
for (const webhookData of webhooks) {
|
||||
await this.webhookService.deleteWebhook(workflow, webhookData, mode, 'update');
|
||||
for (const webhookData of webhooks) {
|
||||
await this.webhookService.deleteWebhook(workflow, webhookData, mode, 'update');
|
||||
}
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||
|
|
@ -632,21 +642,29 @@ export class ActiveWorkflowManager {
|
|||
workflowId: workflow.id,
|
||||
});
|
||||
|
||||
if (shouldAddWebhooks) {
|
||||
added.webhooks = await this.addWebhooks(
|
||||
workflow,
|
||||
additionalData,
|
||||
'trigger',
|
||||
activationMode,
|
||||
);
|
||||
}
|
||||
let triggerCount = 0;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
if (shouldAddWebhooks) {
|
||||
added.webhooks = await this.addWebhooks(
|
||||
workflow,
|
||||
additionalData,
|
||||
'trigger',
|
||||
activationMode,
|
||||
);
|
||||
}
|
||||
|
||||
if (shouldAddTriggersAndPollers) {
|
||||
added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
||||
activationMode,
|
||||
executionMode: 'trigger',
|
||||
additionalData,
|
||||
});
|
||||
if (shouldAddTriggersAndPollers) {
|
||||
added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
||||
activationMode,
|
||||
executionMode: 'trigger',
|
||||
additionalData,
|
||||
});
|
||||
}
|
||||
|
||||
triggerCount = this.countTriggers(workflow, additionalData);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
// Workflow got now successfully activated so make sure nothing is left in the queue
|
||||
|
|
@ -654,7 +672,6 @@ export class ActiveWorkflowManager {
|
|||
|
||||
await this.activationErrorsService.deregister(workflowId);
|
||||
|
||||
const triggerCount = this.countTriggers(workflow, additionalData);
|
||||
await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
||||
} catch (e) {
|
||||
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||
|
|
|
|||
|
|
@ -249,6 +249,10 @@ describe('ChatExecutionManager', () => {
|
|||
const workflow = {
|
||||
getNode: jest.fn().mockReturnValue(node),
|
||||
nodeTypes: { getByNameAndVersion: jest.fn().mockReturnValue(nodeType) },
|
||||
expression: {
|
||||
acquireIsolate: jest.fn().mockResolvedValue(undefined),
|
||||
releaseIsolate: jest.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
};
|
||||
jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow);
|
||||
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any);
|
||||
|
|
@ -280,6 +284,10 @@ describe('ChatExecutionManager', () => {
|
|||
const workflow = {
|
||||
getNode: jest.fn().mockReturnValue(node),
|
||||
nodeTypes: { getByNameAndVersion: jest.fn().mockReturnValue(nodeType) },
|
||||
expression: {
|
||||
acquireIsolate: jest.fn().mockResolvedValue(undefined),
|
||||
releaseIsolate: jest.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
};
|
||||
jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow);
|
||||
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any);
|
||||
|
|
|
|||
|
|
@ -120,7 +120,12 @@ export class ChatExecutionManager {
|
|||
}
|
||||
|
||||
if (nodeType.onMessage) {
|
||||
return await nodeType.onMessage(context, nodeExecutionData);
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
return await nodeType.onMessage(context, nodeExecutionData);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
return [[nodeExecutionData]];
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import {
|
|||
ErrorReporter,
|
||||
ExecutionContextHookRegistry,
|
||||
} from 'n8n-core';
|
||||
import { ensureError, sleep, UnexpectedError, UserError } from 'n8n-workflow';
|
||||
import { ensureError, Expression, sleep, UnexpectedError, UserError } from 'n8n-workflow';
|
||||
|
||||
import type { AbstractServer } from '@/abstract-server';
|
||||
import { N8N_VERSION, N8N_RELEASE_DATE } from '@/constants';
|
||||
|
|
@ -28,6 +28,7 @@ import * as CrashJournal from '@/crash-journal';
|
|||
import { getDataDeduplicationService } from '@/deduplication';
|
||||
import { TestRunCleanupService } from '@/evaluation.ee/test-runner/test-run-cleanup.service.ee';
|
||||
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
|
||||
import { ExpressionObservabilityProvider } from '@/expression-observability/expression-observability.provider';
|
||||
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
|
||||
import { ExternalHooks } from '@/external-hooks';
|
||||
import { License } from '@/license';
|
||||
|
|
@ -174,6 +175,18 @@ export abstract class BaseCommand<F = never> {
|
|||
|
||||
await Container.get(PostHogClient).init();
|
||||
await Container.get(TelemetryEventRelay).init();
|
||||
|
||||
const { engine, poolSize, maxCodeCacheSize, bridgeTimeout, bridgeMemoryLimit, idleTimeout } =
|
||||
this.globalConfig.expressionEngine;
|
||||
await Expression.initExpressionEngine({
|
||||
engine,
|
||||
poolSize,
|
||||
maxCodeCacheSize,
|
||||
bridgeTimeout,
|
||||
bridgeMemoryLimit,
|
||||
idleTimeoutMs: idleTimeout === undefined ? undefined : idleTimeout * 1000,
|
||||
observability: Container.get(ExpressionObservabilityProvider),
|
||||
});
|
||||
}
|
||||
|
||||
protected async stopProcess() {
|
||||
|
|
@ -186,7 +199,11 @@ export abstract class BaseCommand<F = never> {
|
|||
|
||||
protected async exitSuccessFully() {
|
||||
try {
|
||||
await Promise.all([CrashJournal.cleanup(), this.dbConnection.close()]);
|
||||
await Promise.all([
|
||||
CrashJournal.cleanup(),
|
||||
this.dbConnection.close(),
|
||||
Expression.disposeExpressionEngine(),
|
||||
]);
|
||||
} finally {
|
||||
process.exit();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -448,15 +448,20 @@ export class CredentialsHelper extends ICredentialsHelper {
|
|||
});
|
||||
|
||||
// Resolve expressions if any are set
|
||||
decryptedData = workflow.expression.getComplexParameterValue(
|
||||
mockNode,
|
||||
decryptedData as INodeParameters,
|
||||
mode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
decryptedData,
|
||||
) as ICredentialDataDecryptedObject;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
decryptedData = workflow.expression.getComplexParameterValue(
|
||||
mockNode,
|
||||
decryptedData as INodeParameters,
|
||||
mode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
decryptedData,
|
||||
) as ICredentialDataDecryptedObject;
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
return decryptedData;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,259 @@
|
|||
/* eslint-disable @typescript-eslint/naming-convention */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
import type { Logger } from '@n8n/backend-common';
|
||||
import { ExpressionEngineConfig, type GlobalConfig } from '@n8n/config';
|
||||
import { EXPRESSION_METRICS } from '@n8n/expression-runtime';
|
||||
import { trace } from '@opentelemetry/api';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import promClient from 'prom-client';
|
||||
|
||||
import { ExpressionObservabilityProvider } from '../expression-observability.provider';
|
||||
|
||||
const scopedLogger = mock<Logger>();
|
||||
|
||||
function buildConfig(overrides: Partial<ExpressionEngineConfig> = {}): ExpressionEngineConfig {
|
||||
const config = new ExpressionEngineConfig();
|
||||
// The provider is a no-op unless engine === 'vm'. Default tests to the active
|
||||
// engine so shared assertions exercise the real code path.
|
||||
return Object.assign(config, { engine: 'vm' as const, ...overrides });
|
||||
}
|
||||
|
||||
function buildLogger(): Logger {
|
||||
const logger = mock<Logger>();
|
||||
logger.scoped.mockReturnValue(scopedLogger as unknown as Logger);
|
||||
return logger;
|
||||
}
|
||||
|
||||
function buildGlobalConfig(prefix = 'n8n_'): GlobalConfig {
|
||||
return { endpoints: { metrics: { prefix } } } as GlobalConfig;
|
||||
}
|
||||
|
||||
describe('ExpressionObservabilityProvider', () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
promClient.register.clear();
|
||||
});
|
||||
|
||||
describe('when disabled', () => {
|
||||
it('delegates to NoOpProvider when observabilityEnabled=false', () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig({ observabilityEnabled: false }),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
|
||||
expect(() => {
|
||||
provider.metrics.counter(EXPRESSION_METRICS.poolAcquired.name, 1);
|
||||
provider.metrics.gauge(EXPRESSION_METRICS.codeCacheSize.name, 5);
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 10);
|
||||
provider.logs.info('hello');
|
||||
provider.traces.startSpan('x').end();
|
||||
}).not.toThrow();
|
||||
});
|
||||
|
||||
it('delegates to NoOpProvider when engine is not vm', async () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig({ engine: 'legacy' }),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
|
||||
provider.metrics.counter(EXPRESSION_METRICS.poolAcquired.name, 1);
|
||||
|
||||
const output = await promClient.register.metrics();
|
||||
expect(output).not.toContain('n8n_expression_');
|
||||
});
|
||||
});
|
||||
|
||||
describe('metrics adapter', () => {
|
||||
it('registers a prom counter with _total suffix', async () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig(),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.metrics.counter(EXPRESSION_METRICS.poolAcquired.name, 2);
|
||||
|
||||
const metric = promClient.register.getSingleMetric('n8n_expression_pool_acquired_total');
|
||||
expect(metric).toBeDefined();
|
||||
const output = await promClient.register.metrics();
|
||||
expect(output).toContain('n8n_expression_pool_acquired_total 2');
|
||||
});
|
||||
|
||||
it('registers a prom gauge with the cleaned name', async () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig(),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.metrics.gauge(EXPRESSION_METRICS.codeCacheSize.name, 42);
|
||||
|
||||
const output = await promClient.register.metrics();
|
||||
expect(output).toContain('n8n_expression_code_cache_size 42');
|
||||
});
|
||||
|
||||
it('registers a prom histogram with bucketed observations', async () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig(),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 3, {
|
||||
status: 'success',
|
||||
type: 'none',
|
||||
});
|
||||
|
||||
const output = await promClient.register.metrics();
|
||||
expect(output).toContain('n8n_expression_evaluation_duration_seconds_bucket');
|
||||
expect(output).toContain('n8n_expression_evaluation_duration_seconds_count');
|
||||
});
|
||||
});
|
||||
|
||||
describe('tail sampling', () => {
|
||||
const startSpanMock = jest.fn().mockReturnValue({
|
||||
setStatus: jest.fn(),
|
||||
setAttribute: jest.fn(),
|
||||
recordException: jest.fn(),
|
||||
end: jest.fn(),
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
startSpanMock.mockClear();
|
||||
jest.spyOn(trace, 'getTracer').mockReturnValue({
|
||||
startSpan: startSpanMock,
|
||||
} as unknown as ReturnType<typeof trace.getTracer>);
|
||||
});
|
||||
|
||||
it('drops healthy spans under the slow threshold', () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig({ slowEvaluationThresholdMs: 50, tracesSampleRate: 0 }),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.01, {
|
||||
status: 'success',
|
||||
type: 'none',
|
||||
});
|
||||
expect(startSpanMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('keeps spans that exceed the slow threshold', () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig({ slowEvaluationThresholdMs: 50 }),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.1, {
|
||||
status: 'success',
|
||||
type: 'none',
|
||||
});
|
||||
expect(startSpanMock).toHaveBeenCalledWith(
|
||||
'expression.evaluate',
|
||||
expect.objectContaining({
|
||||
attributes: expect.objectContaining({
|
||||
'expression.outcome': 'slow',
|
||||
'expression.engine': 'vm',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('keeps spans when status is error, regardless of duration', () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig({ slowEvaluationThresholdMs: 50 }),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.005, {
|
||||
status: 'error',
|
||||
type: 'timeout',
|
||||
});
|
||||
expect(startSpanMock).toHaveBeenCalledWith(
|
||||
'expression.evaluate',
|
||||
expect.objectContaining({
|
||||
attributes: expect.objectContaining({
|
||||
'expression.outcome': 'error',
|
||||
'expression.error.type': 'timeout',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('does not start spans when tracesEnabled=false', () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig({ tracesEnabled: false, slowEvaluationThresholdMs: 10 }),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.5, {
|
||||
status: 'error',
|
||||
type: 'timeout',
|
||||
});
|
||||
expect(startSpanMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('label-set stability (eager registration)', () => {
|
||||
it('logs a warning and does not throw when an unknown metric is emitted', () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig(),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
expect(() => {
|
||||
provider.metrics.counter('test.unknown', 1);
|
||||
}).not.toThrow();
|
||||
expect(scopedLogger.warn).toHaveBeenCalledWith('Emitted unknown expression metric', {
|
||||
name: 'test.unknown',
|
||||
});
|
||||
});
|
||||
|
||||
it('uses the schema-registered label set regardless of call order', async () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig(),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.01, {
|
||||
status: 'error',
|
||||
type: 'timeout',
|
||||
});
|
||||
provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.02, {
|
||||
status: 'success',
|
||||
type: 'none',
|
||||
});
|
||||
|
||||
const output = await promClient.register.metrics();
|
||||
const countLines = output
|
||||
.split('\n')
|
||||
.filter((line) => line.startsWith('n8n_expression_evaluation_duration_seconds_count{'));
|
||||
expect(countLines.length).toBeGreaterThan(0);
|
||||
for (const line of countLines) {
|
||||
expect(line).toContain('status=');
|
||||
expect(line).toContain('type=');
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('logs adapter', () => {
|
||||
it('delegates to scoped logger', () => {
|
||||
const provider = new ExpressionObservabilityProvider(
|
||||
buildConfig(),
|
||||
buildLogger(),
|
||||
buildGlobalConfig(),
|
||||
);
|
||||
provider.logs.info('hello', { k: 'v' });
|
||||
provider.logs.warn('warn', { k: 'v' });
|
||||
provider.logs.error('boom', new Error('x'), { k: 'v' });
|
||||
|
||||
expect(scopedLogger.info).toHaveBeenCalledWith('hello', { k: 'v' });
|
||||
expect(scopedLogger.warn).toHaveBeenCalledWith('warn', { k: 'v' });
|
||||
expect(scopedLogger.error).toHaveBeenCalledWith(
|
||||
'boom',
|
||||
expect.objectContaining({ error: expect.any(Error), k: 'v' }),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
export const TRACER_NAME = 'n8n-expression-runtime';
|
||||
|
||||
export const ATTRIBUTE = {
|
||||
EXPRESSION_ENGINE: 'expression.engine',
|
||||
EXPRESSION_DURATION_SECONDS: 'expression.duration_seconds',
|
||||
EXPRESSION_OUTCOME: 'expression.outcome',
|
||||
EXPRESSION_ERROR_TYPE: 'expression.error.type',
|
||||
} as const;
|
||||
|
||||
export const DURATION_BUCKETS_SECONDS = [
|
||||
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 5,
|
||||
];
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
import type { Attributes } from '@opentelemetry/api';
|
||||
|
||||
const MAX_EXPRESSION_TEXT_LENGTH = 256;
|
||||
|
||||
export function toPromName(
|
||||
name: string,
|
||||
kind: 'counter' | 'gauge' | 'histogram',
|
||||
prefix: string,
|
||||
): string {
|
||||
const base = prefix + name.replace(/\./g, '_');
|
||||
return kind === 'counter' && !base.endsWith('_total') ? base + '_total' : base;
|
||||
}
|
||||
|
||||
export function normalizeAttributes(attributes?: Record<string, unknown>): Attributes | undefined {
|
||||
if (!attributes) return undefined;
|
||||
const entries = Object.entries(attributes);
|
||||
if (entries.length === 0) return undefined;
|
||||
const result: Attributes = {};
|
||||
for (const [key, value] of entries) {
|
||||
const normalized = normalizeAttributeValue(value);
|
||||
if (normalized !== undefined) result[key] = normalized;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
export function normalizeAttributeValue(value: unknown): Attributes[string] | undefined {
|
||||
if (value === undefined || value === null) return undefined;
|
||||
if (typeof value === 'string')
|
||||
return value.length > MAX_EXPRESSION_TEXT_LENGTH
|
||||
? value.slice(0, MAX_EXPRESSION_TEXT_LENGTH)
|
||||
: value;
|
||||
if (typeof value === 'number' || typeof value === 'boolean') return value;
|
||||
return String(value);
|
||||
}
|
||||
|
|
@ -0,0 +1,217 @@
|
|||
import { Logger } from '@n8n/backend-common';
|
||||
import { ExpressionEngineConfig, GlobalConfig } from '@n8n/config';
|
||||
import { Service } from '@n8n/di';
|
||||
import type {
|
||||
LogsAPI,
|
||||
MetricDef,
|
||||
MetricsAPI,
|
||||
ObservabilityProvider,
|
||||
Span,
|
||||
TracesAPI,
|
||||
} from '@n8n/expression-runtime';
|
||||
import { EXPRESSION_METRICS, NoOpProvider } from '@n8n/expression-runtime';
|
||||
import { SpanStatusCode, trace } from '@opentelemetry/api';
|
||||
import type { Tracer } from '@opentelemetry/api';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
import promClient, { type Counter, type Gauge, type Histogram } from 'prom-client';
|
||||
|
||||
import {
|
||||
ATTRIBUTE,
|
||||
DURATION_BUCKETS_SECONDS,
|
||||
TRACER_NAME,
|
||||
} from './expression-observability.constants';
|
||||
import {
|
||||
normalizeAttributes,
|
||||
normalizeAttributeValue,
|
||||
toPromName,
|
||||
} from './expression-observability.formatters';
|
||||
|
||||
type TailSampleDecision = 'drop' | 'keep';
|
||||
|
||||
@Service()
|
||||
export class ExpressionObservabilityProvider implements ObservabilityProvider {
|
||||
readonly metrics: MetricsAPI;
|
||||
|
||||
readonly traces: TracesAPI;
|
||||
|
||||
readonly logs: LogsAPI;
|
||||
|
||||
private readonly scopedLogger: Logger;
|
||||
|
||||
private readonly prefix!: string;
|
||||
|
||||
private tracer?: Tracer;
|
||||
|
||||
constructor(
|
||||
private readonly config: ExpressionEngineConfig,
|
||||
private readonly logger: Logger,
|
||||
globalConfig: GlobalConfig,
|
||||
) {
|
||||
this.scopedLogger = this.logger.scoped('expression-engine');
|
||||
|
||||
if (!this.config.observabilityEnabled || this.config.engine !== 'vm') {
|
||||
this.metrics = NoOpProvider.metrics;
|
||||
this.traces = NoOpProvider.traces;
|
||||
this.logs = NoOpProvider.logs;
|
||||
return;
|
||||
}
|
||||
|
||||
this.prefix = globalConfig.endpoints.metrics.prefix;
|
||||
|
||||
this.registerMetrics();
|
||||
|
||||
this.metrics = {
|
||||
counter: (name, value, tags) => this.counter(name, value, tags),
|
||||
gauge: (name, value, tags) => this.gauge(name, value, tags),
|
||||
histogram: (name, value, tags) => this.histogram(name, value, tags),
|
||||
};
|
||||
|
||||
this.traces = {
|
||||
startSpan: (name, attributes) => this.startSpan(name, attributes),
|
||||
};
|
||||
|
||||
this.logs = {
|
||||
error: (message, error, context) => this.scopedLogger.error(message, { error, ...context }),
|
||||
warn: (message, context) => this.scopedLogger.warn(message, context),
|
||||
info: (message, context) => this.scopedLogger.info(message, context),
|
||||
debug: (message, context) => this.scopedLogger.debug(message, context),
|
||||
};
|
||||
}
|
||||
|
||||
private registerMetrics(): void {
|
||||
for (const def of Object.values(EXPRESSION_METRICS) as MetricDef[]) {
|
||||
const promName = toPromName(def.name, def.kind, this.prefix);
|
||||
switch (def.kind) {
|
||||
case 'counter':
|
||||
new promClient.Counter({
|
||||
name: promName,
|
||||
help: def.help,
|
||||
labelNames: def.labels,
|
||||
});
|
||||
break;
|
||||
case 'gauge':
|
||||
new promClient.Gauge({
|
||||
name: promName,
|
||||
help: def.help,
|
||||
labelNames: def.labels,
|
||||
});
|
||||
break;
|
||||
case 'histogram':
|
||||
new promClient.Histogram({
|
||||
name: promName,
|
||||
help: def.help,
|
||||
labelNames: def.labels,
|
||||
buckets: DURATION_BUCKETS_SECONDS,
|
||||
});
|
||||
break;
|
||||
default: {
|
||||
const _exhaustive: never = def.kind;
|
||||
throw new UnexpectedError(`Unknown metric kind: ${String(_exhaustive)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private counter(name: string, value: number, tags?: Record<string, string>): void {
|
||||
const promName = toPromName(name, 'counter', this.prefix);
|
||||
const counter = promClient.register.getSingleMetric(promName) as Counter<string> | undefined;
|
||||
if (!counter) {
|
||||
this.scopedLogger.warn('Emitted unknown expression metric', { name });
|
||||
return;
|
||||
}
|
||||
if (tags) counter.inc(tags, value);
|
||||
else counter.inc(value);
|
||||
}
|
||||
|
||||
private gauge(name: string, value: number, tags?: Record<string, string>): void {
|
||||
const promName = toPromName(name, 'gauge', this.prefix);
|
||||
const gauge = promClient.register.getSingleMetric(promName) as Gauge<string> | undefined;
|
||||
if (!gauge) {
|
||||
this.scopedLogger.warn('Emitted unknown expression metric', { name });
|
||||
return;
|
||||
}
|
||||
if (tags) gauge.set(tags, value);
|
||||
else gauge.set(value);
|
||||
}
|
||||
|
||||
private histogram(name: string, value: number, tags?: Record<string, string>): void {
|
||||
const promName = toPromName(name, 'histogram', this.prefix);
|
||||
const histogram = promClient.register.getSingleMetric(promName) as
|
||||
| Histogram<string>
|
||||
| undefined;
|
||||
if (!histogram) {
|
||||
this.scopedLogger.warn('Emitted unknown expression metric', { name });
|
||||
return;
|
||||
}
|
||||
if (tags) histogram.observe(tags, value);
|
||||
else histogram.observe(value);
|
||||
|
||||
if (name === EXPRESSION_METRICS.evaluationDuration.name) this.maybeRecordSpan(value, tags);
|
||||
}
|
||||
|
||||
private maybeRecordSpan(durationSeconds: number, tags?: Record<string, string>): void {
|
||||
const { tracesEnabled, slowEvaluationThresholdMs } = this.config;
|
||||
if (!tracesEnabled) return;
|
||||
|
||||
const decision = this.tailSample(durationSeconds, tags);
|
||||
if (decision === 'drop') return;
|
||||
|
||||
const slowThresholdSeconds = slowEvaluationThresholdMs / 1000;
|
||||
const outcome =
|
||||
tags?.status === 'error'
|
||||
? 'error'
|
||||
: durationSeconds > slowThresholdSeconds
|
||||
? 'slow'
|
||||
: 'healthy';
|
||||
|
||||
const tracer = this.getTracer();
|
||||
const errorType = tags?.type && tags.type !== 'none' ? tags.type : undefined;
|
||||
const span = tracer.startSpan('expression.evaluate', {
|
||||
attributes: {
|
||||
[ATTRIBUTE.EXPRESSION_ENGINE]: 'vm',
|
||||
[ATTRIBUTE.EXPRESSION_DURATION_SECONDS]: durationSeconds,
|
||||
[ATTRIBUTE.EXPRESSION_OUTCOME]: outcome,
|
||||
...(errorType ? { [ATTRIBUTE.EXPRESSION_ERROR_TYPE]: errorType } : {}),
|
||||
},
|
||||
});
|
||||
|
||||
if (tags?.status === 'error') span.setStatus({ code: SpanStatusCode.ERROR });
|
||||
|
||||
span.end();
|
||||
}
|
||||
|
||||
private tailSample(durationSeconds: number, tags?: Record<string, string>): TailSampleDecision {
|
||||
if (tags?.status === 'error') return 'keep';
|
||||
const { slowEvaluationThresholdMs, tracesSampleRate } = this.config;
|
||||
if (durationSeconds > slowEvaluationThresholdMs / 1000) return 'keep';
|
||||
if (tracesSampleRate > 0 && Math.random() < tracesSampleRate) return 'keep';
|
||||
return 'drop';
|
||||
}
|
||||
|
||||
private startSpan(name: string, attributes?: Record<string, unknown>): Span {
|
||||
if (!this.config.tracesEnabled) return NoOpProvider.traces.startSpan(name, attributes);
|
||||
|
||||
const tracer = this.getTracer();
|
||||
const otelSpan = tracer.startSpan(name, {
|
||||
attributes: normalizeAttributes(attributes),
|
||||
});
|
||||
|
||||
return {
|
||||
setStatus: (status) =>
|
||||
otelSpan.setStatus({
|
||||
code: status === 'ok' ? SpanStatusCode.OK : SpanStatusCode.ERROR,
|
||||
}),
|
||||
setAttribute: (key, value) => {
|
||||
const normalized = normalizeAttributeValue(value);
|
||||
if (normalized !== undefined) otelSpan.setAttribute(key, normalized);
|
||||
},
|
||||
recordException: (error) => otelSpan.recordException(error),
|
||||
end: () => otelSpan.end(),
|
||||
};
|
||||
}
|
||||
|
||||
private getTracer(): Tracer {
|
||||
this.tracer ??= trace.getTracer(TRACER_NAME);
|
||||
return this.tracer;
|
||||
}
|
||||
}
|
||||
|
|
@ -351,6 +351,7 @@ export class CredentialsTester {
|
|||
|
||||
let response: INodeExecutionData[][] | null | undefined;
|
||||
try {
|
||||
await workflow.expression.acquireIsolate();
|
||||
response = await routingNode.runNode();
|
||||
} catch (error) {
|
||||
this.errorReporter.error(error);
|
||||
|
|
@ -396,6 +397,7 @@ export class CredentialsTester {
|
|||
message: error.message.toString(),
|
||||
};
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
delete mockNodesData[nodeTypeCopy.description.name];
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { CliWorkflowOperationError, SubworkflowOperationError } from 'n8n-workflow';
|
||||
import type { INode, INodeType } from 'n8n-workflow';
|
||||
import type { INode, INodeType, Workflow } from 'n8n-workflow';
|
||||
|
||||
import { STARTING_NODES } from '@/constants';
|
||||
|
||||
|
|
@ -122,3 +122,19 @@ export const getAllKeyPaths = (
|
|||
}
|
||||
return paths;
|
||||
};
|
||||
|
||||
/**
|
||||
* When N8N_EXPRESSION_ENGINE=vm, expressions run in an isolate that must be acquired
|
||||
* for this workflow before any code resolves {{ }} in parameters or credentials.
|
||||
*/
|
||||
export async function withExpressionIsolate<T>(
|
||||
workflow: Workflow,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { generateNanoId } from '@n8n/db';
|
|||
import type * as express from 'express';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type {
|
||||
Expression,
|
||||
ITaskData,
|
||||
IWorkflowBase,
|
||||
IWebhookData,
|
||||
|
|
@ -46,6 +47,7 @@ describe('TestWebhooks', () => {
|
|||
const webhookService = mock<WebhookService>();
|
||||
|
||||
const testWebhooks = new TestWebhooks(
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
registrations,
|
||||
|
|
@ -70,7 +72,7 @@ describe('TestWebhooks', () => {
|
|||
};
|
||||
|
||||
test('if webhook is needed, should register then create webhook and return true', async () => {
|
||||
const workflow = mock<Workflow>();
|
||||
const workflow = mock<Workflow>({ expression: mock<Expression>() });
|
||||
|
||||
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
|
||||
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
|
||||
|
|
@ -106,7 +108,7 @@ describe('TestWebhooks', () => {
|
|||
});
|
||||
|
||||
test('returns false if a triggerToStartFrom with triggerData is given', async () => {
|
||||
const workflow = mock<Workflow>();
|
||||
const workflow = mock<Workflow>({ expression: mock<Expression>() });
|
||||
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
|
||||
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
|
||||
|
||||
|
|
@ -123,7 +125,7 @@ describe('TestWebhooks', () => {
|
|||
|
||||
test('returns true, registers and then creates webhook if triggerToStartFrom is given with no triggerData', async () => {
|
||||
// ARRANGE
|
||||
const workflow = mock<Workflow>();
|
||||
const workflow = mock<Workflow>({ expression: mock<Expression>() });
|
||||
const webhook2 = mock<IWebhookData>({
|
||||
node: 'trigger',
|
||||
httpMethod,
|
||||
|
|
|
|||
|
|
@ -131,45 +131,50 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
projectId: ownerProjectId,
|
||||
});
|
||||
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflow.getNode(webhook.node) as INode, additionalData)
|
||||
.find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflow.getNode(webhook.node) as INode, additionalData)
|
||||
.find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
|
||||
|
||||
// Get the node which has the webhook defined to know where to start from and to
|
||||
// get additional data
|
||||
const workflowStartNode = workflow.getNode(webhookData.node);
|
||||
// Get the node which has the webhook defined to know where to start from and to
|
||||
// get additional data
|
||||
const workflowStartNode = workflow.getNode(webhookData.node);
|
||||
|
||||
if (workflowStartNode === null) {
|
||||
throw new NotFoundError('Could not find node to process webhook.');
|
||||
if (workflowStartNode === null) {
|
||||
throw new NotFoundError('Could not find node to process webhook.');
|
||||
}
|
||||
|
||||
if (!authAllowlistedNodes.has(workflowStartNode.type)) {
|
||||
sanitizeWebhookRequest(request);
|
||||
}
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const executionMode = 'webhook';
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
request,
|
||||
response,
|
||||
async (error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
// Save static data if it changed
|
||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
if (!authAllowlistedNodes.has(workflowStartNode.type)) {
|
||||
sanitizeWebhookRequest(request);
|
||||
}
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const executionMode = 'webhook';
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
request,
|
||||
response,
|
||||
async (error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
// Save static data if it changed
|
||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private async findWebhook(path: string, httpMethod: IHttpRequestMethods) {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import { TEST_WEBHOOK_TIMEOUT } from '@/constants';
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { OnPubSubEvent } from '@n8n/decorators';
|
||||
import { Service } from '@n8n/di';
|
||||
import type express from 'express';
|
||||
|
|
@ -43,6 +44,7 @@ import type { WorkflowRequest } from '@/workflows/workflow.request';
|
|||
@Service()
|
||||
export class TestWebhooks implements IWebhookManager {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly push: Push,
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
private readonly registrations: TestWebhookRegistrationsService,
|
||||
|
|
@ -128,59 +130,70 @@ export class TestWebhooks implements IWebhookManager {
|
|||
sanitizeWebhookRequest(request);
|
||||
}
|
||||
|
||||
return await new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const executionMode = 'manual';
|
||||
const executionId = await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhook,
|
||||
workflowEntity,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
pushRef,
|
||||
undefined, // IRunExecutionData
|
||||
undefined, // executionId
|
||||
request,
|
||||
response,
|
||||
(error: Error | null, data: IWebhookResponseCallbackData) => {
|
||||
if (error !== null) reject(error);
|
||||
else resolve(data);
|
||||
},
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// The workflow did not run as the request was probably setup related
|
||||
// or a ping so do not resolve the promise and wait for the real webhook
|
||||
// request instead.
|
||||
if (executionId === undefined) return;
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (pushRef !== undefined) {
|
||||
this.push.send(
|
||||
{ type: 'testWebhookReceived', data: { workflowId: webhook?.workflowId, executionId } },
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
return await new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const executionMode = 'manual';
|
||||
const executionId = await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhook,
|
||||
workflowEntity,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
pushRef,
|
||||
undefined, // IRunExecutionData
|
||||
undefined, // executionId
|
||||
request,
|
||||
response,
|
||||
(error: Error | null, data: IWebhookResponseCallbackData) => {
|
||||
if (error !== null) reject(error);
|
||||
else resolve(data);
|
||||
},
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// The workflow did not run as the request was probably setup related
|
||||
// or a ping so do not resolve the promise and wait for the real webhook
|
||||
// request instead.
|
||||
if (executionId === undefined) {
|
||||
await workflow.expression.releaseIsolate();
|
||||
return;
|
||||
}
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (pushRef !== undefined) {
|
||||
this.push.send(
|
||||
{
|
||||
type: 'testWebhookReceived',
|
||||
data: { workflowId: webhook?.workflowId, executionId },
|
||||
},
|
||||
pushRef,
|
||||
);
|
||||
}
|
||||
} catch {}
|
||||
|
||||
/**
|
||||
* Multi-main setup: In a manual webhook execution, the main process that
|
||||
* handles a webhook might not be the same as the main process that created
|
||||
* the webhook. If so, after the test webhook has been successfully executed,
|
||||
* the handler process commands the creator process to clear its test webhooks.
|
||||
*/
|
||||
if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) {
|
||||
void this.publisher.publishCommand({
|
||||
command: 'clear-test-webhooks',
|
||||
payload: { webhookKey: key, workflowEntity, pushRef },
|
||||
});
|
||||
return;
|
||||
}
|
||||
} catch {}
|
||||
|
||||
/**
|
||||
* Multi-main setup: In a manual webhook execution, the main process that
|
||||
* handles a webhook might not be the same as the main process that created
|
||||
* the webhook. If so, after the test webhook has been successfully executed,
|
||||
* the handler process commands the creator process to clear its test webhooks.
|
||||
*/
|
||||
if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) {
|
||||
void this.publisher.publishCommand({
|
||||
command: 'clear-test-webhooks',
|
||||
payload: { webhookKey: key, workflowEntity, pushRef },
|
||||
});
|
||||
return;
|
||||
}
|
||||
this.clearTimeout(key);
|
||||
|
||||
this.clearTimeout(key);
|
||||
|
||||
await this.deactivateWebhooks(workflow);
|
||||
});
|
||||
await this.deactivateWebhooks(workflow);
|
||||
});
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
@OnPubSubEvent('clear-test-webhooks', { instanceType: 'main' })
|
||||
|
|
@ -199,7 +212,12 @@ export class TestWebhooks implements IWebhookManager {
|
|||
|
||||
const workflow = this.toWorkflow(workflowEntity);
|
||||
|
||||
await this.deactivateWebhooks(workflow);
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
await this.deactivateWebhooks(workflow);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(key: string) {
|
||||
|
|
@ -294,96 +312,102 @@ export class TestWebhooks implements IWebhookManager {
|
|||
|
||||
const workflow = this.toWorkflow(workflowEntity);
|
||||
|
||||
let webhooks = WebhookHelpers.getWorkflowWebhooks(
|
||||
workflow,
|
||||
additionalData,
|
||||
destinationNode,
|
||||
true,
|
||||
);
|
||||
await workflow.expression.acquireIsolate();
|
||||
let webhooks: IWebhookData[];
|
||||
try {
|
||||
webhooks = WebhookHelpers.getWorkflowWebhooks(
|
||||
workflow,
|
||||
additionalData,
|
||||
destinationNode,
|
||||
true,
|
||||
);
|
||||
|
||||
// If we have a preferred trigger with data, we don't have to listen for a
|
||||
// webhook.
|
||||
if (triggerToStartFrom?.data) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we have a preferred trigger without data we only want to listen for
|
||||
// that trigger, not the other ones.
|
||||
if (triggerToStartFrom) {
|
||||
webhooks = webhooks.filter((w) => w.node === triggerToStartFrom.name);
|
||||
}
|
||||
|
||||
if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) {
|
||||
return false; // no webhooks found to start a workflow
|
||||
}
|
||||
|
||||
const timeout = setTimeout(
|
||||
async () => await this.cancelWebhook(workflow.id),
|
||||
TEST_WEBHOOK_TIMEOUT,
|
||||
);
|
||||
|
||||
for (const webhook of webhooks) {
|
||||
const key = this.registrations.toKey(webhook);
|
||||
const registrationByKey = await this.registrations.get(key);
|
||||
|
||||
if (runData && webhook.node in runData) {
|
||||
// If we have a preferred trigger with data, we don't have to listen for a
|
||||
// webhook.
|
||||
if (triggerToStartFrom?.data) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// if registration already exists and is not a test webhook created by this user in this workflow throw an error
|
||||
if (
|
||||
registrationByKey &&
|
||||
!webhook.webhookId &&
|
||||
!registrationByKey.webhook.isTest &&
|
||||
registrationByKey.webhook.userId !== userId &&
|
||||
registrationByKey.webhook.workflowId !== workflow.id
|
||||
) {
|
||||
throw new WebhookPathTakenError(webhook.node);
|
||||
// If we have a preferred trigger without data we only want to listen for
|
||||
// that trigger, not the other ones.
|
||||
if (triggerToStartFrom) {
|
||||
webhooks = webhooks.filter((w) => w.node === triggerToStartFrom.name);
|
||||
}
|
||||
|
||||
webhook.path = removeTrailingSlash(webhook.path);
|
||||
webhook.isTest = true;
|
||||
if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) {
|
||||
return false; // no webhooks found to start a workflow
|
||||
}
|
||||
|
||||
/**
|
||||
* Additional data cannot be cached because of circular refs.
|
||||
* Hence store the `userId` and recreate additional data when needed.
|
||||
*/
|
||||
const { workflowExecuteAdditionalData: _, ...cacheableWebhook } = webhook;
|
||||
const timeout = setTimeout(
|
||||
async () => await this.cancelWebhook(workflow.id),
|
||||
TEST_WEBHOOK_TIMEOUT,
|
||||
);
|
||||
|
||||
cacheableWebhook.userId = userId;
|
||||
for (const webhook of webhooks) {
|
||||
const key = this.registrations.toKey(webhook);
|
||||
const registrationByKey = await this.registrations.get(key);
|
||||
|
||||
// TODO(CAT-1265): support destination node mode in test webhook registration.
|
||||
const registration: TestWebhookRegistration = {
|
||||
pushRef,
|
||||
workflowEntity,
|
||||
destinationNode: destinationNode?.nodeName,
|
||||
webhook: cacheableWebhook as IWebhookData,
|
||||
};
|
||||
if (runData && webhook.node in runData) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// if registration already exists and is not a test webhook created by this user in this workflow throw an error
|
||||
if (
|
||||
registrationByKey &&
|
||||
!webhook.webhookId &&
|
||||
!registrationByKey.webhook.isTest &&
|
||||
registrationByKey.webhook.userId !== userId &&
|
||||
registrationByKey.webhook.workflowId !== workflow.id
|
||||
) {
|
||||
throw new WebhookPathTakenError(webhook.node);
|
||||
}
|
||||
|
||||
webhook.path = removeTrailingSlash(webhook.path);
|
||||
webhook.isTest = true;
|
||||
|
||||
try {
|
||||
/**
|
||||
* Register the test webhook _before_ creation at third-party service
|
||||
* in case service sends a confirmation request immediately on creation.
|
||||
* Additional data cannot be cached because of circular refs.
|
||||
* Hence store the `userId` and recreate additional data when needed.
|
||||
*/
|
||||
await this.registrations.register(registration);
|
||||
const { workflowExecuteAdditionalData: _, ...cacheableWebhook } = webhook;
|
||||
|
||||
await this.webhookService.createWebhookIfNotExists(workflow, webhook, 'manual', 'manual');
|
||||
cacheableWebhook.userId = userId;
|
||||
|
||||
cacheableWebhook.staticData = workflow.staticData;
|
||||
// TODO(CAT-1265): support destination node mode in test webhook registration.
|
||||
const registration: TestWebhookRegistration = {
|
||||
pushRef,
|
||||
workflowEntity,
|
||||
destinationNode: destinationNode?.nodeName,
|
||||
webhook: cacheableWebhook as IWebhookData,
|
||||
};
|
||||
|
||||
await this.registrations.register(registration);
|
||||
try {
|
||||
/**
|
||||
* Register the test webhook _before_ creation at third-party service
|
||||
* in case service sends a confirmation request immediately on creation.
|
||||
*/
|
||||
await this.registrations.register(registration);
|
||||
|
||||
this.timeouts[key] = timeout;
|
||||
} catch (error) {
|
||||
await this.deactivateWebhooks(workflow);
|
||||
await this.webhookService.createWebhookIfNotExists(workflow, webhook, 'manual', 'manual');
|
||||
|
||||
delete this.timeouts[key];
|
||||
cacheableWebhook.staticData = workflow.staticData;
|
||||
|
||||
throw error;
|
||||
await this.registrations.register(registration);
|
||||
|
||||
this.timeouts[key] = timeout;
|
||||
} catch (error) {
|
||||
await this.deactivateWebhooks(workflow);
|
||||
|
||||
delete this.timeouts[key];
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return true;
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
}
|
||||
|
||||
async cancelWebhook(workflowId: string) {
|
||||
|
|
@ -414,7 +438,19 @@ export class TestWebhooks implements IWebhookManager {
|
|||
|
||||
if (!foundWebhook) {
|
||||
// As it removes all webhooks of the workflow execute only once
|
||||
void this.deactivateWebhooks(workflow);
|
||||
void (async () => {
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
await this.deactivateWebhooks(workflow);
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
})().catch((error) => {
|
||||
this.logger.error('Failed to deactivate test webhooks on cancel', {
|
||||
error,
|
||||
workflowId,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
foundWebhook = true;
|
||||
|
|
|
|||
|
|
@ -227,67 +227,72 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
workflowId: workflow.id,
|
||||
});
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflowStartNode, additionalData)
|
||||
.find(
|
||||
(webhook) =>
|
||||
webhook.httpMethod === req.method &&
|
||||
webhook.path === (suffix ?? '') &&
|
||||
webhook.webhookDescription.restartWebhook === true &&
|
||||
(webhook.webhookDescription.nodeType === 'form' || false) === this.includeForms,
|
||||
);
|
||||
|
||||
if (webhookData === undefined) {
|
||||
// If no data got found it means that the execution can not be started via a webhook.
|
||||
// Return 404 because we do not want to give any data if the execution exists or not.
|
||||
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
|
||||
|
||||
if (this.isSendAndWaitRequest(workflow.nodes, suffix)) {
|
||||
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
|
||||
return { noWebhookResponse: true };
|
||||
}
|
||||
|
||||
if (!execution.data.resultData.error && execution.status === 'waiting') {
|
||||
const childNodes = workflow.getChildNodes(
|
||||
execution.data.resultData.lastNodeExecuted as string,
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const webhookData = this.webhookService
|
||||
.getNodeWebhooks(workflow, workflowStartNode, additionalData)
|
||||
.find(
|
||||
(webhook) =>
|
||||
webhook.httpMethod === req.method &&
|
||||
webhook.path === (suffix ?? '') &&
|
||||
webhook.webhookDescription.restartWebhook === true &&
|
||||
(webhook.webhookDescription.nodeType === 'form' || false) === this.includeForms,
|
||||
);
|
||||
|
||||
const hasChildForms = childNodes.some(
|
||||
(node) =>
|
||||
workflow.nodes[node].type === FORM_NODE_TYPE ||
|
||||
workflow.nodes[node].type === WAIT_NODE_TYPE,
|
||||
);
|
||||
if (webhookData === undefined) {
|
||||
// If no data got found it means that the execution can not be started via a webhook.
|
||||
// Return 404 because we do not want to give any data if the execution exists or not.
|
||||
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
|
||||
|
||||
if (hasChildForms) {
|
||||
if (this.isSendAndWaitRequest(workflow.nodes, suffix)) {
|
||||
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
|
||||
return { noWebhookResponse: true };
|
||||
}
|
||||
|
||||
if (!execution.data.resultData.error && execution.status === 'waiting') {
|
||||
const childNodes = workflow.getChildNodes(
|
||||
execution.data.resultData.lastNodeExecuted as string,
|
||||
);
|
||||
|
||||
const hasChildForms = childNodes.some(
|
||||
(node) =>
|
||||
workflow.nodes[node].type === FORM_NODE_TYPE ||
|
||||
workflow.nodes[node].type === WAIT_NODE_TYPE,
|
||||
);
|
||||
|
||||
if (hasChildForms) {
|
||||
return { noWebhookResponse: true };
|
||||
}
|
||||
}
|
||||
|
||||
throw new NotFoundError(errorMessage);
|
||||
}
|
||||
|
||||
throw new NotFoundError(errorMessage);
|
||||
const runExecutionData = execution.data;
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
execution.mode,
|
||||
runExecutionData.pushRef,
|
||||
runExecutionData,
|
||||
execution.id,
|
||||
req,
|
||||
res,
|
||||
|
||||
(error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
|
||||
const runExecutionData = execution.data;
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
execution.mode,
|
||||
runExecutionData.pushRef,
|
||||
runExecutionData,
|
||||
execution.id,
|
||||
req,
|
||||
res,
|
||||
|
||||
(error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,9 +40,15 @@ describe('ActiveWorkflows', () => {
|
|||
const pollNode = mock<INode>();
|
||||
|
||||
let activeWorkflows: ActiveWorkflows;
|
||||
let acquireIsolate: jest.Mock;
|
||||
let releaseIsolate: jest.Mock;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
acquireIsolate = jest.fn().mockResolvedValue(undefined);
|
||||
releaseIsolate = jest.fn().mockResolvedValue(undefined);
|
||||
// @ts-expect-error -- assign minimal expression stub for isolate-acquisition tests
|
||||
workflow.expression = { acquireIsolate, releaseIsolate };
|
||||
activeWorkflows = new ActiveWorkflows(
|
||||
mock(),
|
||||
scheduledTaskManager,
|
||||
|
|
|
|||
|
|
@ -158,7 +158,17 @@ export class ActiveWorkflows {
|
|||
workflowId: workflow.id,
|
||||
});
|
||||
|
||||
// The initial activation poll runs inside ActiveWorkflowManager's
|
||||
// outer acquireIsolate window, which also covers countTriggers
|
||||
// afterwards. Acquiring here would release the outer bridge early
|
||||
// (acquire is idempotent per caller; release deletes it). Scheduled
|
||||
// polls fire from the cron scheduler's own async context outside
|
||||
// that window and must acquire/release per tick — see CAT-3147.
|
||||
const ownsIsolate = !testingTrigger;
|
||||
|
||||
try {
|
||||
if (ownsIsolate) await workflow.expression.acquireIsolate();
|
||||
|
||||
const pollResponse = await this.triggersAndPollers.runPoll(workflow, node, pollFunctions);
|
||||
|
||||
if (pollResponse !== null) {
|
||||
|
|
@ -172,6 +182,8 @@ export class ActiveWorkflows {
|
|||
throw error;
|
||||
}
|
||||
pollFunctions.__emitError(error as Error);
|
||||
} finally {
|
||||
if (ownsIsolate) await workflow.expression.releaseIsolate();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -1433,6 +1433,8 @@ export class WorkflowExecute {
|
|||
// eslint-disable-next-line complexity
|
||||
const returnPromise = (async () => {
|
||||
try {
|
||||
await workflow.expression.acquireIsolate();
|
||||
|
||||
// Establish the execution context
|
||||
await establishExecutionContext(
|
||||
workflow,
|
||||
|
|
@ -2269,6 +2271,13 @@ export class WorkflowExecute {
|
|||
}
|
||||
|
||||
return fullRunData;
|
||||
})
|
||||
.finally(async () => {
|
||||
try {
|
||||
await workflow.expression.releaseIsolate();
|
||||
} catch (error) {
|
||||
Container.get(ErrorReporter).error(error);
|
||||
}
|
||||
});
|
||||
|
||||
return await returnPromise.then(resolve);
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import { DateTime, Duration, Interval } from 'luxon';
|
|||
|
||||
import { ApplicationError } from '@n8n/errors';
|
||||
import type { IExpressionEvaluator, ObservabilityProvider } from '@n8n/expression-runtime';
|
||||
import { MemoryLimitError, SecurityViolationError, TimeoutError } from '@n8n/expression-runtime';
|
||||
import { ExpressionExtensionError } from './errors/expression-extension.error';
|
||||
import { ExpressionError } from './errors/expression.error';
|
||||
import { evaluateExpression, setErrorHandler } from './expression-evaluator-proxy';
|
||||
|
|
@ -58,6 +59,56 @@ setErrorHandler((error: Error) => {
|
|||
if (isExpressionError(error)) throw error;
|
||||
});
|
||||
|
||||
/**
|
||||
* Map errors from the VM expression evaluator to host-side error types.
|
||||
*
|
||||
* The VM bridge can only reconstruct plain Error objects with .name set,
|
||||
* because it can't import ExpressionError/ExpressionExtensionError from
|
||||
* packages/workflow without creating a circular dependency.
|
||||
*
|
||||
* TODO: Move this reconstruction into the bridge once expression-runtime
|
||||
* can depend on workflow error classes (or a shared error package exists).
|
||||
*/
|
||||
function mapVmError(error: unknown): Error {
|
||||
if (isExpressionError(error)) return error;
|
||||
|
||||
// Runtime error types (TimeoutError, MemoryLimitError, etc.) must be
|
||||
// checked before the name-based reconstruction below, because they
|
||||
// extend the runtime's ExpressionError and share .name === 'ExpressionError'.
|
||||
if (error instanceof TimeoutError) {
|
||||
const wrapped = new ExpressionError('Expression timed out');
|
||||
wrapped.cause = error;
|
||||
return wrapped;
|
||||
}
|
||||
if (error instanceof MemoryLimitError) {
|
||||
const wrapped = new ExpressionError('Expression exceeded memory limit');
|
||||
wrapped.cause = error;
|
||||
return wrapped;
|
||||
}
|
||||
if (error instanceof SecurityViolationError) {
|
||||
const wrapped = new ExpressionError(error.message);
|
||||
wrapped.cause = error;
|
||||
return wrapped;
|
||||
}
|
||||
|
||||
// Name-based reconstruction for errors that crossed the isolate boundary
|
||||
if (error instanceof Error && error.name === 'ExpressionExtensionError') {
|
||||
const reconstructed = new ExpressionExtensionError(error.message);
|
||||
Object.assign(reconstructed, error);
|
||||
return reconstructed;
|
||||
}
|
||||
if (error instanceof Error && error.name === 'ExpressionError') {
|
||||
const reconstructed = new ExpressionError(error.message);
|
||||
Object.assign(reconstructed, error);
|
||||
return reconstructed;
|
||||
}
|
||||
|
||||
if (isSyntaxError(error)) return new ExpressionError('invalid syntax');
|
||||
|
||||
if (error instanceof Error) return error;
|
||||
return new Error(String(error));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a safe Object wrapper that removes dangerous static methods
|
||||
* that could be used to bypass property access sanitization.
|
||||
|
|
@ -260,6 +311,27 @@ export class Expression {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the active expression evaluation implementation.
|
||||
* Used for testing and verification.
|
||||
*/
|
||||
static getActiveImplementation(): 'legacy' | 'vm' {
|
||||
if (this.shouldUseVm()) return 'vm';
|
||||
return 'legacy';
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the expression engine programmatically.
|
||||
*
|
||||
* WARNING: This is a global setting — switching engines mid-execution could
|
||||
* cause a workflow to evaluate some expressions with one engine and some with
|
||||
* another. Only use this in benchmarks and tests, never in production code.
|
||||
* In production, set `N8N_EXPRESSION_ENGINE` before process startup instead.
|
||||
*/
|
||||
static setExpressionEngine(engine: 'legacy' | 'vm'): void {
|
||||
this.expressionEngine = engine;
|
||||
}
|
||||
|
||||
static initializeGlobalContext(data: IDataObject) {
|
||||
/**
|
||||
* Denylist
|
||||
|
|
@ -517,9 +589,33 @@ export class Expression {
|
|||
|
||||
Expression.initializeGlobalContext(data);
|
||||
|
||||
// expression extensions
|
||||
data.extend = extend;
|
||||
data.extendOptional = extendOptional;
|
||||
const usingVm = Expression.shouldUseVm();
|
||||
|
||||
// Expression extensions — only attached for the legacy engine.
|
||||
//
|
||||
// In the VM engine, every host function reachable from `data` becomes
|
||||
// a callable target the isolate can reach via `callFunctionAtPath`.
|
||||
// To minimise that surface we keep the VM-path data object as small as
|
||||
// possible and let the in-isolate runtime resolve helpers itself
|
||||
// (see packages/@n8n/expression-runtime/src/runtime/context.ts, where
|
||||
// Tournament's polyfill rewrites bare `extend(...)` calls to the
|
||||
// in-isolate copy on `target.extend`). Setting them on `data` in VM
|
||||
// mode would be dead code AND an unnecessary host-callable.
|
||||
if (!usingVm) {
|
||||
data.extend = extend;
|
||||
data.extendOptional = extendOptional;
|
||||
}
|
||||
|
||||
// In VM mode, strip `$jmesPath` / `$jmespath` from the data proxy.
|
||||
// WorkflowDataProxy adds them, but the in-isolate `target.$jmespath`
|
||||
// shadows them via Tournament's polyfill (see
|
||||
// packages/@n8n/expression-runtime/src/runtime/context.ts). The delete
|
||||
// makes them unreachable via direct path lookup through the bridge
|
||||
// too, so the bridge can never invoke the host-side copies.
|
||||
if (usingVm) {
|
||||
delete data.$jmesPath;
|
||||
delete data.$jmespath;
|
||||
}
|
||||
|
||||
Object.defineProperty(data, sanitizerName, {
|
||||
value: sanitizer,
|
||||
|
|
@ -558,6 +654,24 @@ export class Expression {
|
|||
}
|
||||
|
||||
private renderExpression(expression: string, data: IWorkflowDataProxyData) {
|
||||
// Use VM evaluator if engine is set to 'vm' and we're not in the browser
|
||||
if (Expression.expressionEngine === 'vm' && !IS_FRONTEND) {
|
||||
if (!Expression.vmEvaluator) {
|
||||
throw new ApplicationError(
|
||||
'N8N_EXPRESSION_ENGINE=vm is enabled but VM evaluator is not initialized. Call Expression.initExpressionEngine() during application startup.',
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
const result = Expression.vmEvaluator.evaluate(expression, data, this, {
|
||||
timezone: this.workflow.timezone,
|
||||
});
|
||||
return result as string | null | (() => unknown);
|
||||
} catch (error) {
|
||||
throw mapVmError(error);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return evaluateExpression(expression, data);
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import { DateTime } from 'luxon';
|
||||
|
||||
import { evaluate, getLocalISOString } from './helpers';
|
||||
import { asDateTime, evaluate, getLocalISOString } from './helpers';
|
||||
import { dateExtensions } from '../../src/extensions/date-extensions';
|
||||
import { getGlobalState } from '../../src/global-state';
|
||||
|
||||
|
|
@ -19,9 +19,13 @@ describe('Data Transformation Functions', () => {
|
|||
|
||||
describe('.beginningOf', () => {
|
||||
test('.beginningOf("week") should work correctly on a date', () => {
|
||||
expect(evaluate('={{ DateTime.local(2023, 1, 20).beginningOf("week") }}')).toEqual(
|
||||
DateTime.local(2023, 1, 16, { zone: defaultTimezone }),
|
||||
const result = asDateTime(
|
||||
evaluate('={{ DateTime.local(2023, 1, 20).beginningOf("week") }}'),
|
||||
);
|
||||
expect(result).toBeInstanceOf(DateTime);
|
||||
expect(result.toISODate()).toEqual('2023-01-16');
|
||||
expect(result.hour).toEqual(0);
|
||||
expect(result.minute).toEqual(0);
|
||||
|
||||
expect(evaluate('={{ new Date(2023, 0, 20).beginningOf("week") }}')).toEqual(
|
||||
DateTime.local(2023, 1, 16, { zone: defaultTimezone }).toJSDate(),
|
||||
|
|
@ -57,9 +61,12 @@ describe('Data Transformation Functions', () => {
|
|||
});
|
||||
|
||||
test('.endOfMonth() should work correctly on a date', () => {
|
||||
expect(evaluate('={{ DateTime.local(2023, 1, 16).endOfMonth() }}')).toEqual(
|
||||
DateTime.local(2023, 1, 31, 23, 59, 59, 999, { zone: defaultTimezone }),
|
||||
);
|
||||
const result = asDateTime(evaluate('={{ DateTime.local(2023, 1, 16).endOfMonth() }}'));
|
||||
expect(result).toBeInstanceOf(DateTime);
|
||||
expect(result.toISODate()).toEqual('2023-01-31');
|
||||
expect(result.hour).toEqual(23);
|
||||
expect(result.minute).toEqual(59);
|
||||
|
||||
expect(evaluate('={{ new Date(2023, 0, 16).endOfMonth() }}')).toEqual(
|
||||
DateTime.local(2023, 1, 31, 23, 59, 59, 999, { zone: defaultTimezone }).toJSDate(),
|
||||
);
|
||||
|
|
@ -239,9 +246,9 @@ describe('Data Transformation Functions', () => {
|
|||
|
||||
describe('.toDateTime', () => {
|
||||
test('should return itself for DateTime', () => {
|
||||
const result = evaluate(
|
||||
"={{ DateTime.fromFormat('01-01-2024', 'dd-MM-yyyy').toDateTime() }}",
|
||||
) as unknown as DateTime;
|
||||
const result = asDateTime(
|
||||
evaluate("={{ DateTime.fromFormat('01-01-2024', 'dd-MM-yyyy').toDateTime() }}"),
|
||||
);
|
||||
expect(result).toBeInstanceOf(DateTime);
|
||||
expect(result.day).toEqual(1);
|
||||
expect(result.month).toEqual(1);
|
||||
|
|
@ -249,9 +256,7 @@ describe('Data Transformation Functions', () => {
|
|||
});
|
||||
|
||||
test('should return a DateTime for JS Date', () => {
|
||||
const result = evaluate(
|
||||
'={{ new Date(2024, 0, 1, 12).toDateTime() }}',
|
||||
) as unknown as DateTime;
|
||||
const result = asDateTime(evaluate('={{ new Date(2024, 0, 1, 12).toDateTime() }}'));
|
||||
expect(result).toBeInstanceOf(DateTime);
|
||||
expect(result.day).toEqual(1);
|
||||
expect(result.month).toEqual(1);
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
import { DateTime, Duration, Interval } from 'luxon';
|
||||
import { afterAll, beforeAll } from 'vitest';
|
||||
|
||||
import type { IDataObject } from '../../src/interfaces';
|
||||
import { Workflow } from '../../src/workflow';
|
||||
import * as Helpers from '../helpers';
|
||||
|
|
@ -20,6 +23,15 @@ export const workflow = new Workflow({
|
|||
});
|
||||
export const expression = workflow.expression;
|
||||
|
||||
// acquireIsolate/releaseIsolate are no-ops for the legacy engine, so these
|
||||
// hooks are safe to register unconditionally.
|
||||
beforeAll(async () => {
|
||||
await expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await expression.releaseIsolate();
|
||||
});
|
||||
|
||||
export const evaluate = (value: string, values?: IDataObject[]) =>
|
||||
expression.getParameterValue(
|
||||
value,
|
||||
|
|
@ -32,6 +44,29 @@ export const evaluate = (value: string, values?: IDataObject[]) =>
|
|||
{},
|
||||
);
|
||||
|
||||
/**
|
||||
* Normalize expression results that may be Luxon instances (legacy engine)
|
||||
* or ISO strings (VM engine). The VM engine serializes Luxon types to ISO
|
||||
* strings at the isolate boundary.
|
||||
*/
|
||||
export const asDateTime = (v: unknown): DateTime => {
|
||||
if (DateTime.isDateTime(v)) return v;
|
||||
if (typeof v !== 'string') throw new Error(`Expected DateTime or ISO string, got ${typeof v}`);
|
||||
return DateTime.fromISO(v);
|
||||
};
|
||||
|
||||
export const asDuration = (v: unknown): Duration => {
|
||||
if (Duration.isDuration(v)) return v;
|
||||
if (typeof v !== 'string') throw new Error(`Expected Duration or ISO string, got ${typeof v}`);
|
||||
return Duration.fromISO(v);
|
||||
};
|
||||
|
||||
export const asInterval = (v: unknown): Interval => {
|
||||
if (Interval.isInterval(v)) return v;
|
||||
if (typeof v !== 'string') throw new Error(`Expected Interval or ISO string, got ${typeof v}`);
|
||||
return Interval.fromISO(v);
|
||||
};
|
||||
|
||||
export const getLocalISOString = (date: Date) => {
|
||||
const offset = date.getTimezoneOffset();
|
||||
const offsetAbs = Math.abs(offset);
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
// @vitest-environment jsdom
|
||||
import { DateTime } from 'luxon';
|
||||
|
||||
import { evaluate } from './helpers';
|
||||
import { asDateTime, evaluate } from './helpers';
|
||||
import { ExpressionExtensionError } from '../../src/errors';
|
||||
|
||||
describe('Data Transformation Functions', () => {
|
||||
|
|
@ -276,13 +276,17 @@ describe('Data Transformation Functions', () => {
|
|||
});
|
||||
|
||||
test('.toDateTime should work on a variety of formats', () => {
|
||||
expect(evaluate('={{ "Wed, 21 Oct 2015 07:28:00 GMT".toDateTime() }}')).toBeInstanceOf(
|
||||
expect(
|
||||
asDateTime(evaluate('={{ "Wed, 21 Oct 2015 07:28:00 GMT".toDateTime() }}')),
|
||||
).toBeInstanceOf(DateTime);
|
||||
expect(asDateTime(evaluate('={{ "2008-11-11".toDateTime() }}'))).toBeInstanceOf(DateTime);
|
||||
expect(asDateTime(evaluate('={{ "1-Feb-2024".toDateTime() }}'))).toBeInstanceOf(DateTime);
|
||||
expect(asDateTime(evaluate('={{ "1713976144063".toDateTime("ms") }}'))).toBeInstanceOf(
|
||||
DateTime,
|
||||
);
|
||||
expect(asDateTime(evaluate('={{ "31-01-2024".toDateTime("dd-MM-yyyy") }}'))).toBeInstanceOf(
|
||||
DateTime,
|
||||
);
|
||||
expect(evaluate('={{ "2008-11-11".toDateTime() }}')).toBeInstanceOf(DateTime);
|
||||
expect(evaluate('={{ "1-Feb-2024".toDateTime() }}')).toBeInstanceOf(DateTime);
|
||||
expect(evaluate('={{ "1713976144063".toDateTime("ms") }}')).toBeInstanceOf(DateTime);
|
||||
expect(evaluate('={{ "31-01-2024".toDateTime("dd-MM-yyyy") }}')).toBeInstanceOf(DateTime);
|
||||
|
||||
vi.useFakeTimers({ now: new Date() });
|
||||
expect(() => evaluate('={{ "hi".toDateTime() }}')).toThrow(
|
||||
|
|
|
|||
|
|
@ -25,6 +25,16 @@ describe('Expression — array proxy semantics (engine parity)', () => {
|
|||
});
|
||||
const expression = workflow.expression;
|
||||
|
||||
// acquireIsolate/releaseIsolate are no-ops for the legacy engine, so these
|
||||
// hooks are safe to register unconditionally; under the vm engine they
|
||||
// reserve the isolate this suite's expressions evaluate against.
|
||||
beforeAll(async () => {
|
||||
await expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await expression.releaseIsolate();
|
||||
});
|
||||
|
||||
const evaluate = (value: string, json: unknown) => {
|
||||
const data: INodeExecutionData[] = [{ json: json as INodeExecutionData['json'] }];
|
||||
return expression.getParameterValue(value, null, 0, 0, 'node', data, 'manual', {});
|
||||
|
|
|
|||
72
packages/workflow/test/expression-vm-dispatch.test.ts
Normal file
72
packages/workflow/test/expression-vm-dispatch.test.ts
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
import { afterAll, beforeAll, beforeEach, afterEach, describe, expect, test } from 'vitest';
|
||||
|
||||
import { Expression } from '../src/expression';
|
||||
import { Workflow } from '../src/workflow';
|
||||
import * as Helpers from './helpers';
|
||||
|
||||
describe('Expression VM engine dispatch', () => {
|
||||
let workflow: Workflow;
|
||||
|
||||
beforeAll(async () => {
|
||||
await Expression.initExpressionEngine({
|
||||
engine: 'vm',
|
||||
bridgeTimeout: 5000,
|
||||
bridgeMemoryLimit: 128,
|
||||
poolSize: 1,
|
||||
maxCodeCacheSize: 64,
|
||||
});
|
||||
|
||||
const nodeTypes = Helpers.NodeTypes();
|
||||
workflow = new Workflow({
|
||||
nodes: [
|
||||
{
|
||||
name: 'node',
|
||||
typeVersion: 1,
|
||||
type: 'test.set',
|
||||
id: 'uuid-1',
|
||||
parameters: {},
|
||||
position: [0, 0],
|
||||
},
|
||||
],
|
||||
connections: {},
|
||||
active: false,
|
||||
nodeTypes,
|
||||
});
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await Expression.disposeExpressionEngine();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await workflow.expression.acquireIsolate();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await workflow.expression.releaseIsolate();
|
||||
});
|
||||
|
||||
test('reports vm as active implementation while engine is set', () => {
|
||||
expect(Expression.getActiveImplementation()).toBe('vm');
|
||||
});
|
||||
|
||||
test('evaluates a simple expression through the VM engine', () => {
|
||||
const result = workflow.expression.getSimpleParameterValue(
|
||||
workflow.nodes.node,
|
||||
'={{ 7 * 6 }}',
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
expect(result).toBe(42);
|
||||
});
|
||||
|
||||
test('evaluates string concatenation through the VM engine', () => {
|
||||
const result = workflow.expression.getSimpleParameterValue(
|
||||
workflow.nodes.node,
|
||||
'={{ "hello " + "world" }}',
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
expect(result).toBe('hello world');
|
||||
});
|
||||
});
|
||||
213
packages/workflow/test/expression-vm-errors.test.ts
Normal file
213
packages/workflow/test/expression-vm-errors.test.ts
Normal file
|
|
@ -0,0 +1,213 @@
|
|||
import { TimeoutError, MemoryLimitError, SecurityViolationError } from '@n8n/expression-runtime';
|
||||
import type { IExpressionEvaluator } from '@n8n/expression-runtime';
|
||||
import { mock } from 'vitest-mock-extended';
|
||||
|
||||
import { ExpressionError } from '../src/errors/expression.error';
|
||||
import { ExpressionExtensionError } from '../src/errors/expression-extension.error';
|
||||
import { Expression } from '../src/expression';
|
||||
import { Workflow } from '../src/workflow';
|
||||
import * as Helpers from './helpers';
|
||||
|
||||
/**
|
||||
* Tests that VM-specific error types from @n8n/expression-runtime
|
||||
* are caught and wrapped in workflow ExpressionError instances.
|
||||
*
|
||||
* The runtime package defines its own ExpressionError class hierarchy
|
||||
* (TimeoutError, MemoryLimitError, SecurityViolationError), which is
|
||||
* different from packages/workflow's ExpressionError. Without explicit
|
||||
* handling, these errors bypass the isExpressionError() check and
|
||||
* propagate as raw runtime errors.
|
||||
*/
|
||||
describe('Expression VM error handling', () => {
|
||||
const nodeTypes = Helpers.NodeTypes();
|
||||
const workflow = new Workflow({
|
||||
id: '1',
|
||||
nodes: [
|
||||
{
|
||||
name: 'node',
|
||||
typeVersion: 1,
|
||||
type: 'test.set',
|
||||
id: 'uuid-1234',
|
||||
position: [0, 0],
|
||||
parameters: {},
|
||||
},
|
||||
],
|
||||
connections: {},
|
||||
active: false,
|
||||
nodeTypes,
|
||||
});
|
||||
|
||||
let originalEngine: 'legacy' | 'vm';
|
||||
let originalEvaluator: IExpressionEvaluator | undefined;
|
||||
|
||||
beforeEach(async () => {
|
||||
originalEngine = Expression.getActiveImplementation();
|
||||
originalEvaluator = (Expression as any).vmEvaluator;
|
||||
await workflow.expression.acquireIsolate();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await workflow.expression.releaseIsolate();
|
||||
Expression.setExpressionEngine(originalEngine);
|
||||
(Expression as any).vmEvaluator = originalEvaluator;
|
||||
});
|
||||
|
||||
function setVmEvaluator(evaluator: Pick<IExpressionEvaluator, 'evaluate'>) {
|
||||
Expression.setExpressionEngine('vm');
|
||||
(Expression as any).vmEvaluator = mock<IExpressionEvaluator>(evaluator);
|
||||
}
|
||||
|
||||
const evaluate = (expr: string) =>
|
||||
workflow.expression.getParameterValue(expr, null, 0, 0, 'node', [], 'manual', {});
|
||||
|
||||
it('should wrap TimeoutError in ExpressionError', () => {
|
||||
const timeoutError = new TimeoutError('Expression timed out after 5000ms', {});
|
||||
setVmEvaluator({
|
||||
evaluate: () => {
|
||||
throw timeoutError;
|
||||
},
|
||||
});
|
||||
|
||||
let caught: unknown;
|
||||
try {
|
||||
evaluate('={{ $json.id }}');
|
||||
} catch (error) {
|
||||
caught = error;
|
||||
}
|
||||
|
||||
expect(caught).toBeInstanceOf(ExpressionError);
|
||||
expect((caught as ExpressionError).message).toBe('Expression timed out');
|
||||
expect((caught as ExpressionError).cause).toBe(timeoutError);
|
||||
});
|
||||
|
||||
it('should wrap MemoryLimitError in ExpressionError', () => {
|
||||
const memoryError = new MemoryLimitError('Expression exceeded memory limit of 128MB', {});
|
||||
setVmEvaluator({
|
||||
evaluate: () => {
|
||||
throw memoryError;
|
||||
},
|
||||
});
|
||||
|
||||
let caught: unknown;
|
||||
try {
|
||||
evaluate('={{ $json.id }}');
|
||||
} catch (error) {
|
||||
caught = error;
|
||||
}
|
||||
|
||||
expect(caught).toBeInstanceOf(ExpressionError);
|
||||
expect((caught as ExpressionError).message).toBe('Expression exceeded memory limit');
|
||||
expect((caught as ExpressionError).cause).toBe(memoryError);
|
||||
});
|
||||
|
||||
it('should wrap SecurityViolationError in ExpressionError', () => {
|
||||
const securityError = new SecurityViolationError(
|
||||
'Cannot access "constructor" due to security concerns',
|
||||
{},
|
||||
);
|
||||
setVmEvaluator({
|
||||
evaluate: () => {
|
||||
throw securityError;
|
||||
},
|
||||
});
|
||||
|
||||
let caught: unknown;
|
||||
try {
|
||||
evaluate('={{ $json.id }}');
|
||||
} catch (error) {
|
||||
caught = error;
|
||||
}
|
||||
|
||||
expect(caught).toBeInstanceOf(ExpressionError);
|
||||
expect((caught as ExpressionError).message).toBe(
|
||||
'Cannot access "constructor" due to security concerns',
|
||||
);
|
||||
expect((caught as ExpressionError).cause).toBe(securityError);
|
||||
});
|
||||
|
||||
it('should preserve description when reconstructing ExpressionError across isolate boundary', () => {
|
||||
const expressionError = new ExpressionError('something went wrong', {
|
||||
description: 'A human-readable description',
|
||||
});
|
||||
const connectionInputData = [
|
||||
{
|
||||
json: {
|
||||
get boom(): never {
|
||||
throw expressionError;
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
let caught: unknown;
|
||||
try {
|
||||
workflow.expression.getParameterValue(
|
||||
'={{ $json.boom }}',
|
||||
null,
|
||||
0,
|
||||
0,
|
||||
'node',
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
} catch (error) {
|
||||
caught = error;
|
||||
}
|
||||
|
||||
expect(caught).toBeInstanceOf(ExpressionError);
|
||||
expect(caught).toEqual(expressionError);
|
||||
});
|
||||
|
||||
it('should preserve description when reconstructing ExpressionExtensionError across isolate boundary', () => {
|
||||
const expressionExtensionError = new ExpressionExtensionError('extension failed', {
|
||||
description: 'Extension-specific description',
|
||||
});
|
||||
const connectionInputData = [
|
||||
{
|
||||
json: {
|
||||
get boom(): never {
|
||||
throw expressionExtensionError;
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
let caught: unknown;
|
||||
try {
|
||||
workflow.expression.getParameterValue(
|
||||
'={{ $json.boom }}',
|
||||
null,
|
||||
0,
|
||||
0,
|
||||
'node',
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
} catch (error) {
|
||||
caught = error;
|
||||
}
|
||||
|
||||
expect(caught).toBeInstanceOf(ExpressionExtensionError);
|
||||
expect(caught).toEqual(expressionExtensionError);
|
||||
});
|
||||
|
||||
it('should convert built-in SyntaxError to ExpressionError', () => {
|
||||
setVmEvaluator({
|
||||
evaluate: () => {
|
||||
throw new SyntaxError('Unexpected token');
|
||||
},
|
||||
});
|
||||
|
||||
let caught: unknown;
|
||||
try {
|
||||
evaluate('={{ $json.id }}');
|
||||
} catch (error) {
|
||||
caught = error;
|
||||
}
|
||||
|
||||
expect(caught).toBeInstanceOf(ExpressionError);
|
||||
expect((caught as ExpressionError).message).toBe('invalid syntax');
|
||||
});
|
||||
});
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import { DateTime, Duration, Interval } from 'luxon';
|
||||
|
||||
import { workflow } from './ExpressionExtensions/helpers';
|
||||
import { workflow, asDuration, asInterval } from './ExpressionExtensions/helpers';
|
||||
import { baseFixtures } from './ExpressionFixtures/base';
|
||||
import type { ExpressionTestEvaluation, ExpressionTestTransform } from './ExpressionFixtures/base';
|
||||
import * as Helpers from './helpers';
|
||||
|
|
@ -33,6 +33,13 @@ describe('Expression', () => {
|
|||
});
|
||||
const expression = workflow.expression;
|
||||
|
||||
beforeAll(async () => {
|
||||
await expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await expression.releaseIsolate();
|
||||
});
|
||||
|
||||
const evaluate = (value: string) =>
|
||||
expression.getParameterValue(value, null, 0, 0, 'node', [], 'manual', {});
|
||||
|
||||
|
|
@ -86,32 +93,41 @@ describe('Expression', () => {
|
|||
);
|
||||
|
||||
vi.useFakeTimers({ now: new Date() });
|
||||
expect(evaluate('={{Interval.after(new Date(), 100)}}')).toEqual(
|
||||
Interval.after(new Date(), 100),
|
||||
);
|
||||
const intervalResult = asInterval(evaluate('={{Interval.after(new Date(), 100)}}'));
|
||||
expect(intervalResult).toBeInstanceOf(Interval);
|
||||
expect(intervalResult.length('milliseconds')).toEqual(100);
|
||||
vi.useRealTimers();
|
||||
|
||||
expect(evaluate('={{Duration.fromMillis(100)}}')).toEqual(Duration.fromMillis(100));
|
||||
const durationResult = asDuration(evaluate('={{Duration.fromMillis(100)}}'));
|
||||
expect(durationResult).toBeInstanceOf(Duration);
|
||||
expect(durationResult.toMillis()).toEqual(100);
|
||||
|
||||
expect(evaluate('={{new Object()}}')).toEqual(new Object());
|
||||
|
||||
expect(evaluate('={{new Array()}}')).toEqual([]);
|
||||
expect(evaluate('={{new Int8Array()}}')).toEqual(new Int8Array());
|
||||
expect(evaluate('={{new Uint8Array()}}')).toEqual(new Uint8Array());
|
||||
expect(evaluate('={{new Uint8ClampedArray()}}')).toEqual(new Uint8ClampedArray());
|
||||
expect(evaluate('={{new Int16Array()}}')).toEqual(new Int16Array());
|
||||
expect(evaluate('={{new Uint16Array()}}')).toEqual(new Uint16Array());
|
||||
expect(evaluate('={{new Int32Array()}}')).toEqual(new Int32Array());
|
||||
expect(evaluate('={{new Uint32Array()}}')).toEqual(new Uint32Array());
|
||||
expect(evaluate('={{new Float32Array()}}')).toEqual(new Float32Array());
|
||||
expect(evaluate('={{new Float64Array()}}')).toEqual(new Float64Array());
|
||||
expect(evaluate('={{new BigInt64Array()}}')).toEqual(new BigInt64Array());
|
||||
expect(evaluate('={{new BigUint64Array()}}')).toEqual(new BigUint64Array());
|
||||
// Typed arrays: verify constructors are accessible and return correct length.
|
||||
// We don't use toEqual(new Int8Array()) because the VM engine returns typed
|
||||
// arrays from a different V8 realm, which breaks instanceof/toEqual despite
|
||||
// being functionally identical. This is fine — typed arrays aren't a practical
|
||||
// expression return type (they don't survive JSON serialization).
|
||||
expect(evaluate('={{new Int8Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Uint8Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Uint8ClampedArray(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Int16Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Uint16Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Int32Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Uint32Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Float32Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new Float64Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new BigInt64Array(3).length}}')).toEqual(3);
|
||||
expect(evaluate('={{new BigUint64Array(3).length}}')).toEqual(3);
|
||||
|
||||
expect(evaluate('={{new Map()}}')).toEqual(new Map());
|
||||
expect(evaluate('={{new WeakMap()}}')).toEqual(new WeakMap());
|
||||
// WeakMap/WeakSet are not structured-cloneable, so they can't cross
|
||||
// the VM isolate boundary. Verify the constructors are accessible instead.
|
||||
expect(evaluate('={{new WeakMap() instanceof WeakMap}}')).toEqual(true);
|
||||
expect(evaluate('={{new Set()}}')).toEqual(new Set());
|
||||
expect(evaluate('={{new WeakSet()}}')).toEqual(new WeakSet());
|
||||
expect(evaluate('={{new WeakSet() instanceof WeakSet}}')).toEqual(true);
|
||||
|
||||
expect(evaluate('={{new Error()}}')).toEqual(new Error());
|
||||
expect(evaluate('={{new TypeError()}}')).toEqual(new TypeError());
|
||||
|
|
@ -121,13 +137,16 @@ describe('Expression', () => {
|
|||
expect(evaluate('={{new ReferenceError()}}')).toEqual(new ReferenceError());
|
||||
expect(evaluate('={{new URIError()}}')).toEqual(new URIError());
|
||||
|
||||
expect(evaluate('={{Intl}}')).toEqual(Intl);
|
||||
// Intl from the isolate is a different realm object; verify it's accessible
|
||||
expect(evaluate('={{typeof Intl}}')).toEqual('object');
|
||||
|
||||
expect(evaluate('={{new String()}}')).toEqual(new String());
|
||||
expect(evaluate("={{new RegExp('')}}")).toEqual(new RegExp(''));
|
||||
expect(evaluate('={{new String().toString()}}')).toEqual('');
|
||||
expect(evaluate("={{new RegExp('').source}}")).toEqual('(?:)');
|
||||
|
||||
expect(evaluate('={{Math}}')).toEqual(Math);
|
||||
expect(evaluate('={{new Number()}}')).toEqual(new Number());
|
||||
// Namespace objects (Math, Atomics) come from a different V8 realm in the
|
||||
// VM engine, so toEqual fails despite identical content. Verify accessibility.
|
||||
expect(evaluate('={{typeof Math}}')).toEqual('object');
|
||||
expect(evaluate('={{new Number().valueOf()}}')).toEqual(0);
|
||||
expect(evaluate("={{BigInt('1')}}")).toEqual(BigInt('1'));
|
||||
expect(evaluate('={{Infinity}}')).toEqual(Infinity);
|
||||
expect(evaluate('={{NaN}}')).toEqual(NaN);
|
||||
|
|
@ -137,12 +156,12 @@ describe('Expression', () => {
|
|||
expect(evaluate("={{parseInt('1', 10)}}")).toEqual(parseInt('1', 10));
|
||||
|
||||
expect(evaluate('={{JSON.stringify({})}}')).toEqual(JSON.stringify({}));
|
||||
expect(evaluate('={{new ArrayBuffer(10)}}')).toEqual(new ArrayBuffer(10));
|
||||
expect(evaluate('={{new SharedArrayBuffer(10)}}')).toEqual(new SharedArrayBuffer(10));
|
||||
expect(evaluate('={{Atomics}}')).toEqual(Atomics);
|
||||
expect(evaluate('={{new DataView(new ArrayBuffer(1))}}')).toEqual(
|
||||
new DataView(new ArrayBuffer(1)),
|
||||
);
|
||||
// ArrayBuffer, SharedArrayBuffer, DataView, and Atomics come from a
|
||||
// different V8 realm in the VM engine. Verify accessibility instead.
|
||||
expect(evaluate('={{new ArrayBuffer(10).byteLength}}')).toEqual(10);
|
||||
expect(evaluate('={{new SharedArrayBuffer(10).byteLength}}')).toEqual(10);
|
||||
expect(evaluate('={{typeof Atomics}}')).toEqual('object');
|
||||
expect(evaluate('={{new DataView(new ArrayBuffer(1)).byteLength}}')).toEqual(1);
|
||||
|
||||
expect(evaluate("={{encodeURI('https://google.com')}}")).toEqual(
|
||||
encodeURI('https://google.com'),
|
||||
|
|
@ -450,28 +469,46 @@ describe('Expression', () => {
|
|||
);
|
||||
});
|
||||
|
||||
// The $jmespath restricted-identifier defense differs by engine:
|
||||
// legacy → the host wrapper (workflow-data-proxy.ts) throws
|
||||
// "due to security concerns" via an explicit token denylist.
|
||||
// vm → jmespath runs in-isolate with no denylist, but a restricted
|
||||
// identifier can never leak a usable constructor/prototype to
|
||||
// the host: `constructor` resolves to the Object constructor,
|
||||
// which fails to serialize across the isolate boundary (throws),
|
||||
// while `getPrototypeOf`/`prototype`/`__proto__` are key-misses
|
||||
// or non-cloneable internals → the result is never a callable.
|
||||
// (Structural isolation rather than a denylist — see
|
||||
// packages/@n8n/expression-runtime/src/runtime/jmespath.ts.)
|
||||
const expectJmespathBlocked = (expr: string) => {
|
||||
if (process.env.N8N_EXPRESSION_ENGINE === 'vm') {
|
||||
let result: unknown;
|
||||
let threw = false;
|
||||
try {
|
||||
result = evaluate(expr);
|
||||
} catch {
|
||||
threw = true;
|
||||
}
|
||||
expect(threw || typeof result !== 'function').toBe(true);
|
||||
} else {
|
||||
expect(() => evaluate(expr)).toThrow(/due to security concerns/);
|
||||
}
|
||||
};
|
||||
|
||||
it('should reject jmespath queries that reference restricted identifiers', () => {
|
||||
expect(() => evaluate('={{ $jmespath({a:1}, "constructor") }}')).toThrow(
|
||||
/due to security concerns/,
|
||||
);
|
||||
expect(() => evaluate('={{ $jmespath({a:1}, "__proto__") }}')).toThrow(
|
||||
/due to security concerns/,
|
||||
);
|
||||
expect(() => evaluate('={{ $jmespath({a:1}, "prototype") }}')).toThrow(
|
||||
/due to security concerns/,
|
||||
);
|
||||
expectJmespathBlocked('={{ $jmespath({a:1}, "constructor") }}');
|
||||
expectJmespathBlocked('={{ $jmespath({a:1}, "__proto__") }}');
|
||||
expectJmespathBlocked('={{ $jmespath({a:1}, "prototype") }}');
|
||||
});
|
||||
|
||||
it('should reject jmespath queries that reference restricted identifiers (alias)', () => {
|
||||
expect(() => evaluate('={{ $jmesPath({a:1}, "getPrototypeOf") }}')).toThrow(
|
||||
/due to security concerns/,
|
||||
);
|
||||
expectJmespathBlocked('={{ $jmesPath({a:1}, "getPrototypeOf") }}');
|
||||
});
|
||||
|
||||
it('should reject computed-string jmespath queries built from restricted identifiers', () => {
|
||||
const payload =
|
||||
'={{ $jmespath({a:1}, String.fromCharCode(99,111,110,115,116,114,117,99,116,111,114)) }}';
|
||||
expect(() => evaluate(payload)).toThrow(/due to security concerns/);
|
||||
expectJmespathBlocked(
|
||||
'={{ $jmespath({a:1}, String.fromCharCode(99,111,110,115,116,114,117,99,116,111,114)) }}',
|
||||
);
|
||||
});
|
||||
|
||||
it('should still allow jmespath queries that contain restricted names as substrings', () => {
|
||||
|
|
@ -755,6 +792,13 @@ describe('Expression', () => {
|
|||
describe('Test all expression value fixtures', () => {
|
||||
const expression = workflow.expression;
|
||||
|
||||
beforeAll(async () => {
|
||||
await expression.acquireIsolate();
|
||||
});
|
||||
afterAll(async () => {
|
||||
await expression.releaseIsolate();
|
||||
});
|
||||
|
||||
const evaluate = (value: string, data: INodeExecutionData[]) => {
|
||||
const itemIndex = data.length === 0 ? -1 : 0;
|
||||
return expression.getParameterValue(value, null, 0, itemIndex, 'node', data, 'manual', {});
|
||||
|
|
|
|||
26
packages/workflow/test/setup-vm-evaluator.ts
Normal file
26
packages/workflow/test/setup-vm-evaluator.ts
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
import { Expression } from '../src/expression';
|
||||
|
||||
// Only runs when N8N_EXPRESSION_ENGINE=vm is set.
|
||||
// Initializes the VM evaluator once per vitest worker before all tests,
|
||||
// and disposes it after.
|
||||
if (process.env.N8N_EXPRESSION_ENGINE === 'vm') {
|
||||
beforeAll(async () => {
|
||||
await Expression.initExpressionEngine({
|
||||
engine: 'vm',
|
||||
poolSize: 1,
|
||||
maxCodeCacheSize: 1024,
|
||||
bridgeTimeout: 5000,
|
||||
bridgeMemoryLimit: 128,
|
||||
});
|
||||
});
|
||||
|
||||
// Under Stryker, the worker process exits the moment vitest finishes — the
|
||||
// OS reclaims isolated-vm native handles either way. Calling dispose here
|
||||
// aborts the worker on Node 24 with a native finaliser assertion, which
|
||||
// Stryker reports as a dry-run failure.
|
||||
if (!process.env.STRYKER_RUN) {
|
||||
afterAll(async () => {
|
||||
await Expression.disposeExpressionEngine();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -1742,7 +1742,7 @@ describe('Workflow', () => {
|
|||
const nodeTypes = Helpers.NodeTypes();
|
||||
|
||||
for (const testData of tests) {
|
||||
test(testData.description, () => {
|
||||
test(testData.description, async () => {
|
||||
const nodes: INode[] = [
|
||||
{
|
||||
name: 'Node1',
|
||||
|
|
@ -1818,64 +1818,70 @@ describe('Workflow', () => {
|
|||
};
|
||||
|
||||
const workflow = new Workflow({ nodes, connections, active: false, nodeTypes });
|
||||
const activeNodeName = testData.input.hasOwnProperty('Node3') ? 'Node3' : 'Node2';
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const activeNodeName = testData.input.hasOwnProperty('Node3') ? 'Node3' : 'Node2';
|
||||
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
Node1: [
|
||||
{
|
||||
source: [
|
||||
{
|
||||
previousNode: 'test',
|
||||
},
|
||||
],
|
||||
startTime: 1,
|
||||
executionTime: 1,
|
||||
executionIndex: 0,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: testData.input.Node1.outputJson || testData.input.Node1.parameters,
|
||||
binary: testData.input.Node1.outputBinary,
|
||||
},
|
||||
],
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
Node1: [
|
||||
{
|
||||
source: [
|
||||
{
|
||||
previousNode: 'test',
|
||||
},
|
||||
],
|
||||
startTime: 1,
|
||||
executionTime: 1,
|
||||
executionIndex: 0,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json:
|
||||
testData.input.Node1.outputJson || testData.input.Node1.parameters,
|
||||
binary: testData.input.Node1.outputBinary,
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
Node2: [],
|
||||
'Node 4 with spaces': [],
|
||||
],
|
||||
Node2: [],
|
||||
'Node 4 with spaces': [],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
const itemIndex = 0;
|
||||
const runIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] =
|
||||
runExecutionData.resultData.runData.Node1[0].data!.main[0]!;
|
||||
const itemIndex = 0;
|
||||
const runIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] =
|
||||
runExecutionData.resultData.runData.Node1[0].data!.main[0]!;
|
||||
|
||||
for (const parameterName of Object.keys(testData.output)) {
|
||||
const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[
|
||||
parameterName
|
||||
];
|
||||
const result = workflow.expression.getParameterValue(
|
||||
parameterValue,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
expect(result).toEqual(testData.output[parameterName]);
|
||||
for (const parameterName of Object.keys(testData.output)) {
|
||||
const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[
|
||||
parameterName
|
||||
];
|
||||
const result = workflow.expression.getParameterValue(
|
||||
parameterValue,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
expect(result).toEqual(testData.output[parameterName]);
|
||||
}
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
test('should also resolve all child parameters when the parent get requested', () => {
|
||||
test('should also resolve all child parameters when the parent get requested', async () => {
|
||||
const nodes: INode[] = [
|
||||
{
|
||||
name: 'Node1',
|
||||
|
|
@ -1902,64 +1908,69 @@ describe('Workflow', () => {
|
|||
const connections: IConnections = {};
|
||||
|
||||
const workflow = new Workflow({ nodes, connections, active: false, nodeTypes });
|
||||
const activeNodeName = 'Node1';
|
||||
await workflow.expression.acquireIsolate();
|
||||
try {
|
||||
const activeNodeName = 'Node1';
|
||||
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
Node1: [
|
||||
{
|
||||
startTime: 1,
|
||||
executionTime: 1,
|
||||
executionIndex: 0,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
},
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
Node1: [
|
||||
{
|
||||
startTime: 1,
|
||||
executionTime: 1,
|
||||
executionIndex: 0,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
},
|
||||
],
|
||||
],
|
||||
],
|
||||
},
|
||||
source: [],
|
||||
},
|
||||
source: [],
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
const itemIndex = 0;
|
||||
const runIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] =
|
||||
runExecutionData.resultData.runData.Node1[0].data!.main[0]!;
|
||||
const parameterName = 'values';
|
||||
const itemIndex = 0;
|
||||
const runIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] =
|
||||
runExecutionData.resultData.runData.Node1[0].data!.main[0]!;
|
||||
const parameterName = 'values';
|
||||
|
||||
const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[
|
||||
parameterName
|
||||
];
|
||||
const result = workflow.expression.getParameterValue(
|
||||
parameterValue,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[
|
||||
parameterName
|
||||
];
|
||||
const result = workflow.expression.getParameterValue(
|
||||
parameterValue,
|
||||
runExecutionData,
|
||||
runIndex,
|
||||
itemIndex,
|
||||
activeNodeName,
|
||||
connectionInputData,
|
||||
'manual',
|
||||
{},
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
string: [
|
||||
{
|
||||
name: 'name1',
|
||||
value: 'value1',
|
||||
},
|
||||
{
|
||||
name: 'name2',
|
||||
value: 'value1A',
|
||||
},
|
||||
],
|
||||
});
|
||||
expect(result).toEqual({
|
||||
string: [
|
||||
{
|
||||
name: 'name1',
|
||||
value: 'value1',
|
||||
},
|
||||
{
|
||||
name: 'name2',
|
||||
value: 'value1A',
|
||||
},
|
||||
],
|
||||
});
|
||||
} finally {
|
||||
await workflow.expression.releaseIsolate();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,30 @@
|
|||
import { createVitestConfig } from '@n8n/vitest-config/node';
|
||||
import { defineConfig } from 'vitest/config';
|
||||
import { createBaseInlineConfig } from '@n8n/vitest-config/node';
|
||||
|
||||
export default createVitestConfig({ include: ['test/**/*.test.ts'] });
|
||||
// On vitest 3.1.x the multi-config key is `test.workspace` (renamed to
|
||||
// `test.projects` in 3.2). Each project repeats the shared inline config
|
||||
// directly — vitest 3.1 workspaces do not auto-inherit from the root.
|
||||
const sharedTestConfig = createBaseInlineConfig({
|
||||
include: ['test/**/*.test.ts'],
|
||||
setupFiles: ['./test/setup-vm-evaluator.ts'],
|
||||
});
|
||||
|
||||
export default defineConfig({
|
||||
test: {
|
||||
workspace: [
|
||||
{
|
||||
test: {
|
||||
...sharedTestConfig,
|
||||
name: 'legacy-engine',
|
||||
},
|
||||
},
|
||||
{
|
||||
test: {
|
||||
...sharedTestConfig,
|
||||
name: 'vm-engine',
|
||||
env: { N8N_EXPRESSION_ENGINE: 'vm' },
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1614,6 +1614,9 @@ importers:
|
|||
'@n8n/errors':
|
||||
specifier: workspace:*
|
||||
version: link:../@n8n/errors
|
||||
'@n8n/expression-runtime':
|
||||
specifier: workspace:*
|
||||
version: link:../@n8n/expression-runtime
|
||||
'@n8n/n8n-nodes-langchain':
|
||||
specifier: workspace:*
|
||||
version: link:../@n8n/nodes-langchain
|
||||
|
|
@ -1635,6 +1638,9 @@ importers:
|
|||
'@n8n_io/license-sdk':
|
||||
specifier: 2.24.1
|
||||
version: 2.24.1
|
||||
'@opentelemetry/api':
|
||||
specifier: ^1.9.0
|
||||
version: 1.9.0
|
||||
'@parcel/watcher':
|
||||
specifier: ^2.5.1
|
||||
version: 2.5.1
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user