mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-29 15:57:00 +02:00
fix(core): Drain webhook close functions to prevent MCP connection leaks (backport to 1.x) (#31188)
Some checks failed
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.13.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
CI: Python / Checks (push) Has been cancelled
Some checks failed
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.13.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
CI: Python / Checks (push) Has been cancelled
Co-authored-by: Garrit Franke <32395585+garritfra@users.noreply.github.com> Co-authored-by: Garrit Franke <garrit.franke@n8n.io> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
fd770d3887
commit
69b2fbd102
|
|
@ -367,6 +367,7 @@ export class McpClientTool implements INodeType {
|
|||
this.logger.debug('McpClientTool: Successfully connected to MCP Server');
|
||||
|
||||
if (!mcpTools?.length) {
|
||||
await client.close();
|
||||
return setError(
|
||||
new NodeOperationError(node, 'MCP Server returned no tools', {
|
||||
itemIndex,
|
||||
|
|
@ -376,25 +377,30 @@ export class McpClientTool implements INodeType {
|
|||
);
|
||||
}
|
||||
|
||||
const tools = mcpTools.map((tool) =>
|
||||
logWrapper(
|
||||
mcpToolToDynamicTool(
|
||||
tool,
|
||||
createCallTool(tool.name, client, config.timeout, (errorMessage) => {
|
||||
const error = new NodeOperationError(node, errorMessage, { itemIndex });
|
||||
void this.addOutputData(NodeConnectionTypes.AiTool, itemIndex, error);
|
||||
this.logger.error(`McpClientTool: Tool "${tool.name}" failed to execute`, { error });
|
||||
}),
|
||||
try {
|
||||
const tools = mcpTools.map((tool) =>
|
||||
logWrapper(
|
||||
mcpToolToDynamicTool(
|
||||
tool,
|
||||
createCallTool(tool.name, client, config.timeout, (errorMessage) => {
|
||||
const error = new NodeOperationError(node, errorMessage, { itemIndex });
|
||||
void this.addOutputData(NodeConnectionTypes.AiTool, itemIndex, error);
|
||||
this.logger.error(`McpClientTool: Tool "${tool.name}" failed to execute`, { error });
|
||||
}),
|
||||
),
|
||||
this,
|
||||
),
|
||||
this,
|
||||
),
|
||||
);
|
||||
);
|
||||
|
||||
this.logger.debug(`McpClientTool: Connected to MCP Server with ${tools.length} tools`);
|
||||
this.logger.debug(`McpClientTool: Connected to MCP Server with ${tools.length} tools`);
|
||||
|
||||
const toolkit = new McpToolkit(tools);
|
||||
const toolkit = new McpToolkit(tools);
|
||||
|
||||
return { response: toolkit, closeFunction: async () => await client.close() };
|
||||
return { response: toolkit, closeFunction: async () => await client.close() };
|
||||
} catch (e) {
|
||||
await client.close();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
|
||||
|
|
|
|||
|
|
@ -404,6 +404,54 @@ describe('McpClientTool', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('should close client when MCP server returns no tools', async () => {
|
||||
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
|
||||
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({ tools: [] });
|
||||
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
|
||||
|
||||
await expect(
|
||||
new McpClientTool().supplyData.call(
|
||||
mock<ISupplyDataFunctions>({
|
||||
getNode: jest.fn(() => mock<INode>({ typeVersion: 1, name: 'MCP Client' })),
|
||||
logger: { debug: jest.fn(), error: jest.fn() },
|
||||
addInputData: jest.fn(() => ({ index: 0 })),
|
||||
addOutputData: jest.fn(),
|
||||
}),
|
||||
0,
|
||||
),
|
||||
).rejects.toThrow('MCP Server returned no tools');
|
||||
|
||||
expect(closeSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should call client.close() when closeFunction is invoked', async () => {
|
||||
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
|
||||
jest.spyOn(Client.prototype, 'listTools').mockResolvedValue({
|
||||
tools: [
|
||||
{
|
||||
name: 'MyTool1',
|
||||
description: 'MyTool1 does something',
|
||||
inputSchema: { type: 'object', properties: { input: { type: 'string' } } },
|
||||
},
|
||||
],
|
||||
});
|
||||
const closeSpy = jest.spyOn(Client.prototype, 'close').mockResolvedValue();
|
||||
|
||||
const supplyDataResult = await new McpClientTool().supplyData.call(
|
||||
mock<ISupplyDataFunctions>({
|
||||
getNode: jest.fn(() => mock<INode>({ typeVersion: 1, name: 'McpClientTool' })),
|
||||
logger: { debug: jest.fn(), error: jest.fn() },
|
||||
addInputData: jest.fn(() => ({ index: 0 })),
|
||||
}),
|
||||
0,
|
||||
);
|
||||
|
||||
expect(supplyDataResult.closeFunction).toBeDefined();
|
||||
await supplyDataResult.closeFunction?.();
|
||||
|
||||
expect(closeSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should support setting a timeout', async () => {
|
||||
jest.spyOn(Client.prototype, 'connect').mockResolvedValue();
|
||||
const callToolSpy = jest
|
||||
|
|
|
|||
|
|
@ -374,6 +374,38 @@ describe('WebhookService', () => {
|
|||
expect(result).toEqual(responseData);
|
||||
expect(nodeType.webhook).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('should run close functions after webhook completes', async () => {
|
||||
const closeFunction = jest.fn().mockResolvedValue(undefined);
|
||||
const nodeType = mock<INodeType>({
|
||||
webhook: jest.fn().mockImplementation(async function (this: any) {
|
||||
this.closeFunctions.push(closeFunction);
|
||||
return responseData;
|
||||
}),
|
||||
});
|
||||
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
|
||||
|
||||
await webhookService.runWebhook(workflow, webhookData, node, additionalData, 'trigger', null);
|
||||
|
||||
expect(closeFunction).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('should run close functions even when webhook throws', async () => {
|
||||
const closeFunction = jest.fn().mockResolvedValue(undefined);
|
||||
const nodeType = mock<INodeType>({
|
||||
webhook: jest.fn().mockImplementation(async function (this: any) {
|
||||
this.closeFunctions.push(closeFunction);
|
||||
throw new Error('webhook failed');
|
||||
}),
|
||||
});
|
||||
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
|
||||
|
||||
await expect(
|
||||
webhookService.runWebhook(workflow, webhookData, node, additionalData, 'trigger', null),
|
||||
).rejects.toThrow('webhook failed');
|
||||
|
||||
expect(closeFunction).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('findCached()', () => {
|
||||
|
|
|
|||
|
|
@ -371,18 +371,32 @@ export class WebhookService {
|
|||
});
|
||||
}
|
||||
|
||||
const closeFunctions: Array<() => Promise<void>> = [];
|
||||
const context = new WebhookContext(
|
||||
workflow,
|
||||
node,
|
||||
additionalData,
|
||||
mode,
|
||||
webhookData,
|
||||
[],
|
||||
closeFunctions,
|
||||
runExecutionData ?? null,
|
||||
);
|
||||
|
||||
return nodeType instanceof Node
|
||||
? await nodeType.webhook(context)
|
||||
: ((await nodeType.webhook.call(context)) as IWebhookResponseData);
|
||||
try {
|
||||
return nodeType instanceof Node
|
||||
? await nodeType.webhook(context)
|
||||
: ((await nodeType.webhook.call(context)) as IWebhookResponseData);
|
||||
} finally {
|
||||
const settledResults = await Promise.allSettled(closeFunctions.map(async (fn) => await fn()));
|
||||
for (const result of settledResults) {
|
||||
if (result.status === 'rejected') {
|
||||
this.logger.error('Failed to run webhook close function', {
|
||||
error: ensureError(result.reason),
|
||||
nodeName: node.name,
|
||||
nodeType: node.type,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user