mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-03 18:27:09 +02:00
feat(core): Read from workflow_published_version for webhooks/triggers/pollers (no-changelog) (#26896)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
4cdd079385
commit
fe635a754e
|
|
@ -24,4 +24,21 @@ export class WorkflowPublishedVersionRepository extends Repository<WorkflowPubli
|
|||
});
|
||||
return record?.publishedVersionId ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the published version record with the related workflow entity
|
||||
* (including shared/project relations) and the workflow history version
|
||||
* (which contains the published nodes/connections).
|
||||
*/
|
||||
async getPublishedVersionWithRelations(
|
||||
workflowId: string,
|
||||
): Promise<WorkflowPublishedVersion | null> {
|
||||
return await this.findOne({
|
||||
where: { workflowId },
|
||||
relations: {
|
||||
workflow: { shared: { project: true } },
|
||||
publishedVersion: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
import type { Logger } from '@n8n/backend-common';
|
||||
import { mockLogger } from '@n8n/backend-test-utils';
|
||||
import type { WorkflowsConfig } from '@n8n/config';
|
||||
import type { WorkflowEntity, WorkflowHistory, WorkflowRepository } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { ActiveWorkflows, InstanceSettings } from 'n8n-core';
|
||||
|
|
@ -33,6 +34,7 @@ describe('ActiveWorkflowManager', () => {
|
|||
const instanceSettings = mock<InstanceSettings>({ isMultiMain: false });
|
||||
const nodeTypes = mock<NodeTypes>();
|
||||
const workflowRepository = mock<WorkflowRepository>();
|
||||
const workflowsConfig = mock<WorkflowsConfig>({ useWorkflowPublicationService: false });
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
|
|
@ -52,6 +54,7 @@ describe('ActiveWorkflowManager', () => {
|
|||
mock(),
|
||||
instanceSettings,
|
||||
mock(),
|
||||
workflowsConfig,
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
|
|
@ -191,6 +194,7 @@ describe('ActiveWorkflowManager', () => {
|
|||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
);
|
||||
});
|
||||
|
||||
|
|
@ -351,11 +355,12 @@ describe('ActiveWorkflowManager', () => {
|
|||
workflowExecutionService,
|
||||
instanceSettings,
|
||||
mock(),
|
||||
mock(),
|
||||
workflowsConfig,
|
||||
mock(),
|
||||
eventService,
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
);
|
||||
});
|
||||
|
||||
|
|
@ -377,11 +382,14 @@ describe('ActiveWorkflowManager', () => {
|
|||
additionalData,
|
||||
mode,
|
||||
activation,
|
||||
async () => workflowData,
|
||||
);
|
||||
const context = getTriggerFunctions(workflow, node, additionalData, mode, activation);
|
||||
|
||||
context.emit(triggerData);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
expect(workflowStaticDataService.saveStaticData).toHaveBeenCalledWith(workflow);
|
||||
expect(workflowExecutionService.runWorkflow).toHaveBeenCalledWith(
|
||||
workflowData,
|
||||
|
|
@ -393,8 +401,6 @@ describe('ActiveWorkflowManager', () => {
|
|||
undefined,
|
||||
);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
expect(eventService.emit).toHaveBeenCalledWith('workflow-executed', {
|
||||
workflowId: workflowData.id,
|
||||
workflowName: workflowData.name,
|
||||
|
|
@ -417,11 +423,14 @@ describe('ActiveWorkflowManager', () => {
|
|||
additionalData,
|
||||
mode,
|
||||
activation,
|
||||
async () => workflowData,
|
||||
);
|
||||
const context = getTriggerFunctions(workflow, node, additionalData, mode, activation);
|
||||
|
||||
context.emit(triggerData, undefined, undefined, 'wf-1:node-1:1700000000000');
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
expect(workflowExecutionService.runWorkflow).toHaveBeenCalledWith(
|
||||
workflowData,
|
||||
node,
|
||||
|
|
@ -451,6 +460,7 @@ describe('ActiveWorkflowManager', () => {
|
|||
additionalData,
|
||||
mode,
|
||||
activation,
|
||||
async () => workflowData,
|
||||
);
|
||||
const context = getTriggerFunctions(workflow, node, additionalData, mode, activation);
|
||||
|
||||
|
|
@ -479,6 +489,7 @@ describe('ActiveWorkflowManager', () => {
|
|||
additionalData,
|
||||
mode,
|
||||
activation,
|
||||
async () => workflowData,
|
||||
);
|
||||
const context = getTriggerFunctions(workflow, node, additionalData, mode, activation);
|
||||
|
||||
|
|
@ -518,6 +529,7 @@ describe('ActiveWorkflowManager', () => {
|
|||
additionalData,
|
||||
mode,
|
||||
activation,
|
||||
async () => workflowData,
|
||||
);
|
||||
const context = getTriggerFunctions(workflow, node, additionalData, mode, activation);
|
||||
|
||||
|
|
@ -555,6 +567,7 @@ describe('ActiveWorkflowManager', () => {
|
|||
additionalData,
|
||||
mode,
|
||||
activation,
|
||||
async () => workflowData,
|
||||
);
|
||||
const context = getTriggerFunctions(workflow, node, additionalData, mode, activation);
|
||||
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ import * as WebhookHelpers from '@/webhooks/webhook-helpers';
|
|||
import { WebhookService } from '@/webhooks/webhook.service';
|
||||
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
|
||||
import { WorkflowExecutionService } from '@/workflows/workflow-execution.service';
|
||||
import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service';
|
||||
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
|
||||
import { getErrorDescription, getErrorNodeId } from '@/workflows/utils';
|
||||
import { formatWorkflow } from '@/workflows/workflow.formatter';
|
||||
|
|
@ -100,6 +101,7 @@ export class ActiveWorkflowManager {
|
|||
private readonly push: Push,
|
||||
private readonly eventService: EventService,
|
||||
private readonly storageConfig: StorageConfig,
|
||||
private readonly workflowPublishedDataService: WorkflowPublishedDataService,
|
||||
private readonly eventBus: MessageEventBus,
|
||||
) {
|
||||
this.logger = this.logger.scoped(['workflow-activation']);
|
||||
|
|
@ -308,6 +310,11 @@ export class ActiveWorkflowManager {
|
|||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
mode: WorkflowExecuteMode,
|
||||
activation: WorkflowActivateMode,
|
||||
// TODO(CAT-3202): this callback lets us switch between reading from
|
||||
// the in-memory workflowData (flag off) and the workflow published data
|
||||
// service (flag on). Once the feature flag is removed, we'll call the
|
||||
// service directly and this parameter will go away.
|
||||
resolveWorkflowData: () => Promise<IWorkflowBase>,
|
||||
): IGetExecutePollFunctions {
|
||||
return (workflow: Workflow, node: INode) => {
|
||||
const __emit = (
|
||||
|
|
@ -317,13 +324,20 @@ export class ActiveWorkflowManager {
|
|||
) => {
|
||||
this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`);
|
||||
void this.workflowStaticDataService.saveStaticData(workflow);
|
||||
const executePromise = this.workflowExecutionService.runWorkflow(
|
||||
workflowData,
|
||||
node,
|
||||
data,
|
||||
additionalData,
|
||||
mode,
|
||||
responsePromise,
|
||||
|
||||
// TODO(CAT-3202): resolves workflow data via callback so we
|
||||
// can feature-flag between in-memory data and the published data
|
||||
// service. Once the flag is removed, we'll call the service directly.
|
||||
const executePromise = resolveWorkflowData().then(
|
||||
async (freshWorkflowData) =>
|
||||
await this.workflowExecutionService.runWorkflow(
|
||||
freshWorkflowData,
|
||||
node,
|
||||
data,
|
||||
additionalData,
|
||||
mode,
|
||||
responsePromise,
|
||||
),
|
||||
);
|
||||
|
||||
if (donePromise) {
|
||||
|
|
@ -359,6 +373,11 @@ export class ActiveWorkflowManager {
|
|||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
mode: WorkflowExecuteMode,
|
||||
activation: WorkflowActivateMode,
|
||||
// TODO(CAT-3202): this callback lets us switch between reading from
|
||||
// the in-memory workflowData (flag off) and the workflow published data
|
||||
// service (flag on). Once the feature flag is removed, we'll call the
|
||||
// service directly and this parameter will go away.
|
||||
resolveWorkflowData: () => Promise<IWorkflowBase>,
|
||||
): IGetExecuteTriggerFunctions {
|
||||
return (workflow: Workflow, node: INode) => {
|
||||
const emit = (
|
||||
|
|
@ -370,15 +389,21 @@ export class ActiveWorkflowManager {
|
|||
this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
|
||||
void this.workflowStaticDataService.saveStaticData(workflow);
|
||||
|
||||
const executePromise = this.workflowExecutionService
|
||||
.runWorkflow(
|
||||
workflowData,
|
||||
node,
|
||||
data,
|
||||
additionalData,
|
||||
mode,
|
||||
responsePromise,
|
||||
deduplicationKey,
|
||||
// TODO(CAT-3202): resolves workflow data via callback so we
|
||||
// can feature-flag between in-memory data and the published data
|
||||
// service. Once the flag is removed, we'll call the service directly.
|
||||
const executePromise = resolveWorkflowData()
|
||||
.then(
|
||||
async (freshWorkflowData) =>
|
||||
await this.workflowExecutionService.runWorkflow(
|
||||
freshWorkflowData,
|
||||
node,
|
||||
data,
|
||||
additionalData,
|
||||
mode,
|
||||
responsePromise,
|
||||
deduplicationKey,
|
||||
),
|
||||
)
|
||||
.catch((error: unknown) => {
|
||||
if (error instanceof DuplicateExecutionError) {
|
||||
|
|
@ -500,6 +525,30 @@ export class ActiveWorkflowManager {
|
|||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the published workflow nodes/connections from the
|
||||
* `workflow_published_version` table. The passed-in workflow is used
|
||||
* for all other fields (staticData, settings, etc.).
|
||||
*
|
||||
* TODO: Add error handling / fallback strategy for transient DB failures.
|
||||
*/
|
||||
private async loadPublishedWorkflowData(
|
||||
initialWorkflowData: IWorkflowDb,
|
||||
): Promise<IWorkflowBase> {
|
||||
const publishedData = await this.workflowPublishedDataService.getPublishedWorkflowData(
|
||||
initialWorkflowData.id,
|
||||
);
|
||||
|
||||
if (!publishedData) {
|
||||
throw new UnexpectedError('Published version not found for workflow', {
|
||||
extra: { workflowId: initialWorkflowData.id },
|
||||
});
|
||||
}
|
||||
|
||||
const { nodes, connections } = publishedData.publishedVersion;
|
||||
return { ...initialWorkflowData, nodes, connections };
|
||||
}
|
||||
|
||||
private isActivationInProgress = false;
|
||||
|
||||
/**
|
||||
|
|
@ -695,6 +744,7 @@ export class ActiveWorkflowManager {
|
|||
}
|
||||
|
||||
const { nodes, connections } = dbWorkflow.activeVersion;
|
||||
|
||||
dbWorkflow.nodes = nodes;
|
||||
dbWorkflow.connections = connections;
|
||||
|
||||
|
|
@ -739,11 +789,25 @@ export class ActiveWorkflowManager {
|
|||
);
|
||||
}
|
||||
|
||||
// When the flag is on, trigger/poller emit callbacks re-read the published
|
||||
// version from the DB so they pick up updates without deactivate/reactivate.
|
||||
// When the flag is off, they use the in-memory workflowData (same as before).
|
||||
//
|
||||
// Note: we intentionally load the latest published version when the trigger
|
||||
// fires so all triggers are updated at the same time without any downtime.
|
||||
// The workflow publication service is responsible for ensuring that
|
||||
// removed/disabled triggers in a new workflow version are deactivated before
|
||||
// updating the published version.
|
||||
const resolveWorkflowData = this.workflowsConfig.useWorkflowPublicationService
|
||||
? async () => await this.loadPublishedWorkflowData(dbWorkflow)
|
||||
: async () => dbWorkflow as IWorkflowBase;
|
||||
|
||||
if (shouldAddTriggersAndPollers) {
|
||||
added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
||||
activationMode,
|
||||
executionMode: 'trigger',
|
||||
additionalData,
|
||||
resolveWorkflowData,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -1064,10 +1128,12 @@ export class ActiveWorkflowManager {
|
|||
activationMode,
|
||||
executionMode,
|
||||
additionalData,
|
||||
resolveWorkflowData,
|
||||
}: {
|
||||
activationMode: WorkflowActivateMode;
|
||||
executionMode: WorkflowExecuteMode;
|
||||
additionalData: IWorkflowExecuteAdditionalData;
|
||||
resolveWorkflowData: () => Promise<IWorkflowBase>;
|
||||
},
|
||||
) {
|
||||
const getTriggerFunctions = this.getExecuteTriggerFunctions(
|
||||
|
|
@ -1075,6 +1141,7 @@ export class ActiveWorkflowManager {
|
|||
additionalData,
|
||||
executionMode,
|
||||
activationMode,
|
||||
resolveWorkflowData,
|
||||
);
|
||||
|
||||
const getPollFunctions = this.getExecutePollFunctions(
|
||||
|
|
@ -1082,6 +1149,7 @@ export class ActiveWorkflowManager {
|
|||
additionalData,
|
||||
executionMode,
|
||||
activationMode,
|
||||
resolveWorkflowData,
|
||||
);
|
||||
|
||||
if (workflow.getTriggerNodes().length === 0 && workflow.getPollNodes().length === 0) {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import { mockLogger } from '@n8n/backend-test-utils';
|
||||
import type { WorkflowsConfig } from '@n8n/config';
|
||||
import type { WebhookEntity, WorkflowEntity, WorkflowHistory, WorkflowRepository } from '@n8n/db';
|
||||
import type { Response } from 'express';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
|
|
@ -20,6 +21,7 @@ import * as WebhookHelpers from '@/webhooks/webhook-helpers';
|
|||
import type { WebhookService } from '@/webhooks/webhook.service';
|
||||
import type { WebhookRequest } from '@/webhooks/webhook.types';
|
||||
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
|
||||
import type { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service';
|
||||
import type { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
|
||||
|
||||
jest.mock('@/webhooks/webhook-helpers');
|
||||
|
|
@ -34,6 +36,8 @@ describe('LiveWebhooks', () => {
|
|||
const webhookService = mock<WebhookService>();
|
||||
const nodeTypes = mock<NodeTypes>();
|
||||
const workflowStaticDataService = mock<WorkflowStaticDataService>();
|
||||
const workflowsConfig = mock<WorkflowsConfig>({ useWorkflowPublicationService: false });
|
||||
const workflowPublishedDataService = mock<WorkflowPublishedDataService>();
|
||||
|
||||
let liveWebhooks: LiveWebhooks;
|
||||
|
||||
|
|
@ -45,6 +49,8 @@ describe('LiveWebhooks', () => {
|
|||
webhookService,
|
||||
workflowRepository,
|
||||
workflowStaticDataService,
|
||||
workflowsConfig,
|
||||
workflowPublishedDataService,
|
||||
);
|
||||
|
||||
// Mock WorkflowExecuteAdditionalData.getBase to avoid DI issues
|
||||
|
|
@ -256,6 +262,60 @@ describe('LiveWebhooks', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('executeWebhook (with publication service)', () => {
|
||||
beforeEach(() => {
|
||||
Object.assign(workflowsConfig, { useWorkflowPublicationService: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
Object.assign(workflowsConfig, { useWorkflowPublicationService: false });
|
||||
});
|
||||
|
||||
it('should use published version nodes when executing webhook', async () => {
|
||||
const activeNodes: INode[] = [
|
||||
{
|
||||
id: 'webhook-node-active',
|
||||
name: NODE_NAME,
|
||||
type: 'n8n-nodes-base.webhook',
|
||||
typeVersion: 1,
|
||||
position: [100, 200],
|
||||
parameters: { path: WEBHOOK_PATH, httpMethod: 'GET' },
|
||||
},
|
||||
];
|
||||
|
||||
const workflowEntity = mock<WorkflowEntity>({
|
||||
id: WORKFLOW_ID,
|
||||
name: 'Test Workflow',
|
||||
active: true,
|
||||
activeVersionId: 'v1',
|
||||
isArchived: false,
|
||||
shared: [{ role: 'workflow:owner', project: { id: 'project-1', projectRelations: [] } }],
|
||||
});
|
||||
|
||||
const publishedVersion = mock<WorkflowHistory>({
|
||||
versionId: 'v1',
|
||||
workflowId: WORKFLOW_ID,
|
||||
nodes: activeNodes,
|
||||
connections: {},
|
||||
});
|
||||
workflowPublishedDataService.getPublishedWorkflowData.mockResolvedValue({
|
||||
workflow: workflowEntity,
|
||||
publishedVersion,
|
||||
});
|
||||
|
||||
let capturedNodes: INode[] = [];
|
||||
const request = setupExecuteWebhookMocks(workflowEntity, {
|
||||
onExecuteWebhook: ({ workflow }) => {
|
||||
capturedNodes = Object.values(workflow.nodes);
|
||||
},
|
||||
});
|
||||
|
||||
await liveWebhooks.executeWebhook(request, mock<Response>());
|
||||
|
||||
expect(capturedNodes[0].id).toBe('webhook-node-active');
|
||||
});
|
||||
});
|
||||
|
||||
describe('findAccessControlOptions', () => {
|
||||
const httpMethod: IHttpRequestMethods = 'GET';
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { Logger } from '@n8n/backend-common';
|
||||
import { WorkflowRepository } from '@n8n/db';
|
||||
import { WorkflowsConfig } from '@n8n/config';
|
||||
import { WorkflowRepository, type WorkflowEntity, type WorkflowHistory } from '@n8n/db';
|
||||
import { Service } from '@n8n/di';
|
||||
import type { Response } from 'express';
|
||||
import { Workflow, CHAT_TRIGGER_NODE_TYPE } from 'n8n-workflow';
|
||||
|
|
@ -11,6 +12,7 @@ import { NodeTypes } from '@/node-types';
|
|||
import * as WebhookHelpers from '@/webhooks/webhook-helpers';
|
||||
import { WebhookService } from '@/webhooks/webhook.service';
|
||||
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
|
||||
import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service';
|
||||
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
|
||||
|
||||
import { authAllowlistedNodes } from './constants';
|
||||
|
|
@ -37,6 +39,8 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
private readonly webhookService: WebhookService,
|
||||
private readonly workflowRepository: WorkflowRepository,
|
||||
private readonly workflowStaticDataService: WorkflowStaticDataService,
|
||||
private readonly workflowsConfig: WorkflowsConfig,
|
||||
private readonly workflowPublishedDataService: WorkflowPublishedDataService,
|
||||
) {}
|
||||
|
||||
async getWebhookMethods(path: string) {
|
||||
|
|
@ -98,33 +102,14 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
});
|
||||
}
|
||||
|
||||
const workflowData = await this.workflowRepository.findOne({
|
||||
where: { id: webhook.workflowId },
|
||||
relations: {
|
||||
activeVersion: true,
|
||||
shared: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (workflowData === null) {
|
||||
throw new NotFoundError(`Could not find workflow with id "${webhook.workflowId}"`);
|
||||
}
|
||||
|
||||
if (!workflowData.activeVersion) {
|
||||
throw new NotFoundError(
|
||||
`Active version not found for workflow with id "${webhook.workflowId}"`,
|
||||
);
|
||||
}
|
||||
|
||||
const { nodes, connections } = workflowData.activeVersion;
|
||||
const { workflow: workflowData, publishedVersion } = await this.loadWebhookExecutionData(
|
||||
webhook.workflowId,
|
||||
);
|
||||
const { nodes, connections } = publishedVersion;
|
||||
|
||||
// Create a clean workflowData object with only activeVersion nodes/connections
|
||||
// This prevents any downstream code from accidentally using the draft nodes
|
||||
const activeWorkflowData: IWorkflowBase = {
|
||||
...workflowData,
|
||||
nodes,
|
||||
connections,
|
||||
};
|
||||
const activeWorkflowData: IWorkflowBase = { ...workflowData, nodes, connections };
|
||||
|
||||
const workflow = new Workflow({
|
||||
id: webhook.workflowId,
|
||||
|
|
@ -137,7 +122,7 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
settings: workflowData.settings,
|
||||
});
|
||||
|
||||
const ownerProjectId = workflowData.shared.find(
|
||||
const ownerProjectId = workflowData.shared?.find(
|
||||
(share) => share.role === 'workflow:owner',
|
||||
)?.projectId;
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
|
|
@ -200,6 +185,48 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
}
|
||||
}
|
||||
|
||||
private async loadWebhookExecutionData(
|
||||
workflowId: string,
|
||||
): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory }> {
|
||||
return this.workflowsConfig.useWorkflowPublicationService
|
||||
? await this.loadFromPublishedVersion(workflowId)
|
||||
: await this.loadFromActiveVersion(workflowId);
|
||||
}
|
||||
|
||||
/**
|
||||
* New path for the workflow publication service. Behind a flag, disabled
|
||||
* by default.
|
||||
*/
|
||||
private async loadFromPublishedVersion(
|
||||
workflowId: string,
|
||||
): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory }> {
|
||||
const publishedData =
|
||||
await this.workflowPublishedDataService.getPublishedWorkflowData(workflowId);
|
||||
if (publishedData === null) {
|
||||
throw new NotFoundError(`Published version not found for workflow with id "${workflowId}"`);
|
||||
}
|
||||
return { workflow: publishedData.workflow, publishedVersion: publishedData.publishedVersion };
|
||||
}
|
||||
|
||||
/**
|
||||
* Old path, before the workflow publication service. Currently the default.
|
||||
*/
|
||||
private async loadFromActiveVersion(
|
||||
workflowId: string,
|
||||
): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory }> {
|
||||
const workflowData = await this.workflowRepository.findOne({
|
||||
where: { id: workflowId },
|
||||
relations: { activeVersion: true, shared: true },
|
||||
});
|
||||
if (workflowData === null) {
|
||||
throw new NotFoundError(`Could not find workflow with id "${workflowId}"`);
|
||||
}
|
||||
if (!workflowData.activeVersion) {
|
||||
throw new NotFoundError(`Active version not found for workflow with id "${workflowId}"`);
|
||||
}
|
||||
return { workflow: workflowData, publishedVersion: workflowData.activeVersion };
|
||||
}
|
||||
|
||||
private async findWebhook(path: string, httpMethod: IHttpRequestMethods) {
|
||||
// Remove trailing slash
|
||||
if (path.endsWith('/')) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
import type { WorkflowPublishedVersionRepository, WorkflowPublishedVersion } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { ErrorReporter } from 'n8n-core';
|
||||
|
||||
import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service';
|
||||
|
||||
describe('WorkflowPublishedDataService', () => {
|
||||
const workflowPublishedVersionRepository = mock<WorkflowPublishedVersionRepository>();
|
||||
const errorReporter = mock<ErrorReporter>();
|
||||
let service: WorkflowPublishedDataService;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
service = new WorkflowPublishedDataService(errorReporter, workflowPublishedVersionRepository);
|
||||
});
|
||||
|
||||
// Verifies that we hit the repository and return the data it provides.
|
||||
test('should return published data when record exists', async () => {
|
||||
const nodes = [
|
||||
{
|
||||
id: 'node-1',
|
||||
name: 'Test',
|
||||
type: 'n8n-nodes-base.noOp',
|
||||
typeVersion: 1,
|
||||
position: [0, 0] as [number, number],
|
||||
parameters: {},
|
||||
},
|
||||
];
|
||||
const connections = {};
|
||||
|
||||
const record = mock<WorkflowPublishedVersion>();
|
||||
Object.defineProperty(record, 'publishedVersion', {
|
||||
value: { nodes, connections, name: 'v1' },
|
||||
});
|
||||
Object.defineProperty(record, 'workflow', {
|
||||
value: { name: 'Workflow Name', staticData: undefined, settings: {}, shared: [] },
|
||||
});
|
||||
workflowPublishedVersionRepository.getPublishedVersionWithRelations.mockResolvedValue(record);
|
||||
|
||||
const result = await service.getPublishedWorkflowData('wf-1');
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
expect(result!.workflow.name).toBe('Workflow Name');
|
||||
expect(result!.publishedVersion.nodes).toBe(nodes);
|
||||
expect(result!.publishedVersion.connections).toBe(connections);
|
||||
});
|
||||
|
||||
test('should return null when no record exists', async () => {
|
||||
workflowPublishedVersionRepository.getPublishedVersionWithRelations.mockResolvedValue(null);
|
||||
|
||||
const result = await service.getPublishedWorkflowData('wf-1');
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
|
||||
test('should return null when publishedVersion relation is missing', async () => {
|
||||
const record = mock<WorkflowPublishedVersion>();
|
||||
// Simulate unloaded relation — TypeORM returns undefined for unloaded relations
|
||||
Object.defineProperty(record, 'publishedVersion', { value: undefined });
|
||||
Object.defineProperty(record, 'workflow', { value: { name: 'Workflow Name' } });
|
||||
workflowPublishedVersionRepository.getPublishedVersionWithRelations.mockResolvedValue(record);
|
||||
|
||||
const result = await service.getPublishedWorkflowData('wf-1');
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
import {
|
||||
WorkflowPublishedVersionRepository,
|
||||
type WorkflowEntity,
|
||||
type WorkflowHistory,
|
||||
} from '@n8n/db';
|
||||
import { Service } from '@n8n/di';
|
||||
import { ErrorReporter } from 'n8n-core';
|
||||
import { UnexpectedError } from 'n8n-workflow';
|
||||
|
||||
@Service()
|
||||
export class WorkflowPublishedDataService {
|
||||
constructor(
|
||||
private readonly errorReporter: ErrorReporter,
|
||||
private readonly workflowPublishedVersionRepository: WorkflowPublishedVersionRepository,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Resolves a workflow's published version: returns the workflow entity and
|
||||
* the `WorkflowHistory` row that the `workflow_published_version` mapping
|
||||
* currently points at.
|
||||
*/
|
||||
async getPublishedWorkflowData(
|
||||
workflowId: string,
|
||||
): Promise<{ workflow: WorkflowEntity; publishedVersion: WorkflowHistory } | null> {
|
||||
const record =
|
||||
await this.workflowPublishedVersionRepository.getPublishedVersionWithRelations(workflowId);
|
||||
|
||||
// This should not happen: only triggers read from this service, and they
|
||||
// only do so when the flag is on; the publication service stops triggers
|
||||
// before deleting the record. If we hit this, we have a real bug.
|
||||
if (!record?.publishedVersion || !record.workflow) {
|
||||
this.errorReporter.error(
|
||||
new UnexpectedError('Published version record not found for workflow', {
|
||||
extra: { workflowId },
|
||||
}),
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
return { workflow: record.workflow, publishedVersion: record.publishedVersion };
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,92 @@
|
|||
import { createWorkflowWithHistory, setActiveVersion, testDb } from '@n8n/backend-test-utils';
|
||||
import { WorkflowPublishedVersionRepository } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import type { INode } from 'n8n-workflow';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { WorkflowPublishedDataService } from '@/workflows/workflow-published-data.service';
|
||||
|
||||
import { createOwner } from '../shared/db/users';
|
||||
import { createWorkflowHistoryItem } from '../shared/db/workflow-history';
|
||||
|
||||
let workflowPublishedVersionRepository: WorkflowPublishedVersionRepository;
|
||||
let workflowPublishedDataService: WorkflowPublishedDataService;
|
||||
|
||||
beforeAll(async () => {
|
||||
await testDb.init();
|
||||
workflowPublishedVersionRepository = Container.get(WorkflowPublishedVersionRepository);
|
||||
workflowPublishedDataService = Container.get(WorkflowPublishedDataService);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await testDb.terminate();
|
||||
});
|
||||
|
||||
const makeNode = (name: string): INode => ({
|
||||
id: uuid(),
|
||||
name,
|
||||
type: 'n8n-nodes-base.noOp',
|
||||
typeVersion: 1,
|
||||
position: [0, 0],
|
||||
parameters: {},
|
||||
});
|
||||
|
||||
describe('WorkflowPublishedDataService', () => {
|
||||
test('should read nodes/connections from workflow_published_version table', async () => {
|
||||
const owner = await createOwner();
|
||||
const workflow = await createWorkflowWithHistory({}, owner);
|
||||
await setActiveVersion(workflow.id, workflow.versionId);
|
||||
|
||||
// Write to published version table
|
||||
await workflowPublishedVersionRepository.setPublishedVersion(workflow.id, workflow.versionId);
|
||||
|
||||
const result = await workflowPublishedDataService.getPublishedWorkflowData(workflow.id);
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
expect(result!.workflow.id).toBe(workflow.id);
|
||||
expect(result!.publishedVersion.nodes).toEqual(
|
||||
expect.arrayContaining([expect.objectContaining({ type: 'n8n-nodes-base.scheduleTrigger' })]),
|
||||
);
|
||||
});
|
||||
|
||||
test('should return data from a different version than activeVersion when published_version table points elsewhere', async () => {
|
||||
const owner = await createOwner();
|
||||
const workflow = await createWorkflowWithHistory({}, owner);
|
||||
await setActiveVersion(workflow.id, workflow.versionId);
|
||||
|
||||
// Create a second history version with different nodes
|
||||
const alternateVersionId = uuid();
|
||||
const alternateNodes = [makeNode('Alternate Node')];
|
||||
await createWorkflowHistoryItem(workflow.id, {
|
||||
versionId: alternateVersionId,
|
||||
nodes: alternateNodes,
|
||||
connections: {},
|
||||
});
|
||||
|
||||
// Point the published version table to the alternate version
|
||||
// (NOT the activeVersion). This proves we're reading from the table.
|
||||
await workflowPublishedVersionRepository.setPublishedVersion(workflow.id, alternateVersionId);
|
||||
|
||||
const result = await workflowPublishedDataService.getPublishedWorkflowData(workflow.id);
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
// Should have the alternate nodes, NOT the original activeVersion nodes
|
||||
expect(result!.publishedVersion.nodes).toEqual(
|
||||
expect.arrayContaining([expect.objectContaining({ name: 'Alternate Node' })]),
|
||||
);
|
||||
expect(result!.publishedVersion.nodes).not.toEqual(
|
||||
expect.arrayContaining([expect.objectContaining({ type: 'n8n-nodes-base.scheduleTrigger' })]),
|
||||
);
|
||||
});
|
||||
|
||||
test('should return null when published_version table has no record', async () => {
|
||||
const owner = await createOwner();
|
||||
const workflow = await createWorkflowWithHistory({}, owner);
|
||||
await setActiveVersion(workflow.id, workflow.versionId);
|
||||
// Don't write to published version table
|
||||
|
||||
const result = await workflowPublishedDataService.getPublishedWorkflowData(workflow.id);
|
||||
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,67 @@
|
|||
import { test, expect } from '../../../fixtures/base';
|
||||
|
||||
test.use({
|
||||
capability: {
|
||||
env: {
|
||||
TEST_ISOLATION: 'workflow-publication-service',
|
||||
N8N_USE_WORKFLOW_PUBLICATION_SERVICE: 'true',
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
test.describe(
|
||||
'Workflow Publication Service',
|
||||
{
|
||||
annotation: [{ type: 'owner', description: 'Catalysts' }],
|
||||
},
|
||||
() => {
|
||||
test('webhook fires the published version even when the draft has diverged', async ({
|
||||
api,
|
||||
}) => {
|
||||
const { workflowId, webhookPath } = await api.workflows.importWorkflowFromFile(
|
||||
'simple-webhook-test.json',
|
||||
);
|
||||
|
||||
// Fetch the activated workflow, then update the draft so it no longer
|
||||
// matches the version recorded in workflow_published_version.
|
||||
const published = await api.workflows.getWorkflow(workflowId);
|
||||
const divergedNodes = published.nodes.map((node) =>
|
||||
node.name === 'Set Response'
|
||||
? {
|
||||
...node,
|
||||
parameters: {
|
||||
assignments: {
|
||||
assignments: [
|
||||
{
|
||||
id: 'draft-only-marker',
|
||||
name: 'result',
|
||||
value: 'draft-only',
|
||||
type: 'string',
|
||||
},
|
||||
],
|
||||
},
|
||||
options: {},
|
||||
},
|
||||
}
|
||||
: node,
|
||||
);
|
||||
await api.workflows.update(workflowId, published.versionId!, { nodes: divergedNodes });
|
||||
|
||||
// With the publication service flag on, the trigger path resolves
|
||||
// workflow data via workflow_published_version. The published nodes
|
||||
// (not the just-saved draft) should be the ones that execute.
|
||||
const response = await api.webhooks.trigger(`/webhook/${webhookPath}`, {
|
||||
method: 'POST',
|
||||
data: { message: 'from-publication-test' },
|
||||
});
|
||||
expect(response.ok()).toBe(true);
|
||||
|
||||
const execution = await api.workflows.waitForExecution(workflowId, 5000);
|
||||
expect(execution.status).toBe('success');
|
||||
|
||||
const details = await api.workflows.getExecution(execution.id);
|
||||
expect(details.data).toContain('Webhook received');
|
||||
expect(details.data).not.toContain('draft-only');
|
||||
});
|
||||
},
|
||||
);
|
||||
Loading…
Reference in New Issue
Block a user