diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts index 0e8d96fe8d2..931ddfef874 100644 --- a/packages/cli/src/__tests__/active-workflow-manager.test.ts +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -108,9 +108,9 @@ describe('ActiveWorkflowManager', () => { 'should skip inactive workflow in `%s` activation mode', async (mode) => { const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn( + const addNonWebhookTriggersSpy = jest.spyOn( activeWorkflowManager, - 'addTriggersAndPollers', + 'addNonWebhookTriggers', ); workflowRepository.findById.mockResolvedValue( mock({ active: false, activeVersionId: null, activeVersion: null }), @@ -119,7 +119,7 @@ describe('ActiveWorkflowManager', () => { const added = await activeWorkflowManager.add('some-id', mode); expect(addWebhooksSpy).not.toHaveBeenCalled(); - expect(addTriggersAndPollersSpy).not.toHaveBeenCalled(); + expect(addNonWebhookTriggersSpy).not.toHaveBeenCalled(); expect(added).toEqual({ triggersAndPollers: false, webhooks: false }); }, ); @@ -128,9 +128,9 @@ describe('ActiveWorkflowManager', () => { 'should skip archived workflow in `%s` activation mode', async (mode) => { const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn( + const addNonWebhookTriggersSpy = jest.spyOn( activeWorkflowManager, - 'addTriggersAndPollers', + 'addNonWebhookTriggers', ); workflowRepository.findById.mockResolvedValue( mock({ @@ -144,7 +144,7 @@ describe('ActiveWorkflowManager', () => { const added = await activeWorkflowManager.add('archived-id', mode); expect(addWebhooksSpy).not.toHaveBeenCalled(); - expect(addTriggersAndPollersSpy).not.toHaveBeenCalled(); + expect(addNonWebhookTriggersSpy).not.toHaveBeenCalled(); expect(added).toEqual({ triggersAndPollers: false, webhooks: false }); }, ); @@ -168,7 +168,7 @@ describe('ActiveWorkflowManager', () => { }); }); - describe('handleAddWebhooksTriggersAndPollers', () => { + describe('handleAddWebhooksAndNonWebhookTriggers', () => { const push = mock(); const publisher = mock(); @@ -206,7 +206,7 @@ describe('ActiveWorkflowManager', () => { jest.spyOn(activeWorkflowManager, 'add').mockRejectedValue(activationError); - await activeWorkflowManager.handleAddWebhooksTriggersAndPollers({ + await activeWorkflowManager.handleAddWebhooksAndNonWebhookTriggers({ workflowId: 'wf-1', activeVersionId: 'v1', activationMode: 'activate', @@ -234,7 +234,7 @@ describe('ActiveWorkflowManager', () => { test('should not include nodeId in broadcast when error has no node', async () => { jest.spyOn(activeWorkflowManager, 'add').mockRejectedValue(new Error('Some error')); - await activeWorkflowManager.handleAddWebhooksTriggersAndPollers({ + await activeWorkflowManager.handleAddWebhooksAndNonWebhookTriggers({ workflowId: 'wf-1', activeVersionId: 'v1', activationMode: 'activate', diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 1dc584781fc..48514f611ae 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -653,37 +653,36 @@ export class ActiveWorkflowManager { } @OnLeaderTakeover() - async addAllTriggerAndPollerBasedWorkflows() { + async addAllNonWebhookTriggerWorkflows() { await this.addActiveWorkflows('leadershipChange'); } @OnLeaderStepdown() @OnShutdown() - async removeAllTriggerAndPollerBasedWorkflows() { + async removeAllNonWebhookTriggerWorkflows() { this.removeAllQueuedWorkflowActivations(); - await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); + await this.activeWorkflows.removeAllNonWebhookTriggerWorkflows(); } /** * Register a workflow as active. * - * An activatable workflow may be webhook-, trigger-, or poller-based: + * An activatable workflow may start from: * - * - A `webhook` is an HTTP-based node that can start a workflow when called - * by a third-party service. - * - A `poller` is an HTTP-based node that can start a workflow when detecting - * a change while regularly checking a third-party service. - * - A `trigger` is any non-HTTP-based node that can start a workflow, e.g. a - * time-based node like Schedule Trigger or a message-queue-based node. + * - A webhook trigger, invoked by an HTTP request. + * - A poll trigger, which regularly checks an external service. + * - An active trigger, which keeps a listener or persistent connection open. + * - A schedule trigger, which registers its own crons. * * Note that despite the name, most "trigger" nodes are actually webhook-based - * and so qualify as `webhook`, e.g. Stripe Trigger. + * and so qualify as webhook triggers, e.g. Stripe Trigger. * - * Triggers and pollers are registered as active in memory at `ActiveWorkflows`, - * but webhooks are registered by being entered in the `webhook_entity` table, - * since webhooks do not require continuous execution. + * Active triggers, poll triggers, and schedule triggers are registered as + * active in memory at `ActiveWorkflows`, but webhook triggers are registered + * by being entered in the `webhook_entity` table, since webhooks do not + * require continuous execution. * - * Returns whether this operation added webhooks and/or triggers and pollers. + * Returns whether this operation added webhooks and/or non-webhook triggers. */ async add( workflowId: WorkflowId, @@ -724,7 +723,7 @@ export class ActiveWorkflowManager { let workflow: Workflow; const shouldAddWebhooks = this.shouldAddWebhooks(activationMode); - const shouldAddTriggersAndPollers = this.shouldAddTriggersAndPollers(); + const shouldAddNonWebhookTriggers = this.shouldAddNonWebhookTriggers(); try { if (['init', 'leadershipChange'].includes(activationMode) && !dbWorkflow.activeVersion) { @@ -767,7 +766,7 @@ export class ActiveWorkflowManager { if (!validation.isValid) { throw new WorkflowActivationError( - `Workflow ${formatWorkflow(dbWorkflow)} has no node to start the workflow - at least one trigger, poller or webhook node is required`, + `Workflow ${formatWorkflow(dbWorkflow)} has no node to start the workflow - at least one active trigger, poll trigger, webhook trigger, or schedule trigger node is required`, { level: 'warning' }, ); } @@ -789,8 +788,8 @@ export class ActiveWorkflowManager { ); } - // When the flag is on, trigger/poller emit callbacks re-read the published - // version from the DB so they pick up updates without deactivate/reactivate. + // When the flag is on, non-webhook trigger emit callbacks re-read the + // published version from the DB so they pick up updates without deactivate/reactivate. // When the flag is off, they use the in-memory workflowData (same as before). // // Note: we intentionally load the latest published version when the trigger @@ -802,8 +801,8 @@ export class ActiveWorkflowManager { ? async () => await this.loadPublishedWorkflowData(dbWorkflow) : async () => dbWorkflow as IWorkflowBase; - if (shouldAddTriggersAndPollers) { - added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, { + if (shouldAddNonWebhookTriggers) { + added.triggersAndPollers = await this.addNonWebhookTriggers(dbWorkflow, workflow, { activationMode, executionMode: 'trigger', additionalData, @@ -871,7 +870,7 @@ export class ActiveWorkflowManager { instanceType: 'main', instanceRole: 'leader', }) - async handleAddWebhooksTriggersAndPollers({ + async handleAddWebhooksAndNonWebhookTriggers({ workflowId, activeVersionId, activationMode, @@ -1082,17 +1081,16 @@ export class ActiveWorkflowManager { this.removeQueuedWorkflowActivation(workflowId); } - // if it's active in memory then it's a trigger - // so remove from list of actives workflows - await this.removeWorkflowTriggersAndPollers(workflowId); + // If it is active in memory, it is a non-webhook trigger workflow. + await this.removeNonWebhookTriggers(workflowId); } @OnPubSubEvent('remove-triggers-and-pollers', { instanceType: 'main', instanceRole: 'leader' }) - async handleRemoveTriggersAndPollers({ + async handleRemoveNonWebhookTriggers({ workflowId, }: PubSubCommandMap['remove-triggers-and-pollers']) { await this.removeActivationError(workflowId); - await this.removeWorkflowTriggersAndPollers(workflowId); + await this.removeNonWebhookTriggers(workflowId); this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } }); @@ -1104,24 +1102,24 @@ export class ActiveWorkflowManager { } /** - * Stop running active triggers and pollers for a workflow. + * Stop running active, poll, and schedule triggers for a workflow. */ - async removeWorkflowTriggersAndPollers(workflowId: WorkflowId) { + async removeNonWebhookTriggers(workflowId: WorkflowId) { if (!this.activeWorkflows.isActive(workflowId)) return; const wasRemoved = await this.activeWorkflows.remove(workflowId); if (wasRemoved) { - this.logger.debug(`Removed triggers and pollers for workflow "${workflowId}"`, { + this.logger.debug(`Removed non-webhook triggers for workflow "${workflowId}"`, { workflowId, }); } } /** - * Register as active in memory a trigger- or poller-based workflow. + * Register a workflow's active, poll, and schedule triggers in memory. */ - async addTriggersAndPollers( + async addNonWebhookTriggers( dbWorkflow: WorkflowEntity, workflow: Workflow, { @@ -1166,7 +1164,7 @@ export class ActiveWorkflowManager { getPollFunctions, ); - this.logger.debug(`Added triggers and pollers for workflow ${formatWorkflow(dbWorkflow)}`); + this.logger.debug(`Added non-webhook triggers for workflow ${formatWorkflow(dbWorkflow)}`); return true; } @@ -1189,12 +1187,12 @@ export class ActiveWorkflowManager { } /** - * Whether this instance may add triggers and pollers to memory. + * Whether this instance may add active, poll, and schedule triggers to memory. * * In both single- and multi-main setup, only the leader is allowed to manage - * triggers and pollers in memory, to ensure they are not duplicated. + * non-webhook triggers in memory, to ensure they are not duplicated. */ - shouldAddTriggersAndPollers() { + shouldAddNonWebhookTriggers() { return this.instanceSettings.isLeader; } } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 8f6358df281..5df6f79372d 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -100,7 +100,7 @@ export class Start extends BaseCommand> { await this.externalHooks?.run('n8n.stop'); - await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows(); + await this.activeWorkflowManager.removeAllNonWebhookTriggerWorkflows(); if (this.instanceSettings.isMultiMain) { await Container.get(MultiMainSetup).shutdown(); diff --git a/packages/cli/src/controllers/debug.controller.ts b/packages/cli/src/controllers/debug.controller.ts index 0614885c5c9..9df7878b6ce 100644 --- a/packages/cli/src/controllers/debug.controller.ts +++ b/packages/cli/src/controllers/debug.controller.ts @@ -32,7 +32,7 @@ export class DebugController { isLeader: this.instanceSettings.isLeader, activeWorkflows: { webhooks, // webhook-based active workflows - triggersAndPollers, // poller- and trigger-based active workflows + triggersAndPollers, // non-webhook trigger active workflows }, activationErrors, }; diff --git a/packages/cli/src/workflows/workflow-validation.service.ts b/packages/cli/src/workflows/workflow-validation.service.ts index a40ca866455..1ea3ff09cb7 100644 --- a/packages/cli/src/workflows/workflow-validation.service.ts +++ b/packages/cli/src/workflows/workflow-validation.service.ts @@ -265,7 +265,7 @@ export class WorkflowValidationService { connections: IConnections, nodeTypes: NodeTypes, ): WorkflowValidationResult { - // Validate trigger nodes + // Validate workflow entry points: active, poll, webhook, or schedule triggers. const triggerValidation = validateWorkflowHasTriggerLikeNode(nodes, nodeTypes, STARTING_NODES); if (!triggerValidation.isValid) { @@ -273,7 +273,7 @@ export class WorkflowValidationService { isValid: false, error: triggerValidation.error ?? - 'Workflow cannot be activated because it has no trigger node. At least one trigger, webhook, or polling node is required.', + 'Workflow cannot be activated because it has no trigger node. At least one active trigger, poll trigger, webhook trigger, or schedule trigger node is required.', }; } diff --git a/packages/cli/test/integration/active-workflow-manager.test.ts b/packages/cli/test/integration/active-workflow-manager.test.ts index 50722768c51..531ecb4e1a1 100644 --- a/packages/cli/test/integration/active-workflow-manager.test.ts +++ b/packages/cli/test/integration/active-workflow-manager.test.ts @@ -145,21 +145,21 @@ describe('init()', () => { describe('add()', () => { describe('in single-main mode', () => { test.each(['activate', 'update'])( - "should add webhooks, triggers and pollers for workflow in '%s' activation mode", + "should add webhooks and non-webhook triggers for workflow in '%s' activation mode", async (mode) => { await activeWorkflowManager.init(); const dbWorkflow = await createActiveWorkflow(); const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowManager, 'addTriggersAndPollers'); + const addNonWebhookTriggersSpy = jest.spyOn(activeWorkflowManager, 'addNonWebhookTriggers'); await activeWorkflowManager.add(dbWorkflow.id, mode); const [argWorkflow] = addWebhooksSpy.mock.calls[0]; - const [_, _argWorkflow] = addTriggersAndPollersSpy.mock.calls[0]; + const [_, _argWorkflow] = addNonWebhookTriggersSpy.mock.calls[0]; expect(addWebhooksSpy).toHaveBeenCalledTimes(1); - expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + expect(addNonWebhookTriggersSpy).toHaveBeenCalledTimes(1); if (!(argWorkflow instanceof Workflow)) fail(); if (!(_argWorkflow instanceof Workflow)) fail(); @@ -262,17 +262,17 @@ describe('remove()', () => { expect(webhookService.deleteWebhook).toHaveBeenCalledTimes(1); }); - it('should stop running triggers and pollers', async () => { + it('should stop running non-webhook triggers', async () => { const dbWorkflow = await createActiveWorkflow(); - const removeTriggersAndPollersSpy = jest.spyOn( + const removeNonWebhookTriggersSpy = jest.spyOn( activeWorkflowManager, - 'removeWorkflowTriggersAndPollers', + 'removeNonWebhookTriggers', ); await activeWorkflowManager.init(); await activeWorkflowManager.remove(dbWorkflow.id); - expect(removeTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + expect(removeNonWebhookTriggersSpy).toHaveBeenCalledTimes(1); }); }); }); diff --git a/packages/core/src/execution-engine/__tests__/active-workflows.test.ts b/packages/core/src/execution-engine/__tests__/active-workflows.test.ts index abb315a65b6..bb1a56f1552 100644 --- a/packages/core/src/execution-engine/__tests__/active-workflows.test.ts +++ b/packages/core/src/execution-engine/__tests__/active-workflows.test.ts @@ -87,13 +87,13 @@ describe('ActiveWorkflows', () => { pollFunctions.getNodeParameter.calledWith('pollTimes').mockReturnValue(pollTimes); if (triggerError) { - triggersAndPollers.runTrigger.mockRejectedValueOnce(triggerError); + triggersAndPollers.runTriggerFunction.mockRejectedValueOnce(triggerError); } else { - triggersAndPollers.runTrigger.mockResolvedValue(triggerResponse); + triggersAndPollers.runTriggerFunction.mockResolvedValue(triggerResponse); } if (pollError) { - triggersAndPollers.runPoll.mockRejectedValueOnce(pollError); + triggersAndPollers.runPollFunction.mockRejectedValueOnce(pollError); } else { getPollFunctions.mockReturnValue(pollFunctions); } @@ -111,12 +111,12 @@ describe('ActiveWorkflows', () => { describe('add()', () => { describe('should activate workflow', () => { - it('with trigger nodes', async () => { + it('with trigger function nodes', async () => { await addWorkflow({ triggerNodes: [triggerNode] }); expect(activeWorkflows.isActive(workflowId)).toBe(true); expect(workflow.getTriggerNodes).toHaveBeenCalled(); - expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith( + expect(triggersAndPollers.runTriggerFunction).toHaveBeenCalledWith( workflow, triggerNode, getTriggerFunctions, @@ -126,7 +126,7 @@ describe('ActiveWorkflows', () => { ); }); - it('with polling nodes', async () => { + it('with poll trigger nodes', async () => { await addWorkflow({ pollNodes: [pollNode] }); expect(activeWorkflows.isActive(workflowId)).toBe(true); @@ -134,13 +134,13 @@ describe('ActiveWorkflows', () => { expect(scheduledTaskManager.registerCron).toHaveBeenCalled(); }); - it('with both trigger and polling nodes', async () => { + it('with both trigger function and poll trigger nodes', async () => { await addWorkflow({ triggerNodes: [triggerNode], pollNodes: [pollNode] }); expect(activeWorkflows.isActive(workflowId)).toBe(true); expect(workflow.getTriggerNodes).toHaveBeenCalled(); expect(workflow.getPollNodes).toHaveBeenCalled(); - expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith( + expect(triggersAndPollers.runTriggerFunction).toHaveBeenCalledWith( workflow, triggerNode, getTriggerFunctions, @@ -149,7 +149,11 @@ describe('ActiveWorkflows', () => { activation, ); expect(scheduledTaskManager.registerCron).toHaveBeenCalled(); - expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions); + expect(triggersAndPollers.runPollFunction).toHaveBeenCalledWith( + workflow, + pollNode, + pollFunctions, + ); }); }); @@ -162,8 +166,8 @@ describe('ActiveWorkflows', () => { expect(activeWorkflows.isActive(workflowId)).toBe(false); }); - it('if polling activation fails', async () => { - const error = new Error('Failed to activate polling'); + it('if poll trigger activation fails', async () => { + const error = new Error('Failed to activate poll trigger'); await expect(addWorkflow({ pollNodes: [pollNode], pollError: error })).rejects.toThrow( WorkflowActivationError, ); @@ -211,14 +215,14 @@ describe('ActiveWorkflows', () => { // the activation acquire/release window, so the expression bridge fails // with "No bridge acquired for this context" on every tick. it('should acquire and release the isolate when the scheduled poll fires', async () => { - triggersAndPollers.runPoll.mockResolvedValueOnce(null); // initial activation test poll - triggersAndPollers.runPoll.mockResolvedValueOnce(null); // scheduled poll + triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // initial activation test poll + triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // scheduled poll await addWorkflow({ pollNodes: [pollNode] }); acquireIsolate.mockClear(); releaseIsolate.mockClear(); - triggersAndPollers.runPoll.mockClear(); + triggersAndPollers.runPollFunction.mockClear(); const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0]; const executeScheduledPoll = registerCronCall[1] as () => Promise; @@ -228,10 +232,10 @@ describe('ActiveWorkflows', () => { expect(acquireIsolate).toHaveBeenCalledTimes(1); expect(releaseIsolate).toHaveBeenCalledTimes(1); - expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(1); + expect(triggersAndPollers.runPollFunction).toHaveBeenCalledTimes(1); const [acquireOrder] = acquireIsolate.mock.invocationCallOrder; - const [runPollOrder] = triggersAndPollers.runPoll.mock.invocationCallOrder; + const [runPollOrder] = triggersAndPollers.runPollFunction.mock.invocationCallOrder; const [releaseOrder] = releaseIsolate.mock.invocationCallOrder; expect(acquireOrder).toBeLessThan(runPollOrder); @@ -242,19 +246,19 @@ describe('ActiveWorkflows', () => { // The outer ActiveWorkflowManager.add() acquire covers the test poll // and the subsequent countTriggers call. Nested acquire/release would // release the outer's bridge early and break countTriggers. - triggersAndPollers.runPoll.mockResolvedValueOnce(null); + triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); await addWorkflow({ pollNodes: [pollNode] }); - expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(1); + expect(triggersAndPollers.runPollFunction).toHaveBeenCalledTimes(1); expect(acquireIsolate).not.toHaveBeenCalled(); expect(releaseIsolate).not.toHaveBeenCalled(); }); it('should release the isolate when __emit throws after a successful poll', async () => { const pollData = [[{ json: { foo: 'bar' } }]]; - triggersAndPollers.runPoll.mockResolvedValueOnce(null); // initial activation test poll - triggersAndPollers.runPoll.mockResolvedValueOnce(pollData); // scheduled poll returns data + triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // initial activation test poll + triggersAndPollers.runPollFunction.mockResolvedValueOnce(pollData); // scheduled poll returns data const emitError = new Error('emit failed'); pollFunctions.__emit.mockImplementationOnce(() => { @@ -281,7 +285,7 @@ describe('ActiveWorkflows', () => { // Without this routing, the rejection would escape the cron callback // `() => void executeTrigger()` and become an unhandled rejection — the // user would only see a process-level log line, not an error execution. - triggersAndPollers.runPoll.mockResolvedValueOnce(null); // initial activation test poll + triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // initial activation test poll await addWorkflow({ pollNodes: [pollNode] }); @@ -289,7 +293,7 @@ describe('ActiveWorkflows', () => { acquireIsolate.mockClear(); releaseIsolate.mockClear(); acquireIsolate.mockRejectedValueOnce(acquireError); - triggersAndPollers.runPoll.mockClear(); + triggersAndPollers.runPollFunction.mockClear(); const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0]; const executeScheduledPoll = registerCronCall[1] as () => Promise; @@ -298,13 +302,13 @@ describe('ActiveWorkflows', () => { await flushPromises(); expect(acquireIsolate).toHaveBeenCalledTimes(1); - expect(triggersAndPollers.runPoll).not.toHaveBeenCalled(); + expect(triggersAndPollers.runPollFunction).not.toHaveBeenCalled(); expect(pollFunctions.__emitError).toHaveBeenCalledWith(acquireError); }); it('should release the isolate even when the scheduled poll throws', async () => { const error = new Error('Poll function failed'); - triggersAndPollers.runPoll + triggersAndPollers.runPollFunction .mockResolvedValueOnce(null) // initial activation test poll .mockRejectedValueOnce(error); // scheduled poll fails @@ -333,14 +337,18 @@ describe('ActiveWorkflows', () => { WorkflowActivationError, ); - expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions); + expect(triggersAndPollers.runPollFunction).toHaveBeenCalledWith( + workflow, + pollNode, + pollFunctions, + ); expect(pollFunctions.__emit).not.toHaveBeenCalled(); expect(pollFunctions.__emitError).not.toHaveBeenCalled(); }); it('should emit error when poll fails during regular polling', async () => { const error = new Error('Poll function failed'); - triggersAndPollers.runPoll + triggersAndPollers.runPollFunction .mockResolvedValueOnce(null) // Succeed on first call (testing) .mockRejectedValueOnce(error); // Fail on second call (regular polling) @@ -354,7 +362,7 @@ describe('ActiveWorkflows', () => { await executeTrigger(); await flushPromises(); - expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(2); + expect(triggersAndPollers.runPollFunction).toHaveBeenCalledTimes(2); expect(pollFunctions.__emit).not.toHaveBeenCalled(); expect(pollFunctions.__emitError).toHaveBeenCalledWith(error); }); @@ -436,11 +444,11 @@ describe('ActiveWorkflows', () => { }); }); - describe('removeAllTriggerAndPollerBasedWorkflows()', () => { + describe('removeAllNonWebhookTriggerWorkflows()', () => { it('should remove all active workflows', async () => { await addWorkflow({ triggerNodes: [triggerNode] }); - await activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); + await activeWorkflows.removeAllNonWebhookTriggerWorkflows(); expect(activeWorkflows.allActiveWorkflows()).toEqual([]); expect(scheduledTaskManager.deregisterCrons).toHaveBeenCalledWith(workflowId); diff --git a/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts b/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts index bdc9556c857..6535b4e9de4 100644 --- a/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts +++ b/packages/core/src/execution-engine/__tests__/triggers-and-pollers.test.ts @@ -32,14 +32,14 @@ describe('TriggersAndPollers', () => { nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); }); - describe('runTrigger()', () => { + describe('runTriggerFunction()', () => { const triggerFunctions = mock(); const getTriggerFunctions = vi.fn().mockReturnValue(triggerFunctions); const triggerFn = vi.fn(); const mockEmitData: INodeExecutionData[][] = [[{ json: { data: 'test' } }]]; const runTriggerHelper = async (mode: 'manual' | 'trigger' = 'trigger') => - await triggersAndPollers.runTrigger( + await triggersAndPollers.runTriggerFunction( workflow, node, getTriggerFunctions, @@ -112,12 +112,12 @@ describe('TriggersAndPollers', () => { }); }); - describe('runPoll()', () => { + describe('runPollFunction()', () => { const pollFunctions = mock(); const pollFn = vi.fn(); const runPollHelper = async () => - await triggersAndPollers.runPoll(workflow, node, pollFunctions); + await triggersAndPollers.runPollFunction(workflow, node, pollFunctions); it('should throw error if node type does not have poll function', async () => { await expect(runPollHelper()).rejects.toThrow(ApplicationError); diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts index 5e1ae053afb..29d89f178b7 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute-run-node.test.ts @@ -106,7 +106,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { // Setup Container mock for different dependencies const mockTriggersAndPollersInstance = { - runTrigger: vi.fn(), + runTriggerFunction: vi.fn(), }; const mockGlobalConfigInstance = { sentry: { backendDsn: '' }, @@ -935,7 +935,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { mockNodeType.webhook = undefined; const mockTriggersAndPollersInstance = { - runTrigger: vi.fn().mockResolvedValue(mockTriggerResponse), + runTriggerFunction: vi.fn().mockResolvedValue(mockTriggerResponse), }; const mockGlobalConfigInstance = { sentry: { backendDsn: '' }, @@ -959,7 +959,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { 'manual', ); - expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled(); expect(result).toEqual({ data: mockTriggerData }); }); @@ -970,7 +970,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { mockNodeType.webhook = undefined; const mockTriggersAndPollersInstance = { - runTrigger: vi.fn().mockResolvedValue(undefined), // Return undefined to trigger line 1277 + runTriggerFunction: vi.fn().mockResolvedValue(undefined), // Return undefined to trigger line 1277 }; const mockGlobalConfigInstance = { sentry: { backendDsn: '' }, @@ -994,7 +994,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { 'manual', ); - expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled(); expect(result).toEqual({ data: null }); }); @@ -1011,7 +1011,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { mockNodeType.webhook = undefined; const mockTriggersAndPollersInstance = { - runTrigger: vi.fn().mockResolvedValue(mockTriggerResponse), + runTriggerFunction: vi.fn().mockResolvedValue(mockTriggerResponse), }; const mockGlobalConfigInstance = { sentry: { backendDsn: '' }, @@ -1035,7 +1035,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { 'manual', ); - expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled(); expect(result).toEqual({ data: null, closeFunction: mockCloseFunction }); }); @@ -1055,7 +1055,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { mockNodeType.webhook = undefined; const mockTriggersAndPollersInstance = { - runTrigger: vi.fn().mockResolvedValue(mockTriggerResponse), + runTriggerFunction: vi.fn().mockResolvedValue(mockTriggerResponse), }; const mockGlobalConfigInstance = { sentry: { backendDsn: '' }, @@ -1079,7 +1079,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { 'manual', ); - expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled(); + expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled(); expect(mockManualTriggerFunction).toHaveBeenCalled(); // Verify line 1294 was executed expect(result).toEqual({ data: mockTriggerData, closeFunction: mockCloseFunction }); }); @@ -1552,7 +1552,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => { mockNodeType.webhook = undefined; const mockTriggersAndPollersInstance = { - runTrigger: vi.fn().mockResolvedValue({ + runTriggerFunction: vi.fn().mockResolvedValue({ manualTriggerResponse: Promise.resolve([[{ json: { triggered: 'data' } }]]), }), }; diff --git a/packages/core/src/execution-engine/active-workflows.ts b/packages/core/src/execution-engine/active-workflows.ts index 8c6ce51a0a6..83ec96cc2c6 100644 --- a/packages/core/src/execution-engine/active-workflows.ts +++ b/packages/core/src/execution-engine/active-workflows.ts @@ -76,13 +76,13 @@ export class ActiveWorkflows { getTriggerFunctions: IGetExecuteTriggerFunctions, getPollFunctions: IGetExecutePollFunctions, ) { - const triggerNodes = workflow.getTriggerNodes(); + const triggerFunctionNodes = workflow.getTriggerNodes(); const triggerResponses: ITriggerResponse[] = []; - for (const triggerNode of triggerNodes) { + for (const triggerNode of triggerFunctionNodes) { try { - const triggerResponse = await this.triggersAndPollers.runTrigger( + const triggerResponse = await this.triggersAndPollers.runTriggerFunction( workflow, triggerNode, getTriggerFunctions, @@ -105,13 +105,13 @@ export class ActiveWorkflows { this.activeWorkflows[workflowId] = { triggerResponses }; - const pollingNodes = workflow.getPollNodes(); + const pollTriggerNodes = workflow.getPollNodes(); - if (pollingNodes.length === 0) return; + if (pollTriggerNodes.length === 0) return; - for (const pollNode of pollingNodes) { + for (const pollNode of pollTriggerNodes) { try { - await this.activatePolling( + await this.activatePollTrigger( pollNode, workflow, additionalData, @@ -120,7 +120,8 @@ export class ActiveWorkflows { activation, ); } catch (e) { - // Do not mark this workflow as active if there are no triggerResponses, and any polling activation failed + // Do not mark this workflow as active if there are no active or schedule + // trigger responses, and any poll trigger activation failed. if (triggerResponses.length === 0) { delete this.activeWorkflows[workflowId]; } @@ -136,9 +137,9 @@ export class ActiveWorkflows { } /** - * Activates polling for the given node + * Activates the given poll trigger node. */ - private async activatePolling( + private async activatePollTrigger( node: INode, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, @@ -154,11 +155,10 @@ export class ActiveWorkflows { // Get all the trigger times const cronExpressions = (pollTimes.item || []).map(toCronExpression); - // The trigger function to execute when the cron-time got reached - const executeTrigger = this.createPollExecuteFn(workflow, node, pollFunctions); + const executePollTrigger = this.createPollTriggerExecuteFn(workflow, node, pollFunctions); - // Execute the trigger directly to be able to know if it works - await executeTrigger(true); + // Execute the poll trigger directly to be able to know if it works. + await executePollTrigger(true); for (const expression of cronExpressions) { const fields = expression.split(' '); @@ -177,7 +177,7 @@ export class ActiveWorkflows { }; this.scheduledTaskManager.registerCron(ctx, () => { - void executeTrigger(); + void executePollTrigger(); }); } } @@ -203,7 +203,7 @@ export class ActiveWorkflows { return true; } - async removeAllTriggerAndPollerBasedWorkflows() { + async removeAllNonWebhookTriggerWorkflows() { const activeWorkflowIds = Object.keys(this.activeWorkflows); if (activeWorkflowIds.length === 0) return; @@ -212,7 +212,7 @@ export class ActiveWorkflows { await this.remove(workflowId); } - this.logger.debug('Deactivated all trigger- and poller-based workflows', { + this.logger.debug('Deactivated all non-webhook trigger workflows', { workflowIds: activeWorkflowIds, }); } @@ -241,10 +241,10 @@ export class ActiveWorkflows { } /** - * Creates a function that executes the poll function for a given workflow - * and node and triggers a workflow execution based on the output. + * Creates a function that executes the poll() implementation for a poll + * trigger node and triggers a workflow execution based on the output. */ - private createPollExecuteFn( + private createPollTriggerExecuteFn( workflow: Workflow, node: INode, pollFunctions: IPollFunctions, @@ -260,7 +260,7 @@ export class ActiveWorkflows { }, }, async (span) => { - this.logger.debug(`Polling trigger initiated for workflow "${workflow.name}"`, { + this.logger.debug(`Poll trigger initiated for workflow "${workflow.name}"`, { workflowName: workflow.name, workflowId: workflow.id, }); @@ -276,7 +276,7 @@ export class ActiveWorkflows { try { if (ownsIsolate) await workflow.expression.acquireIsolate(); - const pollResponse = await this.triggersAndPollers.runPoll( + const pollResponse = await this.triggersAndPollers.runPollFunction( workflow, node, pollFunctions, @@ -289,7 +289,7 @@ export class ActiveWorkflows { span.setStatus({ code: SpanStatus.ok }); } catch (error) { span.setStatus({ code: SpanStatus.error }); - // If the poll function fails in the first activation + // If the poll trigger fails in the first activation // throw the error back so we let the user know there is // an issue with the trigger. if (testingTrigger) { diff --git a/packages/core/src/execution-engine/triggers-and-pollers.ts b/packages/core/src/execution-engine/triggers-and-pollers.ts index 9e4a393bbec..1afe8442697 100644 --- a/packages/core/src/execution-engine/triggers-and-pollers.ts +++ b/packages/core/src/execution-engine/triggers-and-pollers.ts @@ -21,9 +21,9 @@ import type { IGetExecuteTriggerFunctions } from './interfaces'; @Service() export class TriggersAndPollers { /** - * Runs the given trigger node so that it can trigger the workflow when the node has data. + * Runs the trigger() implementation for an active trigger or schedule trigger node. */ - async runTrigger( + async runTriggerFunction( workflow: Workflow, node: INode, getTriggerFunctions: IGetExecuteTriggerFunctions, @@ -90,9 +90,9 @@ export class TriggersAndPollers { } /** - * Runs the given poller node so that it can trigger the workflow when the node has data. + * Runs the poll() implementation for a poll trigger node. */ - async runPoll( + async runPollFunction( workflow: Workflow, node: INode, pollFunctions: IPollFunctions, diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index 6ec81bd30ff..453be5fe305 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -1193,7 +1193,7 @@ export class WorkflowExecute { ): Promise { if (mode === 'manual') { // In manual mode start the trigger - const triggerResponse = await Container.get(TriggersAndPollers).runTrigger( + const triggerResponse = await Container.get(TriggersAndPollers).runTriggerFunction( workflow, node, NodeExecuteFunctions.getExecuteTriggerFunctions,