refactor: Refine trigger naming in core and cli (#31632)

This commit is contained in:
Tomi Turtiainen 2026-06-03 14:30:45 +03:00 committed by GitHub
parent e27c4feaca
commit 163c718a3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 134 additions and 128 deletions

View File

@ -108,9 +108,9 @@ describe('ActiveWorkflowManager', () => {
'should skip inactive workflow in `%s` activation mode',
async (mode) => {
const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(
const addNonWebhookTriggersSpy = jest.spyOn(
activeWorkflowManager,
'addTriggersAndPollers',
'addNonWebhookTriggers',
);
workflowRepository.findById.mockResolvedValue(
mock<WorkflowEntity>({ active: false, activeVersionId: null, activeVersion: null }),
@ -119,7 +119,7 @@ describe('ActiveWorkflowManager', () => {
const added = await activeWorkflowManager.add('some-id', mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
expect(addNonWebhookTriggersSpy).not.toHaveBeenCalled();
expect(added).toEqual({ triggersAndPollers: false, webhooks: false });
},
);
@ -128,9 +128,9 @@ describe('ActiveWorkflowManager', () => {
'should skip archived workflow in `%s` activation mode',
async (mode) => {
const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(
const addNonWebhookTriggersSpy = jest.spyOn(
activeWorkflowManager,
'addTriggersAndPollers',
'addNonWebhookTriggers',
);
workflowRepository.findById.mockResolvedValue(
mock<WorkflowEntity>({
@ -144,7 +144,7 @@ describe('ActiveWorkflowManager', () => {
const added = await activeWorkflowManager.add('archived-id', mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
expect(addNonWebhookTriggersSpy).not.toHaveBeenCalled();
expect(added).toEqual({ triggersAndPollers: false, webhooks: false });
},
);
@ -168,7 +168,7 @@ describe('ActiveWorkflowManager', () => {
});
});
describe('handleAddWebhooksTriggersAndPollers', () => {
describe('handleAddWebhooksAndNonWebhookTriggers', () => {
const push = mock<Push>();
const publisher = mock<Publisher>();
@ -206,7 +206,7 @@ describe('ActiveWorkflowManager', () => {
jest.spyOn(activeWorkflowManager, 'add').mockRejectedValue(activationError);
await activeWorkflowManager.handleAddWebhooksTriggersAndPollers({
await activeWorkflowManager.handleAddWebhooksAndNonWebhookTriggers({
workflowId: 'wf-1',
activeVersionId: 'v1',
activationMode: 'activate',
@ -234,7 +234,7 @@ describe('ActiveWorkflowManager', () => {
test('should not include nodeId in broadcast when error has no node', async () => {
jest.spyOn(activeWorkflowManager, 'add').mockRejectedValue(new Error('Some error'));
await activeWorkflowManager.handleAddWebhooksTriggersAndPollers({
await activeWorkflowManager.handleAddWebhooksAndNonWebhookTriggers({
workflowId: 'wf-1',
activeVersionId: 'v1',
activationMode: 'activate',

View File

@ -653,37 +653,36 @@ export class ActiveWorkflowManager {
}
@OnLeaderTakeover()
async addAllTriggerAndPollerBasedWorkflows() {
async addAllNonWebhookTriggerWorkflows() {
await this.addActiveWorkflows('leadershipChange');
}
@OnLeaderStepdown()
@OnShutdown()
async removeAllTriggerAndPollerBasedWorkflows() {
async removeAllNonWebhookTriggerWorkflows() {
this.removeAllQueuedWorkflowActivations();
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflows.removeAllNonWebhookTriggerWorkflows();
}
/**
* Register a workflow as active.
*
* An activatable workflow may be webhook-, trigger-, or poller-based:
* An activatable workflow may start from:
*
* - A `webhook` is an HTTP-based node that can start a workflow when called
* by a third-party service.
* - A `poller` is an HTTP-based node that can start a workflow when detecting
* a change while regularly checking a third-party service.
* - A `trigger` is any non-HTTP-based node that can start a workflow, e.g. a
* time-based node like Schedule Trigger or a message-queue-based node.
* - A webhook trigger, invoked by an HTTP request.
* - A poll trigger, which regularly checks an external service.
* - An active trigger, which keeps a listener or persistent connection open.
* - A schedule trigger, which registers its own crons.
*
* Note that despite the name, most "trigger" nodes are actually webhook-based
* and so qualify as `webhook`, e.g. Stripe Trigger.
* and so qualify as webhook triggers, e.g. Stripe Trigger.
*
* Triggers and pollers are registered as active in memory at `ActiveWorkflows`,
* but webhooks are registered by being entered in the `webhook_entity` table,
* since webhooks do not require continuous execution.
* Active triggers, poll triggers, and schedule triggers are registered as
* active in memory at `ActiveWorkflows`, but webhook triggers are registered
* by being entered in the `webhook_entity` table, since webhooks do not
* require continuous execution.
*
* Returns whether this operation added webhooks and/or triggers and pollers.
* Returns whether this operation added webhooks and/or non-webhook triggers.
*/
async add(
workflowId: WorkflowId,
@ -724,7 +723,7 @@ export class ActiveWorkflowManager {
let workflow: Workflow;
const shouldAddWebhooks = this.shouldAddWebhooks(activationMode);
const shouldAddTriggersAndPollers = this.shouldAddTriggersAndPollers();
const shouldAddNonWebhookTriggers = this.shouldAddNonWebhookTriggers();
try {
if (['init', 'leadershipChange'].includes(activationMode) && !dbWorkflow.activeVersion) {
@ -767,7 +766,7 @@ export class ActiveWorkflowManager {
if (!validation.isValid) {
throw new WorkflowActivationError(
`Workflow ${formatWorkflow(dbWorkflow)} has no node to start the workflow - at least one trigger, poller or webhook node is required`,
`Workflow ${formatWorkflow(dbWorkflow)} has no node to start the workflow - at least one active trigger, poll trigger, webhook trigger, or schedule trigger node is required`,
{ level: 'warning' },
);
}
@ -789,8 +788,8 @@ export class ActiveWorkflowManager {
);
}
// When the flag is on, trigger/poller emit callbacks re-read the published
// version from the DB so they pick up updates without deactivate/reactivate.
// When the flag is on, non-webhook trigger emit callbacks re-read the
// published version from the DB so they pick up updates without deactivate/reactivate.
// When the flag is off, they use the in-memory workflowData (same as before).
//
// Note: we intentionally load the latest published version when the trigger
@ -802,8 +801,8 @@ export class ActiveWorkflowManager {
? async () => await this.loadPublishedWorkflowData(dbWorkflow)
: async () => dbWorkflow as IWorkflowBase;
if (shouldAddTriggersAndPollers) {
added.triggersAndPollers = await this.addTriggersAndPollers(dbWorkflow, workflow, {
if (shouldAddNonWebhookTriggers) {
added.triggersAndPollers = await this.addNonWebhookTriggers(dbWorkflow, workflow, {
activationMode,
executionMode: 'trigger',
additionalData,
@ -871,7 +870,7 @@ export class ActiveWorkflowManager {
instanceType: 'main',
instanceRole: 'leader',
})
async handleAddWebhooksTriggersAndPollers({
async handleAddWebhooksAndNonWebhookTriggers({
workflowId,
activeVersionId,
activationMode,
@ -1082,17 +1081,16 @@ export class ActiveWorkflowManager {
this.removeQueuedWorkflowActivation(workflowId);
}
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
await this.removeWorkflowTriggersAndPollers(workflowId);
// If it is active in memory, it is a non-webhook trigger workflow.
await this.removeNonWebhookTriggers(workflowId);
}
@OnPubSubEvent('remove-triggers-and-pollers', { instanceType: 'main', instanceRole: 'leader' })
async handleRemoveTriggersAndPollers({
async handleRemoveNonWebhookTriggers({
workflowId,
}: PubSubCommandMap['remove-triggers-and-pollers']) {
await this.removeActivationError(workflowId);
await this.removeWorkflowTriggersAndPollers(workflowId);
await this.removeNonWebhookTriggers(workflowId);
this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } });
@ -1104,24 +1102,24 @@ export class ActiveWorkflowManager {
}
/**
* Stop running active triggers and pollers for a workflow.
* Stop running active, poll, and schedule triggers for a workflow.
*/
async removeWorkflowTriggersAndPollers(workflowId: WorkflowId) {
async removeNonWebhookTriggers(workflowId: WorkflowId) {
if (!this.activeWorkflows.isActive(workflowId)) return;
const wasRemoved = await this.activeWorkflows.remove(workflowId);
if (wasRemoved) {
this.logger.debug(`Removed triggers and pollers for workflow "${workflowId}"`, {
this.logger.debug(`Removed non-webhook triggers for workflow "${workflowId}"`, {
workflowId,
});
}
}
/**
* Register as active in memory a trigger- or poller-based workflow.
* Register a workflow's active, poll, and schedule triggers in memory.
*/
async addTriggersAndPollers(
async addNonWebhookTriggers(
dbWorkflow: WorkflowEntity,
workflow: Workflow,
{
@ -1166,7 +1164,7 @@ export class ActiveWorkflowManager {
getPollFunctions,
);
this.logger.debug(`Added triggers and pollers for workflow ${formatWorkflow(dbWorkflow)}`);
this.logger.debug(`Added non-webhook triggers for workflow ${formatWorkflow(dbWorkflow)}`);
return true;
}
@ -1189,12 +1187,12 @@ export class ActiveWorkflowManager {
}
/**
* Whether this instance may add triggers and pollers to memory.
* Whether this instance may add active, poll, and schedule triggers to memory.
*
* In both single- and multi-main setup, only the leader is allowed to manage
* triggers and pollers in memory, to ensure they are not duplicated.
* non-webhook triggers in memory, to ensure they are not duplicated.
*/
shouldAddTriggersAndPollers() {
shouldAddNonWebhookTriggers() {
return this.instanceSettings.isLeader;
}
}

View File

@ -100,7 +100,7 @@ export class Start extends BaseCommand<z.infer<typeof flagsSchema>> {
await this.externalHooks?.run('n8n.stop');
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.removeAllNonWebhookTriggerWorkflows();
if (this.instanceSettings.isMultiMain) {
await Container.get(MultiMainSetup).shutdown();

View File

@ -32,7 +32,7 @@ export class DebugController {
isLeader: this.instanceSettings.isLeader,
activeWorkflows: {
webhooks, // webhook-based active workflows
triggersAndPollers, // poller- and trigger-based active workflows
triggersAndPollers, // non-webhook trigger active workflows
},
activationErrors,
};

View File

@ -265,7 +265,7 @@ export class WorkflowValidationService {
connections: IConnections,
nodeTypes: NodeTypes,
): WorkflowValidationResult {
// Validate trigger nodes
// Validate workflow entry points: active, poll, webhook, or schedule triggers.
const triggerValidation = validateWorkflowHasTriggerLikeNode(nodes, nodeTypes, STARTING_NODES);
if (!triggerValidation.isValid) {
@ -273,7 +273,7 @@ export class WorkflowValidationService {
isValid: false,
error:
triggerValidation.error ??
'Workflow cannot be activated because it has no trigger node. At least one trigger, webhook, or polling node is required.',
'Workflow cannot be activated because it has no trigger node. At least one active trigger, poll trigger, webhook trigger, or schedule trigger node is required.',
};
}

View File

@ -145,21 +145,21 @@ describe('init()', () => {
describe('add()', () => {
describe('in single-main mode', () => {
test.each<WorkflowActivateMode>(['activate', 'update'])(
"should add webhooks, triggers and pollers for workflow in '%s' activation mode",
"should add webhooks and non-webhook triggers for workflow in '%s' activation mode",
async (mode) => {
await activeWorkflowManager.init();
const dbWorkflow = await createActiveWorkflow();
const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowManager, 'addTriggersAndPollers');
const addNonWebhookTriggersSpy = jest.spyOn(activeWorkflowManager, 'addNonWebhookTriggers');
await activeWorkflowManager.add(dbWorkflow.id, mode);
const [argWorkflow] = addWebhooksSpy.mock.calls[0];
const [_, _argWorkflow] = addTriggersAndPollersSpy.mock.calls[0];
const [_, _argWorkflow] = addNonWebhookTriggersSpy.mock.calls[0];
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
expect(addNonWebhookTriggersSpy).toHaveBeenCalledTimes(1);
if (!(argWorkflow instanceof Workflow)) fail();
if (!(_argWorkflow instanceof Workflow)) fail();
@ -262,17 +262,17 @@ describe('remove()', () => {
expect(webhookService.deleteWebhook).toHaveBeenCalledTimes(1);
});
it('should stop running triggers and pollers', async () => {
it('should stop running non-webhook triggers', async () => {
const dbWorkflow = await createActiveWorkflow();
const removeTriggersAndPollersSpy = jest.spyOn(
const removeNonWebhookTriggersSpy = jest.spyOn(
activeWorkflowManager,
'removeWorkflowTriggersAndPollers',
'removeNonWebhookTriggers',
);
await activeWorkflowManager.init();
await activeWorkflowManager.remove(dbWorkflow.id);
expect(removeTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
expect(removeNonWebhookTriggersSpy).toHaveBeenCalledTimes(1);
});
});
});

View File

@ -87,13 +87,13 @@ describe('ActiveWorkflows', () => {
pollFunctions.getNodeParameter.calledWith('pollTimes').mockReturnValue(pollTimes);
if (triggerError) {
triggersAndPollers.runTrigger.mockRejectedValueOnce(triggerError);
triggersAndPollers.runTriggerFunction.mockRejectedValueOnce(triggerError);
} else {
triggersAndPollers.runTrigger.mockResolvedValue(triggerResponse);
triggersAndPollers.runTriggerFunction.mockResolvedValue(triggerResponse);
}
if (pollError) {
triggersAndPollers.runPoll.mockRejectedValueOnce(pollError);
triggersAndPollers.runPollFunction.mockRejectedValueOnce(pollError);
} else {
getPollFunctions.mockReturnValue(pollFunctions);
}
@ -111,12 +111,12 @@ describe('ActiveWorkflows', () => {
describe('add()', () => {
describe('should activate workflow', () => {
it('with trigger nodes', async () => {
it('with trigger function nodes', async () => {
await addWorkflow({ triggerNodes: [triggerNode] });
expect(activeWorkflows.isActive(workflowId)).toBe(true);
expect(workflow.getTriggerNodes).toHaveBeenCalled();
expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith(
expect(triggersAndPollers.runTriggerFunction).toHaveBeenCalledWith(
workflow,
triggerNode,
getTriggerFunctions,
@ -126,7 +126,7 @@ describe('ActiveWorkflows', () => {
);
});
it('with polling nodes', async () => {
it('with poll trigger nodes', async () => {
await addWorkflow({ pollNodes: [pollNode] });
expect(activeWorkflows.isActive(workflowId)).toBe(true);
@ -134,13 +134,13 @@ describe('ActiveWorkflows', () => {
expect(scheduledTaskManager.registerCron).toHaveBeenCalled();
});
it('with both trigger and polling nodes', async () => {
it('with both trigger function and poll trigger nodes', async () => {
await addWorkflow({ triggerNodes: [triggerNode], pollNodes: [pollNode] });
expect(activeWorkflows.isActive(workflowId)).toBe(true);
expect(workflow.getTriggerNodes).toHaveBeenCalled();
expect(workflow.getPollNodes).toHaveBeenCalled();
expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith(
expect(triggersAndPollers.runTriggerFunction).toHaveBeenCalledWith(
workflow,
triggerNode,
getTriggerFunctions,
@ -149,7 +149,11 @@ describe('ActiveWorkflows', () => {
activation,
);
expect(scheduledTaskManager.registerCron).toHaveBeenCalled();
expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions);
expect(triggersAndPollers.runPollFunction).toHaveBeenCalledWith(
workflow,
pollNode,
pollFunctions,
);
});
});
@ -162,8 +166,8 @@ describe('ActiveWorkflows', () => {
expect(activeWorkflows.isActive(workflowId)).toBe(false);
});
it('if polling activation fails', async () => {
const error = new Error('Failed to activate polling');
it('if poll trigger activation fails', async () => {
const error = new Error('Failed to activate poll trigger');
await expect(addWorkflow({ pollNodes: [pollNode], pollError: error })).rejects.toThrow(
WorkflowActivationError,
);
@ -211,14 +215,14 @@ describe('ActiveWorkflows', () => {
// the activation acquire/release window, so the expression bridge fails
// with "No bridge acquired for this context" on every tick.
it('should acquire and release the isolate when the scheduled poll fires', async () => {
triggersAndPollers.runPoll.mockResolvedValueOnce(null); // initial activation test poll
triggersAndPollers.runPoll.mockResolvedValueOnce(null); // scheduled poll
triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // initial activation test poll
triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // scheduled poll
await addWorkflow({ pollNodes: [pollNode] });
acquireIsolate.mockClear();
releaseIsolate.mockClear();
triggersAndPollers.runPoll.mockClear();
triggersAndPollers.runPollFunction.mockClear();
const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0];
const executeScheduledPoll = registerCronCall[1] as () => Promise<void>;
@ -228,10 +232,10 @@ describe('ActiveWorkflows', () => {
expect(acquireIsolate).toHaveBeenCalledTimes(1);
expect(releaseIsolate).toHaveBeenCalledTimes(1);
expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(1);
expect(triggersAndPollers.runPollFunction).toHaveBeenCalledTimes(1);
const [acquireOrder] = acquireIsolate.mock.invocationCallOrder;
const [runPollOrder] = triggersAndPollers.runPoll.mock.invocationCallOrder;
const [runPollOrder] = triggersAndPollers.runPollFunction.mock.invocationCallOrder;
const [releaseOrder] = releaseIsolate.mock.invocationCallOrder;
expect(acquireOrder).toBeLessThan(runPollOrder);
@ -242,19 +246,19 @@ describe('ActiveWorkflows', () => {
// The outer ActiveWorkflowManager.add() acquire covers the test poll
// and the subsequent countTriggers call. Nested acquire/release would
// release the outer's bridge early and break countTriggers.
triggersAndPollers.runPoll.mockResolvedValueOnce(null);
triggersAndPollers.runPollFunction.mockResolvedValueOnce(null);
await addWorkflow({ pollNodes: [pollNode] });
expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(1);
expect(triggersAndPollers.runPollFunction).toHaveBeenCalledTimes(1);
expect(acquireIsolate).not.toHaveBeenCalled();
expect(releaseIsolate).not.toHaveBeenCalled();
});
it('should release the isolate when __emit throws after a successful poll', async () => {
const pollData = [[{ json: { foo: 'bar' } }]];
triggersAndPollers.runPoll.mockResolvedValueOnce(null); // initial activation test poll
triggersAndPollers.runPoll.mockResolvedValueOnce(pollData); // scheduled poll returns data
triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // initial activation test poll
triggersAndPollers.runPollFunction.mockResolvedValueOnce(pollData); // scheduled poll returns data
const emitError = new Error('emit failed');
pollFunctions.__emit.mockImplementationOnce(() => {
@ -281,7 +285,7 @@ describe('ActiveWorkflows', () => {
// Without this routing, the rejection would escape the cron callback
// `() => void executeTrigger()` and become an unhandled rejection — the
// user would only see a process-level log line, not an error execution.
triggersAndPollers.runPoll.mockResolvedValueOnce(null); // initial activation test poll
triggersAndPollers.runPollFunction.mockResolvedValueOnce(null); // initial activation test poll
await addWorkflow({ pollNodes: [pollNode] });
@ -289,7 +293,7 @@ describe('ActiveWorkflows', () => {
acquireIsolate.mockClear();
releaseIsolate.mockClear();
acquireIsolate.mockRejectedValueOnce(acquireError);
triggersAndPollers.runPoll.mockClear();
triggersAndPollers.runPollFunction.mockClear();
const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0];
const executeScheduledPoll = registerCronCall[1] as () => Promise<void>;
@ -298,13 +302,13 @@ describe('ActiveWorkflows', () => {
await flushPromises();
expect(acquireIsolate).toHaveBeenCalledTimes(1);
expect(triggersAndPollers.runPoll).not.toHaveBeenCalled();
expect(triggersAndPollers.runPollFunction).not.toHaveBeenCalled();
expect(pollFunctions.__emitError).toHaveBeenCalledWith(acquireError);
});
it('should release the isolate even when the scheduled poll throws', async () => {
const error = new Error('Poll function failed');
triggersAndPollers.runPoll
triggersAndPollers.runPollFunction
.mockResolvedValueOnce(null) // initial activation test poll
.mockRejectedValueOnce(error); // scheduled poll fails
@ -333,14 +337,18 @@ describe('ActiveWorkflows', () => {
WorkflowActivationError,
);
expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions);
expect(triggersAndPollers.runPollFunction).toHaveBeenCalledWith(
workflow,
pollNode,
pollFunctions,
);
expect(pollFunctions.__emit).not.toHaveBeenCalled();
expect(pollFunctions.__emitError).not.toHaveBeenCalled();
});
it('should emit error when poll fails during regular polling', async () => {
const error = new Error('Poll function failed');
triggersAndPollers.runPoll
triggersAndPollers.runPollFunction
.mockResolvedValueOnce(null) // Succeed on first call (testing)
.mockRejectedValueOnce(error); // Fail on second call (regular polling)
@ -354,7 +362,7 @@ describe('ActiveWorkflows', () => {
await executeTrigger();
await flushPromises();
expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(2);
expect(triggersAndPollers.runPollFunction).toHaveBeenCalledTimes(2);
expect(pollFunctions.__emit).not.toHaveBeenCalled();
expect(pollFunctions.__emitError).toHaveBeenCalledWith(error);
});
@ -436,11 +444,11 @@ describe('ActiveWorkflows', () => {
});
});
describe('removeAllTriggerAndPollerBasedWorkflows()', () => {
describe('removeAllNonWebhookTriggerWorkflows()', () => {
it('should remove all active workflows', async () => {
await addWorkflow({ triggerNodes: [triggerNode] });
await activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
await activeWorkflows.removeAllNonWebhookTriggerWorkflows();
expect(activeWorkflows.allActiveWorkflows()).toEqual([]);
expect(scheduledTaskManager.deregisterCrons).toHaveBeenCalledWith(workflowId);

View File

@ -32,14 +32,14 @@ describe('TriggersAndPollers', () => {
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
});
describe('runTrigger()', () => {
describe('runTriggerFunction()', () => {
const triggerFunctions = mock<ITriggerFunctions>();
const getTriggerFunctions = vi.fn().mockReturnValue(triggerFunctions);
const triggerFn = vi.fn();
const mockEmitData: INodeExecutionData[][] = [[{ json: { data: 'test' } }]];
const runTriggerHelper = async (mode: 'manual' | 'trigger' = 'trigger') =>
await triggersAndPollers.runTrigger(
await triggersAndPollers.runTriggerFunction(
workflow,
node,
getTriggerFunctions,
@ -112,12 +112,12 @@ describe('TriggersAndPollers', () => {
});
});
describe('runPoll()', () => {
describe('runPollFunction()', () => {
const pollFunctions = mock<IPollFunctions>();
const pollFn = vi.fn();
const runPollHelper = async () =>
await triggersAndPollers.runPoll(workflow, node, pollFunctions);
await triggersAndPollers.runPollFunction(workflow, node, pollFunctions);
it('should throw error if node type does not have poll function', async () => {
await expect(runPollHelper()).rejects.toThrow(ApplicationError);

View File

@ -106,7 +106,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
// Setup Container mock for different dependencies
const mockTriggersAndPollersInstance = {
runTrigger: vi.fn(),
runTriggerFunction: vi.fn(),
};
const mockGlobalConfigInstance = {
sentry: { backendDsn: '' },
@ -935,7 +935,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
mockNodeType.webhook = undefined;
const mockTriggersAndPollersInstance = {
runTrigger: vi.fn().mockResolvedValue(mockTriggerResponse),
runTriggerFunction: vi.fn().mockResolvedValue(mockTriggerResponse),
};
const mockGlobalConfigInstance = {
sentry: { backendDsn: '' },
@ -959,7 +959,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
'manual',
);
expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled();
expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled();
expect(result).toEqual({ data: mockTriggerData });
});
@ -970,7 +970,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
mockNodeType.webhook = undefined;
const mockTriggersAndPollersInstance = {
runTrigger: vi.fn().mockResolvedValue(undefined), // Return undefined to trigger line 1277
runTriggerFunction: vi.fn().mockResolvedValue(undefined), // Return undefined to trigger line 1277
};
const mockGlobalConfigInstance = {
sentry: { backendDsn: '' },
@ -994,7 +994,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
'manual',
);
expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled();
expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled();
expect(result).toEqual({ data: null });
});
@ -1011,7 +1011,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
mockNodeType.webhook = undefined;
const mockTriggersAndPollersInstance = {
runTrigger: vi.fn().mockResolvedValue(mockTriggerResponse),
runTriggerFunction: vi.fn().mockResolvedValue(mockTriggerResponse),
};
const mockGlobalConfigInstance = {
sentry: { backendDsn: '' },
@ -1035,7 +1035,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
'manual',
);
expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled();
expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled();
expect(result).toEqual({ data: null, closeFunction: mockCloseFunction });
});
@ -1055,7 +1055,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
mockNodeType.webhook = undefined;
const mockTriggersAndPollersInstance = {
runTrigger: vi.fn().mockResolvedValue(mockTriggerResponse),
runTriggerFunction: vi.fn().mockResolvedValue(mockTriggerResponse),
};
const mockGlobalConfigInstance = {
sentry: { backendDsn: '' },
@ -1079,7 +1079,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
'manual',
);
expect(mockTriggersAndPollersInstance.runTrigger).toHaveBeenCalled();
expect(mockTriggersAndPollersInstance.runTriggerFunction).toHaveBeenCalled();
expect(mockManualTriggerFunction).toHaveBeenCalled(); // Verify line 1294 was executed
expect(result).toEqual({ data: mockTriggerData, closeFunction: mockCloseFunction });
});
@ -1552,7 +1552,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
mockNodeType.webhook = undefined;
const mockTriggersAndPollersInstance = {
runTrigger: vi.fn().mockResolvedValue({
runTriggerFunction: vi.fn().mockResolvedValue({
manualTriggerResponse: Promise.resolve([[{ json: { triggered: 'data' } }]]),
}),
};

View File

@ -76,13 +76,13 @@ export class ActiveWorkflows {
getTriggerFunctions: IGetExecuteTriggerFunctions,
getPollFunctions: IGetExecutePollFunctions,
) {
const triggerNodes = workflow.getTriggerNodes();
const triggerFunctionNodes = workflow.getTriggerNodes();
const triggerResponses: ITriggerResponse[] = [];
for (const triggerNode of triggerNodes) {
for (const triggerNode of triggerFunctionNodes) {
try {
const triggerResponse = await this.triggersAndPollers.runTrigger(
const triggerResponse = await this.triggersAndPollers.runTriggerFunction(
workflow,
triggerNode,
getTriggerFunctions,
@ -105,13 +105,13 @@ export class ActiveWorkflows {
this.activeWorkflows[workflowId] = { triggerResponses };
const pollingNodes = workflow.getPollNodes();
const pollTriggerNodes = workflow.getPollNodes();
if (pollingNodes.length === 0) return;
if (pollTriggerNodes.length === 0) return;
for (const pollNode of pollingNodes) {
for (const pollNode of pollTriggerNodes) {
try {
await this.activatePolling(
await this.activatePollTrigger(
pollNode,
workflow,
additionalData,
@ -120,7 +120,8 @@ export class ActiveWorkflows {
activation,
);
} catch (e) {
// Do not mark this workflow as active if there are no triggerResponses, and any polling activation failed
// Do not mark this workflow as active if there are no active or schedule
// trigger responses, and any poll trigger activation failed.
if (triggerResponses.length === 0) {
delete this.activeWorkflows[workflowId];
}
@ -136,9 +137,9 @@ export class ActiveWorkflows {
}
/**
* Activates polling for the given node
* Activates the given poll trigger node.
*/
private async activatePolling(
private async activatePollTrigger(
node: INode,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
@ -154,11 +155,10 @@ export class ActiveWorkflows {
// Get all the trigger times
const cronExpressions = (pollTimes.item || []).map(toCronExpression);
// The trigger function to execute when the cron-time got reached
const executeTrigger = this.createPollExecuteFn(workflow, node, pollFunctions);
const executePollTrigger = this.createPollTriggerExecuteFn(workflow, node, pollFunctions);
// Execute the trigger directly to be able to know if it works
await executeTrigger(true);
// Execute the poll trigger directly to be able to know if it works.
await executePollTrigger(true);
for (const expression of cronExpressions) {
const fields = expression.split(' ');
@ -177,7 +177,7 @@ export class ActiveWorkflows {
};
this.scheduledTaskManager.registerCron(ctx, () => {
void executeTrigger();
void executePollTrigger();
});
}
}
@ -203,7 +203,7 @@ export class ActiveWorkflows {
return true;
}
async removeAllTriggerAndPollerBasedWorkflows() {
async removeAllNonWebhookTriggerWorkflows() {
const activeWorkflowIds = Object.keys(this.activeWorkflows);
if (activeWorkflowIds.length === 0) return;
@ -212,7 +212,7 @@ export class ActiveWorkflows {
await this.remove(workflowId);
}
this.logger.debug('Deactivated all trigger- and poller-based workflows', {
this.logger.debug('Deactivated all non-webhook trigger workflows', {
workflowIds: activeWorkflowIds,
});
}
@ -241,10 +241,10 @@ export class ActiveWorkflows {
}
/**
* Creates a function that executes the poll function for a given workflow
* and node and triggers a workflow execution based on the output.
* Creates a function that executes the poll() implementation for a poll
* trigger node and triggers a workflow execution based on the output.
*/
private createPollExecuteFn(
private createPollTriggerExecuteFn(
workflow: Workflow,
node: INode,
pollFunctions: IPollFunctions,
@ -260,7 +260,7 @@ export class ActiveWorkflows {
},
},
async (span) => {
this.logger.debug(`Polling trigger initiated for workflow "${workflow.name}"`, {
this.logger.debug(`Poll trigger initiated for workflow "${workflow.name}"`, {
workflowName: workflow.name,
workflowId: workflow.id,
});
@ -276,7 +276,7 @@ export class ActiveWorkflows {
try {
if (ownsIsolate) await workflow.expression.acquireIsolate();
const pollResponse = await this.triggersAndPollers.runPoll(
const pollResponse = await this.triggersAndPollers.runPollFunction(
workflow,
node,
pollFunctions,
@ -289,7 +289,7 @@ export class ActiveWorkflows {
span.setStatus({ code: SpanStatus.ok });
} catch (error) {
span.setStatus({ code: SpanStatus.error });
// If the poll function fails in the first activation
// If the poll trigger fails in the first activation
// throw the error back so we let the user know there is
// an issue with the trigger.
if (testingTrigger) {

View File

@ -21,9 +21,9 @@ import type { IGetExecuteTriggerFunctions } from './interfaces';
@Service()
export class TriggersAndPollers {
/**
* Runs the given trigger node so that it can trigger the workflow when the node has data.
* Runs the trigger() implementation for an active trigger or schedule trigger node.
*/
async runTrigger(
async runTriggerFunction(
workflow: Workflow,
node: INode,
getTriggerFunctions: IGetExecuteTriggerFunctions,
@ -90,9 +90,9 @@ export class TriggersAndPollers {
}
/**
* Runs the given poller node so that it can trigger the workflow when the node has data.
* Runs the poll() implementation for a poll trigger node.
*/
async runPoll(
async runPollFunction(
workflow: Workflow,
node: INode,
pollFunctions: IPollFunctions,

View File

@ -1193,7 +1193,7 @@ export class WorkflowExecute {
): Promise<IRunNodeResponse> {
if (mode === 'manual') {
// In manual mode start the trigger
const triggerResponse = await Container.get(TriggersAndPollers).runTrigger(
const triggerResponse = await Container.get(TriggersAndPollers).runTriggerFunction(
workflow,
node,
NodeExecuteFunctions.getExecuteTriggerFunctions,