From fe635a754eec88376504d2bdf9a888b9f326d3c8 Mon Sep 17 00:00:00 2001 From: mfsiega <93014743+mfsiega@users.noreply.github.com> Date: Mon, 1 Jun 2026 13:24:18 +0200 Subject: [PATCH] feat(core): Read from workflow_published_version for webhooks/triggers/pollers (no-changelog) (#26896) Co-authored-by: Claude Opus 4.6 --- .../workflow-published-version.repository.ts | 17 +++ .../__tests__/active-workflow-manager.test.ts | 19 +++- packages/cli/src/active-workflow-manager.ts | 100 +++++++++++++++--- .../webhooks/__tests__/live-webhooks.test.ts | 60 +++++++++++ packages/cli/src/webhooks/live-webhooks.ts | 79 +++++++++----- .../workflow-published-data.service.test.ts | 67 ++++++++++++ .../workflow-published-data.service.ts | 42 ++++++++ .../workflow-published-data.service.test.ts | 92 ++++++++++++++++ .../api/workflow-publication-service.spec.ts | 67 ++++++++++++ 9 files changed, 498 insertions(+), 45 deletions(-) create mode 100644 packages/cli/src/workflows/__tests__/workflow-published-data.service.test.ts create mode 100644 packages/cli/src/workflows/workflow-published-data.service.ts create mode 100644 packages/cli/test/integration/workflows/workflow-published-data.service.test.ts create mode 100644 packages/testing/playwright/tests/e2e/api/workflow-publication-service.spec.ts diff --git a/packages/@n8n/db/src/repositories/workflow-published-version.repository.ts b/packages/@n8n/db/src/repositories/workflow-published-version.repository.ts index 803ccea36de..87ffa178223 100644 --- a/packages/@n8n/db/src/repositories/workflow-published-version.repository.ts +++ b/packages/@n8n/db/src/repositories/workflow-published-version.repository.ts @@ -24,4 +24,21 @@ export class WorkflowPublishedVersionRepository extends Repository { + return await this.findOne({ + where: { workflowId }, + relations: { + workflow: { shared: { project: true } }, + publishedVersion: true, + }, + }); + } } diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts index 61e397425a7..0e8d96fe8d2 100644 --- a/packages/cli/src/__tests__/active-workflow-manager.test.ts +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -1,6 +1,7 @@ /* eslint-disable @typescript-eslint/unbound-method */ import type { Logger } from '@n8n/backend-common'; import { mockLogger } from '@n8n/backend-test-utils'; +import type { WorkflowsConfig } from '@n8n/config'; import type { WorkflowEntity, WorkflowHistory, WorkflowRepository } from '@n8n/db'; import { mock } from 'jest-mock-extended'; import type { ActiveWorkflows, InstanceSettings } from 'n8n-core'; @@ -33,6 +34,7 @@ describe('ActiveWorkflowManager', () => { const instanceSettings = mock({ isMultiMain: false }); const nodeTypes = mock(); const workflowRepository = mock(); + const workflowsConfig = mock({ useWorkflowPublicationService: false }); beforeEach(() => { jest.clearAllMocks(); @@ -52,6 +54,7 @@ describe('ActiveWorkflowManager', () => { mock(), instanceSettings, mock(), + workflowsConfig, mock(), mock(), mock(), @@ -191,6 +194,7 @@ describe('ActiveWorkflowManager', () => { mock(), mock(), mock(), + mock(), ); }); @@ -351,11 +355,12 @@ describe('ActiveWorkflowManager', () => { workflowExecutionService, instanceSettings, mock(), - mock(), + workflowsConfig, mock(), eventService, mock(), mock(), + mock(), ); }); @@ -377,11 +382,14 @@ describe('ActiveWorkflowManager', () => { additionalData, mode, activation, + async () => workflowData, ); const context = getTriggerFunctions(workflow, node, additionalData, mode, activation); context.emit(triggerData); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(workflowStaticDataService.saveStaticData).toHaveBeenCalledWith(workflow); expect(workflowExecutionService.runWorkflow).toHaveBeenCalledWith( workflowData, @@ -393,8 +401,6 @@ describe('ActiveWorkflowManager', () => { undefined, ); - await new Promise((resolve) => setTimeout(resolve, 0)); - expect(eventService.emit).toHaveBeenCalledWith('workflow-executed', { workflowId: workflowData.id, workflowName: workflowData.name, @@ -417,11 +423,14 @@ describe('ActiveWorkflowManager', () => { additionalData, mode, activation, + async () => workflowData, ); const context = getTriggerFunctions(workflow, node, additionalData, mode, activation); context.emit(triggerData, undefined, undefined, 'wf-1:node-1:1700000000000'); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(workflowExecutionService.runWorkflow).toHaveBeenCalledWith( workflowData, node, @@ -451,6 +460,7 @@ describe('ActiveWorkflowManager', () => { additionalData, mode, activation, + async () => workflowData, ); const context = getTriggerFunctions(workflow, node, additionalData, mode, activation); @@ -479,6 +489,7 @@ describe('ActiveWorkflowManager', () => { additionalData, mode, activation, + async () => workflowData, ); const context = getTriggerFunctions(workflow, node, additionalData, mode, activation); @@ -518,6 +529,7 @@ describe('ActiveWorkflowManager', () => { additionalData, mode, activation, + async () => workflowData, ); const context = getTriggerFunctions(workflow, node, additionalData, mode, activation); @@ -555,6 +567,7 @@ describe('ActiveWorkflowManager', () => { additionalData, mode, activation, + async () => workflowData, ); const context = getTriggerFunctions(workflow, node, additionalData, mode, activation); diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 066836eb159..1dc584781fc 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -65,6 +65,7 @@ import * as WebhookHelpers from '@/webhooks/webhook-helpers'; import { WebhookService } from '@/webhooks/webhook.service'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import { WorkflowExecutionService } from '@/workflows/workflow-execution.service'; +import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { getErrorDescription, getErrorNodeId } from '@/workflows/utils'; import { formatWorkflow } from '@/workflows/workflow.formatter'; @@ -100,6 +101,7 @@ export class ActiveWorkflowManager { private readonly push: Push, private readonly eventService: EventService, private readonly storageConfig: StorageConfig, + private readonly workflowPublishedDataService: WorkflowPublishedDataService, private readonly eventBus: MessageEventBus, ) { this.logger = this.logger.scoped(['workflow-activation']); @@ -308,6 +310,11 @@ export class ActiveWorkflowManager { additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, + // TODO(CAT-3202): this callback lets us switch between reading from + // the in-memory workflowData (flag off) and the workflow published data + // service (flag on). Once the feature flag is removed, we'll call the + // service directly and this parameter will go away. + resolveWorkflowData: () => Promise, ): IGetExecutePollFunctions { return (workflow: Workflow, node: INode) => { const __emit = ( @@ -317,13 +324,20 @@ export class ActiveWorkflowManager { ) => { this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`); void this.workflowStaticDataService.saveStaticData(workflow); - const executePromise = this.workflowExecutionService.runWorkflow( - workflowData, - node, - data, - additionalData, - mode, - responsePromise, + + // TODO(CAT-3202): resolves workflow data via callback so we + // can feature-flag between in-memory data and the published data + // service. Once the flag is removed, we'll call the service directly. + const executePromise = resolveWorkflowData().then( + async (freshWorkflowData) => + await this.workflowExecutionService.runWorkflow( + freshWorkflowData, + node, + data, + additionalData, + mode, + responsePromise, + ), ); if (donePromise) { @@ -359,6 +373,11 @@ export class ActiveWorkflowManager { additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, + // TODO(CAT-3202): this callback lets us switch between reading from + // the in-memory workflowData (flag off) and the workflow published data + // service (flag on). Once the feature flag is removed, we'll call the + // service directly and this parameter will go away. + resolveWorkflowData: () => Promise, ): IGetExecuteTriggerFunctions { return (workflow: Workflow, node: INode) => { const emit = ( @@ -370,15 +389,21 @@ export class ActiveWorkflowManager { this.logger.debug(`Received trigger for workflow "${workflow.name}"`); void this.workflowStaticDataService.saveStaticData(workflow); - const executePromise = this.workflowExecutionService - .runWorkflow( - workflowData, - node, - data, - additionalData, - mode, - responsePromise, - deduplicationKey, + // TODO(CAT-3202): resolves workflow data via callback so we + // can feature-flag between in-memory data and the published data + // service. Once the flag is removed, we'll call the service directly. + const executePromise = resolveWorkflowData() + .then( + async (freshWorkflowData) => + await this.workflowExecutionService.runWorkflow( + freshWorkflowData, + node, + data, + additionalData, + mode, + responsePromise, + deduplicationKey, + ), ) .catch((error: unknown) => { if (error instanceof DuplicateExecutionError) { @@ -500,6 +525,30 @@ export class ActiveWorkflowManager { executeErrorWorkflow(workflowData, fullRunData, mode); } + /** + * Load the published workflow nodes/connections from the + * `workflow_published_version` table. The passed-in workflow is used + * for all other fields (staticData, settings, etc.). + * + * TODO: Add error handling / fallback strategy for transient DB failures. + */ + private async loadPublishedWorkflowData( + initialWorkflowData: IWorkflowDb, + ): Promise { + const publishedData = await this.workflowPublishedDataService.getPublishedWorkflowData( + initialWorkflowData.id, + ); + + if (!publishedData) { + throw new UnexpectedError('Published version not found for workflow', { + extra: { workflowId: initialWorkflowData.id }, + }); + } + + const { nodes, connections } = publishedData.publishedVersion; + return { ...initialWorkflowData, nodes, connections }; + } + private isActivationInProgress = false; /** @@ -695,6 +744,7 @@ export class ActiveWorkflowManager { } const { nodes, connections } = dbWorkflow.activeVersion; + dbWorkflow.nodes = nodes; dbWorkflow.connections = connections; @@ -739,11 +789,25 @@ export class ActiveWorkflowManager { ); } + // When the flag is on, trigger/poller emit callbacks re-read the published + // version from the DB so they pick up updates without deactivate/reactivate. + // When the flag is off, they use the in-memory workflowData (same as before). + // + // Note: we intentionally load the latest published version when the trigger + // fires so all triggers are updated at the same time without any downtime. + // The workflow publication service is responsible for ensuring that + // removed/disabled triggers in a new workflow version are deactivated before + // updating the published version. + const resolveWorkflowData = this.workflowsConfig.useWorkflowPublicationService + ? async () => await this.loadPublishedWorkflowData(dbWorkflow) + : async () => dbWorkflow as IWorkflowBase; + if (shouldAddTriggersAndPollers) { added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, { activationMode, executionMode: 'trigger', additionalData, + resolveWorkflowData, }); } @@ -1064,10 +1128,12 @@ export class ActiveWorkflowManager { activationMode, executionMode, additionalData, + resolveWorkflowData, }: { activationMode: WorkflowActivateMode; executionMode: WorkflowExecuteMode; additionalData: IWorkflowExecuteAdditionalData; + resolveWorkflowData: () => Promise; }, ) { const getTriggerFunctions = this.getExecuteTriggerFunctions( @@ -1075,6 +1141,7 @@ export class ActiveWorkflowManager { additionalData, executionMode, activationMode, + resolveWorkflowData, ); const getPollFunctions = this.getExecutePollFunctions( @@ -1082,6 +1149,7 @@ export class ActiveWorkflowManager { additionalData, executionMode, activationMode, + resolveWorkflowData, ); if (workflow.getTriggerNodes().length === 0 && workflow.getPollNodes().length === 0) { diff --git a/packages/cli/src/webhooks/__tests__/live-webhooks.test.ts b/packages/cli/src/webhooks/__tests__/live-webhooks.test.ts index 9c556075b66..e136880ca59 100644 --- a/packages/cli/src/webhooks/__tests__/live-webhooks.test.ts +++ b/packages/cli/src/webhooks/__tests__/live-webhooks.test.ts @@ -1,4 +1,5 @@ import { mockLogger } from '@n8n/backend-test-utils'; +import type { WorkflowsConfig } from '@n8n/config'; import type { WebhookEntity, WorkflowEntity, WorkflowHistory, WorkflowRepository } from '@n8n/db'; import type { Response } from 'express'; import { mock } from 'jest-mock-extended'; @@ -20,6 +21,7 @@ import * as WebhookHelpers from '@/webhooks/webhook-helpers'; import type { WebhookService } from '@/webhooks/webhook.service'; import type { WebhookRequest } from '@/webhooks/webhook.types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; +import type { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service'; import type { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; jest.mock('@/webhooks/webhook-helpers'); @@ -34,6 +36,8 @@ describe('LiveWebhooks', () => { const webhookService = mock(); const nodeTypes = mock(); const workflowStaticDataService = mock(); + const workflowsConfig = mock({ useWorkflowPublicationService: false }); + const workflowPublishedDataService = mock(); let liveWebhooks: LiveWebhooks; @@ -45,6 +49,8 @@ describe('LiveWebhooks', () => { webhookService, workflowRepository, workflowStaticDataService, + workflowsConfig, + workflowPublishedDataService, ); // Mock WorkflowExecuteAdditionalData.getBase to avoid DI issues @@ -256,6 +262,60 @@ describe('LiveWebhooks', () => { }); }); + describe('executeWebhook (with publication service)', () => { + beforeEach(() => { + Object.assign(workflowsConfig, { useWorkflowPublicationService: true }); + }); + + afterEach(() => { + Object.assign(workflowsConfig, { useWorkflowPublicationService: false }); + }); + + it('should use published version nodes when executing webhook', async () => { + const activeNodes: INode[] = [ + { + id: 'webhook-node-active', + name: NODE_NAME, + type: 'n8n-nodes-base.webhook', + typeVersion: 1, + position: [100, 200], + parameters: { path: WEBHOOK_PATH, httpMethod: 'GET' }, + }, + ]; + + const workflowEntity = mock({ + id: WORKFLOW_ID, + name: 'Test Workflow', + active: true, + activeVersionId: 'v1', + isArchived: false, + shared: [{ role: 'workflow:owner', project: { id: 'project-1', projectRelations: [] } }], + }); + + const publishedVersion = mock({ + versionId: 'v1', + workflowId: WORKFLOW_ID, + nodes: activeNodes, + connections: {}, + }); + workflowPublishedDataService.getPublishedWorkflowData.mockResolvedValue({ + workflow: workflowEntity, + publishedVersion, + }); + + let capturedNodes: INode[] = []; + const request = setupExecuteWebhookMocks(workflowEntity, { + onExecuteWebhook: ({ workflow }) => { + capturedNodes = Object.values(workflow.nodes); + }, + }); + + await liveWebhooks.executeWebhook(request, mock()); + + expect(capturedNodes[0].id).toBe('webhook-node-active'); + }); + }); + describe('findAccessControlOptions', () => { const httpMethod: IHttpRequestMethods = 'GET'; diff --git a/packages/cli/src/webhooks/live-webhooks.ts b/packages/cli/src/webhooks/live-webhooks.ts index 5903fd0b26e..35cfce92849 100644 --- a/packages/cli/src/webhooks/live-webhooks.ts +++ b/packages/cli/src/webhooks/live-webhooks.ts @@ -1,5 +1,6 @@ import { Logger } from '@n8n/backend-common'; -import { WorkflowRepository } from '@n8n/db'; +import { WorkflowsConfig } from '@n8n/config'; +import { WorkflowRepository, type WorkflowEntity, type WorkflowHistory } from '@n8n/db'; import { Service } from '@n8n/di'; import type { Response } from 'express'; import { Workflow, CHAT_TRIGGER_NODE_TYPE } from 'n8n-workflow'; @@ -11,6 +12,7 @@ import { NodeTypes } from '@/node-types'; import * as WebhookHelpers from '@/webhooks/webhook-helpers'; import { WebhookService } from '@/webhooks/webhook.service'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; +import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { authAllowlistedNodes } from './constants'; @@ -37,6 +39,8 @@ export class LiveWebhooks implements IWebhookManager { private readonly webhookService: WebhookService, private readonly workflowRepository: WorkflowRepository, private readonly workflowStaticDataService: WorkflowStaticDataService, + private readonly workflowsConfig: WorkflowsConfig, + private readonly workflowPublishedDataService: WorkflowPublishedDataService, ) {} async getWebhookMethods(path: string) { @@ -98,33 +102,14 @@ export class LiveWebhooks implements IWebhookManager { }); } - const workflowData = await this.workflowRepository.findOne({ - where: { id: webhook.workflowId }, - relations: { - activeVersion: true, - shared: true, - }, - }); - - if (workflowData === null) { - throw new NotFoundError(`Could not find workflow with id "${webhook.workflowId}"`); - } - - if (!workflowData.activeVersion) { - throw new NotFoundError( - `Active version not found for workflow with id "${webhook.workflowId}"`, - ); - } - - const { nodes, connections } = workflowData.activeVersion; + const { workflow: workflowData, publishedVersion } = await this.loadWebhookExecutionData( + webhook.workflowId, + ); + const { nodes, connections } = publishedVersion; // Create a clean workflowData object with only activeVersion nodes/connections // This prevents any downstream code from accidentally using the draft nodes - const activeWorkflowData: IWorkflowBase = { - ...workflowData, - nodes, - connections, - }; + const activeWorkflowData: IWorkflowBase = { ...workflowData, nodes, connections }; const workflow = new Workflow({ id: webhook.workflowId, @@ -137,7 +122,7 @@ export class LiveWebhooks implements IWebhookManager { settings: workflowData.settings, }); - const ownerProjectId = workflowData.shared.find( + const ownerProjectId = workflowData.shared?.find( (share) => share.role === 'workflow:owner', )?.projectId; const additionalData = await WorkflowExecuteAdditionalData.getBase({ @@ -200,6 +185,48 @@ export class LiveWebhooks implements IWebhookManager { } } + private async loadWebhookExecutionData( + workflowId: string, + ): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory }> { + return this.workflowsConfig.useWorkflowPublicationService + ? await this.loadFromPublishedVersion(workflowId) + : await this.loadFromActiveVersion(workflowId); + } + + /** + * New path for the workflow publication service. Behind a flag, disabled + * by default. + */ + private async loadFromPublishedVersion( + workflowId: string, + ): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory }> { + const publishedData = + await this.workflowPublishedDataService.getPublishedWorkflowData(workflowId); + if (publishedData === null) { + throw new NotFoundError(`Published version not found for workflow with id "${workflowId}"`); + } + return { workflow: publishedData.workflow, publishedVersion: publishedData.publishedVersion }; + } + + /** + * Old path, before the workflow publication service. Currently the default. + */ + private async loadFromActiveVersion( + workflowId: string, + ): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory }> { + const workflowData = await this.workflowRepository.findOne({ + where: { id: workflowId }, + relations: { activeVersion: true, shared: true }, + }); + if (workflowData === null) { + throw new NotFoundError(`Could not find workflow with id "${workflowId}"`); + } + if (!workflowData.activeVersion) { + throw new NotFoundError(`Active version not found for workflow with id "${workflowId}"`); + } + return { workflow: workflowData, publishedVersion: workflowData.activeVersion }; + } + private async findWebhook(path: string, httpMethod: IHttpRequestMethods) { // Remove trailing slash if (path.endsWith('/')) { diff --git a/packages/cli/src/workflows/__tests__/workflow-published-data.service.test.ts b/packages/cli/src/workflows/__tests__/workflow-published-data.service.test.ts new file mode 100644 index 00000000000..b4610568f5b --- /dev/null +++ b/packages/cli/src/workflows/__tests__/workflow-published-data.service.test.ts @@ -0,0 +1,67 @@ +import type { WorkflowPublishedVersionRepository, WorkflowPublishedVersion } from '@n8n/db'; +import { mock } from 'jest-mock-extended'; +import type { ErrorReporter } from 'n8n-core'; + +import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service'; + +describe('WorkflowPublishedDataService', () => { + const workflowPublishedVersionRepository = mock(); + const errorReporter = mock(); + let service: WorkflowPublishedDataService; + + beforeEach(() => { + jest.clearAllMocks(); + service = new WorkflowPublishedDataService(errorReporter, workflowPublishedVersionRepository); + }); + + // Verifies that we hit the repository and return the data it provides. + test('should return published data when record exists', async () => { + const nodes = [ + { + id: 'node-1', + name: 'Test', + type: 'n8n-nodes-base.noOp', + typeVersion: 1, + position: [0, 0] as [number, number], + parameters: {}, + }, + ]; + const connections = {}; + + const record = mock(); + Object.defineProperty(record, 'publishedVersion', { + value: { nodes, connections, name: 'v1' }, + }); + Object.defineProperty(record, 'workflow', { + value: { name: 'Workflow Name', staticData: undefined, settings: {}, shared: [] }, + }); + workflowPublishedVersionRepository.getPublishedVersionWithRelations.mockResolvedValue(record); + + const result = await service.getPublishedWorkflowData('wf-1'); + + expect(result).not.toBeNull(); + expect(result!.workflow.name).toBe('Workflow Name'); + expect(result!.publishedVersion.nodes).toBe(nodes); + expect(result!.publishedVersion.connections).toBe(connections); + }); + + test('should return null when no record exists', async () => { + workflowPublishedVersionRepository.getPublishedVersionWithRelations.mockResolvedValue(null); + + const result = await service.getPublishedWorkflowData('wf-1'); + + expect(result).toBeNull(); + }); + + test('should return null when publishedVersion relation is missing', async () => { + const record = mock(); + // Simulate unloaded relation — TypeORM returns undefined for unloaded relations + Object.defineProperty(record, 'publishedVersion', { value: undefined }); + Object.defineProperty(record, 'workflow', { value: { name: 'Workflow Name' } }); + workflowPublishedVersionRepository.getPublishedVersionWithRelations.mockResolvedValue(record); + + const result = await service.getPublishedWorkflowData('wf-1'); + + expect(result).toBeNull(); + }); +}); diff --git a/packages/cli/src/workflows/workflow-published-data.service.ts b/packages/cli/src/workflows/workflow-published-data.service.ts new file mode 100644 index 00000000000..f45ee9f88c1 --- /dev/null +++ b/packages/cli/src/workflows/workflow-published-data.service.ts @@ -0,0 +1,42 @@ +import { + WorkflowPublishedVersionRepository, + type WorkflowEntity, + type WorkflowHistory, +} from '@n8n/db'; +import { Service } from '@n8n/di'; +import { ErrorReporter } from 'n8n-core'; +import { UnexpectedError } from 'n8n-workflow'; + +@Service() +export class WorkflowPublishedDataService { + constructor( + private readonly errorReporter: ErrorReporter, + private readonly workflowPublishedVersionRepository: WorkflowPublishedVersionRepository, + ) {} + + /** + * Resolves a workflow's published version: returns the workflow entity and + * the `WorkflowHistory` row that the `workflow_published_version` mapping + * currently points at. + */ + async getPublishedWorkflowData( + workflowId: string, + ): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory } | null> { + const record = + await this.workflowPublishedVersionRepository.getPublishedVersionWithRelations(workflowId); + + // This should not happen: only triggers read from this service, and they + // only do so when the flag is on; the publication service stops triggers + // before deleting the record. If we hit this, we have a real bug. + if (!record?.publishedVersion || !record.workflow) { + this.errorReporter.error( + new UnexpectedError('Published version record not found for workflow', { + extra: { workflowId }, + }), + ); + return null; + } + + return { workflow: record.workflow, publishedVersion: record.publishedVersion }; + } +} diff --git a/packages/cli/test/integration/workflows/workflow-published-data.service.test.ts b/packages/cli/test/integration/workflows/workflow-published-data.service.test.ts new file mode 100644 index 00000000000..9ddb6c00ba4 --- /dev/null +++ b/packages/cli/test/integration/workflows/workflow-published-data.service.test.ts @@ -0,0 +1,92 @@ +import { createWorkflowWithHistory, setActiveVersion, testDb } from '@n8n/backend-test-utils'; +import { WorkflowPublishedVersionRepository } from '@n8n/db'; +import { Container } from '@n8n/di'; +import type { INode } from 'n8n-workflow'; +import { v4 as uuid } from 'uuid'; + +import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service'; + +import { createOwner } from '../shared/db/users'; +import { createWorkflowHistoryItem } from '../shared/db/workflow-history'; + +let workflowPublishedVersionRepository: WorkflowPublishedVersionRepository; +let workflowPublishedDataService: WorkflowPublishedDataService; + +beforeAll(async () => { + await testDb.init(); + workflowPublishedVersionRepository = Container.get(WorkflowPublishedVersionRepository); + workflowPublishedDataService = Container.get(WorkflowPublishedDataService); +}); + +afterAll(async () => { + await testDb.terminate(); +}); + +const makeNode = (name: string): INode => ({ + id: uuid(), + name, + type: 'n8n-nodes-base.noOp', + typeVersion: 1, + position: [0, 0], + parameters: {}, +}); + +describe('WorkflowPublishedDataService', () => { + test('should read nodes/connections from workflow_published_version table', async () => { + const owner = await createOwner(); + const workflow = await createWorkflowWithHistory({}, owner); + await setActiveVersion(workflow.id, workflow.versionId); + + // Write to published version table + await workflowPublishedVersionRepository.setPublishedVersion(workflow.id, workflow.versionId); + + const result = await workflowPublishedDataService.getPublishedWorkflowData(workflow.id); + + expect(result).not.toBeNull(); + expect(result!.workflow.id).toBe(workflow.id); + expect(result!.publishedVersion.nodes).toEqual( + expect.arrayContaining([expect.objectContaining({ type: 'n8n-nodes-base.scheduleTrigger' })]), + ); + }); + + test('should return data from a different version than activeVersion when published_version table points elsewhere', async () => { + const owner = await createOwner(); + const workflow = await createWorkflowWithHistory({}, owner); + await setActiveVersion(workflow.id, workflow.versionId); + + // Create a second history version with different nodes + const alternateVersionId = uuid(); + const alternateNodes = [makeNode('Alternate Node')]; + await createWorkflowHistoryItem(workflow.id, { + versionId: alternateVersionId, + nodes: alternateNodes, + connections: {}, + }); + + // Point the published version table to the alternate version + // (NOT the activeVersion). This proves we're reading from the table. + await workflowPublishedVersionRepository.setPublishedVersion(workflow.id, alternateVersionId); + + const result = await workflowPublishedDataService.getPublishedWorkflowData(workflow.id); + + expect(result).not.toBeNull(); + // Should have the alternate nodes, NOT the original activeVersion nodes + expect(result!.publishedVersion.nodes).toEqual( + expect.arrayContaining([expect.objectContaining({ name: 'Alternate Node' })]), + ); + expect(result!.publishedVersion.nodes).not.toEqual( + expect.arrayContaining([expect.objectContaining({ type: 'n8n-nodes-base.scheduleTrigger' })]), + ); + }); + + test('should return null when published_version table has no record', async () => { + const owner = await createOwner(); + const workflow = await createWorkflowWithHistory({}, owner); + await setActiveVersion(workflow.id, workflow.versionId); + // Don't write to published version table + + const result = await workflowPublishedDataService.getPublishedWorkflowData(workflow.id); + + expect(result).toBeNull(); + }); +}); diff --git a/packages/testing/playwright/tests/e2e/api/workflow-publication-service.spec.ts b/packages/testing/playwright/tests/e2e/api/workflow-publication-service.spec.ts new file mode 100644 index 00000000000..5f57a4d049d --- /dev/null +++ b/packages/testing/playwright/tests/e2e/api/workflow-publication-service.spec.ts @@ -0,0 +1,67 @@ +import { test, expect } from '../../../fixtures/base'; + +test.use({ + capability: { + env: { + TEST_ISOLATION: 'workflow-publication-service', + N8N_USE_WORKFLOW_PUBLICATION_SERVICE: 'true', + }, + }, +}); + +test.describe( + 'Workflow Publication Service', + { + annotation: [{ type: 'owner', description: 'Catalysts' }], + }, + () => { + test('webhook fires the published version even when the draft has diverged', async ({ + api, + }) => { + const { workflowId, webhookPath } = await api.workflows.importWorkflowFromFile( + 'simple-webhook-test.json', + ); + + // Fetch the activated workflow, then update the draft so it no longer + // matches the version recorded in workflow_published_version. + const published = await api.workflows.getWorkflow(workflowId); + const divergedNodes = published.nodes.map((node) => + node.name === 'Set Response' + ? { + ...node, + parameters: { + assignments: { + assignments: [ + { + id: 'draft-only-marker', + name: 'result', + value: 'draft-only', + type: 'string', + }, + ], + }, + options: {}, + }, + } + : node, + ); + await api.workflows.update(workflowId, published.versionId!, { nodes: divergedNodes }); + + // With the publication service flag on, the trigger path resolves + // workflow data via workflow_published_version. The published nodes + // (not the just-saved draft) should be the ones that execute. + const response = await api.webhooks.trigger(`/webhook/${webhookPath}`, { + method: 'POST', + data: { message: 'from-publication-test' }, + }); + expect(response.ok()).toBe(true); + + const execution = await api.workflows.waitForExecution(workflowId, 5000); + expect(execution.status).toBe('success'); + + const details = await api.workflows.getExecution(execution.id); + expect(details.data).toContain('Webhook received'); + expect(details.data).not.toContain('draft-only'); + }); + }, +);