From acdec59a123ed38717a30e4eca7ff8275f650149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milorad=20FIlipovi=C4=87?= Date: Mon, 17 Nov 2025 12:40:55 +0100 Subject: [PATCH] feat(core): Add Execute workflow mcp tool (no-changelog) (#21840) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- .../__tests__/execute-workflow.tool.test.ts | 803 ++++++++++++++++++ packages/cli/src/modules/mcp/mcp.errors.ts | 19 + packages/cli/src/modules/mcp/mcp.service.ts | 18 + .../modules/mcp/mcp.settings.controller.ts | 6 +- packages/cli/src/modules/mcp/mcp.types.ts | 5 + packages/cli/src/modules/mcp/mcp.utils.ts | 12 +- .../mcp/tools/execute-workflow.tool.ts | 408 +++++++++ .../AccessTokenConnectionInstructions.vue | 14 +- 8 files changed, 1270 insertions(+), 15 deletions(-) create mode 100644 packages/cli/src/modules/mcp/__tests__/execute-workflow.tool.test.ts create mode 100644 packages/cli/src/modules/mcp/mcp.errors.ts create mode 100644 packages/cli/src/modules/mcp/tools/execute-workflow.tool.ts diff --git a/packages/cli/src/modules/mcp/__tests__/execute-workflow.tool.test.ts b/packages/cli/src/modules/mcp/__tests__/execute-workflow.tool.test.ts new file mode 100644 index 00000000000..466866d9f5c --- /dev/null +++ b/packages/cli/src/modules/mcp/__tests__/execute-workflow.tool.test.ts @@ -0,0 +1,803 @@ +import { mockInstance } from '@n8n/backend-test-utils'; +import { User } from '@n8n/db'; +import { + CHAT_TRIGGER_NODE_TYPE, + FORM_TRIGGER_NODE_TYPE, + MANUAL_TRIGGER_NODE_TYPE, + WEBHOOK_NODE_TYPE, + type INode, + type IWorkflowExecutionDataProcess, + UnexpectedError, + UserError, +} from 'n8n-workflow'; + +import { createWorkflow } from './mock.utils'; +import { createExecuteWorkflowTool, executeWorkflow } from '../tools/execute-workflow.tool'; + +import { ActiveExecutions } from '@/active-executions'; +import { Telemetry } from '@/telemetry'; +import { WorkflowRunner } from '@/workflow-runner'; +import { WorkflowFinderService } from '@/workflows/workflow-finder.service'; + +describe('execute-workflow MCP tool', () => { + const user = Object.assign(new User(), { id: 'user-1' }); + let workflowFinderService: WorkflowFinderService; + let activeExecutions: ActiveExecutions; + let workflowRunner: WorkflowRunner; + let telemetry: Telemetry; + + beforeEach(() => { + workflowFinderService = mockInstance(WorkflowFinderService); + activeExecutions = mockInstance(ActiveExecutions); + workflowRunner = mockInstance(WorkflowRunner); + telemetry = mockInstance(Telemetry, { + track: jest.fn(), + }); + }); + + describe('smoke tests', () => { + test('it creates tool correctly', () => { + const tool = createExecuteWorkflowTool( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + telemetry, + ); + + expect(tool.name).toBe('execute_workflow'); + expect(tool.config).toBeDefined(); + expect(typeof tool.config.description).toBe('string'); + expect(tool.config.description).toBe( + 'Execute a workflow by ID. Before executing always ensure you know the input schema by first using the get_workflow_details tool and consulting workflow description', + ); + expect(tool.config.inputSchema).toBeDefined(); + expect(tool.config.outputSchema).toBeDefined(); + expect(typeof tool.handler).toBe('function'); + }); + }); + + describe('handler tests', () => { + describe('workflow validation', () => { + test('throws error when workflow is not found', async () => { + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(null); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'missing-workflow', + undefined, + ), + ).rejects.toThrow(UserError); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'missing-workflow', + undefined, + ), + ).rejects.toThrow('Workflow not found'); + }); + + test('throws error when workflow is archived', async () => { + const workflow = createWorkflow({ isArchived: true }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'archived-workflow', + undefined, + ), + ).rejects.toThrow(UserError); + }); + + test('throws error when workflow is not available in MCP', async () => { + const workflow = createWorkflow({ settings: { availableInMCP: false } }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'unavailable-workflow', + undefined, + ), + ).rejects.toThrow(UserError); + }); + + test('throws error when workflow has unsupported trigger nodes', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'Manual', + type: MANUAL_TRIGGER_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'unsupported-trigger', + undefined, + ), + ).rejects.toThrow(UserError); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'unsupported-trigger', + undefined, + ), + ).rejects.toThrow(/Only workflows with the following trigger nodes can be executed/); + }); + + test('throws error when no supported trigger node is found', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'Webhook', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: true, // disabled node should not be considered + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'disabled-trigger', + undefined, + ), + ).rejects.toThrow(UserError); + }); + }); + + describe('webhook trigger execution', () => { + test('executes workflow with webhook trigger and webhook data', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-123'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { + resultData: { + runData: { + WebhookNode: [{ data: { main: [[{ json: { result: 'webhook success' } }]] } }], + }, + }, + }, + }); + + const result = await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'webhook-workflow', + { + type: 'webhook', + webhookData: { + method: 'POST', + headers: { 'content-type': 'application/json' }, + query: { page: '1' }, + body: { message: 'test' }, + }, + }, + ); + + expect(result).toMatchObject({ + success: true, + executionId: 'exec-123', + result: expect.objectContaining({ + runData: expect.any(Object), + }), + }); + + // Verify the runner was called with correct pin data + const runCall = (workflowRunner.run as jest.Mock).mock + .calls[0][0] as IWorkflowExecutionDataProcess; + expect(runCall.startNodes).toEqual([{ name: 'WebhookNode', sourceData: null }]); + expect(runCall.pinData).toMatchObject({ + WebhookNode: [ + { + json: { + headers: { 'content-type': 'application/json' }, + query: { page: '1' }, + body: { message: 'test' }, + }, + }, + ], + }); + }); + + test('executes workflow with webhook trigger and default GET method', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-456'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { resultData: {} }, + }); + + await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'webhook-workflow', + { + type: 'webhook', + webhookData: { + method: 'GET', + query: { id: '123' }, + }, + }, + ); + + const runCall = (workflowRunner.run as jest.Mock).mock + .calls[0][0] as IWorkflowExecutionDataProcess; + expect(runCall.pinData).toMatchObject({ + WebhookNode: [ + { + json: { + headers: {}, + query: { id: '123' }, + body: {}, + }, + }, + ], + }); + }); + }); + + describe('chat trigger execution', () => { + test('executes workflow with chat trigger and chat input', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'ChatNode', + type: CHAT_TRIGGER_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-789'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { resultData: {} }, + }); + + const result = await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'chat-workflow', + { + type: 'chat', + chatInput: 'Hello, how can I help?', + }, + ); + + expect(result).toMatchObject({ + success: true, + executionId: 'exec-789', + }); + + const runCall = (workflowRunner.run as jest.Mock).mock + .calls[0][0] as IWorkflowExecutionDataProcess; + expect(runCall.executionMode).toBe('chat'); + expect(runCall.pinData).toMatchObject({ + ChatNode: [ + { + json: { + sessionId: expect.stringMatching(/^mcp-session-\d+$/), + action: 'sendMessage', + chatInput: 'Hello, how can I help?', + }, + }, + ], + }); + }); + }); + + describe('form trigger execution', () => { + test('executes workflow with form trigger and form data', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'FormNode', + type: FORM_TRIGGER_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-101'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { resultData: {} }, + }); + + const result = await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'form-workflow', + { + type: 'form', + formData: { + name: 'John Doe', + email: 'john@example.com', + age: 30, + }, + }, + ); + + expect(result).toMatchObject({ + success: true, + executionId: 'exec-101', + }); + + const runCall = (workflowRunner.run as jest.Mock).mock + .calls[0][0] as IWorkflowExecutionDataProcess; + expect(runCall.executionMode).toBe('trigger'); + expect(runCall.pinData).toMatchObject({ + FormNode: [ + { + json: { + submittedAt: expect.stringMatching(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/), + formMode: 'mcp', + name: 'John Doe', + email: 'john@example.com', + age: 30, + }, + }, + ], + }); + }); + }); + + describe('execution results handling', () => { + test('handles successful execution', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-success'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { + resultData: { + runData: { + WebhookNode: [{ data: { main: [[{ json: { output: 'success' } }]] } }], + }, + }, + }, + }); + + const result = await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'success-workflow', + undefined, + ); + + expect(result).toMatchObject({ + success: true, + executionId: 'exec-success', + result: expect.objectContaining({ + runData: expect.any(Object), + }), + }); + }); + + test('handles execution with error status', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-error'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'error', + data: { + resultData: { + error: { + message: 'Workflow execution failed', + name: 'ExecutionError', + }, + }, + }, + }); + + const result = await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'error-workflow', + undefined, + ); + + expect(result).toMatchObject({ + success: false, + executionId: 'exec-error', + error: { + message: 'Workflow execution failed', + name: 'ExecutionError', + }, + }); + }); + + test('handles execution with result data error', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-data-error'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { + resultData: { + error: { + message: 'Node execution failed', + name: 'NodeExecutionError', + }, + }, + }, + }); + + const result = await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'data-error-workflow', + undefined, + ); + + expect(result).toMatchObject({ + success: false, + executionId: 'exec-data-error', + error: { + message: 'Node execution failed', + name: 'NodeExecutionError', + }, + }); + }); + + test('handles workflow returning undefined data', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-no-data'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue(undefined); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'no-data-workflow', + undefined, + ), + ).rejects.toThrow(UnexpectedError); + + await expect( + executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'no-data-workflow', + undefined, + ), + ).rejects.toThrow('Workflow did not return any data'); + }); + }); + + describe('workflow with no inputs', () => { + test('executes workflow without any inputs', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-no-inputs'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { resultData: {} }, + }); + + await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'no-inputs-workflow', + undefined, + ); + + const runCall = (workflowRunner.run as jest.Mock).mock + .calls[0][0] as IWorkflowExecutionDataProcess; + expect(runCall.pinData).toMatchObject({ + WebhookNode: [ + { + json: { + headers: {}, + query: {}, + body: {}, + }, + }, + ], + }); + }); + }); + + describe('telemetry tracking', () => { + test('tracks successful execution with tool handler', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-telemetry'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { resultData: {} }, + }); + + const tool = createExecuteWorkflowTool( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + telemetry, + ); + + // Call through the tool handler to test telemetry + await tool.handler( + { workflowId: 'telemetry-workflow', inputs: { type: 'chat', chatInput: 'test' } }, + {} as any, + ); + + expect(telemetry.track).toHaveBeenCalledWith( + 'User called mcp tool', + expect.objectContaining({ + user_id: 'user-1', + tool_name: 'execute_workflow', + parameters: { + workflowId: 'telemetry-workflow', + inputs: { type: 'chat', parameter_count: 1 }, + }, + results: { + success: true, + data: { + executionId: 'exec-telemetry', + }, + }, + }), + ); + }); + + test('tracks failed execution with tool handler', async () => { + const error = new UserError('Test error'); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockRejectedValue(error); + + const tool = createExecuteWorkflowTool( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + telemetry, + ); + + // Call through the tool handler to test telemetry + await tool.handler({ workflowId: 'error-tracking' }, {} as any); + + expect(telemetry.track).toHaveBeenCalledWith( + 'User called mcp tool', + expect.objectContaining({ + user_id: 'user-1', + tool_name: 'execute_workflow', + parameters: { + workflowId: 'error-tracking', + inputs: undefined, + }, + results: { + success: false, + error: 'Test error', + }, + }), + ); + }); + }); + + describe('multiple trigger nodes', () => { + test('uses first eligible trigger node when multiple are present', async () => { + const workflow = createWorkflow({ + nodes: [ + { + id: 'node-1', + name: 'Manual', + type: MANUAL_TRIGGER_NODE_TYPE, + typeVersion: 1, + position: [0, 0], + disabled: false, + parameters: {}, + } as INode, + { + id: 'node-2', + name: 'WebhookNode', + type: WEBHOOK_NODE_TYPE, + typeVersion: 1, + position: [100, 0], + disabled: false, + parameters: {}, + } as INode, + { + id: 'node-3', + name: 'ChatNode', + type: CHAT_TRIGGER_NODE_TYPE, + typeVersion: 1, + position: [200, 0], + disabled: false, + parameters: {}, + } as INode, + ], + }); + (workflowFinderService.findWorkflowForUser as jest.Mock).mockResolvedValue(workflow); + (workflowRunner.run as jest.Mock).mockResolvedValue('exec-multi'); + (activeExecutions.getPostExecutePromise as jest.Mock).mockResolvedValue({ + status: 'success', + data: { resultData: {} }, + }); + + await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + 'multi-trigger-workflow', + undefined, + ); + + const runCall = (workflowRunner.run as jest.Mock).mock + .calls[0][0] as IWorkflowExecutionDataProcess; + // Should use the WebhookNode (first eligible trigger) + expect(runCall.startNodes).toEqual([{ name: 'WebhookNode', sourceData: null }]); + expect(runCall.pinData).toHaveProperty('WebhookNode'); + expect(runCall.executionMode).toBe('webhook'); + }); + }); + }); +}); diff --git a/packages/cli/src/modules/mcp/mcp.errors.ts b/packages/cli/src/modules/mcp/mcp.errors.ts new file mode 100644 index 00000000000..a937d149cfa --- /dev/null +++ b/packages/cli/src/modules/mcp/mcp.errors.ts @@ -0,0 +1,19 @@ +import { Time } from '@n8n/constants'; +import { UserError } from 'n8n-workflow'; + +/** + * Error thrown when MCP workflow execution times out + */ +export class McpExecutionTimeoutError extends UserError { + executionId: string | null; + timeoutMs: number; + + constructor(executionId: string | null, timeoutMs: number) { + const timeoutSeconds = timeoutMs / Time.milliseconds.toSeconds; + super(`Workflow execution timed out after ${timeoutSeconds} seconds`); + + this.name = 'McpExecutionTimeoutError'; + this.executionId = executionId; + this.timeoutMs = timeoutMs; + } +} diff --git a/packages/cli/src/modules/mcp/mcp.service.ts b/packages/cli/src/modules/mcp/mcp.service.ts index 126715fd846..ea786bf3824 100644 --- a/packages/cli/src/modules/mcp/mcp.service.ts +++ b/packages/cli/src/modules/mcp/mcp.service.ts @@ -3,12 +3,15 @@ import { GlobalConfig } from '@n8n/config'; import { User } from '@n8n/db'; import { Service } from '@n8n/di'; +import { createExecuteWorkflowTool } from './tools/execute-workflow.tool'; import { createWorkflowDetailsTool } from './tools/get-workflow-details.tool'; import { createSearchWorkflowsTool } from './tools/search-workflows.tool'; +import { ActiveExecutions } from '@/active-executions'; import { CredentialsService } from '@/credentials/credentials.service'; import { UrlService } from '@/services/url.service'; import { Telemetry } from '@/telemetry'; +import { WorkflowRunner } from '@/workflow-runner'; import { WorkflowFinderService } from '@/workflows/workflow-finder.service'; import { WorkflowService } from '@/workflows/workflow.service'; @@ -19,8 +22,10 @@ export class McpService { private readonly workflowService: WorkflowService, private readonly urlService: UrlService, private readonly credentialsService: CredentialsService, + private readonly activeExecutions: ActiveExecutions, private readonly globalConfig: GlobalConfig, private readonly telemetry: Telemetry, + private readonly workflowRunner: WorkflowRunner, ) {} getServer(user: User) { @@ -40,6 +45,19 @@ export class McpService { workflowSearchTool.handler, ); + const executeWorkflowTool = createExecuteWorkflowTool( + user, + this.workflowFinderService, + this.activeExecutions, + this.workflowRunner, + this.telemetry, + ); + server.registerTool( + executeWorkflowTool.name, + executeWorkflowTool.config, + executeWorkflowTool.handler, + ); + const workflowDetailsTool = createWorkflowDetailsTool( user, this.urlService.getWebhookBaseUrl(), diff --git a/packages/cli/src/modules/mcp/mcp.settings.controller.ts b/packages/cli/src/modules/mcp/mcp.settings.controller.ts index f3b6aa5e792..db20f5fa59d 100644 --- a/packages/cli/src/modules/mcp/mcp.settings.controller.ts +++ b/packages/cli/src/modules/mcp/mcp.settings.controller.ts @@ -8,7 +8,7 @@ import { UpdateWorkflowAvailabilityDto } from './dto/update-workflow-availabilit import { McpServerApiKeyService } from './mcp-api-key.service'; import { SUPPORTED_MCP_TRIGGERS } from './mcp.constants'; import { McpSettingsService } from './mcp.settings.service'; -import { isWorkflowEligibleForMCPAccess } from './mcp.utils'; +import { findMcpSupportedTrigger } from './mcp.utils'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; @@ -83,9 +83,9 @@ export class McpSettingsController { throw new BadRequestError('MCP access can only be set for active workflows'); } - const isEligible = isWorkflowEligibleForMCPAccess(workflow); + const supportedTrigger = findMcpSupportedTrigger(workflow); - if (!isEligible) { + if (!supportedTrigger) { throw new BadRequestError( `MCP access can only be set for active workflows with one of the following trigger nodes: ${Object.values(SUPPORTED_MCP_TRIGGERS).join(', ')}.`, ); diff --git a/packages/cli/src/modules/mcp/mcp.types.ts b/packages/cli/src/modules/mcp/mcp.types.ts index b9714862f41..a8987557b80 100644 --- a/packages/cli/src/modules/mcp/mcp.types.ts +++ b/packages/cli/src/modules/mcp/mcp.types.ts @@ -82,6 +82,11 @@ export type UserCalledMCPToolEventPayload = { }; }; +export type ExecuteWorkflowsInputMeta = { + type: 'webhook' | 'chat' | 'schedule' | 'form'; + parameter_count: number; +}; + type SupportedTriggerNodeTypes = keyof typeof SUPPORTED_MCP_TRIGGERS; export type MCPTriggersMap = { diff --git a/packages/cli/src/modules/mcp/mcp.utils.ts b/packages/cli/src/modules/mcp/mcp.utils.ts index 54d7c5dc2d4..3d9eb72ebf8 100644 --- a/packages/cli/src/modules/mcp/mcp.utils.ts +++ b/packages/cli/src/modules/mcp/mcp.utils.ts @@ -1,5 +1,6 @@ import type { AuthenticatedRequest, WorkflowEntity } from '@n8n/db'; import type { Request } from 'express'; +import type { INode } from 'n8n-workflow'; import { SUPPORTED_MCP_TRIGGERS } from './mcp.constants'; import { isRecord, isJSONRPCRequest } from './mcp.typeguards'; @@ -47,17 +48,14 @@ export const getToolArguments = (body: unknown): Record => { }; /** - * Determines if MCP access can be toggled for a given workflow. - * Workflow is eligible if it contains at least one of these (enabled) trigger nodes: + * Finds the first supported trigger node in the workflow. + * Workflow is eligible for MCP access if it contains at least one of these (enabled) trigger nodes: * - Schedule trigger * - Webhook trigger * - Form trigger * - Chat trigger - * @param workflow */ -export const isWorkflowEligibleForMCPAccess = (workflow: WorkflowEntity): boolean => { +export const findMcpSupportedTrigger = (workflow: WorkflowEntity): INode | undefined => { const triggerNodeTypes = Object.keys(SUPPORTED_MCP_TRIGGERS); - return workflow.nodes.some( - (node) => triggerNodeTypes.includes(node.type) && node.disabled !== true, - ); + return workflow.nodes.find((node) => triggerNodeTypes.includes(node.type) && !node.disabled); }; diff --git a/packages/cli/src/modules/mcp/tools/execute-workflow.tool.ts b/packages/cli/src/modules/mcp/tools/execute-workflow.tool.ts new file mode 100644 index 00000000000..47095ad2597 --- /dev/null +++ b/packages/cli/src/modules/mcp/tools/execute-workflow.tool.ts @@ -0,0 +1,408 @@ +import { Time } from '@n8n/constants'; +import type { User } from '@n8n/db'; +import moment from 'moment-timezone'; +import { + CHAT_TRIGGER_NODE_TYPE, + FORM_TRIGGER_NODE_TYPE, + WEBHOOK_NODE_TYPE, + type INode, + type IPinData, + type IRunExecutionData, + type IWorkflowExecutionDataProcess, + type WorkflowExecuteMode, + UserError, + UnexpectedError, + TimeoutExecutionCancelledError, + ensureError, + jsonStringify, + SCHEDULE_TRIGGER_NODE_TYPE, +} from 'n8n-workflow'; +import z from 'zod'; + +import { SUPPORTED_MCP_TRIGGERS, USER_CALLED_MCP_TOOL_EVENT } from '../mcp.constants'; +import { McpExecutionTimeoutError } from '../mcp.errors'; +import type { + ExecuteWorkflowsInputMeta, + ToolDefinition, + UserCalledMCPToolEventPayload, +} from '../mcp.types'; +import { findMcpSupportedTrigger } from '../mcp.utils'; + +import type { ActiveExecutions } from '@/active-executions'; +import type { Telemetry } from '@/telemetry'; +import type { WorkflowRunner } from '@/workflow-runner'; +import type { WorkflowFinderService } from '@/workflows/workflow-finder.service'; + +const WORKFLOW_EXECUTION_TIMEOUT_MS = 5 * Time.minutes.toMilliseconds; // 5 minutes + +const inputSchema = z.object({ + workflowId: z.string().describe('The ID of the workflow to execute'), + inputs: z + .discriminatedUnion('type', [ + z.object({ + type: z.literal('chat'), + chatInput: z.string().describe('Input for chat-based workflows'), + }), + z.object({ + type: z.literal('form'), + formData: z.record(z.unknown()).describe('Input data for form-based workflows'), + }), + z.object({ + type: z.literal('webhook'), + webhookData: z + .object({ + method: z + .enum(['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS']) + .optional() + .default('GET') + .describe('HTTP method (defaults to GET)'), + query: z.record(z.string()).optional().describe('Query string parameters'), + body: z + .record(z.unknown()) + .optional() + .describe('Request body data (main webhook payload)'), + headers: z + .record(z.string()) + .optional() + .describe('HTTP headers (e.g., authorization, content-type)'), + }) + .describe('Input data for webhook-based workflows'), + }), + ]) + .optional() + .describe('Inputs to provide to the workflow.'), +}); + +type ExecuteWorkflowOutput = { + success: boolean; + executionId: string | null; + result?: IRunExecutionData['resultData']; + error?: unknown; +}; + +const outputSchema = { + success: z.boolean(), + executionId: z.string().nullable().optional(), + result: z.unknown().optional().describe('Workflow execution result data'), + error: z.unknown().optional(), +} satisfies z.ZodRawShape; + +export const createExecuteWorkflowTool = ( + user: User, + workflowFinderService: WorkflowFinderService, + activeExecutions: ActiveExecutions, + workflowRunner: WorkflowRunner, + telemetry: Telemetry, +): ToolDefinition => ({ + name: 'execute_workflow', + config: { + description: + 'Execute a workflow by ID. Before executing always ensure you know the input schema by first using the get_workflow_details tool and consulting workflow description', + inputSchema: inputSchema.shape, + outputSchema, + annotations: { + title: 'Execute Workflow', + readOnlyHint: false, // Can read and write data via workflows + destructiveHint: true, // Can cause changes in external systems via workflows + idempotentHint: true, // Safe to retry multiple times + openWorldHint: true, // Can access external systems via workflows + }, + }, + handler: async ({ workflowId, inputs }) => { + const telemetryPayload: UserCalledMCPToolEventPayload = { + user_id: user.id, + tool_name: 'execute_workflow', + parameters: { workflowId, inputs: getInputMetaData(inputs) }, + }; + try { + const output = await executeWorkflow( + user, + workflowFinderService, + activeExecutions, + workflowRunner, + workflowId, + inputs, + ); + + telemetryPayload.results = { + success: output.success, + data: { + executionId: output.executionId, + }, + }; + telemetry.track(USER_CALLED_MCP_TOOL_EVENT, telemetryPayload); + + return { + content: [{ type: 'text', text: jsonStringify(output) }], + structuredContent: output, + }; + } catch (er) { + const error = ensureError(er); + const isTimeout = error instanceof McpExecutionTimeoutError; + const output: ExecuteWorkflowOutput = { + success: false, + executionId: isTimeout ? error.executionId : null, + error: isTimeout + ? `Workflow execution timed out after ${WORKFLOW_EXECUTION_TIMEOUT_MS / Time.milliseconds.toSeconds} seconds` + : error.message, + }; + + telemetryPayload.results = { + success: false, + error: isTimeout ? 'Workflow execution timed out' : error.message, + }; + telemetry.track(USER_CALLED_MCP_TOOL_EVENT, telemetryPayload); + + return { + content: [{ type: 'text', text: jsonStringify(output) }], + structuredContent: output, + }; + } + }, +}); + +/** + * Executes a workflow for the given user with provided inputs. + * In order to "synchronously" execute the workflow, + * it is mapping mcp tool inputs to trigger node pin data and starting execution from there. + * LIMITATION: Does not properly support workflows with multiple triggers. + */ +export const executeWorkflow = async ( + user: User, + workflowFinderService: WorkflowFinderService, + activeExecutions: ActiveExecutions, + workflowRunner: WorkflowRunner, + workflowId: string, + inputs?: z.infer['inputs'], +): Promise => { + const workflow = await workflowFinderService.findWorkflowForUser(workflowId, user, [ + 'workflow:execute', + ]); + + if (!workflow || workflow.isArchived) { + throw new UserError('Workflow not found'); + } + + if (!workflow.settings?.availableInMCP) { + throw new UserError( + 'Workflow is not available for execution via MCP. Enable access in the workflow settings to make it available.', + ); + } + + const triggerNode = findMcpSupportedTrigger(workflow); + + if (!triggerNode) { + throw new UserError( + `Only workflows with the following trigger nodes can be executed: ${Object.values(SUPPORTED_MCP_TRIGGERS).join(', ')}.`, + ); + } + + const runData: IWorkflowExecutionDataProcess = { + executionMode: getExecutionModeForTrigger(triggerNode), + workflowData: workflow, + userId: user.id, + }; + + // Set the trigger node as the start node and pin data for it + // This will enable us to run the workflow from the trigger node with the provided inputs without waiting for an actual trigger event + runData.startNodes = [{ name: triggerNode.name, sourceData: null }]; + runData.pinData = getPinDataForTrigger(triggerNode, inputs); + + runData.executionData = { + startData: {}, + resultData: { + pinData: runData.pinData, + runData: {}, + }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack: [ + { + node: triggerNode, + data: { + main: [runData.pinData[triggerNode.name]], + }, + source: null, + }, + ], + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + + const executionId = await workflowRunner.run(runData); + + // Create a timeout promise + let timeoutId: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject(new McpExecutionTimeoutError(executionId, WORKFLOW_EXECUTION_TIMEOUT_MS)); + }, WORKFLOW_EXECUTION_TIMEOUT_MS); + }); + + try { + const data = await Promise.race([ + activeExecutions.getPostExecutePromise(executionId), + timeoutPromise, + ]); + + // Executed successfully before timeout: clear the timeout + clearTimeout(timeoutId); + + if (data === undefined) { + throw new UnexpectedError('Workflow did not return any data'); + } + + return { + success: data.status !== 'error' && !data.data.resultData?.error, + executionId, + result: data.data.resultData, + error: data.data.resultData?.error, + }; + } catch (error) { + if (timeoutId) clearTimeout(timeoutId); + + // If we hit the timeout, attempt to stop the execution + if (error instanceof McpExecutionTimeoutError) { + try { + const cancellationError = new TimeoutExecutionCancelledError(error.executionId!); + activeExecutions.stopExecution(error.executionId!, cancellationError); + } catch (stopError) { + throw new UnexpectedError( + `Failed to stop timed-out execution [id: ${error.executionId}]: ${ensureError(stopError).message}`, + ); + } + } + // Re-throw the error to be handled by the caller + throw error; + } +}; + +/** + * Gets the execution mode based on the trigger node type. + */ +const getExecutionModeForTrigger = (node: INode): WorkflowExecuteMode => { + switch (node.type) { + case WEBHOOK_NODE_TYPE: + return 'webhook'; + case CHAT_TRIGGER_NODE_TYPE: + return 'chat'; + case FORM_TRIGGER_NODE_TYPE: + return 'trigger'; + default: + return 'trigger'; + } +}; + +/** + * Constructs pin data for the trigger node based on provided inputs. + */ +const getPinDataForTrigger = ( + node: INode, + inputs: z.infer['inputs'], +): IPinData => { + switch (node.type) { + case WEBHOOK_NODE_TYPE: { + // For webhook triggers, provide default empty values if no inputs or wrong type + const webhookData = inputs?.type === 'webhook' ? inputs.webhookData : undefined; + return { + [node.name]: [ + { + json: { + headers: webhookData?.headers ?? {}, + query: webhookData?.query ?? {}, + body: webhookData?.body ?? {}, + }, + }, + ], + }; + } + case CHAT_TRIGGER_NODE_TYPE: + if (!inputs || inputs.type !== 'chat') return {}; + return { + [node.name]: [ + { + json: { + sessionId: `mcp-session-${Date.now()}`, + action: 'sendMessage', + chatInput: inputs.chatInput, + }, + }, + ], + }; + case FORM_TRIGGER_NODE_TYPE: + if (!inputs || inputs.type !== 'form') return {}; + return { + [node.name]: [ + { + json: { + submittedAt: new Date().toISOString(), + formMode: 'mcp', + ...(inputs.formData ?? {}), + }, + }, + ], + }; + case SCHEDULE_TRIGGER_NODE_TYPE: { + // For schedule triggers, we don't map any inputs but we can add expected datetime info + const timezone = Intl.DateTimeFormat().resolvedOptions().timeZone; + const momentTz = moment.tz(timezone); + return { + [node.name]: [ + { + json: { + timestamp: momentTz.toISOString(true), + 'Readable date': momentTz.format('MMMM Do YYYY, h:mm:ss a'), + 'Readable time': momentTz.format('h:mm:ss a'), + 'Day of week': momentTz.format('dddd'), + Year: momentTz.format('YYYY'), + Month: momentTz.format('MMMM'), + 'Day of month': momentTz.format('DD'), + Hour: momentTz.format('HH'), + Minute: momentTz.format('mm'), + Second: momentTz.format('ss'), + Timezone: `${timezone} (UTC${momentTz.format('Z')})`, + }, + }, + ], + }; + } + default: + return {}; + } +}; + +/** + * Reduce inputs to metadata that will be sent to telemetry. + */ +const getInputMetaData = ( + inputs: z.infer['inputs'], +): ExecuteWorkflowsInputMeta | undefined => { + if (!inputs) { + return undefined; + } + switch (inputs.type) { + case 'chat': + return { + type: 'chat', + parameter_count: 1, + }; + case 'form': + return { + type: 'form', + parameter_count: Object.keys(inputs.formData ?? {}).length, + }; + case 'webhook': + return { + type: 'webhook', + parameter_count: [ + inputs.webhookData?.body ? Object.keys(inputs.webhookData.body).length : 0, + inputs.webhookData?.query ? Object.keys(inputs.webhookData.query).length : 0, + inputs.webhookData?.headers ? Object.keys(inputs.webhookData.headers).length : 0, + ].reduce((a, b) => a + b, 0), + }; + default: + return undefined; + } +}; diff --git a/packages/frontend/editor-ui/src/features/ai/mcpAccess/components/connectionInstructions/AccessTokenConnectionInstructions.vue b/packages/frontend/editor-ui/src/features/ai/mcpAccess/components/connectionInstructions/AccessTokenConnectionInstructions.vue index ef1d1d011fc..f9f36afeb74 100644 --- a/packages/frontend/editor-ui/src/features/ai/mcpAccess/components/connectionInstructions/AccessTokenConnectionInstructions.vue +++ b/packages/frontend/editor-ui/src/features/ai/mcpAccess/components/connectionInstructions/AccessTokenConnectionInstructions.vue @@ -35,11 +35,15 @@ const connectionString = computed(() => { { "mcpServers": { "n8n-mcp": { - "type": "http", - "url": "${props.serverUrl}", - "headers": { - "Authorization": "Bearer ${apiKeyText.value}" - } + "command": "npx", + "args": [ + "-y", + "supergateway", + "--streamableHttp", + "${props.serverUrl}", + "--header", + "authorization:Bearer ${apiKeyText.value}" + ] } } }