feat(core): Add Execute workflow mcp tool (no-changelog) (#21840)

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
This commit is contained in:
Milorad FIlipović 2025-11-17 12:40:55 +01:00 committed by GitHub
parent 0ab07f0478
commit acdec59a12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1270 additions and 15 deletions

View File

@ -0,0 +1,803 @@
import { mockInstance } from '@n8n/backend-test-utils';
import { User } from '@n8n/db';
import {
CHAT_TRIGGER_NODE_TYPE,
FORM_TRIGGER_NODE_TYPE,
MANUAL_TRIGGER_NODE_TYPE,
WEBHOOK_NODE_TYPE,
type INode,
type IWorkflowExecutionDataProcess,
UnexpectedError,
UserError,
} from 'n8n-workflow';
import { createWorkflow } from './mock.utils';
import { createExecuteWorkflowTool, executeWorkflow } from '../tools/execute-workflow.tool';
import { ActiveExecutions } from '@/active-executions';
import { Telemetry } from '@/telemetry';
import { WorkflowRunner } from '@/workflow-runner';
import { WorkflowFinderService } from '@/workflows/workflow-finder.service';
describe('execute-workflow MCP tool', () => {
const user = Object.assign(new User(), { id: 'user-1' });
let workflowFinderService: WorkflowFinderService;
let activeExecutions: ActiveExecutions;
let workflowRunner: WorkflowRunner;
let telemetry: Telemetry;
beforeEach(() => {
workflowFinderService = mockInstance(WorkflowFinderService);
activeExecutions = mockInstance(ActiveExecutions);
workflowRunner = mockInstance(WorkflowRunner);
telemetry = mockInstance(Telemetry, {
track: jest.fn(),
});
});
describe('smoke tests', () => {
test('it creates tool correctly', () => {
const tool = createExecuteWorkflowTool(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
telemetry,
);
expect(tool.name).toBe('execute_workflow');
expect(tool.config).toBeDefined();
expect(typeof tool.config.description).toBe('string');
expect(tool.config.description).toBe(
'Execute a workflow by ID. Before executing always ensure you know the input schema by first using the get_workflow_details tool and consulting workflow description',
);
expect(tool.config.inputSchema).toBeDefined();
expect(tool.config.outputSchema).toBeDefined();
expect(typeof tool.handler).toBe('function');
});
});
describe('handler tests', () => {
describe('workflow validation', () => {
test('throws error when workflow is not found', async () => {
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(null);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'missing-workflow',
undefined,
),
).rejects.toThrow(UserError);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'missing-workflow',
undefined,
),
).rejects.toThrow('Workflow not found');
});
test('throws error when workflow is archived', async () => {
const workflow = createWorkflow({ isArchived: true });
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'archived-workflow',
undefined,
),
).rejects.toThrow(UserError);
});
test('throws error when workflow is not available in MCP', async () => {
const workflow = createWorkflow({ settings: { availableInMCP: false } });
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'unavailable-workflow',
undefined,
),
).rejects.toThrow(UserError);
});
test('throws error when workflow has unsupported trigger nodes', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'Manual',
type: MANUAL_TRIGGER_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'unsupported-trigger',
undefined,
),
).rejects.toThrow(UserError);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'unsupported-trigger',
undefined,
),
).rejects.toThrow(/Only workflows with the following trigger nodes can be executed/);
});
test('throws error when no supported trigger node is found', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'Webhook',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: true, // disabled node should not be considered
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'disabled-trigger',
undefined,
),
).rejects.toThrow(UserError);
});
});
describe('webhook trigger execution', () => {
test('executes workflow with webhook trigger and webhook data', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-123');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: {
resultData: {
runData: {
WebhookNode: [{ data: { main: [[{ json: { result: 'webhook success' } }]] } }],
},
},
},
});
const result = await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'webhook-workflow',
{
type: 'webhook',
webhookData: {
method: 'POST',
headers: { 'content-type': 'application/json' },
query: { page: '1' },
body: { message: 'test' },
},
},
);
expect(result).toMatchObject({
success: true,
executionId: 'exec-123',
result: expect.objectContaining({
runData: expect.any(Object),
}),
});
// Verify the runner was called with correct pin data
const runCall = (workflowRunner.run as jest.Mock).mock
.calls[0][0] as IWorkflowExecutionDataProcess;
expect(runCall.startNodes).toEqual([{ name: 'WebhookNode', sourceData: null }]);
expect(runCall.pinData).toMatchObject({
WebhookNode: [
{
json: {
headers: { 'content-type': 'application/json' },
query: { page: '1' },
body: { message: 'test' },
},
},
],
});
});
test('executes workflow with webhook trigger and default GET method', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-456');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: { resultData: {} },
});
await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'webhook-workflow',
{
type: 'webhook',
webhookData: {
method: 'GET',
query: { id: '123' },
},
},
);
const runCall = (workflowRunner.run as jest.Mock).mock
.calls[0][0] as IWorkflowExecutionDataProcess;
expect(runCall.pinData).toMatchObject({
WebhookNode: [
{
json: {
headers: {},
query: { id: '123' },
body: {},
},
},
],
});
});
});
describe('chat trigger execution', () => {
test('executes workflow with chat trigger and chat input', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'ChatNode',
type: CHAT_TRIGGER_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-789');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: { resultData: {} },
});
const result = await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'chat-workflow',
{
type: 'chat',
chatInput: 'Hello, how can I help?',
},
);
expect(result).toMatchObject({
success: true,
executionId: 'exec-789',
});
const runCall = (workflowRunner.run as jest.Mock).mock
.calls[0][0] as IWorkflowExecutionDataProcess;
expect(runCall.executionMode).toBe('chat');
expect(runCall.pinData).toMatchObject({
ChatNode: [
{
json: {
sessionId: expect.stringMatching(/^mcp-session-\d+$/),
action: 'sendMessage',
chatInput: 'Hello, how can I help?',
},
},
],
});
});
});
describe('form trigger execution', () => {
test('executes workflow with form trigger and form data', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'FormNode',
type: FORM_TRIGGER_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-101');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: { resultData: {} },
});
const result = await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'form-workflow',
{
type: 'form',
formData: {
name: 'John Doe',
email: 'john@example.com',
age: 30,
},
},
);
expect(result).toMatchObject({
success: true,
executionId: 'exec-101',
});
const runCall = (workflowRunner.run as jest.Mock).mock
.calls[0][0] as IWorkflowExecutionDataProcess;
expect(runCall.executionMode).toBe('trigger');
expect(runCall.pinData).toMatchObject({
FormNode: [
{
json: {
submittedAt: expect.stringMatching(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/),
formMode: 'mcp',
name: 'John Doe',
email: 'john@example.com',
age: 30,
},
},
],
});
});
});
describe('execution results handling', () => {
test('handles successful execution', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-success');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: {
resultData: {
runData: {
WebhookNode: [{ data: { main: [[{ json: { output: 'success' } }]] } }],
},
},
},
});
const result = await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'success-workflow',
undefined,
);
expect(result).toMatchObject({
success: true,
executionId: 'exec-success',
result: expect.objectContaining({
runData: expect.any(Object),
}),
});
});
test('handles execution with error status', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-error');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'error',
data: {
resultData: {
error: {
message: 'Workflow execution failed',
name: 'ExecutionError',
},
},
},
});
const result = await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'error-workflow',
undefined,
);
expect(result).toMatchObject({
success: false,
executionId: 'exec-error',
error: {
message: 'Workflow execution failed',
name: 'ExecutionError',
},
});
});
test('handles execution with result data error', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-data-error');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: {
resultData: {
error: {
message: 'Node execution failed',
name: 'NodeExecutionError',
},
},
},
});
const result = await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'data-error-workflow',
undefined,
);
expect(result).toMatchObject({
success: false,
executionId: 'exec-data-error',
error: {
message: 'Node execution failed',
name: 'NodeExecutionError',
},
});
});
test('handles workflow returning undefined data', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-no-data');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue(undefined);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'no-data-workflow',
undefined,
),
).rejects.toThrow(UnexpectedError);
await expect(
executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'no-data-workflow',
undefined,
),
).rejects.toThrow('Workflow did not return any data');
});
});
describe('workflow with no inputs', () => {
test('executes workflow without any inputs', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-no-inputs');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: { resultData: {} },
});
await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'no-inputs-workflow',
undefined,
);
const runCall = (workflowRunner.run as jest.Mock).mock
.calls[0][0] as IWorkflowExecutionDataProcess;
expect(runCall.pinData).toMatchObject({
WebhookNode: [
{
json: {
headers: {},
query: {},
body: {},
},
},
],
});
});
});
describe('telemetry tracking', () => {
test('tracks successful execution with tool handler', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-telemetry');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: { resultData: {} },
});
const tool = createExecuteWorkflowTool(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
telemetry,
);
// Call through the tool handler to test telemetry
await tool.handler(
{ workflowId: 'telemetry-workflow', inputs: { type: 'chat', chatInput: 'test' } },
{} as any,
);
expect(telemetry.track).toHaveBeenCalledWith(
'User called mcp tool',
expect.objectContaining({
user_id: 'user-1',
tool_name: 'execute_workflow',
parameters: {
workflowId: 'telemetry-workflow',
inputs: { type: 'chat', parameter_count: 1 },
},
results: {
success: true,
data: {
executionId: 'exec-telemetry',
},
},
}),
);
});
test('tracks failed execution with tool handler', async () => {
const error = new UserError('Test error');
(workflowFinderService.findWorkflowForUser as jest.Mock).mockRejectedValue(error);
const tool = createExecuteWorkflowTool(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
telemetry,
);
// Call through the tool handler to test telemetry
await tool.handler({ workflowId: 'error-tracking' }, {} as any);
expect(telemetry.track).toHaveBeenCalledWith(
'User called mcp tool',
expect.objectContaining({
user_id: 'user-1',
tool_name: 'execute_workflow',
parameters: {
workflowId: 'error-tracking',
inputs: undefined,
},
results: {
success: false,
error: 'Test error',
},
}),
);
});
});
describe('multiple trigger nodes', () => {
test('uses first eligible trigger node when multiple are present', async () => {
const workflow = createWorkflow({
nodes: [
{
id: 'node-1',
name: 'Manual',
type: MANUAL_TRIGGER_NODE_TYPE,
typeVersion: 1,
position: [0, 0],
disabled: false,
parameters: {},
} as INode,
{
id: 'node-2',
name: 'WebhookNode',
type: WEBHOOK_NODE_TYPE,
typeVersion: 1,
position: [100, 0],
disabled: false,
parameters: {},
} as INode,
{
id: 'node-3',
name: 'ChatNode',
type: CHAT_TRIGGER_NODE_TYPE,
typeVersion: 1,
position: [200, 0],
disabled: false,
parameters: {},
} as INode,
],
});
(workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow);
(workflowRunner.run as jest.Mock).mockResolvedValue('exec-multi');
(activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({
status: 'success',
data: { resultData: {} },
});
await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
'multi-trigger-workflow',
undefined,
);
const runCall = (workflowRunner.run as jest.Mock).mock
.calls[0][0] as IWorkflowExecutionDataProcess;
// Should use the WebhookNode (first eligible trigger)
expect(runCall.startNodes).toEqual([{ name: 'WebhookNode', sourceData: null }]);
expect(runCall.pinData).toHaveProperty('WebhookNode');
expect(runCall.executionMode).toBe('webhook');
});
});
});
});

View File

@ -0,0 +1,19 @@
import { Time } from '@n8n/constants';
import { UserError } from 'n8n-workflow';
/**
* Error thrown when MCP workflow execution times out
*/
export class McpExecutionTimeoutError extends UserError {
executionId: string | null;
timeoutMs: number;
constructor(executionId: string | null, timeoutMs: number) {
const timeoutSeconds = timeoutMs / Time.milliseconds.toSeconds;
super(`Workflow execution timed out after ${timeoutSeconds} seconds`);
this.name = 'McpExecutionTimeoutError';
this.executionId = executionId;
this.timeoutMs = timeoutMs;
}
}

View File

@ -3,12 +3,15 @@ import { GlobalConfig } from '@n8n/config';
import { User } from '@n8n/db';
import { Service } from '@n8n/di';
import { createExecuteWorkflowTool } from './tools/execute-workflow.tool';
import { createWorkflowDetailsTool } from './tools/get-workflow-details.tool';
import { createSearchWorkflowsTool } from './tools/search-workflows.tool';
import { ActiveExecutions } from '@/active-executions';
import { CredentialsService } from '@/credentials/credentials.service';
import { UrlService } from '@/services/url.service';
import { Telemetry } from '@/telemetry';
import { WorkflowRunner } from '@/workflow-runner';
import { WorkflowFinderService } from '@/workflows/workflow-finder.service';
import { WorkflowService } from '@/workflows/workflow.service';
@ -19,8 +22,10 @@ export class McpService {
private readonly workflowService: WorkflowService,
private readonly urlService: UrlService,
private readonly credentialsService: CredentialsService,
private readonly activeExecutions: ActiveExecutions,
private readonly globalConfig: GlobalConfig,
private readonly telemetry: Telemetry,
private readonly workflowRunner: WorkflowRunner,
) {}
getServer(user: User) {
@ -40,6 +45,19 @@ export class McpService {
workflowSearchTool.handler,
);
const executeWorkflowTool = createExecuteWorkflowTool(
user,
this.workflowFinderService,
this.activeExecutions,
this.workflowRunner,
this.telemetry,
);
server.registerTool(
executeWorkflowTool.name,
executeWorkflowTool.config,
executeWorkflowTool.handler,
);
const workflowDetailsTool = createWorkflowDetailsTool(
user,
this.urlService.getWebhookBaseUrl(),

View File

@ -8,7 +8,7 @@ import { UpdateWorkflowAvailabilityDto } from './dto/update-workflow-availabilit
import { McpServerApiKeyService } from './mcp-api-key.service';
import { SUPPORTED_MCP_TRIGGERS } from './mcp.constants';
import { McpSettingsService } from './mcp.settings.service';
import { isWorkflowEligibleForMCPAccess } from './mcp.utils';
import { findMcpSupportedTrigger } from './mcp.utils';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
@ -83,9 +83,9 @@ export class McpSettingsController {
throw new BadRequestError('MCP access can only be set for active workflows');
}
const isEligible = isWorkflowEligibleForMCPAccess(workflow);
const supportedTrigger = findMcpSupportedTrigger(workflow);
if (!isEligible) {
if (!supportedTrigger) {
throw new BadRequestError(
`MCP access can only be set for active workflows with one of the following trigger nodes: ${Object.values(SUPPORTED_MCP_TRIGGERS).join(', ')}.`,
);

View File

@ -82,6 +82,11 @@ export type UserCalledMCPToolEventPayload = {
};
};
export type ExecuteWorkflowsInputMeta = {
type: 'webhook' | 'chat' | 'schedule' | 'form';
parameter_count: number;
};
type SupportedTriggerNodeTypes = keyof typeof SUPPORTED_MCP_TRIGGERS;
export type MCPTriggersMap = {

View File

@ -1,5 +1,6 @@
import type { AuthenticatedRequest, WorkflowEntity } from '@n8n/db';
import type { Request } from 'express';
import type { INode } from 'n8n-workflow';
import { SUPPORTED_MCP_TRIGGERS } from './mcp.constants';
import { isRecord, isJSONRPCRequest } from './mcp.typeguards';
@ -47,17 +48,14 @@ export const getToolArguments = (body: unknown): Record<string, unknown> => {
};
/**
* Determines if MCP access can be toggled for a given workflow.
* Workflow is eligible if it contains at least one of these (enabled) trigger nodes:
* Finds the first supported trigger node in the workflow.
* Workflow is eligible for MCP access if it contains at least one of these (enabled) trigger nodes:
* - Schedule trigger
* - Webhook trigger
* - Form trigger
* - Chat trigger
* @param workflow
*/
export const isWorkflowEligibleForMCPAccess = (workflow: WorkflowEntity): boolean => {
export const findMcpSupportedTrigger = (workflow: WorkflowEntity): INode | undefined => {
const triggerNodeTypes = Object.keys(SUPPORTED_MCP_TRIGGERS);
return workflow.nodes.some(
(node) => triggerNodeTypes.includes(node.type) && node.disabled !== true,
);
return workflow.nodes.find((node) => triggerNodeTypes.includes(node.type) && !node.disabled);
};

View File

@ -0,0 +1,408 @@
import { Time } from '@n8n/constants';
import type { User } from '@n8n/db';
import moment from 'moment-timezone';
import {
CHAT_TRIGGER_NODE_TYPE,
FORM_TRIGGER_NODE_TYPE,
WEBHOOK_NODE_TYPE,
type INode,
type IPinData,
type IRunExecutionData,
type IWorkflowExecutionDataProcess,
type WorkflowExecuteMode,
UserError,
UnexpectedError,
TimeoutExecutionCancelledError,
ensureError,
jsonStringify,
SCHEDULE_TRIGGER_NODE_TYPE,
} from 'n8n-workflow';
import z from 'zod';
import { SUPPORTED_MCP_TRIGGERS, USER_CALLED_MCP_TOOL_EVENT } from '../mcp.constants';
import { McpExecutionTimeoutError } from '../mcp.errors';
import type {
ExecuteWorkflowsInputMeta,
ToolDefinition,
UserCalledMCPToolEventPayload,
} from '../mcp.types';
import { findMcpSupportedTrigger } from '../mcp.utils';
import type { ActiveExecutions } from '@/active-executions';
import type { Telemetry } from '@/telemetry';
import type { WorkflowRunner } from '@/workflow-runner';
import type { WorkflowFinderService } from '@/workflows/workflow-finder.service';
const WORKFLOW_EXECUTION_TIMEOUT_MS = 5 * Time.minutes.toMilliseconds; // 5 minutes
const inputSchema = z.object({
workflowId: z.string().describe('The ID of the workflow to execute'),
inputs: z
.discriminatedUnion('type', [
z.object({
type: z.literal('chat'),
chatInput: z.string().describe('Input for chat-based workflows'),
}),
z.object({
type: z.literal('form'),
formData: z.record(z.unknown()).describe('Input data for form-based workflows'),
}),
z.object({
type: z.literal('webhook'),
webhookData: z
.object({
method: z
.enum(['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS'])
.optional()
.default('GET')
.describe('HTTP method (defaults to GET)'),
query: z.record(z.string()).optional().describe('Query string parameters'),
body: z
.record(z.unknown())
.optional()
.describe('Request body data (main webhook payload)'),
headers: z
.record(z.string())
.optional()
.describe('HTTP headers (e.g., authorization, content-type)'),
})
.describe('Input data for webhook-based workflows'),
}),
])
.optional()
.describe('Inputs to provide to the workflow.'),
});
type ExecuteWorkflowOutput = {
success: boolean;
executionId: string | null;
result?: IRunExecutionData['resultData'];
error?: unknown;
};
const outputSchema = {
success: z.boolean(),
executionId: z.string().nullable().optional(),
result: z.unknown().optional().describe('Workflow execution result data'),
error: z.unknown().optional(),
} satisfies z.ZodRawShape;
export const createExecuteWorkflowTool = (
user: User,
workflowFinderService: WorkflowFinderService,
activeExecutions: ActiveExecutions,
workflowRunner: WorkflowRunner,
telemetry: Telemetry,
): ToolDefinition<typeof inputSchema.shape> => ({
name: 'execute_workflow',
config: {
description:
'Execute a workflow by ID. Before executing always ensure you know the input schema by first using the get_workflow_details tool and consulting workflow description',
inputSchema: inputSchema.shape,
outputSchema,
annotations: {
title: 'Execute Workflow',
readOnlyHint: false, // Can read and write data via workflows
destructiveHint: true, // Can cause changes in external systems via workflows
idempotentHint: true, // Safe to retry multiple times
openWorldHint: true, // Can access external systems via workflows
},
},
handler: async ({ workflowId, inputs }) => {
const telemetryPayload: UserCalledMCPToolEventPayload = {
user_id: user.id,
tool_name: 'execute_workflow',
parameters: { workflowId, inputs: getInputMetaData(inputs) },
};
try {
const output = await executeWorkflow(
user,
workflowFinderService,
activeExecutions,
workflowRunner,
workflowId,
inputs,
);
telemetryPayload.results = {
success: output.success,
data: {
executionId: output.executionId,
},
};
telemetry.track(USER_CALLED_MCP_TOOL_EVENT, telemetryPayload);
return {
content: [{ type: 'text', text: jsonStringify(output) }],
structuredContent: output,
};
} catch (er) {
const error = ensureError(er);
const isTimeout = error instanceof McpExecutionTimeoutError;
const output: ExecuteWorkflowOutput = {
success: false,
executionId: isTimeout ? error.executionId : null,
error: isTimeout
? `Workflow execution timed out after ${WORKFLOW_EXECUTION_TIMEOUT_MS / Time.milliseconds.toSeconds} seconds`
: error.message,
};
telemetryPayload.results = {
success: false,
error: isTimeout ? 'Workflow execution timed out' : error.message,
};
telemetry.track(USER_CALLED_MCP_TOOL_EVENT, telemetryPayload);
return {
content: [{ type: 'text', text: jsonStringify(output) }],
structuredContent: output,
};
}
},
});
/**
* Executes a workflow for the given user with provided inputs.
* In order to "synchronously" execute the workflow,
* it is mapping mcp tool inputs to trigger node pin data and starting execution from there.
* LIMITATION: Does not properly support workflows with multiple triggers.
*/
export const executeWorkflow = async (
user: User,
workflowFinderService: WorkflowFinderService,
activeExecutions: ActiveExecutions,
workflowRunner: WorkflowRunner,
workflowId: string,
inputs?: z.infer<typeof inputSchema>['inputs'],
): Promise<ExecuteWorkflowOutput> => {
const workflow = await workflowFinderService.findWorkflowForUser(workflowId, user, [
'workflow:execute',
]);
if (!workflow || workflow.isArchived) {
throw new UserError('Workflow not found');
}
if (!workflow.settings?.availableInMCP) {
throw new UserError(
'Workflow is not available for execution via MCP. Enable access in the workflow settings to make it available.',
);
}
const triggerNode = findMcpSupportedTrigger(workflow);
if (!triggerNode) {
throw new UserError(
`Only workflows with the following trigger nodes can be executed: ${Object.values(SUPPORTED_MCP_TRIGGERS).join(', ')}.`,
);
}
const runData: IWorkflowExecutionDataProcess = {
executionMode: getExecutionModeForTrigger(triggerNode),
workflowData: workflow,
userId: user.id,
};
// Set the trigger node as the start node and pin data for it
// This will enable us to run the workflow from the trigger node with the provided inputs without waiting for an actual trigger event
runData.startNodes = [{ name: triggerNode.name, sourceData: null }];
runData.pinData = getPinDataForTrigger(triggerNode, inputs);
runData.executionData = {
startData: {},
resultData: {
pinData: runData.pinData,
runData: {},
},
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack: [
{
node: triggerNode,
data: {
main: [runData.pinData[triggerNode.name]],
},
source: null,
},
],
waitingExecution: {},
waitingExecutionSource: {},
},
};
const executionId = await workflowRunner.run(runData);
// Create a timeout promise
let timeoutId: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
reject(new McpExecutionTimeoutError(executionId, WORKFLOW_EXECUTION_TIMEOUT_MS));
}, WORKFLOW_EXECUTION_TIMEOUT_MS);
});
try {
const data = await Promise.race([
activeExecutions.getPostExecutePromise(executionId),
timeoutPromise,
]);
// Executed successfully before timeout: clear the timeout
clearTimeout(timeoutId);
if (data === undefined) {
throw new UnexpectedError('Workflow did not return any data');
}
return {
success: data.status !== 'error' && !data.data.resultData?.error,
executionId,
result: data.data.resultData,
error: data.data.resultData?.error,
};
} catch (error) {
if (timeoutId) clearTimeout(timeoutId);
// If we hit the timeout, attempt to stop the execution
if (error instanceof McpExecutionTimeoutError) {
try {
const cancellationError = new TimeoutExecutionCancelledError(error.executionId!);
activeExecutions.stopExecution(error.executionId!, cancellationError);
} catch (stopError) {
throw new UnexpectedError(
`Failed to stop timed-out execution [id: ${error.executionId}]: ${ensureError(stopError).message}`,
);
}
}
// Re-throw the error to be handled by the caller
throw error;
}
};
/**
* Gets the execution mode based on the trigger node type.
*/
const getExecutionModeForTrigger = (node: INode): WorkflowExecuteMode => {
switch (node.type) {
case WEBHOOK_NODE_TYPE:
return 'webhook';
case CHAT_TRIGGER_NODE_TYPE:
return 'chat';
case FORM_TRIGGER_NODE_TYPE:
return 'trigger';
default:
return 'trigger';
}
};
/**
* Constructs pin data for the trigger node based on provided inputs.
*/
const getPinDataForTrigger = (
node: INode,
inputs: z.infer<typeof inputSchema>['inputs'],
): IPinData => {
switch (node.type) {
case WEBHOOK_NODE_TYPE: {
// For webhook triggers, provide default empty values if no inputs or wrong type
const webhookData = inputs?.type === 'webhook' ? inputs.webhookData : undefined;
return {
[node.name]: [
{
json: {
headers: webhookData?.headers ?? {},
query: webhookData?.query ?? {},
body: webhookData?.body ?? {},
},
},
],
};
}
case CHAT_TRIGGER_NODE_TYPE:
if (!inputs || inputs.type !== 'chat') return {};
return {
[node.name]: [
{
json: {
sessionId: `mcp-session-${Date.now()}`,
action: 'sendMessage',
chatInput: inputs.chatInput,
},
},
],
};
case FORM_TRIGGER_NODE_TYPE:
if (!inputs || inputs.type !== 'form') return {};
return {
[node.name]: [
{
json: {
submittedAt: new Date().toISOString(),
formMode: 'mcp',
...(inputs.formData ?? {}),
},
},
],
};
case SCHEDULE_TRIGGER_NODE_TYPE: {
// For schedule triggers, we don't map any inputs but we can add expected datetime info
const timezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
const momentTz = moment.tz(timezone);
return {
[node.name]: [
{
json: {
timestamp: momentTz.toISOString(true),
'Readable date': momentTz.format('MMMM Do YYYY, h:mm:ss a'),
'Readable time': momentTz.format('h:mm:ss a'),
'Day of week': momentTz.format('dddd'),
Year: momentTz.format('YYYY'),
Month: momentTz.format('MMMM'),
'Day of month': momentTz.format('DD'),
Hour: momentTz.format('HH'),
Minute: momentTz.format('mm'),
Second: momentTz.format('ss'),
Timezone: `${timezone} (UTC${momentTz.format('Z')})`,
},
},
],
};
}
default:
return {};
}
};
/**
* Reduce inputs to metadata that will be sent to telemetry.
*/
const getInputMetaData = (
inputs: z.infer<typeof inputSchema>['inputs'],
): ExecuteWorkflowsInputMeta | undefined => {
if (!inputs) {
return undefined;
}
switch (inputs.type) {
case 'chat':
return {
type: 'chat',
parameter_count: 1,
};
case 'form':
return {
type: 'form',
parameter_count: Object.keys(inputs.formData ?? {}).length,
};
case 'webhook':
return {
type: 'webhook',
parameter_count: [
inputs.webhookData?.body ? Object.keys(inputs.webhookData.body).length : 0,
inputs.webhookData?.query ? Object.keys(inputs.webhookData.query).length : 0,
inputs.webhookData?.headers ? Object.keys(inputs.webhookData.headers).length : 0,
].reduce((a, b) => a + b, 0),
};
default:
return undefined;
}
};

View File

@ -35,11 +35,15 @@ const connectionString = computed(() => {
{
"mcpServers": {
"n8n-mcp": {
"type": "http",
"url": "${props.serverUrl}",
"headers": {
"Authorization": "Bearer ${apiKeyText.value}"
}
"command": "npx",
"args": [
"-y",
"supergateway",
"--streamableHttp",
"${props.serverUrl}",
"--header",
"authorization:Bearer ${apiKeyText.value}"
]
}
}
}