diff --git a/docker/images/n8n/Dockerfile b/docker/images/n8n/Dockerfile index e78f2c3d7fd..c76b64f9a7c 100644 --- a/docker/images/n8n/Dockerfile +++ b/docker/images/n8n/Dockerfile @@ -38,7 +38,26 @@ RUN set -e; \ cd / && rm -rf /launcher-temp # ============================================================================== -# STAGE 4: Final Runtime Image +# STAGE 4: Native Module Builder (isolated-vm musl rebuild) +# ============================================================================== +# Rebuild isolated-vm from source for musl libc. node-gyp-build's musl +# detection relies on /etc/alpine-release, which the hardened runtime base +# omits, so the bundled prebuilt loader picks the glibc binary and segfaults +# in musl pthread paths. Removing prebuilds and invoking node-gyp directly +# avoids npm rebuild's double-build behavior (built-in node-gyp + install-script +# node-gyp) which races on dep-file state. Build tooling stays in this throwaway +# stage so the runtime image ships no compilers. +FROM node:${NODE_VERSION}-alpine3.22 AS native-builder + +COPY --from=app-artifact-processor /app /usr/local/lib/node_modules/n8n + +RUN apk add --no-cache python3 make g++ && \ + cd /usr/local/lib/node_modules/n8n/node_modules/isolated-vm && \ + rm -rf prebuilds && \ + npx --yes node-gyp rebuild --release -j max + +# ============================================================================== +# STAGE 5: Final Runtime Image # ============================================================================== FROM system-deps AS runtime @@ -53,11 +72,14 @@ WORKDIR /home/node COPY --from=app-artifact-processor /app /usr/local/lib/node_modules/n8n COPY --from=launcher-downloader /launcher-bin/* /usr/local/bin/ +# Overlay the musl-rebuilt isolated-vm binary (see native-builder stage). +COPY --from=native-builder /usr/local/lib/node_modules/n8n/node_modules/isolated-vm/build /usr/local/lib/node_modules/n8n/node_modules/isolated-vm/build COPY docker/images/n8n/docker-entrypoint.sh / COPY docker/images/n8n/n8n-task-runners.json /etc/n8n-task-runners.json RUN cd /usr/local/lib/node_modules/n8n && \ npm rebuild sqlite3 && \ + rm -rf node_modules/isolated-vm/prebuilds && \ ln -s /usr/local/lib/node_modules/n8n/bin/n8n /usr/local/bin/n8n && \ mkdir -p /home/node/.n8n && \ chown -R node:node /home/node diff --git a/packages/@n8n/config/src/configs/logging.config.ts b/packages/@n8n/config/src/configs/logging.config.ts index 8a899329d32..268fdb3003c 100644 --- a/packages/@n8n/config/src/configs/logging.config.ts +++ b/packages/@n8n/config/src/configs/logging.config.ts @@ -28,6 +28,7 @@ export const LOG_SCOPES = [ 'chat-hub', 'breaking-changes', 'circuit-breaker', + 'expression-engine', ] as const; export type LogScope = (typeof LOG_SCOPES)[number]; diff --git a/packages/@n8n/vitest-config/node.ts b/packages/@n8n/vitest-config/node.ts index f1e4d90c439..a586bd25415 100644 --- a/packages/@n8n/vitest-config/node.ts +++ b/packages/@n8n/vitest-config/node.ts @@ -1,27 +1,28 @@ import { defineConfig } from 'vitest/config'; import type { InlineConfig } from 'vitest/node'; -export const createVitestConfig = (options: InlineConfig = {}) => { - const vitestConfig = defineConfig({ - test: { - silent: true, - globals: true, - environment: 'node', - ...(process.env.COVERAGE_ENABLED === 'true' - ? { - coverage: { - enabled: true, - provider: 'v8', - reporter: process.env.CI === 'true' ? 'cobertura' : 'text-summary', - all: true, - }, - } - : {}), - ...options, - }, - }); +/** + * Shared test options without the outer defineConfig wrapper. + * Use this when you need to spread the config into workspace projects. + */ +export const createBaseInlineConfig = (options: InlineConfig = {}): InlineConfig => ({ + silent: true, + globals: true, + environment: 'node', + ...(process.env.COVERAGE_ENABLED === 'true' + ? { + coverage: { + enabled: true, + provider: 'v8', + reporter: process.env.CI === 'true' ? 'cobertura' : 'text-summary', + all: true, + }, + } + : {}), + ...options, +}); - return vitestConfig; -}; +export const createVitestConfig = (options: InlineConfig = {}) => + defineConfig({ test: createBaseInlineConfig(options) }); export const vitestConfig = createVitestConfig(); diff --git a/packages/cli/package.json b/packages/cli/package.json index d442f59ddcd..72fe9c0a1bc 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -105,6 +105,7 @@ "@n8n/decorators": "workspace:*", "@n8n/di": "workspace:*", "@n8n/errors": "workspace:*", + "@n8n/expression-runtime": "workspace:*", "@n8n/n8n-nodes-langchain": "workspace:*", "@n8n/permissions": "workspace:*", "@n8n/task-runner": "workspace:*", @@ -112,6 +113,7 @@ "@n8n/utils": "workspace:*", "@n8n_io/ai-assistant-sdk": "catalog:", "@n8n_io/license-sdk": "2.24.1", + "@opentelemetry/api": "^1.9.0", "@parcel/watcher": "^2.5.1", "@rudderstack/rudder-sdk-node": "3.0.5", "@sentry/node": "catalog:", diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 6c8372baa67..d0bbe854203 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -280,10 +280,20 @@ export class ActiveWorkflowManager { const additionalData = await WorkflowExecuteAdditionalData.getBase({ workflowId: workflow.id }); - const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true); + await workflow.expression.acquireIsolate(); + try { + const webhooks = WebhookHelpers.getWorkflowWebhooks( + workflow, + additionalData, + undefined, + true, + ); - for (const webhookData of webhooks) { - await this.webhookService.deleteWebhook(workflow, webhookData, mode, 'update'); + for (const webhookData of webhooks) { + await this.webhookService.deleteWebhook(workflow, webhookData, mode, 'update'); + } + } finally { + await workflow.expression.releaseIsolate(); } await this.workflowStaticDataService.saveStaticData(workflow); @@ -632,21 +642,29 @@ export class ActiveWorkflowManager { workflowId: workflow.id, }); - if (shouldAddWebhooks) { - added.webhooks = await this.addWebhooks( - workflow, - additionalData, - 'trigger', - activationMode, - ); - } + let triggerCount = 0; + await workflow.expression.acquireIsolate(); + try { + if (shouldAddWebhooks) { + added.webhooks = await this.addWebhooks( + workflow, + additionalData, + 'trigger', + activationMode, + ); + } - if (shouldAddTriggersAndPollers) { - added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, { - activationMode, - executionMode: 'trigger', - additionalData, - }); + if (shouldAddTriggersAndPollers) { + added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, { + activationMode, + executionMode: 'trigger', + additionalData, + }); + } + + triggerCount = this.countTriggers(workflow, additionalData); + } finally { + await workflow.expression.releaseIsolate(); } // Workflow got now successfully activated so make sure nothing is left in the queue @@ -654,7 +672,6 @@ export class ActiveWorkflowManager { await this.activationErrorsService.deregister(workflowId); - const triggerCount = this.countTriggers(workflow, additionalData); await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount); } catch (e) { const error = e instanceof Error ? e : new Error(`${e}`); diff --git a/packages/cli/src/chat/__tests__/chat-execution-manager.test.ts b/packages/cli/src/chat/__tests__/chat-execution-manager.test.ts index 84848057c2c..592acd17679 100644 --- a/packages/cli/src/chat/__tests__/chat-execution-manager.test.ts +++ b/packages/cli/src/chat/__tests__/chat-execution-manager.test.ts @@ -249,6 +249,10 @@ describe('ChatExecutionManager', () => { const workflow = { getNode: jest.fn().mockReturnValue(node), nodeTypes: { getByNameAndVersion: jest.fn().mockReturnValue(nodeType) }, + expression: { + acquireIsolate: jest.fn().mockResolvedValue(undefined), + releaseIsolate: jest.fn().mockResolvedValue(undefined), + }, }; jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow); jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any); @@ -280,6 +284,10 @@ describe('ChatExecutionManager', () => { const workflow = { getNode: jest.fn().mockReturnValue(node), nodeTypes: { getByNameAndVersion: jest.fn().mockReturnValue(nodeType) }, + expression: { + acquireIsolate: jest.fn().mockResolvedValue(undefined), + releaseIsolate: jest.fn().mockResolvedValue(undefined), + }, }; jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow); jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any); diff --git a/packages/cli/src/chat/chat-execution-manager.ts b/packages/cli/src/chat/chat-execution-manager.ts index 61d995d30e2..eb88549f89c 100644 --- a/packages/cli/src/chat/chat-execution-manager.ts +++ b/packages/cli/src/chat/chat-execution-manager.ts @@ -120,7 +120,12 @@ export class ChatExecutionManager { } if (nodeType.onMessage) { - return await nodeType.onMessage(context, nodeExecutionData); + await workflow.expression.acquireIsolate(); + try { + return await nodeType.onMessage(context, nodeExecutionData); + } finally { + await workflow.expression.releaseIsolate(); + } } return [[nodeExecutionData]]; diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index 58e3affb5a8..33a3a06ce87 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -20,7 +20,7 @@ import { ErrorReporter, ExecutionContextHookRegistry, } from 'n8n-core'; -import { ensureError, sleep, UnexpectedError, UserError } from 'n8n-workflow'; +import { ensureError, Expression, sleep, UnexpectedError, UserError } from 'n8n-workflow'; import type { AbstractServer } from '@/abstract-server'; import { N8N_VERSION, N8N_RELEASE_DATE } from '@/constants'; @@ -28,6 +28,7 @@ import * as CrashJournal from '@/crash-journal'; import { getDataDeduplicationService } from '@/deduplication'; import { TestRunCleanupService } from '@/evaluation.ee/test-runner/test-run-cleanup.service.ee'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; +import { ExpressionObservabilityProvider } from '@/expression-observability/expression-observability.provider'; import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay'; import { ExternalHooks } from '@/external-hooks'; import { License } from '@/license'; @@ -174,6 +175,18 @@ export abstract class BaseCommand { await Container.get(PostHogClient).init(); await Container.get(TelemetryEventRelay).init(); + + const { engine, poolSize, maxCodeCacheSize, bridgeTimeout, bridgeMemoryLimit, idleTimeout } = + this.globalConfig.expressionEngine; + await Expression.initExpressionEngine({ + engine, + poolSize, + maxCodeCacheSize, + bridgeTimeout, + bridgeMemoryLimit, + idleTimeoutMs: idleTimeout === undefined ? undefined : idleTimeout * 1000, + observability: Container.get(ExpressionObservabilityProvider), + }); } protected async stopProcess() { @@ -186,7 +199,11 @@ export abstract class BaseCommand { protected async exitSuccessFully() { try { - await Promise.all([CrashJournal.cleanup(), this.dbConnection.close()]); + await Promise.all([ + CrashJournal.cleanup(), + this.dbConnection.close(), + Expression.disposeExpressionEngine(), + ]); } finally { process.exit(); } diff --git a/packages/cli/src/credentials-helper.ts b/packages/cli/src/credentials-helper.ts index 8fc9ec4024a..a8bc62733fc 100644 --- a/packages/cli/src/credentials-helper.ts +++ b/packages/cli/src/credentials-helper.ts @@ -448,15 +448,20 @@ export class CredentialsHelper extends ICredentialsHelper { }); // Resolve expressions if any are set - decryptedData = workflow.expression.getComplexParameterValue( - mockNode, - decryptedData as INodeParameters, - mode, - additionalKeys, - undefined, - undefined, - decryptedData, - ) as ICredentialDataDecryptedObject; + await workflow.expression.acquireIsolate(); + try { + decryptedData = workflow.expression.getComplexParameterValue( + mockNode, + decryptedData as INodeParameters, + mode, + additionalKeys, + undefined, + undefined, + decryptedData, + ) as ICredentialDataDecryptedObject; + } finally { + await workflow.expression.releaseIsolate(); + } } return decryptedData; diff --git a/packages/cli/src/expression-observability/__tests__/expression-observability.provider.test.ts b/packages/cli/src/expression-observability/__tests__/expression-observability.provider.test.ts new file mode 100644 index 00000000000..9a7075569d3 --- /dev/null +++ b/packages/cli/src/expression-observability/__tests__/expression-observability.provider.test.ts @@ -0,0 +1,259 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ +/* eslint-disable @typescript-eslint/unbound-method */ +import type { Logger } from '@n8n/backend-common'; +import { ExpressionEngineConfig, type GlobalConfig } from '@n8n/config'; +import { EXPRESSION_METRICS } from '@n8n/expression-runtime'; +import { trace } from '@opentelemetry/api'; +import { mock } from 'jest-mock-extended'; +import promClient from 'prom-client'; + +import { ExpressionObservabilityProvider } from '../expression-observability.provider'; + +const scopedLogger = mock(); + +function buildConfig(overrides: Partial = {}): ExpressionEngineConfig { + const config = new ExpressionEngineConfig(); + // The provider is a no-op unless engine === 'vm'. Default tests to the active + // engine so shared assertions exercise the real code path. + return Object.assign(config, { engine: 'vm' as const, ...overrides }); +} + +function buildLogger(): Logger { + const logger = mock(); + logger.scoped.mockReturnValue(scopedLogger as unknown as Logger); + return logger; +} + +function buildGlobalConfig(prefix = 'n8n_'): GlobalConfig { + return { endpoints: { metrics: { prefix } } } as GlobalConfig; +} + +describe('ExpressionObservabilityProvider', () => { + beforeEach(() => { + jest.clearAllMocks(); + promClient.register.clear(); + }); + + describe('when disabled', () => { + it('delegates to NoOpProvider when observabilityEnabled=false', () => { + const provider = new ExpressionObservabilityProvider( + buildConfig({ observabilityEnabled: false }), + buildLogger(), + buildGlobalConfig(), + ); + + expect(() => { + provider.metrics.counter(EXPRESSION_METRICS.poolAcquired.name, 1); + provider.metrics.gauge(EXPRESSION_METRICS.codeCacheSize.name, 5); + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 10); + provider.logs.info('hello'); + provider.traces.startSpan('x').end(); + }).not.toThrow(); + }); + + it('delegates to NoOpProvider when engine is not vm', async () => { + const provider = new ExpressionObservabilityProvider( + buildConfig({ engine: 'legacy' }), + buildLogger(), + buildGlobalConfig(), + ); + + provider.metrics.counter(EXPRESSION_METRICS.poolAcquired.name, 1); + + const output = await promClient.register.metrics(); + expect(output).not.toContain('n8n_expression_'); + }); + }); + + describe('metrics adapter', () => { + it('registers a prom counter with _total suffix', async () => { + const provider = new ExpressionObservabilityProvider( + buildConfig(), + buildLogger(), + buildGlobalConfig(), + ); + provider.metrics.counter(EXPRESSION_METRICS.poolAcquired.name, 2); + + const metric = promClient.register.getSingleMetric('n8n_expression_pool_acquired_total'); + expect(metric).toBeDefined(); + const output = await promClient.register.metrics(); + expect(output).toContain('n8n_expression_pool_acquired_total 2'); + }); + + it('registers a prom gauge with the cleaned name', async () => { + const provider = new ExpressionObservabilityProvider( + buildConfig(), + buildLogger(), + buildGlobalConfig(), + ); + provider.metrics.gauge(EXPRESSION_METRICS.codeCacheSize.name, 42); + + const output = await promClient.register.metrics(); + expect(output).toContain('n8n_expression_code_cache_size 42'); + }); + + it('registers a prom histogram with bucketed observations', async () => { + const provider = new ExpressionObservabilityProvider( + buildConfig(), + buildLogger(), + buildGlobalConfig(), + ); + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 3, { + status: 'success', + type: 'none', + }); + + const output = await promClient.register.metrics(); + expect(output).toContain('n8n_expression_evaluation_duration_seconds_bucket'); + expect(output).toContain('n8n_expression_evaluation_duration_seconds_count'); + }); + }); + + describe('tail sampling', () => { + const startSpanMock = jest.fn().mockReturnValue({ + setStatus: jest.fn(), + setAttribute: jest.fn(), + recordException: jest.fn(), + end: jest.fn(), + }); + + beforeEach(() => { + startSpanMock.mockClear(); + jest.spyOn(trace, 'getTracer').mockReturnValue({ + startSpan: startSpanMock, + } as unknown as ReturnType); + }); + + it('drops healthy spans under the slow threshold', () => { + const provider = new ExpressionObservabilityProvider( + buildConfig({ slowEvaluationThresholdMs: 50, tracesSampleRate: 0 }), + buildLogger(), + buildGlobalConfig(), + ); + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.01, { + status: 'success', + type: 'none', + }); + expect(startSpanMock).not.toHaveBeenCalled(); + }); + + it('keeps spans that exceed the slow threshold', () => { + const provider = new ExpressionObservabilityProvider( + buildConfig({ slowEvaluationThresholdMs: 50 }), + buildLogger(), + buildGlobalConfig(), + ); + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.1, { + status: 'success', + type: 'none', + }); + expect(startSpanMock).toHaveBeenCalledWith( + 'expression.evaluate', + expect.objectContaining({ + attributes: expect.objectContaining({ + 'expression.outcome': 'slow', + 'expression.engine': 'vm', + }), + }), + ); + }); + + it('keeps spans when status is error, regardless of duration', () => { + const provider = new ExpressionObservabilityProvider( + buildConfig({ slowEvaluationThresholdMs: 50 }), + buildLogger(), + buildGlobalConfig(), + ); + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.005, { + status: 'error', + type: 'timeout', + }); + expect(startSpanMock).toHaveBeenCalledWith( + 'expression.evaluate', + expect.objectContaining({ + attributes: expect.objectContaining({ + 'expression.outcome': 'error', + 'expression.error.type': 'timeout', + }), + }), + ); + }); + + it('does not start spans when tracesEnabled=false', () => { + const provider = new ExpressionObservabilityProvider( + buildConfig({ tracesEnabled: false, slowEvaluationThresholdMs: 10 }), + buildLogger(), + buildGlobalConfig(), + ); + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.5, { + status: 'error', + type: 'timeout', + }); + expect(startSpanMock).not.toHaveBeenCalled(); + }); + }); + + describe('label-set stability (eager registration)', () => { + it('logs a warning and does not throw when an unknown metric is emitted', () => { + const provider = new ExpressionObservabilityProvider( + buildConfig(), + buildLogger(), + buildGlobalConfig(), + ); + expect(() => { + provider.metrics.counter('test.unknown', 1); + }).not.toThrow(); + expect(scopedLogger.warn).toHaveBeenCalledWith('Emitted unknown expression metric', { + name: 'test.unknown', + }); + }); + + it('uses the schema-registered label set regardless of call order', async () => { + const provider = new ExpressionObservabilityProvider( + buildConfig(), + buildLogger(), + buildGlobalConfig(), + ); + + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.01, { + status: 'error', + type: 'timeout', + }); + provider.metrics.histogram(EXPRESSION_METRICS.evaluationDuration.name, 0.02, { + status: 'success', + type: 'none', + }); + + const output = await promClient.register.metrics(); + const countLines = output + .split('\n') + .filter((line) => line.startsWith('n8n_expression_evaluation_duration_seconds_count{')); + expect(countLines.length).toBeGreaterThan(0); + for (const line of countLines) { + expect(line).toContain('status='); + expect(line).toContain('type='); + } + }); + }); + + describe('logs adapter', () => { + it('delegates to scoped logger', () => { + const provider = new ExpressionObservabilityProvider( + buildConfig(), + buildLogger(), + buildGlobalConfig(), + ); + provider.logs.info('hello', { k: 'v' }); + provider.logs.warn('warn', { k: 'v' }); + provider.logs.error('boom', new Error('x'), { k: 'v' }); + + expect(scopedLogger.info).toHaveBeenCalledWith('hello', { k: 'v' }); + expect(scopedLogger.warn).toHaveBeenCalledWith('warn', { k: 'v' }); + expect(scopedLogger.error).toHaveBeenCalledWith( + 'boom', + expect.objectContaining({ error: expect.any(Error), k: 'v' }), + ); + }); + }); +}); diff --git a/packages/cli/src/expression-observability/expression-observability.constants.ts b/packages/cli/src/expression-observability/expression-observability.constants.ts new file mode 100644 index 00000000000..0c169482b48 --- /dev/null +++ b/packages/cli/src/expression-observability/expression-observability.constants.ts @@ -0,0 +1,12 @@ +export const TRACER_NAME = 'n8n-expression-runtime'; + +export const ATTRIBUTE = { + EXPRESSION_ENGINE: 'expression.engine', + EXPRESSION_DURATION_SECONDS: 'expression.duration_seconds', + EXPRESSION_OUTCOME: 'expression.outcome', + EXPRESSION_ERROR_TYPE: 'expression.error.type', +} as const; + +export const DURATION_BUCKETS_SECONDS = [ + 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 5, +]; diff --git a/packages/cli/src/expression-observability/expression-observability.formatters.ts b/packages/cli/src/expression-observability/expression-observability.formatters.ts new file mode 100644 index 00000000000..98ff5c89ec8 --- /dev/null +++ b/packages/cli/src/expression-observability/expression-observability.formatters.ts @@ -0,0 +1,34 @@ +import type { Attributes } from '@opentelemetry/api'; + +const MAX_EXPRESSION_TEXT_LENGTH = 256; + +export function toPromName( + name: string, + kind: 'counter' | 'gauge' | 'histogram', + prefix: string, +): string { + const base = prefix + name.replace(/\./g, '_'); + return kind === 'counter' && !base.endsWith('_total') ? base + '_total' : base; +} + +export function normalizeAttributes(attributes?: Record): Attributes | undefined { + if (!attributes) return undefined; + const entries = Object.entries(attributes); + if (entries.length === 0) return undefined; + const result: Attributes = {}; + for (const [key, value] of entries) { + const normalized = normalizeAttributeValue(value); + if (normalized !== undefined) result[key] = normalized; + } + return result; +} + +export function normalizeAttributeValue(value: unknown): Attributes[string] | undefined { + if (value === undefined || value === null) return undefined; + if (typeof value === 'string') + return value.length > MAX_EXPRESSION_TEXT_LENGTH + ? value.slice(0, MAX_EXPRESSION_TEXT_LENGTH) + : value; + if (typeof value === 'number' || typeof value === 'boolean') return value; + return String(value); +} diff --git a/packages/cli/src/expression-observability/expression-observability.provider.ts b/packages/cli/src/expression-observability/expression-observability.provider.ts new file mode 100644 index 00000000000..2148d67df5f --- /dev/null +++ b/packages/cli/src/expression-observability/expression-observability.provider.ts @@ -0,0 +1,217 @@ +import { Logger } from '@n8n/backend-common'; +import { ExpressionEngineConfig, GlobalConfig } from '@n8n/config'; +import { Service } from '@n8n/di'; +import type { + LogsAPI, + MetricDef, + MetricsAPI, + ObservabilityProvider, + Span, + TracesAPI, +} from '@n8n/expression-runtime'; +import { EXPRESSION_METRICS, NoOpProvider } from '@n8n/expression-runtime'; +import { SpanStatusCode, trace } from '@opentelemetry/api'; +import type { Tracer } from '@opentelemetry/api'; +import { UnexpectedError } from 'n8n-workflow'; +import promClient, { type Counter, type Gauge, type Histogram } from 'prom-client'; + +import { + ATTRIBUTE, + DURATION_BUCKETS_SECONDS, + TRACER_NAME, +} from './expression-observability.constants'; +import { + normalizeAttributes, + normalizeAttributeValue, + toPromName, +} from './expression-observability.formatters'; + +type TailSampleDecision = 'drop' | 'keep'; + +@Service() +export class ExpressionObservabilityProvider implements ObservabilityProvider { + readonly metrics: MetricsAPI; + + readonly traces: TracesAPI; + + readonly logs: LogsAPI; + + private readonly scopedLogger: Logger; + + private readonly prefix!: string; + + private tracer?: Tracer; + + constructor( + private readonly config: ExpressionEngineConfig, + private readonly logger: Logger, + globalConfig: GlobalConfig, + ) { + this.scopedLogger = this.logger.scoped('expression-engine'); + + if (!this.config.observabilityEnabled || this.config.engine !== 'vm') { + this.metrics = NoOpProvider.metrics; + this.traces = NoOpProvider.traces; + this.logs = NoOpProvider.logs; + return; + } + + this.prefix = globalConfig.endpoints.metrics.prefix; + + this.registerMetrics(); + + this.metrics = { + counter: (name, value, tags) => this.counter(name, value, tags), + gauge: (name, value, tags) => this.gauge(name, value, tags), + histogram: (name, value, tags) => this.histogram(name, value, tags), + }; + + this.traces = { + startSpan: (name, attributes) => this.startSpan(name, attributes), + }; + + this.logs = { + error: (message, error, context) => this.scopedLogger.error(message, { error, ...context }), + warn: (message, context) => this.scopedLogger.warn(message, context), + info: (message, context) => this.scopedLogger.info(message, context), + debug: (message, context) => this.scopedLogger.debug(message, context), + }; + } + + private registerMetrics(): void { + for (const def of Object.values(EXPRESSION_METRICS) as MetricDef[]) { + const promName = toPromName(def.name, def.kind, this.prefix); + switch (def.kind) { + case 'counter': + new promClient.Counter({ + name: promName, + help: def.help, + labelNames: def.labels, + }); + break; + case 'gauge': + new promClient.Gauge({ + name: promName, + help: def.help, + labelNames: def.labels, + }); + break; + case 'histogram': + new promClient.Histogram({ + name: promName, + help: def.help, + labelNames: def.labels, + buckets: DURATION_BUCKETS_SECONDS, + }); + break; + default: { + const _exhaustive: never = def.kind; + throw new UnexpectedError(`Unknown metric kind: ${String(_exhaustive)}`); + } + } + } + } + + private counter(name: string, value: number, tags?: Record): void { + const promName = toPromName(name, 'counter', this.prefix); + const counter = promClient.register.getSingleMetric(promName) as Counter | undefined; + if (!counter) { + this.scopedLogger.warn('Emitted unknown expression metric', { name }); + return; + } + if (tags) counter.inc(tags, value); + else counter.inc(value); + } + + private gauge(name: string, value: number, tags?: Record): void { + const promName = toPromName(name, 'gauge', this.prefix); + const gauge = promClient.register.getSingleMetric(promName) as Gauge | undefined; + if (!gauge) { + this.scopedLogger.warn('Emitted unknown expression metric', { name }); + return; + } + if (tags) gauge.set(tags, value); + else gauge.set(value); + } + + private histogram(name: string, value: number, tags?: Record): void { + const promName = toPromName(name, 'histogram', this.prefix); + const histogram = promClient.register.getSingleMetric(promName) as + | Histogram + | undefined; + if (!histogram) { + this.scopedLogger.warn('Emitted unknown expression metric', { name }); + return; + } + if (tags) histogram.observe(tags, value); + else histogram.observe(value); + + if (name === EXPRESSION_METRICS.evaluationDuration.name) this.maybeRecordSpan(value, tags); + } + + private maybeRecordSpan(durationSeconds: number, tags?: Record): void { + const { tracesEnabled, slowEvaluationThresholdMs } = this.config; + if (!tracesEnabled) return; + + const decision = this.tailSample(durationSeconds, tags); + if (decision === 'drop') return; + + const slowThresholdSeconds = slowEvaluationThresholdMs / 1000; + const outcome = + tags?.status === 'error' + ? 'error' + : durationSeconds > slowThresholdSeconds + ? 'slow' + : 'healthy'; + + const tracer = this.getTracer(); + const errorType = tags?.type && tags.type !== 'none' ? tags.type : undefined; + const span = tracer.startSpan('expression.evaluate', { + attributes: { + [ATTRIBUTE.EXPRESSION_ENGINE]: 'vm', + [ATTRIBUTE.EXPRESSION_DURATION_SECONDS]: durationSeconds, + [ATTRIBUTE.EXPRESSION_OUTCOME]: outcome, + ...(errorType ? { [ATTRIBUTE.EXPRESSION_ERROR_TYPE]: errorType } : {}), + }, + }); + + if (tags?.status === 'error') span.setStatus({ code: SpanStatusCode.ERROR }); + + span.end(); + } + + private tailSample(durationSeconds: number, tags?: Record): TailSampleDecision { + if (tags?.status === 'error') return 'keep'; + const { slowEvaluationThresholdMs, tracesSampleRate } = this.config; + if (durationSeconds > slowEvaluationThresholdMs / 1000) return 'keep'; + if (tracesSampleRate > 0 && Math.random() < tracesSampleRate) return 'keep'; + return 'drop'; + } + + private startSpan(name: string, attributes?: Record): Span { + if (!this.config.tracesEnabled) return NoOpProvider.traces.startSpan(name, attributes); + + const tracer = this.getTracer(); + const otelSpan = tracer.startSpan(name, { + attributes: normalizeAttributes(attributes), + }); + + return { + setStatus: (status) => + otelSpan.setStatus({ + code: status === 'ok' ? SpanStatusCode.OK : SpanStatusCode.ERROR, + }), + setAttribute: (key, value) => { + const normalized = normalizeAttributeValue(value); + if (normalized !== undefined) otelSpan.setAttribute(key, normalized); + }, + recordException: (error) => otelSpan.recordException(error), + end: () => otelSpan.end(), + }; + } + + private getTracer(): Tracer { + this.tracer ??= trace.getTracer(TRACER_NAME); + return this.tracer; + } +} diff --git a/packages/cli/src/services/credentials-tester.service.ts b/packages/cli/src/services/credentials-tester.service.ts index 2835d29103c..77428dd4781 100644 --- a/packages/cli/src/services/credentials-tester.service.ts +++ b/packages/cli/src/services/credentials-tester.service.ts @@ -351,6 +351,7 @@ export class CredentialsTester { let response: INodeExecutionData[][] | null | undefined; try { + await workflow.expression.acquireIsolate(); response = await routingNode.runNode(); } catch (error) { this.errorReporter.error(error); @@ -396,6 +397,7 @@ export class CredentialsTester { message: error.message.toString(), }; } finally { + await workflow.expression.releaseIsolate(); delete mockNodesData[nodeTypeCopy.description.name]; } diff --git a/packages/cli/src/utils.ts b/packages/cli/src/utils.ts index c90049d5ec4..7ab87587abf 100644 --- a/packages/cli/src/utils.ts +++ b/packages/cli/src/utils.ts @@ -1,5 +1,5 @@ import { CliWorkflowOperationError, SubworkflowOperationError } from 'n8n-workflow'; -import type { INode, INodeType } from 'n8n-workflow'; +import type { INode, INodeType, Workflow } from 'n8n-workflow'; import { STARTING_NODES } from '@/constants'; @@ -122,3 +122,19 @@ export const getAllKeyPaths = ( } return paths; }; + +/** + * When N8N_EXPRESSION_ENGINE=vm, expressions run in an isolate that must be acquired + * for this workflow before any code resolves {{ }} in parameters or credentials. + */ +export async function withExpressionIsolate( + workflow: Workflow, + fn: () => Promise, +): Promise { + await workflow.expression.acquireIsolate(); + try { + return await fn(); + } finally { + await workflow.expression.releaseIsolate(); + } +} diff --git a/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts b/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts index c4f4067e841..d774d425b98 100644 --- a/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts +++ b/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts @@ -3,6 +3,7 @@ import { generateNanoId } from '@n8n/db'; import type * as express from 'express'; import { mock } from 'jest-mock-extended'; import type { + Expression, ITaskData, IWorkflowBase, IWebhookData, @@ -46,6 +47,7 @@ describe('TestWebhooks', () => { const webhookService = mock(); const testWebhooks = new TestWebhooks( + mock(), mock(), mock(), registrations, @@ -70,7 +72,7 @@ describe('TestWebhooks', () => { }; test('if webhook is needed, should register then create webhook and return true', async () => { - const workflow = mock(); + const workflow = mock({ expression: mock() }); jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow); jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]); @@ -106,7 +108,7 @@ describe('TestWebhooks', () => { }); test('returns false if a triggerToStartFrom with triggerData is given', async () => { - const workflow = mock(); + const workflow = mock({ expression: mock() }); jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow); jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]); @@ -123,7 +125,7 @@ describe('TestWebhooks', () => { test('returns true, registers and then creates webhook if triggerToStartFrom is given with no triggerData', async () => { // ARRANGE - const workflow = mock(); + const workflow = mock({ expression: mock() }); const webhook2 = mock({ node: 'trigger', httpMethod, diff --git a/packages/cli/src/webhooks/live-webhooks.ts b/packages/cli/src/webhooks/live-webhooks.ts index a94fa853d2a..77a2dc6adb0 100644 --- a/packages/cli/src/webhooks/live-webhooks.ts +++ b/packages/cli/src/webhooks/live-webhooks.ts @@ -131,45 +131,50 @@ export class LiveWebhooks implements IWebhookManager { projectId: ownerProjectId, }); - const webhookData = this.webhookService - .getNodeWebhooks(workflow, workflow.getNode(webhook.node) as INode, additionalData) - .find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData; + await workflow.expression.acquireIsolate(); + try { + const webhookData = this.webhookService + .getNodeWebhooks(workflow, workflow.getNode(webhook.node) as INode, additionalData) + .find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData; - // Get the node which has the webhook defined to know where to start from and to - // get additional data - const workflowStartNode = workflow.getNode(webhookData.node); + // Get the node which has the webhook defined to know where to start from and to + // get additional data + const workflowStartNode = workflow.getNode(webhookData.node); - if (workflowStartNode === null) { - throw new NotFoundError('Could not find node to process webhook.'); + if (workflowStartNode === null) { + throw new NotFoundError('Could not find node to process webhook.'); + } + + if (!authAllowlistedNodes.has(workflowStartNode.type)) { + sanitizeWebhookRequest(request); + } + + return await new Promise((resolve, reject) => { + const executionMode = 'webhook'; + void WebhookHelpers.executeWebhook( + workflow, + webhookData, + workflowData, + workflowStartNode, + executionMode, + undefined, + undefined, + undefined, + request, + response, + async (error: Error | null, data: object) => { + if (error !== null) { + return reject(error); + } + // Save static data if it changed + await this.workflowStaticDataService.saveStaticData(workflow); + resolve(data); + }, + ); + }); + } finally { + await workflow.expression.releaseIsolate(); } - - if (!authAllowlistedNodes.has(workflowStartNode.type)) { - sanitizeWebhookRequest(request); - } - - return await new Promise((resolve, reject) => { - const executionMode = 'webhook'; - void WebhookHelpers.executeWebhook( - workflow, - webhookData, - workflowData, - workflowStartNode, - executionMode, - undefined, - undefined, - undefined, - request, - response, - async (error: Error | null, data: object) => { - if (error !== null) { - return reject(error); - } - // Save static data if it changed - await this.workflowStaticDataService.saveStaticData(workflow); - resolve(data); - }, - ); - }); } private async findWebhook(path: string, httpMethod: IHttpRequestMethods) { diff --git a/packages/cli/src/webhooks/test-webhooks.ts b/packages/cli/src/webhooks/test-webhooks.ts index 4f340a88915..4fcc0c722ad 100644 --- a/packages/cli/src/webhooks/test-webhooks.ts +++ b/packages/cli/src/webhooks/test-webhooks.ts @@ -1,4 +1,5 @@ import { TEST_WEBHOOK_TIMEOUT } from '@/constants'; +import { Logger } from '@n8n/backend-common'; import { OnPubSubEvent } from '@n8n/decorators'; import { Service } from '@n8n/di'; import type express from 'express'; @@ -43,6 +44,7 @@ import type { WorkflowRequest } from '@/workflows/workflow.request'; @Service() export class TestWebhooks implements IWebhookManager { constructor( + private readonly logger: Logger, private readonly push: Push, private readonly nodeTypes: NodeTypes, private readonly registrations: TestWebhookRegistrationsService, @@ -128,59 +130,70 @@ export class TestWebhooks implements IWebhookManager { sanitizeWebhookRequest(request); } - return await new Promise(async (resolve, reject) => { - try { - const executionMode = 'manual'; - const executionId = await WebhookHelpers.executeWebhook( - workflow, - webhook, - workflowEntity, - workflowStartNode, - executionMode, - pushRef, - undefined, // IRunExecutionData - undefined, // executionId - request, - response, - (error: Error | null, data: IWebhookResponseCallbackData) => { - if (error !== null) reject(error); - else resolve(data); - }, - destinationNode, - ); - - // The workflow did not run as the request was probably setup related - // or a ping so do not resolve the promise and wait for the real webhook - // request instead. - if (executionId === undefined) return; - - // Inform editor-ui that webhook got received - if (pushRef !== undefined) { - this.push.send( - { type: 'testWebhookReceived', data: { workflowId: webhook?.workflowId, executionId } }, + await workflow.expression.acquireIsolate(); + try { + return await new Promise(async (resolve, reject) => { + try { + const executionMode = 'manual'; + const executionId = await WebhookHelpers.executeWebhook( + workflow, + webhook, + workflowEntity, + workflowStartNode, + executionMode, pushRef, + undefined, // IRunExecutionData + undefined, // executionId + request, + response, + (error: Error | null, data: IWebhookResponseCallbackData) => { + if (error !== null) reject(error); + else resolve(data); + }, + destinationNode, ); + + // The workflow did not run as the request was probably setup related + // or a ping so do not resolve the promise and wait for the real webhook + // request instead. + if (executionId === undefined) { + await workflow.expression.releaseIsolate(); + return; + } + + // Inform editor-ui that webhook got received + if (pushRef !== undefined) { + this.push.send( + { + type: 'testWebhookReceived', + data: { workflowId: webhook?.workflowId, executionId }, + }, + pushRef, + ); + } + } catch {} + + /** + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, after the test webhook has been successfully executed, + * the handler process commands the creator process to clear its test webhooks. + */ + if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) { + void this.publisher.publishCommand({ + command: 'clear-test-webhooks', + payload: { webhookKey: key, workflowEntity, pushRef }, + }); + return; } - } catch {} - /** - * Multi-main setup: In a manual webhook execution, the main process that - * handles a webhook might not be the same as the main process that created - * the webhook. If so, after the test webhook has been successfully executed, - * the handler process commands the creator process to clear its test webhooks. - */ - if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) { - void this.publisher.publishCommand({ - command: 'clear-test-webhooks', - payload: { webhookKey: key, workflowEntity, pushRef }, - }); - return; - } + this.clearTimeout(key); - this.clearTimeout(key); - - await this.deactivateWebhooks(workflow); - }); + await this.deactivateWebhooks(workflow); + }); + } finally { + await workflow.expression.releaseIsolate(); + } } @OnPubSubEvent('clear-test-webhooks', { instanceType: 'main' }) @@ -199,7 +212,12 @@ export class TestWebhooks implements IWebhookManager { const workflow = this.toWorkflow(workflowEntity); - await this.deactivateWebhooks(workflow); + await workflow.expression.acquireIsolate(); + try { + await this.deactivateWebhooks(workflow); + } finally { + await workflow.expression.releaseIsolate(); + } } clearTimeout(key: string) { @@ -294,96 +312,102 @@ export class TestWebhooks implements IWebhookManager { const workflow = this.toWorkflow(workflowEntity); - let webhooks = WebhookHelpers.getWorkflowWebhooks( - workflow, - additionalData, - destinationNode, - true, - ); + await workflow.expression.acquireIsolate(); + let webhooks: IWebhookData[]; + try { + webhooks = WebhookHelpers.getWorkflowWebhooks( + workflow, + additionalData, + destinationNode, + true, + ); - // If we have a preferred trigger with data, we don't have to listen for a - // webhook. - if (triggerToStartFrom?.data) { - return false; - } - - // If we have a preferred trigger without data we only want to listen for - // that trigger, not the other ones. - if (triggerToStartFrom) { - webhooks = webhooks.filter((w) => w.node === triggerToStartFrom.name); - } - - if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) { - return false; // no webhooks found to start a workflow - } - - const timeout = setTimeout( - async () => await this.cancelWebhook(workflow.id), - TEST_WEBHOOK_TIMEOUT, - ); - - for (const webhook of webhooks) { - const key = this.registrations.toKey(webhook); - const registrationByKey = await this.registrations.get(key); - - if (runData && webhook.node in runData) { + // If we have a preferred trigger with data, we don't have to listen for a + // webhook. + if (triggerToStartFrom?.data) { return false; } - // if registration already exists and is not a test webhook created by this user in this workflow throw an error - if ( - registrationByKey && - !webhook.webhookId && - !registrationByKey.webhook.isTest && - registrationByKey.webhook.userId !== userId && - registrationByKey.webhook.workflowId !== workflow.id - ) { - throw new WebhookPathTakenError(webhook.node); + // If we have a preferred trigger without data we only want to listen for + // that trigger, not the other ones. + if (triggerToStartFrom) { + webhooks = webhooks.filter((w) => w.node === triggerToStartFrom.name); } - webhook.path = removeTrailingSlash(webhook.path); - webhook.isTest = true; + if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) { + return false; // no webhooks found to start a workflow + } - /** - * Additional data cannot be cached because of circular refs. - * Hence store the `userId` and recreate additional data when needed. - */ - const { workflowExecuteAdditionalData: _, ...cacheableWebhook } = webhook; + const timeout = setTimeout( + async () => await this.cancelWebhook(workflow.id), + TEST_WEBHOOK_TIMEOUT, + ); - cacheableWebhook.userId = userId; + for (const webhook of webhooks) { + const key = this.registrations.toKey(webhook); + const registrationByKey = await this.registrations.get(key); - // TODO(CAT-1265): support destination node mode in test webhook registration. - const registration: TestWebhookRegistration = { - pushRef, - workflowEntity, - destinationNode: destinationNode?.nodeName, - webhook: cacheableWebhook as IWebhookData, - }; + if (runData && webhook.node in runData) { + return false; + } + + // if registration already exists and is not a test webhook created by this user in this workflow throw an error + if ( + registrationByKey && + !webhook.webhookId && + !registrationByKey.webhook.isTest && + registrationByKey.webhook.userId !== userId && + registrationByKey.webhook.workflowId !== workflow.id + ) { + throw new WebhookPathTakenError(webhook.node); + } + + webhook.path = removeTrailingSlash(webhook.path); + webhook.isTest = true; - try { /** - * Register the test webhook _before_ creation at third-party service - * in case service sends a confirmation request immediately on creation. + * Additional data cannot be cached because of circular refs. + * Hence store the `userId` and recreate additional data when needed. */ - await this.registrations.register(registration); + const { workflowExecuteAdditionalData: _, ...cacheableWebhook } = webhook; - await this.webhookService.createWebhookIfNotExists(workflow, webhook, 'manual', 'manual'); + cacheableWebhook.userId = userId; - cacheableWebhook.staticData = workflow.staticData; + // TODO(CAT-1265): support destination node mode in test webhook registration. + const registration: TestWebhookRegistration = { + pushRef, + workflowEntity, + destinationNode: destinationNode?.nodeName, + webhook: cacheableWebhook as IWebhookData, + }; - await this.registrations.register(registration); + try { + /** + * Register the test webhook _before_ creation at third-party service + * in case service sends a confirmation request immediately on creation. + */ + await this.registrations.register(registration); - this.timeouts[key] = timeout; - } catch (error) { - await this.deactivateWebhooks(workflow); + await this.webhookService.createWebhookIfNotExists(workflow, webhook, 'manual', 'manual'); - delete this.timeouts[key]; + cacheableWebhook.staticData = workflow.staticData; - throw error; + await this.registrations.register(registration); + + this.timeouts[key] = timeout; + } catch (error) { + await this.deactivateWebhooks(workflow); + + delete this.timeouts[key]; + + throw error; + } } - } - return true; + return true; + } finally { + await workflow.expression.releaseIsolate(); + } } async cancelWebhook(workflowId: string) { @@ -414,7 +438,19 @@ export class TestWebhooks implements IWebhookManager { if (!foundWebhook) { // As it removes all webhooks of the workflow execute only once - void this.deactivateWebhooks(workflow); + void (async () => { + await workflow.expression.acquireIsolate(); + try { + await this.deactivateWebhooks(workflow); + } finally { + await workflow.expression.releaseIsolate(); + } + })().catch((error) => { + this.logger.error('Failed to deactivate test webhooks on cancel', { + error, + workflowId, + }); + }); } foundWebhook = true; diff --git a/packages/cli/src/webhooks/waiting-webhooks.ts b/packages/cli/src/webhooks/waiting-webhooks.ts index ed1d8c21801..7835c43ffdd 100644 --- a/packages/cli/src/webhooks/waiting-webhooks.ts +++ b/packages/cli/src/webhooks/waiting-webhooks.ts @@ -227,67 +227,72 @@ export class WaitingWebhooks implements IWebhookManager { const additionalData = await WorkflowExecuteAdditionalData.getBase({ workflowId: workflow.id, }); - const webhookData = this.webhookService - .getNodeWebhooks(workflow, workflowStartNode, additionalData) - .find( - (webhook) => - webhook.httpMethod === req.method && - webhook.path === (suffix ?? '') && - webhook.webhookDescription.restartWebhook === true && - (webhook.webhookDescription.nodeType === 'form' || false) === this.includeForms, - ); - - if (webhookData === undefined) { - // If no data got found it means that the execution can not be started via a webhook. - // Return 404 because we do not want to give any data if the execution exists or not. - const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`; - - if (this.isSendAndWaitRequest(workflow.nodes, suffix)) { - res.render('send-and-wait-no-action-required', { isTestWebhook: false }); - return { noWebhookResponse: true }; - } - - if (!execution.data.resultData.error && execution.status === 'waiting') { - const childNodes = workflow.getChildNodes( - execution.data.resultData.lastNodeExecuted as string, + await workflow.expression.acquireIsolate(); + try { + const webhookData = this.webhookService + .getNodeWebhooks(workflow, workflowStartNode, additionalData) + .find( + (webhook) => + webhook.httpMethod === req.method && + webhook.path === (suffix ?? '') && + webhook.webhookDescription.restartWebhook === true && + (webhook.webhookDescription.nodeType === 'form' || false) === this.includeForms, ); - const hasChildForms = childNodes.some( - (node) => - workflow.nodes[node].type === FORM_NODE_TYPE || - workflow.nodes[node].type === WAIT_NODE_TYPE, - ); + if (webhookData === undefined) { + // If no data got found it means that the execution can not be started via a webhook. + // Return 404 because we do not want to give any data if the execution exists or not. + const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`; - if (hasChildForms) { + if (this.isSendAndWaitRequest(workflow.nodes, suffix)) { + res.render('send-and-wait-no-action-required', { isTestWebhook: false }); return { noWebhookResponse: true }; } + + if (!execution.data.resultData.error && execution.status === 'waiting') { + const childNodes = workflow.getChildNodes( + execution.data.resultData.lastNodeExecuted as string, + ); + + const hasChildForms = childNodes.some( + (node) => + workflow.nodes[node].type === FORM_NODE_TYPE || + workflow.nodes[node].type === WAIT_NODE_TYPE, + ); + + if (hasChildForms) { + return { noWebhookResponse: true }; + } + } + + throw new NotFoundError(errorMessage); } - throw new NotFoundError(errorMessage); + const runExecutionData = execution.data; + + return await new Promise((resolve, reject) => { + void WebhookHelpers.executeWebhook( + workflow, + webhookData, + workflowData, + workflowStartNode, + execution.mode, + runExecutionData.pushRef, + runExecutionData, + execution.id, + req, + res, + + (error: Error | null, data: object) => { + if (error !== null) { + return reject(error); + } + resolve(data); + }, + ); + }); + } finally { + await workflow.expression.releaseIsolate(); } - - const runExecutionData = execution.data; - - return await new Promise((resolve, reject) => { - void WebhookHelpers.executeWebhook( - workflow, - webhookData, - workflowData, - workflowStartNode, - execution.mode, - runExecutionData.pushRef, - runExecutionData, - execution.id, - req, - res, - - (error: Error | null, data: object) => { - if (error !== null) { - return reject(error); - } - resolve(data); - }, - ); - }); } } diff --git a/packages/core/src/execution-engine/__tests__/active-workflows.test.ts b/packages/core/src/execution-engine/__tests__/active-workflows.test.ts index 51e4b7b1921..c297b7ea93b 100644 --- a/packages/core/src/execution-engine/__tests__/active-workflows.test.ts +++ b/packages/core/src/execution-engine/__tests__/active-workflows.test.ts @@ -40,9 +40,15 @@ describe('ActiveWorkflows', () => { const pollNode = mock(); let activeWorkflows: ActiveWorkflows; + let acquireIsolate: jest.Mock; + let releaseIsolate: jest.Mock; beforeEach(() => { jest.clearAllMocks(); + acquireIsolate = jest.fn().mockResolvedValue(undefined); + releaseIsolate = jest.fn().mockResolvedValue(undefined); + // @ts-expect-error -- assign minimal expression stub for isolate-acquisition tests + workflow.expression = { acquireIsolate, releaseIsolate }; activeWorkflows = new ActiveWorkflows( mock(), scheduledTaskManager, diff --git a/packages/core/src/execution-engine/active-workflows.ts b/packages/core/src/execution-engine/active-workflows.ts index c908ab0911f..7923d9b56fb 100644 --- a/packages/core/src/execution-engine/active-workflows.ts +++ b/packages/core/src/execution-engine/active-workflows.ts @@ -158,7 +158,17 @@ export class ActiveWorkflows { workflowId: workflow.id, }); + // The initial activation poll runs inside ActiveWorkflowManager's + // outer acquireIsolate window, which also covers countTriggers + // afterwards. Acquiring here would release the outer bridge early + // (acquire is idempotent per caller; release deletes it). Scheduled + // polls fire from the cron scheduler's own async context outside + // that window and must acquire/release per tick — see CAT-3147. + const ownsIsolate = !testingTrigger; + try { + if (ownsIsolate) await workflow.expression.acquireIsolate(); + const pollResponse = await this.triggersAndPollers.runPoll(workflow, node, pollFunctions); if (pollResponse !== null) { @@ -172,6 +182,8 @@ export class ActiveWorkflows { throw error; } pollFunctions.__emitError(error as Error); + } finally { + if (ownsIsolate) await workflow.expression.releaseIsolate(); } }; diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index eea3b42bc5f..b8c28141774 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -1433,6 +1433,8 @@ export class WorkflowExecute { // eslint-disable-next-line complexity const returnPromise = (async () => { try { + await workflow.expression.acquireIsolate(); + // Establish the execution context await establishExecutionContext( workflow, @@ -2269,6 +2271,13 @@ export class WorkflowExecute { } return fullRunData; + }) + .finally(async () => { + try { + await workflow.expression.releaseIsolate(); + } catch (error) { + Container.get(ErrorReporter).error(error); + } }); return await returnPromise.then(resolve); diff --git a/packages/workflow/src/expression.ts b/packages/workflow/src/expression.ts index 832ab92a1af..f0350a8f81c 100644 --- a/packages/workflow/src/expression.ts +++ b/packages/workflow/src/expression.ts @@ -2,6 +2,7 @@ import { DateTime, Duration, Interval } from 'luxon'; import { ApplicationError } from '@n8n/errors'; import type { IExpressionEvaluator, ObservabilityProvider } from '@n8n/expression-runtime'; +import { MemoryLimitError, SecurityViolationError, TimeoutError } from '@n8n/expression-runtime'; import { ExpressionExtensionError } from './errors/expression-extension.error'; import { ExpressionError } from './errors/expression.error'; import { evaluateExpression, setErrorHandler } from './expression-evaluator-proxy'; @@ -58,6 +59,56 @@ setErrorHandler((error: Error) => { if (isExpressionError(error)) throw error; }); +/** + * Map errors from the VM expression evaluator to host-side error types. + * + * The VM bridge can only reconstruct plain Error objects with .name set, + * because it can't import ExpressionError/ExpressionExtensionError from + * packages/workflow without creating a circular dependency. + * + * TODO: Move this reconstruction into the bridge once expression-runtime + * can depend on workflow error classes (or a shared error package exists). + */ +function mapVmError(error: unknown): Error { + if (isExpressionError(error)) return error; + + // Runtime error types (TimeoutError, MemoryLimitError, etc.) must be + // checked before the name-based reconstruction below, because they + // extend the runtime's ExpressionError and share .name === 'ExpressionError'. + if (error instanceof TimeoutError) { + const wrapped = new ExpressionError('Expression timed out'); + wrapped.cause = error; + return wrapped; + } + if (error instanceof MemoryLimitError) { + const wrapped = new ExpressionError('Expression exceeded memory limit'); + wrapped.cause = error; + return wrapped; + } + if (error instanceof SecurityViolationError) { + const wrapped = new ExpressionError(error.message); + wrapped.cause = error; + return wrapped; + } + + // Name-based reconstruction for errors that crossed the isolate boundary + if (error instanceof Error && error.name === 'ExpressionExtensionError') { + const reconstructed = new ExpressionExtensionError(error.message); + Object.assign(reconstructed, error); + return reconstructed; + } + if (error instanceof Error && error.name === 'ExpressionError') { + const reconstructed = new ExpressionError(error.message); + Object.assign(reconstructed, error); + return reconstructed; + } + + if (isSyntaxError(error)) return new ExpressionError('invalid syntax'); + + if (error instanceof Error) return error; + return new Error(String(error)); +} + /** * Creates a safe Object wrapper that removes dangerous static methods * that could be used to bypass property access sanitization. @@ -260,6 +311,27 @@ export class Expression { } } + /** + * Get the active expression evaluation implementation. + * Used for testing and verification. + */ + static getActiveImplementation(): 'legacy' | 'vm' { + if (this.shouldUseVm()) return 'vm'; + return 'legacy'; + } + + /** + * Set the expression engine programmatically. + * + * WARNING: This is a global setting — switching engines mid-execution could + * cause a workflow to evaluate some expressions with one engine and some with + * another. Only use this in benchmarks and tests, never in production code. + * In production, set `N8N_EXPRESSION_ENGINE` before process startup instead. + */ + static setExpressionEngine(engine: 'legacy' | 'vm'): void { + this.expressionEngine = engine; + } + static initializeGlobalContext(data: IDataObject) { /** * Denylist @@ -517,9 +589,33 @@ export class Expression { Expression.initializeGlobalContext(data); - // expression extensions - data.extend = extend; - data.extendOptional = extendOptional; + const usingVm = Expression.shouldUseVm(); + + // Expression extensions — only attached for the legacy engine. + // + // In the VM engine, every host function reachable from `data` becomes + // a callable target the isolate can reach via `callFunctionAtPath`. + // To minimise that surface we keep the VM-path data object as small as + // possible and let the in-isolate runtime resolve helpers itself + // (see packages/@n8n/expression-runtime/src/runtime/context.ts, where + // Tournament's polyfill rewrites bare `extend(...)` calls to the + // in-isolate copy on `target.extend`). Setting them on `data` in VM + // mode would be dead code AND an unnecessary host-callable. + if (!usingVm) { + data.extend = extend; + data.extendOptional = extendOptional; + } + + // In VM mode, strip `$jmesPath` / `$jmespath` from the data proxy. + // WorkflowDataProxy adds them, but the in-isolate `target.$jmespath` + // shadows them via Tournament's polyfill (see + // packages/@n8n/expression-runtime/src/runtime/context.ts). The delete + // makes them unreachable via direct path lookup through the bridge + // too, so the bridge can never invoke the host-side copies. + if (usingVm) { + delete data.$jmesPath; + delete data.$jmespath; + } Object.defineProperty(data, sanitizerName, { value: sanitizer, @@ -558,6 +654,24 @@ export class Expression { } private renderExpression(expression: string, data: IWorkflowDataProxyData) { + // Use VM evaluator if engine is set to 'vm' and we're not in the browser + if (Expression.expressionEngine === 'vm' && !IS_FRONTEND) { + if (!Expression.vmEvaluator) { + throw new ApplicationError( + 'N8N_EXPRESSION_ENGINE=vm is enabled but VM evaluator is not initialized. Call Expression.initExpressionEngine() during application startup.', + ); + } + + try { + const result = Expression.vmEvaluator.evaluate(expression, data, this, { + timezone: this.workflow.timezone, + }); + return result as string | null | (() => unknown); + } catch (error) { + throw mapVmError(error); + } + } + try { return evaluateExpression(expression, data); } catch (error) { diff --git a/packages/workflow/test/ExpressionExtensions/date-extensions.test.ts b/packages/workflow/test/ExpressionExtensions/date-extensions.test.ts index 27c2d80eb8e..db90e8a6232 100644 --- a/packages/workflow/test/ExpressionExtensions/date-extensions.test.ts +++ b/packages/workflow/test/ExpressionExtensions/date-extensions.test.ts @@ -2,7 +2,7 @@ import { DateTime } from 'luxon'; -import { evaluate, getLocalISOString } from './helpers'; +import { asDateTime, evaluate, getLocalISOString } from './helpers'; import { dateExtensions } from '../../src/extensions/date-extensions'; import { getGlobalState } from '../../src/global-state'; @@ -19,9 +19,13 @@ describe('Data Transformation Functions', () => { describe('.beginningOf', () => { test('.beginningOf("week") should work correctly on a date', () => { - expect(evaluate('={{ DateTime.local(2023, 1, 20).beginningOf("week") }}')).toEqual( - DateTime.local(2023, 1, 16, { zone: defaultTimezone }), + const result = asDateTime( + evaluate('={{ DateTime.local(2023, 1, 20).beginningOf("week") }}'), ); + expect(result).toBeInstanceOf(DateTime); + expect(result.toISODate()).toEqual('2023-01-16'); + expect(result.hour).toEqual(0); + expect(result.minute).toEqual(0); expect(evaluate('={{ new Date(2023, 0, 20).beginningOf("week") }}')).toEqual( DateTime.local(2023, 1, 16, { zone: defaultTimezone }).toJSDate(), @@ -57,9 +61,12 @@ describe('Data Transformation Functions', () => { }); test('.endOfMonth() should work correctly on a date', () => { - expect(evaluate('={{ DateTime.local(2023, 1, 16).endOfMonth() }}')).toEqual( - DateTime.local(2023, 1, 31, 23, 59, 59, 999, { zone: defaultTimezone }), - ); + const result = asDateTime(evaluate('={{ DateTime.local(2023, 1, 16).endOfMonth() }}')); + expect(result).toBeInstanceOf(DateTime); + expect(result.toISODate()).toEqual('2023-01-31'); + expect(result.hour).toEqual(23); + expect(result.minute).toEqual(59); + expect(evaluate('={{ new Date(2023, 0, 16).endOfMonth() }}')).toEqual( DateTime.local(2023, 1, 31, 23, 59, 59, 999, { zone: defaultTimezone }).toJSDate(), ); @@ -239,9 +246,9 @@ describe('Data Transformation Functions', () => { describe('.toDateTime', () => { test('should return itself for DateTime', () => { - const result = evaluate( - "={{ DateTime.fromFormat('01-01-2024', 'dd-MM-yyyy').toDateTime() }}", - ) as unknown as DateTime; + const result = asDateTime( + evaluate("={{ DateTime.fromFormat('01-01-2024', 'dd-MM-yyyy').toDateTime() }}"), + ); expect(result).toBeInstanceOf(DateTime); expect(result.day).toEqual(1); expect(result.month).toEqual(1); @@ -249,9 +256,7 @@ describe('Data Transformation Functions', () => { }); test('should return a DateTime for JS Date', () => { - const result = evaluate( - '={{ new Date(2024, 0, 1, 12).toDateTime() }}', - ) as unknown as DateTime; + const result = asDateTime(evaluate('={{ new Date(2024, 0, 1, 12).toDateTime() }}')); expect(result).toBeInstanceOf(DateTime); expect(result.day).toEqual(1); expect(result.month).toEqual(1); diff --git a/packages/workflow/test/ExpressionExtensions/helpers.ts b/packages/workflow/test/ExpressionExtensions/helpers.ts index d59f3c0c140..d7a56029d9f 100644 --- a/packages/workflow/test/ExpressionExtensions/helpers.ts +++ b/packages/workflow/test/ExpressionExtensions/helpers.ts @@ -1,3 +1,6 @@ +import { DateTime, Duration, Interval } from 'luxon'; +import { afterAll, beforeAll } from 'vitest'; + import type { IDataObject } from '../../src/interfaces'; import { Workflow } from '../../src/workflow'; import * as Helpers from '../helpers'; @@ -20,6 +23,15 @@ export const workflow = new Workflow({ }); export const expression = workflow.expression; +// acquireIsolate/releaseIsolate are no-ops for the legacy engine, so these +// hooks are safe to register unconditionally. +beforeAll(async () => { + await expression.acquireIsolate(); +}); +afterAll(async () => { + await expression.releaseIsolate(); +}); + export const evaluate = (value: string, values?: IDataObject[]) => expression.getParameterValue( value, @@ -32,6 +44,29 @@ export const evaluate = (value: string, values?: IDataObject[]) => {}, ); +/** + * Normalize expression results that may be Luxon instances (legacy engine) + * or ISO strings (VM engine). The VM engine serializes Luxon types to ISO + * strings at the isolate boundary. + */ +export const asDateTime = (v: unknown): DateTime => { + if (DateTime.isDateTime(v)) return v; + if (typeof v !== 'string') throw new Error(`Expected DateTime or ISO string, got ${typeof v}`); + return DateTime.fromISO(v); +}; + +export const asDuration = (v: unknown): Duration => { + if (Duration.isDuration(v)) return v; + if (typeof v !== 'string') throw new Error(`Expected Duration or ISO string, got ${typeof v}`); + return Duration.fromISO(v); +}; + +export const asInterval = (v: unknown): Interval => { + if (Interval.isInterval(v)) return v; + if (typeof v !== 'string') throw new Error(`Expected Interval or ISO string, got ${typeof v}`); + return Interval.fromISO(v); +}; + export const getLocalISOString = (date: Date) => { const offset = date.getTimezoneOffset(); const offsetAbs = Math.abs(offset); diff --git a/packages/workflow/test/ExpressionExtensions/string-extensions.test.ts b/packages/workflow/test/ExpressionExtensions/string-extensions.test.ts index b2bb16d4d9a..c4876ba9c7d 100644 --- a/packages/workflow/test/ExpressionExtensions/string-extensions.test.ts +++ b/packages/workflow/test/ExpressionExtensions/string-extensions.test.ts @@ -1,7 +1,7 @@ // @vitest-environment jsdom import { DateTime } from 'luxon'; -import { evaluate } from './helpers'; +import { asDateTime, evaluate } from './helpers'; import { ExpressionExtensionError } from '../../src/errors'; describe('Data Transformation Functions', () => { @@ -276,13 +276,17 @@ describe('Data Transformation Functions', () => { }); test('.toDateTime should work on a variety of formats', () => { - expect(evaluate('={{ "Wed, 21 Oct 2015 07:28:00 GMT".toDateTime() }}')).toBeInstanceOf( + expect( + asDateTime(evaluate('={{ "Wed, 21 Oct 2015 07:28:00 GMT".toDateTime() }}')), + ).toBeInstanceOf(DateTime); + expect(asDateTime(evaluate('={{ "2008-11-11".toDateTime() }}'))).toBeInstanceOf(DateTime); + expect(asDateTime(evaluate('={{ "1-Feb-2024".toDateTime() }}'))).toBeInstanceOf(DateTime); + expect(asDateTime(evaluate('={{ "1713976144063".toDateTime("ms") }}'))).toBeInstanceOf( + DateTime, + ); + expect(asDateTime(evaluate('={{ "31-01-2024".toDateTime("dd-MM-yyyy") }}'))).toBeInstanceOf( DateTime, ); - expect(evaluate('={{ "2008-11-11".toDateTime() }}')).toBeInstanceOf(DateTime); - expect(evaluate('={{ "1-Feb-2024".toDateTime() }}')).toBeInstanceOf(DateTime); - expect(evaluate('={{ "1713976144063".toDateTime("ms") }}')).toBeInstanceOf(DateTime); - expect(evaluate('={{ "31-01-2024".toDateTime("dd-MM-yyyy") }}')).toBeInstanceOf(DateTime); vi.useFakeTimers({ now: new Date() }); expect(() => evaluate('={{ "hi".toDateTime() }}')).toThrow( diff --git a/packages/workflow/test/expression-array-proxy-semantics.test.ts b/packages/workflow/test/expression-array-proxy-semantics.test.ts index 5a9388e12b6..62ed9cd9077 100644 --- a/packages/workflow/test/expression-array-proxy-semantics.test.ts +++ b/packages/workflow/test/expression-array-proxy-semantics.test.ts @@ -25,6 +25,16 @@ describe('Expression — array proxy semantics (engine parity)', () => { }); const expression = workflow.expression; + // acquireIsolate/releaseIsolate are no-ops for the legacy engine, so these + // hooks are safe to register unconditionally; under the vm engine they + // reserve the isolate this suite's expressions evaluate against. + beforeAll(async () => { + await expression.acquireIsolate(); + }); + afterAll(async () => { + await expression.releaseIsolate(); + }); + const evaluate = (value: string, json: unknown) => { const data: INodeExecutionData[] = [{ json: json as INodeExecutionData['json'] }]; return expression.getParameterValue(value, null, 0, 0, 'node', data, 'manual', {}); diff --git a/packages/workflow/test/expression-vm-dispatch.test.ts b/packages/workflow/test/expression-vm-dispatch.test.ts new file mode 100644 index 00000000000..1896a5201a8 --- /dev/null +++ b/packages/workflow/test/expression-vm-dispatch.test.ts @@ -0,0 +1,72 @@ +import { afterAll, beforeAll, beforeEach, afterEach, describe, expect, test } from 'vitest'; + +import { Expression } from '../src/expression'; +import { Workflow } from '../src/workflow'; +import * as Helpers from './helpers'; + +describe('Expression VM engine dispatch', () => { + let workflow: Workflow; + + beforeAll(async () => { + await Expression.initExpressionEngine({ + engine: 'vm', + bridgeTimeout: 5000, + bridgeMemoryLimit: 128, + poolSize: 1, + maxCodeCacheSize: 64, + }); + + const nodeTypes = Helpers.NodeTypes(); + workflow = new Workflow({ + nodes: [ + { + name: 'node', + typeVersion: 1, + type: 'test.set', + id: 'uuid-1', + parameters: {}, + position: [0, 0], + }, + ], + connections: {}, + active: false, + nodeTypes, + }); + }); + + afterAll(async () => { + await Expression.disposeExpressionEngine(); + }); + + beforeEach(async () => { + await workflow.expression.acquireIsolate(); + }); + + afterEach(async () => { + await workflow.expression.releaseIsolate(); + }); + + test('reports vm as active implementation while engine is set', () => { + expect(Expression.getActiveImplementation()).toBe('vm'); + }); + + test('evaluates a simple expression through the VM engine', () => { + const result = workflow.expression.getSimpleParameterValue( + workflow.nodes.node, + '={{ 7 * 6 }}', + 'manual', + {}, + ); + expect(result).toBe(42); + }); + + test('evaluates string concatenation through the VM engine', () => { + const result = workflow.expression.getSimpleParameterValue( + workflow.nodes.node, + '={{ "hello " + "world" }}', + 'manual', + {}, + ); + expect(result).toBe('hello world'); + }); +}); diff --git a/packages/workflow/test/expression-vm-errors.test.ts b/packages/workflow/test/expression-vm-errors.test.ts new file mode 100644 index 00000000000..6c22d720ef0 --- /dev/null +++ b/packages/workflow/test/expression-vm-errors.test.ts @@ -0,0 +1,213 @@ +import { TimeoutError, MemoryLimitError, SecurityViolationError } from '@n8n/expression-runtime'; +import type { IExpressionEvaluator } from '@n8n/expression-runtime'; +import { mock } from 'vitest-mock-extended'; + +import { ExpressionError } from '../src/errors/expression.error'; +import { ExpressionExtensionError } from '../src/errors/expression-extension.error'; +import { Expression } from '../src/expression'; +import { Workflow } from '../src/workflow'; +import * as Helpers from './helpers'; + +/** + * Tests that VM-specific error types from @n8n/expression-runtime + * are caught and wrapped in workflow ExpressionError instances. + * + * The runtime package defines its own ExpressionError class hierarchy + * (TimeoutError, MemoryLimitError, SecurityViolationError), which is + * different from packages/workflow's ExpressionError. Without explicit + * handling, these errors bypass the isExpressionError() check and + * propagate as raw runtime errors. + */ +describe('Expression VM error handling', () => { + const nodeTypes = Helpers.NodeTypes(); + const workflow = new Workflow({ + id: '1', + nodes: [ + { + name: 'node', + typeVersion: 1, + type: 'test.set', + id: 'uuid-1234', + position: [0, 0], + parameters: {}, + }, + ], + connections: {}, + active: false, + nodeTypes, + }); + + let originalEngine: 'legacy' | 'vm'; + let originalEvaluator: IExpressionEvaluator | undefined; + + beforeEach(async () => { + originalEngine = Expression.getActiveImplementation(); + originalEvaluator = (Expression as any).vmEvaluator; + await workflow.expression.acquireIsolate(); + }); + + afterEach(async () => { + await workflow.expression.releaseIsolate(); + Expression.setExpressionEngine(originalEngine); + (Expression as any).vmEvaluator = originalEvaluator; + }); + + function setVmEvaluator(evaluator: Pick) { + Expression.setExpressionEngine('vm'); + (Expression as any).vmEvaluator = mock(evaluator); + } + + const evaluate = (expr: string) => + workflow.expression.getParameterValue(expr, null, 0, 0, 'node', [], 'manual', {}); + + it('should wrap TimeoutError in ExpressionError', () => { + const timeoutError = new TimeoutError('Expression timed out after 5000ms', {}); + setVmEvaluator({ + evaluate: () => { + throw timeoutError; + }, + }); + + let caught: unknown; + try { + evaluate('={{ $json.id }}'); + } catch (error) { + caught = error; + } + + expect(caught).toBeInstanceOf(ExpressionError); + expect((caught as ExpressionError).message).toBe('Expression timed out'); + expect((caught as ExpressionError).cause).toBe(timeoutError); + }); + + it('should wrap MemoryLimitError in ExpressionError', () => { + const memoryError = new MemoryLimitError('Expression exceeded memory limit of 128MB', {}); + setVmEvaluator({ + evaluate: () => { + throw memoryError; + }, + }); + + let caught: unknown; + try { + evaluate('={{ $json.id }}'); + } catch (error) { + caught = error; + } + + expect(caught).toBeInstanceOf(ExpressionError); + expect((caught as ExpressionError).message).toBe('Expression exceeded memory limit'); + expect((caught as ExpressionError).cause).toBe(memoryError); + }); + + it('should wrap SecurityViolationError in ExpressionError', () => { + const securityError = new SecurityViolationError( + 'Cannot access "constructor" due to security concerns', + {}, + ); + setVmEvaluator({ + evaluate: () => { + throw securityError; + }, + }); + + let caught: unknown; + try { + evaluate('={{ $json.id }}'); + } catch (error) { + caught = error; + } + + expect(caught).toBeInstanceOf(ExpressionError); + expect((caught as ExpressionError).message).toBe( + 'Cannot access "constructor" due to security concerns', + ); + expect((caught as ExpressionError).cause).toBe(securityError); + }); + + it('should preserve description when reconstructing ExpressionError across isolate boundary', () => { + const expressionError = new ExpressionError('something went wrong', { + description: 'A human-readable description', + }); + const connectionInputData = [ + { + json: { + get boom(): never { + throw expressionError; + }, + }, + }, + ]; + + let caught: unknown; + try { + workflow.expression.getParameterValue( + '={{ $json.boom }}', + null, + 0, + 0, + 'node', + connectionInputData, + 'manual', + {}, + ); + } catch (error) { + caught = error; + } + + expect(caught).toBeInstanceOf(ExpressionError); + expect(caught).toEqual(expressionError); + }); + + it('should preserve description when reconstructing ExpressionExtensionError across isolate boundary', () => { + const expressionExtensionError = new ExpressionExtensionError('extension failed', { + description: 'Extension-specific description', + }); + const connectionInputData = [ + { + json: { + get boom(): never { + throw expressionExtensionError; + }, + }, + }, + ]; + + let caught: unknown; + try { + workflow.expression.getParameterValue( + '={{ $json.boom }}', + null, + 0, + 0, + 'node', + connectionInputData, + 'manual', + {}, + ); + } catch (error) { + caught = error; + } + + expect(caught).toBeInstanceOf(ExpressionExtensionError); + expect(caught).toEqual(expressionExtensionError); + }); + + it('should convert built-in SyntaxError to ExpressionError', () => { + setVmEvaluator({ + evaluate: () => { + throw new SyntaxError('Unexpected token'); + }, + }); + + let caught: unknown; + try { + evaluate('={{ $json.id }}'); + } catch (error) { + caught = error; + } + + expect(caught).toBeInstanceOf(ExpressionError); + expect((caught as ExpressionError).message).toBe('invalid syntax'); + }); +}); diff --git a/packages/workflow/test/expression.test.ts b/packages/workflow/test/expression.test.ts index 9cb8707b50c..bd83afe5cde 100644 --- a/packages/workflow/test/expression.test.ts +++ b/packages/workflow/test/expression.test.ts @@ -2,7 +2,7 @@ import { DateTime, Duration, Interval } from 'luxon'; -import { workflow } from './ExpressionExtensions/helpers'; +import { workflow, asDuration, asInterval } from './ExpressionExtensions/helpers'; import { baseFixtures } from './ExpressionFixtures/base'; import type { ExpressionTestEvaluation, ExpressionTestTransform } from './ExpressionFixtures/base'; import * as Helpers from './helpers'; @@ -33,6 +33,13 @@ describe('Expression', () => { }); const expression = workflow.expression; + beforeAll(async () => { + await expression.acquireIsolate(); + }); + afterAll(async () => { + await expression.releaseIsolate(); + }); + const evaluate = (value: string) => expression.getParameterValue(value, null, 0, 0, 'node', [], 'manual', {}); @@ -86,32 +93,41 @@ describe('Expression', () => { ); vi.useFakeTimers({ now: new Date() }); - expect(evaluate('={{Interval.after(new Date(), 100)}}')).toEqual( - Interval.after(new Date(), 100), - ); + const intervalResult = asInterval(evaluate('={{Interval.after(new Date(), 100)}}')); + expect(intervalResult).toBeInstanceOf(Interval); + expect(intervalResult.length('milliseconds')).toEqual(100); vi.useRealTimers(); - expect(evaluate('={{Duration.fromMillis(100)}}')).toEqual(Duration.fromMillis(100)); + const durationResult = asDuration(evaluate('={{Duration.fromMillis(100)}}')); + expect(durationResult).toBeInstanceOf(Duration); + expect(durationResult.toMillis()).toEqual(100); expect(evaluate('={{new Object()}}')).toEqual(new Object()); expect(evaluate('={{new Array()}}')).toEqual([]); - expect(evaluate('={{new Int8Array()}}')).toEqual(new Int8Array()); - expect(evaluate('={{new Uint8Array()}}')).toEqual(new Uint8Array()); - expect(evaluate('={{new Uint8ClampedArray()}}')).toEqual(new Uint8ClampedArray()); - expect(evaluate('={{new Int16Array()}}')).toEqual(new Int16Array()); - expect(evaluate('={{new Uint16Array()}}')).toEqual(new Uint16Array()); - expect(evaluate('={{new Int32Array()}}')).toEqual(new Int32Array()); - expect(evaluate('={{new Uint32Array()}}')).toEqual(new Uint32Array()); - expect(evaluate('={{new Float32Array()}}')).toEqual(new Float32Array()); - expect(evaluate('={{new Float64Array()}}')).toEqual(new Float64Array()); - expect(evaluate('={{new BigInt64Array()}}')).toEqual(new BigInt64Array()); - expect(evaluate('={{new BigUint64Array()}}')).toEqual(new BigUint64Array()); + // Typed arrays: verify constructors are accessible and return correct length. + // We don't use toEqual(new Int8Array()) because the VM engine returns typed + // arrays from a different V8 realm, which breaks instanceof/toEqual despite + // being functionally identical. This is fine — typed arrays aren't a practical + // expression return type (they don't survive JSON serialization). + expect(evaluate('={{new Int8Array(3).length}}')).toEqual(3); + expect(evaluate('={{new Uint8Array(3).length}}')).toEqual(3); + expect(evaluate('={{new Uint8ClampedArray(3).length}}')).toEqual(3); + expect(evaluate('={{new Int16Array(3).length}}')).toEqual(3); + expect(evaluate('={{new Uint16Array(3).length}}')).toEqual(3); + expect(evaluate('={{new Int32Array(3).length}}')).toEqual(3); + expect(evaluate('={{new Uint32Array(3).length}}')).toEqual(3); + expect(evaluate('={{new Float32Array(3).length}}')).toEqual(3); + expect(evaluate('={{new Float64Array(3).length}}')).toEqual(3); + expect(evaluate('={{new BigInt64Array(3).length}}')).toEqual(3); + expect(evaluate('={{new BigUint64Array(3).length}}')).toEqual(3); expect(evaluate('={{new Map()}}')).toEqual(new Map()); - expect(evaluate('={{new WeakMap()}}')).toEqual(new WeakMap()); + // WeakMap/WeakSet are not structured-cloneable, so they can't cross + // the VM isolate boundary. Verify the constructors are accessible instead. + expect(evaluate('={{new WeakMap() instanceof WeakMap}}')).toEqual(true); expect(evaluate('={{new Set()}}')).toEqual(new Set()); - expect(evaluate('={{new WeakSet()}}')).toEqual(new WeakSet()); + expect(evaluate('={{new WeakSet() instanceof WeakSet}}')).toEqual(true); expect(evaluate('={{new Error()}}')).toEqual(new Error()); expect(evaluate('={{new TypeError()}}')).toEqual(new TypeError()); @@ -121,13 +137,16 @@ describe('Expression', () => { expect(evaluate('={{new ReferenceError()}}')).toEqual(new ReferenceError()); expect(evaluate('={{new URIError()}}')).toEqual(new URIError()); - expect(evaluate('={{Intl}}')).toEqual(Intl); + // Intl from the isolate is a different realm object; verify it's accessible + expect(evaluate('={{typeof Intl}}')).toEqual('object'); - expect(evaluate('={{new String()}}')).toEqual(new String()); - expect(evaluate("={{new RegExp('')}}")).toEqual(new RegExp('')); + expect(evaluate('={{new String().toString()}}')).toEqual(''); + expect(evaluate("={{new RegExp('').source}}")).toEqual('(?:)'); - expect(evaluate('={{Math}}')).toEqual(Math); - expect(evaluate('={{new Number()}}')).toEqual(new Number()); + // Namespace objects (Math, Atomics) come from a different V8 realm in the + // VM engine, so toEqual fails despite identical content. Verify accessibility. + expect(evaluate('={{typeof Math}}')).toEqual('object'); + expect(evaluate('={{new Number().valueOf()}}')).toEqual(0); expect(evaluate("={{BigInt('1')}}")).toEqual(BigInt('1')); expect(evaluate('={{Infinity}}')).toEqual(Infinity); expect(evaluate('={{NaN}}')).toEqual(NaN); @@ -137,12 +156,12 @@ describe('Expression', () => { expect(evaluate("={{parseInt('1', 10)}}")).toEqual(parseInt('1', 10)); expect(evaluate('={{JSON.stringify({})}}')).toEqual(JSON.stringify({})); - expect(evaluate('={{new ArrayBuffer(10)}}')).toEqual(new ArrayBuffer(10)); - expect(evaluate('={{new SharedArrayBuffer(10)}}')).toEqual(new SharedArrayBuffer(10)); - expect(evaluate('={{Atomics}}')).toEqual(Atomics); - expect(evaluate('={{new DataView(new ArrayBuffer(1))}}')).toEqual( - new DataView(new ArrayBuffer(1)), - ); + // ArrayBuffer, SharedArrayBuffer, DataView, and Atomics come from a + // different V8 realm in the VM engine. Verify accessibility instead. + expect(evaluate('={{new ArrayBuffer(10).byteLength}}')).toEqual(10); + expect(evaluate('={{new SharedArrayBuffer(10).byteLength}}')).toEqual(10); + expect(evaluate('={{typeof Atomics}}')).toEqual('object'); + expect(evaluate('={{new DataView(new ArrayBuffer(1)).byteLength}}')).toEqual(1); expect(evaluate("={{encodeURI('https://google.com')}}")).toEqual( encodeURI('https://google.com'), @@ -450,28 +469,46 @@ describe('Expression', () => { ); }); + // The $jmespath restricted-identifier defense differs by engine: + // legacy → the host wrapper (workflow-data-proxy.ts) throws + // "due to security concerns" via an explicit token denylist. + // vm → jmespath runs in-isolate with no denylist, but a restricted + // identifier can never leak a usable constructor/prototype to + // the host: `constructor` resolves to the Object constructor, + // which fails to serialize across the isolate boundary (throws), + // while `getPrototypeOf`/`prototype`/`__proto__` are key-misses + // or non-cloneable internals → the result is never a callable. + // (Structural isolation rather than a denylist — see + // packages/@n8n/expression-runtime/src/runtime/jmespath.ts.) + const expectJmespathBlocked = (expr: string) => { + if (process.env.N8N_EXPRESSION_ENGINE === 'vm') { + let result: unknown; + let threw = false; + try { + result = evaluate(expr); + } catch { + threw = true; + } + expect(threw || typeof result !== 'function').toBe(true); + } else { + expect(() => evaluate(expr)).toThrow(/due to security concerns/); + } + }; + it('should reject jmespath queries that reference restricted identifiers', () => { - expect(() => evaluate('={{ $jmespath({a:1}, "constructor") }}')).toThrow( - /due to security concerns/, - ); - expect(() => evaluate('={{ $jmespath({a:1}, "__proto__") }}')).toThrow( - /due to security concerns/, - ); - expect(() => evaluate('={{ $jmespath({a:1}, "prototype") }}')).toThrow( - /due to security concerns/, - ); + expectJmespathBlocked('={{ $jmespath({a:1}, "constructor") }}'); + expectJmespathBlocked('={{ $jmespath({a:1}, "__proto__") }}'); + expectJmespathBlocked('={{ $jmespath({a:1}, "prototype") }}'); }); it('should reject jmespath queries that reference restricted identifiers (alias)', () => { - expect(() => evaluate('={{ $jmesPath({a:1}, "getPrototypeOf") }}')).toThrow( - /due to security concerns/, - ); + expectJmespathBlocked('={{ $jmesPath({a:1}, "getPrototypeOf") }}'); }); it('should reject computed-string jmespath queries built from restricted identifiers', () => { - const payload = - '={{ $jmespath({a:1}, String.fromCharCode(99,111,110,115,116,114,117,99,116,111,114)) }}'; - expect(() => evaluate(payload)).toThrow(/due to security concerns/); + expectJmespathBlocked( + '={{ $jmespath({a:1}, String.fromCharCode(99,111,110,115,116,114,117,99,116,111,114)) }}', + ); }); it('should still allow jmespath queries that contain restricted names as substrings', () => { @@ -755,6 +792,13 @@ describe('Expression', () => { describe('Test all expression value fixtures', () => { const expression = workflow.expression; + beforeAll(async () => { + await expression.acquireIsolate(); + }); + afterAll(async () => { + await expression.releaseIsolate(); + }); + const evaluate = (value: string, data: INodeExecutionData[]) => { const itemIndex = data.length === 0 ? -1 : 0; return expression.getParameterValue(value, null, 0, itemIndex, 'node', data, 'manual', {}); diff --git a/packages/workflow/test/setup-vm-evaluator.ts b/packages/workflow/test/setup-vm-evaluator.ts new file mode 100644 index 00000000000..18c61cf997b --- /dev/null +++ b/packages/workflow/test/setup-vm-evaluator.ts @@ -0,0 +1,26 @@ +import { Expression } from '../src/expression'; + +// Only runs when N8N_EXPRESSION_ENGINE=vm is set. +// Initializes the VM evaluator once per vitest worker before all tests, +// and disposes it after. +if (process.env.N8N_EXPRESSION_ENGINE === 'vm') { + beforeAll(async () => { + await Expression.initExpressionEngine({ + engine: 'vm', + poolSize: 1, + maxCodeCacheSize: 1024, + bridgeTimeout: 5000, + bridgeMemoryLimit: 128, + }); + }); + + // Under Stryker, the worker process exits the moment vitest finishes — the + // OS reclaims isolated-vm native handles either way. Calling dispose here + // aborts the worker on Node 24 with a native finaliser assertion, which + // Stryker reports as a dry-run failure. + if (!process.env.STRYKER_RUN) { + afterAll(async () => { + await Expression.disposeExpressionEngine(); + }); + } +} diff --git a/packages/workflow/test/workflow.test.ts b/packages/workflow/test/workflow.test.ts index ea9fde77131..579d42b850f 100644 --- a/packages/workflow/test/workflow.test.ts +++ b/packages/workflow/test/workflow.test.ts @@ -1742,7 +1742,7 @@ describe('Workflow', () => { const nodeTypes = Helpers.NodeTypes(); for (const testData of tests) { - test(testData.description, () => { + test(testData.description, async () => { const nodes: INode[] = [ { name: 'Node1', @@ -1818,64 +1818,70 @@ describe('Workflow', () => { }; const workflow = new Workflow({ nodes, connections, active: false, nodeTypes }); - const activeNodeName = testData.input.hasOwnProperty('Node3') ? 'Node3' : 'Node2'; + await workflow.expression.acquireIsolate(); + try { + const activeNodeName = testData.input.hasOwnProperty('Node3') ? 'Node3' : 'Node2'; - const runExecutionData = createRunExecutionData({ - resultData: { - runData: { - Node1: [ - { - source: [ - { - previousNode: 'test', - }, - ], - startTime: 1, - executionTime: 1, - executionIndex: 0, - data: { - main: [ - [ - { - json: testData.input.Node1.outputJson || testData.input.Node1.parameters, - binary: testData.input.Node1.outputBinary, - }, - ], + const runExecutionData = createRunExecutionData({ + resultData: { + runData: { + Node1: [ + { + source: [ + { + previousNode: 'test', + }, ], + startTime: 1, + executionTime: 1, + executionIndex: 0, + data: { + main: [ + [ + { + json: + testData.input.Node1.outputJson || testData.input.Node1.parameters, + binary: testData.input.Node1.outputBinary, + }, + ], + ], + }, }, - }, - ], - Node2: [], - 'Node 4 with spaces': [], + ], + Node2: [], + 'Node 4 with spaces': [], + }, }, - }, - }); + }); - const itemIndex = 0; - const runIndex = 0; - const connectionInputData: INodeExecutionData[] = - runExecutionData.resultData.runData.Node1[0].data!.main[0]!; + const itemIndex = 0; + const runIndex = 0; + const connectionInputData: INodeExecutionData[] = + runExecutionData.resultData.runData.Node1[0].data!.main[0]!; - for (const parameterName of Object.keys(testData.output)) { - const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[ - parameterName - ]; - const result = workflow.expression.getParameterValue( - parameterValue, - runExecutionData, - runIndex, - itemIndex, - activeNodeName, - connectionInputData, - 'manual', - {}, - ); - expect(result).toEqual(testData.output[parameterName]); + for (const parameterName of Object.keys(testData.output)) { + const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[ + parameterName + ]; + const result = workflow.expression.getParameterValue( + parameterValue, + runExecutionData, + runIndex, + itemIndex, + activeNodeName, + connectionInputData, + 'manual', + {}, + ); + expect(result).toEqual(testData.output[parameterName]); + } + } finally { + await workflow.expression.releaseIsolate(); } }); } - test('should also resolve all child parameters when the parent get requested', () => { + test('should also resolve all child parameters when the parent get requested', async () => { const nodes: INode[] = [ { name: 'Node1', @@ -1902,64 +1908,69 @@ describe('Workflow', () => { const connections: IConnections = {}; const workflow = new Workflow({ nodes, connections, active: false, nodeTypes }); - const activeNodeName = 'Node1'; + await workflow.expression.acquireIsolate(); + try { + const activeNodeName = 'Node1'; - const runExecutionData = createRunExecutionData({ - resultData: { - runData: { - Node1: [ - { - startTime: 1, - executionTime: 1, - executionIndex: 0, - data: { - main: [ - [ - { - json: {}, - }, + const runExecutionData = createRunExecutionData({ + resultData: { + runData: { + Node1: [ + { + startTime: 1, + executionTime: 1, + executionIndex: 0, + data: { + main: [ + [ + { + json: {}, + }, + ], ], - ], + }, + source: [], }, - source: [], - }, - ], + ], + }, }, - }, - }); + }); - const itemIndex = 0; - const runIndex = 0; - const connectionInputData: INodeExecutionData[] = - runExecutionData.resultData.runData.Node1[0].data!.main[0]!; - const parameterName = 'values'; + const itemIndex = 0; + const runIndex = 0; + const connectionInputData: INodeExecutionData[] = + runExecutionData.resultData.runData.Node1[0].data!.main[0]!; + const parameterName = 'values'; - const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[ - parameterName - ]; - const result = workflow.expression.getParameterValue( - parameterValue, - runExecutionData, - runIndex, - itemIndex, - activeNodeName, - connectionInputData, - 'manual', - {}, - ); + const parameterValue = nodes.find((node) => node.name === activeNodeName)!.parameters[ + parameterName + ]; + const result = workflow.expression.getParameterValue( + parameterValue, + runExecutionData, + runIndex, + itemIndex, + activeNodeName, + connectionInputData, + 'manual', + {}, + ); - expect(result).toEqual({ - string: [ - { - name: 'name1', - value: 'value1', - }, - { - name: 'name2', - value: 'value1A', - }, - ], - }); + expect(result).toEqual({ + string: [ + { + name: 'name1', + value: 'value1', + }, + { + name: 'name2', + value: 'value1A', + }, + ], + }); + } finally { + await workflow.expression.releaseIsolate(); + } }); }); diff --git a/packages/workflow/vitest.config.ts b/packages/workflow/vitest.config.ts index 7042490e9a8..82de49c01c7 100644 --- a/packages/workflow/vitest.config.ts +++ b/packages/workflow/vitest.config.ts @@ -1,3 +1,30 @@ -import { createVitestConfig } from '@n8n/vitest-config/node'; +import { defineConfig } from 'vitest/config'; +import { createBaseInlineConfig } from '@n8n/vitest-config/node'; -export default createVitestConfig({ include: ['test/**/*.test.ts'] }); +// On vitest 3.1.x the multi-config key is `test.workspace` (renamed to +// `test.projects` in 3.2). Each project repeats the shared inline config +// directly — vitest 3.1 workspaces do not auto-inherit from the root. +const sharedTestConfig = createBaseInlineConfig({ + include: ['test/**/*.test.ts'], + setupFiles: ['./test/setup-vm-evaluator.ts'], +}); + +export default defineConfig({ + test: { + workspace: [ + { + test: { + ...sharedTestConfig, + name: 'legacy-engine', + }, + }, + { + test: { + ...sharedTestConfig, + name: 'vm-engine', + env: { N8N_EXPRESSION_ENGINE: 'vm' }, + }, + }, + ], + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 90041d2807d..9b4c4a93039 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1614,6 +1614,9 @@ importers: '@n8n/errors': specifier: workspace:* version: link:../@n8n/errors + '@n8n/expression-runtime': + specifier: workspace:* + version: link:../@n8n/expression-runtime '@n8n/n8n-nodes-langchain': specifier: workspace:* version: link:../@n8n/nodes-langchain @@ -1635,6 +1638,9 @@ importers: '@n8n_io/license-sdk': specifier: 2.24.1 version: 2.24.1 + '@opentelemetry/api': + specifier: ^1.9.0 + version: 1.9.0 '@parcel/watcher': specifier: ^2.5.1 version: 2.5.1