mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-04 10:39:23 +02:00
feat: Allow cancelling HITL messages (no-changelog) (#31561)
This commit is contained in:
parent
a3f663d1c2
commit
86e42203ff
|
|
@ -111,7 +111,7 @@ class EngineAgent extends Agent {
|
|||
## Testing
|
||||
|
||||
- Unit tests live in `src/__tests__/`, integration tests in `src/__tests__/integration/`
|
||||
- Unit tests use Jest (`pnpm test`)
|
||||
- Unit tests use Vitest (`pnpm test`)
|
||||
- Integration tests use Vitest (`pnpm test:integration`) with real LLM calls
|
||||
- A `.env` file at the package root is loaded automatically by the vitest config.
|
||||
Always assume it exists when running integration tests. Never commit it.
|
||||
|
|
@ -133,7 +133,7 @@ class EngineAgent extends Agent {
|
|||
cd packages/@n8n/agents
|
||||
pnpm build # rimraf dist && tsc -p tsconfig.build.json → dist/
|
||||
pnpm typecheck # tsc --noEmit
|
||||
pnpm test # jest (unit)
|
||||
pnpm test # vitest (unit)
|
||||
```
|
||||
|
||||
## PR naming convention
|
||||
|
|
|
|||
|
|
@ -44,4 +44,11 @@ export default defineConfig(
|
|||
'n8n-local-rules/no-uncaught-json-parse': 'off',
|
||||
},
|
||||
},
|
||||
{
|
||||
files: ['**/*.test.ts'],
|
||||
rules: {
|
||||
'@typescript-eslint/no-unsafe-assignment': 'warn',
|
||||
'@typescript-eslint/no-unsafe-member-access': 'warn',
|
||||
},
|
||||
},
|
||||
);
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import {
|
|||
createAgentWithConcurrentMixedTools,
|
||||
collectTextDeltas,
|
||||
} from './helpers';
|
||||
import { createCancellation } from '../../index';
|
||||
import type { StreamChunk } from '../../index';
|
||||
|
||||
const describe = describeIf('anthropic');
|
||||
|
|
@ -99,6 +100,51 @@ describe('concurrent tool execution integration', () => {
|
|||
expect(remainingIds).not.toContain(firstToolCallId);
|
||||
});
|
||||
|
||||
it('cancels one of multiple suspended delete_file tool calls and resolves the batch', async () => {
|
||||
const agent = createAgentWithConcurrentInterruptibleCalls('anthropic');
|
||||
|
||||
const first = await agent.generate(
|
||||
'Delete these two files: /tmp/cancel-a.txt and /tmp/cancel-b.txt. You MUST call delete_file for each file in a single turn using parallel tool calls.',
|
||||
);
|
||||
|
||||
expect(first.finishReason).toBe('tool-calls');
|
||||
expect(first.pendingSuspend).toBeDefined();
|
||||
expect(first.pendingSuspend!.length).toBeGreaterThanOrEqual(2);
|
||||
|
||||
const { runId, toolCallId } = first.pendingSuspend![0];
|
||||
const resumed = await agent.resume(
|
||||
'generate',
|
||||
createCancellation('Cancel the delete operation. Do not delete any of the files.'),
|
||||
{ runId, toolCallId },
|
||||
);
|
||||
|
||||
expect(resumed.finishReason).toBe('stop');
|
||||
expect(resumed.pendingSuspend).toBeUndefined();
|
||||
expect(resumed.toolCalls).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
tool: 'delete_file',
|
||||
output:
|
||||
'[Tool call cancelled. User said: "Cancel the delete operation. Do not delete any of the files."]',
|
||||
canceled: true,
|
||||
}),
|
||||
]),
|
||||
);
|
||||
expect(agent.getState().messageList.messages).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
content: expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
type: 'tool-call',
|
||||
output: '[Skipped: a sibling tool call was cancelled]',
|
||||
canceled: true,
|
||||
}),
|
||||
]),
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('resumes all suspended tools one by one until the LLM loop continues (stream)', async () => {
|
||||
const agent = createAgentWithConcurrentInterruptibleCalls('anthropic');
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import {
|
|||
createAgentWithMixedTools,
|
||||
createAgentWithParallelInterruptibleCalls,
|
||||
} from './helpers';
|
||||
import { createCancellation } from '../../index';
|
||||
import type { StreamChunk } from '../../index';
|
||||
|
||||
const describe = describeIf('anthropic');
|
||||
|
|
@ -88,6 +89,34 @@ describe('tool interrupt integration', () => {
|
|||
expect(resumedTypes).toContain('text-delta');
|
||||
});
|
||||
|
||||
it('cancels a suspended delete_file tool call and continues', async () => {
|
||||
const agent = createAgentWithInterruptibleTool('anthropic');
|
||||
|
||||
const first = await agent.generate('Delete the file /tmp/cancel-me.txt');
|
||||
expect(first.finishReason).toBe('tool-calls');
|
||||
expect(first.pendingSuspend).toHaveLength(1);
|
||||
|
||||
const { runId, toolCallId } = first.pendingSuspend![0];
|
||||
const resumed = await agent.resume(
|
||||
'generate',
|
||||
createCancellation('Do not delete the file. Tell me the deletion was cancelled.'),
|
||||
{ runId, toolCallId },
|
||||
);
|
||||
|
||||
expect(resumed.finishReason).toBe('stop');
|
||||
expect(resumed.pendingSuspend).toBeUndefined();
|
||||
expect(resumed.toolCalls).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
tool: 'delete_file',
|
||||
output:
|
||||
'[Tool call cancelled. User said: "Do not delete the file. Tell me the deletion was cancelled."]',
|
||||
canceled: true,
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('resumes each pending tool call one by one when multiple tool calls are suspended', async () => {
|
||||
const agent = createAgentWithParallelInterruptibleCalls('anthropic');
|
||||
|
||||
|
|
|
|||
|
|
@ -98,6 +98,8 @@ export {
|
|||
OBSERVATION_LOG_STATUSES,
|
||||
} from './types';
|
||||
|
||||
export { createCancellation, isCancellation, CANCELLATION_TYPE } from './sdk/cancellation';
|
||||
export type { Cancellation } from './sdk/cancellation';
|
||||
export { Tool, wrapToolForApproval } from './sdk/tool';
|
||||
export { Memory } from './sdk/memory';
|
||||
export { Guardrail } from './sdk/guardrail';
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import * as aiModule from 'ai';
|
|||
import type { Mock, MockedFunction } from 'vitest';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { createCancellation } from '../../sdk/cancellation';
|
||||
import { isLlmMessage } from '../../sdk/message';
|
||||
import { Tool, Tool as ToolBuilder } from '../../sdk/tool';
|
||||
import { AgentEvent } from '../../types/runtime/event';
|
||||
|
|
@ -1178,6 +1179,140 @@ describe('AgentRuntime — concurrent tool execution', () => {
|
|||
expect(third.finishReason).toBe('stop');
|
||||
});
|
||||
|
||||
it('cancels a suspended tool before resume validation and adds the user message', async () => {
|
||||
const handler = vi.fn(async (_input, ctx: InterruptibleToolContext) => {
|
||||
if (ctx.resumeData) return { approved: true };
|
||||
return await ctx.suspend({ reason: 'needs approval' });
|
||||
});
|
||||
const suspendTool = makeSuspendingTool('suspend_tool', handler);
|
||||
const receivedMessages: unknown[] = [];
|
||||
|
||||
const { runtime } = createRuntimeWithTools([suspendTool], Infinity);
|
||||
generateText.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{ toolCallId: 'tc-1', toolName: 'suspend_tool', args: { value: 'a' } },
|
||||
]),
|
||||
);
|
||||
|
||||
const first = await runtime.generate('run tools');
|
||||
const { runId, toolCallId } = first.pendingSuspend![0];
|
||||
generateText.mockImplementationOnce(async ({ messages }: { messages: unknown[] }) => {
|
||||
receivedMessages.push(...messages);
|
||||
return await Promise.resolve(makeGenerateSuccess('Cancelled'));
|
||||
});
|
||||
|
||||
const result = await runtime.resume('generate', createCancellation('Do not run this tool'), {
|
||||
runId,
|
||||
toolCallId,
|
||||
});
|
||||
|
||||
expect(result.finishReason).toBe('stop');
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
expect(result.toolCalls).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
tool: 'suspend_tool',
|
||||
output: '[Tool call cancelled. User said: "Do not run this tool"]',
|
||||
canceled: true,
|
||||
}),
|
||||
]),
|
||||
);
|
||||
expect(receivedMessages).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
role: 'user',
|
||||
content: expect.arrayContaining([
|
||||
expect.objectContaining({ type: 'text', text: 'Do not run this tool' }),
|
||||
]),
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('streams cancellation as a normal tool result on resume', async () => {
|
||||
const handler = vi.fn(async (_input, ctx: InterruptibleToolContext) => {
|
||||
if (ctx.resumeData) return { approved: true };
|
||||
return await ctx.suspend({ reason: 'needs approval' });
|
||||
});
|
||||
const suspendTool = makeSuspendingTool('suspend_tool', handler);
|
||||
|
||||
const { runtime } = createRuntimeWithTools([suspendTool], Infinity);
|
||||
generateText.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{ toolCallId: 'tc-1', toolName: 'suspend_tool', args: { value: 'a' } },
|
||||
]),
|
||||
);
|
||||
|
||||
const first = await runtime.generate('run tools');
|
||||
const { runId, toolCallId } = first.pendingSuspend![0];
|
||||
streamText.mockReturnValueOnce(makeStreamSuccess('Cancelled'));
|
||||
|
||||
const resumed = await runtime.resume('stream', createCancellation('Stop this action'), {
|
||||
runId,
|
||||
toolCallId,
|
||||
});
|
||||
const chunks = await collectChunks(resumed.stream as ReadableStream<unknown>);
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
expect(chunks).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
type: 'tool-result',
|
||||
toolCallId,
|
||||
toolName: 'suspend_tool',
|
||||
output: '[Tool call cancelled. User said: "Stop this action"]',
|
||||
canceled: true,
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('streams skipped sibling tool results when cancelling one of multiple suspensions', async () => {
|
||||
const handler = vi.fn(async (_input, ctx: InterruptibleToolContext) => {
|
||||
if (ctx.resumeData) return { approved: true };
|
||||
return await ctx.suspend({ reason: 'needs approval' });
|
||||
});
|
||||
const suspendTool = makeSuspendingTool('suspend_tool', handler);
|
||||
|
||||
const { runtime } = createRuntimeWithTools([suspendTool], Infinity);
|
||||
generateText.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{ toolCallId: 'tc-1', toolName: 'suspend_tool', args: { value: 'a' } },
|
||||
{ toolCallId: 'tc-2', toolName: 'suspend_tool', args: { value: 'b' } },
|
||||
]),
|
||||
);
|
||||
|
||||
const first = await runtime.generate('run tools');
|
||||
const { runId } = first.pendingSuspend![0];
|
||||
streamText.mockReturnValueOnce(makeStreamSuccess('Cancelled'));
|
||||
|
||||
const resumed = await runtime.resume('stream', createCancellation('Stop this action'), {
|
||||
runId,
|
||||
toolCallId: 'tc-1',
|
||||
});
|
||||
const chunks = await collectChunks(resumed.stream as ReadableStream<unknown>);
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
expect(chunks).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
type: 'tool-result',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'suspend_tool',
|
||||
output: '[Tool call cancelled. User said: "Stop this action"]',
|
||||
canceled: true,
|
||||
}),
|
||||
expect.objectContaining({
|
||||
type: 'tool-result',
|
||||
toolCallId: 'tc-2',
|
||||
toolName: 'suspend_tool',
|
||||
output: '[Skipped: a sibling tool call was cancelled]',
|
||||
canceled: true,
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('bounded concurrency (2) batches respects the limit', async () => {
|
||||
const batchSizes: number[] = [];
|
||||
let activeConcurrency = 0;
|
||||
|
|
@ -3741,3 +3876,230 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
expect(callArgs.experimental_telemetry).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Cancellation (Feature 1: cancel suspended tool via user message)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('AgentRuntime.resume() with createCancellation() — auto-bypass', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
/** A tool that suspends on first call and returns on resume. */
|
||||
function makeSuspendToolForCancel(): BuiltTool {
|
||||
return {
|
||||
name: 'interactive_tool',
|
||||
description: 'A tool that suspends',
|
||||
inputSchema: z.object({ prompt: z.string() }),
|
||||
suspendSchema: z.object({ prompt: z.string() }),
|
||||
resumeSchema: z.object({ answer: z.string() }),
|
||||
handler: async (_input: unknown, ctx: unknown) => {
|
||||
const { suspend, resumeData } = ctx as InterruptibleToolContext;
|
||||
if (!resumeData) {
|
||||
return await suspend({ prompt: 'What should I do?' });
|
||||
}
|
||||
return { result: (resumeData as { answer: string }).answer };
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
it('auto-bypass: does NOT call the tool handler on cancellation', async () => {
|
||||
const handlerSpy = vi.fn().mockImplementation(async (_input: unknown, ctx: unknown) => {
|
||||
const { suspend, resumeData } = ctx as InterruptibleToolContext;
|
||||
if (!resumeData) {
|
||||
return await suspend({ prompt: 'What should I do?' });
|
||||
}
|
||||
return { result: (resumeData as { answer: string }).answer };
|
||||
});
|
||||
const tool: BuiltTool = {
|
||||
name: 'interactive_tool',
|
||||
description: 'A tool that suspends',
|
||||
inputSchema: z.object({ prompt: z.string() }),
|
||||
suspendSchema: z.object({ prompt: z.string() }),
|
||||
resumeSchema: z.object({ answer: z.string() }),
|
||||
handler: handlerSpy,
|
||||
};
|
||||
|
||||
const { runtime } = createRuntimeWithTools([tool], 1);
|
||||
|
||||
generateText
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{ toolCallId: 'tc-1', toolName: 'interactive_tool', args: { prompt: 'continue?' } },
|
||||
]),
|
||||
)
|
||||
.mockResolvedValueOnce(makeGenerateSuccess('Done after cancel'));
|
||||
|
||||
const first = await runtime.generate('start', {});
|
||||
const { runId, toolCallId } = first.pendingSuspend![0];
|
||||
|
||||
// Reset call count to check the handler is NOT called on resume
|
||||
handlerSpy.mockClear();
|
||||
|
||||
const resumed = await runtime.resume(
|
||||
'generate',
|
||||
createCancellation('do something else instead'),
|
||||
{ runId, toolCallId },
|
||||
);
|
||||
|
||||
// Handler should NOT have been called for the resume
|
||||
expect(handlerSpy).not.toHaveBeenCalled();
|
||||
// The generation should have continued after cancellation
|
||||
expect(resumed.finishReason).toBe('stop');
|
||||
});
|
||||
|
||||
it('auto-bypass: injects the steering message and the LLM sees it', async () => {
|
||||
const tool = makeSuspendToolForCancel();
|
||||
const { runtime } = createRuntimeWithTools([tool], 1);
|
||||
|
||||
generateText
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{ toolCallId: 'tc-1', toolName: 'interactive_tool', args: { prompt: 'continue?' } },
|
||||
]),
|
||||
)
|
||||
.mockResolvedValueOnce(makeGenerateSuccess('Understood, doing something else'));
|
||||
|
||||
const first = await runtime.generate('start');
|
||||
const { runId, toolCallId } = first.pendingSuspend![0];
|
||||
|
||||
await runtime.resume('generate', createCancellation('pivot to plan B'), { runId, toolCallId });
|
||||
|
||||
// The second generateText call should include the user steering message
|
||||
const secondCallMessages = (
|
||||
generateText.mock.calls[1][0] as { messages: Array<{ role: string; content: unknown }> }
|
||||
).messages;
|
||||
const userMessages = secondCallMessages.filter((m) => m.role === 'user');
|
||||
const steeringMsg = userMessages.find((m) =>
|
||||
JSON.stringify(m.content).includes('pivot to plan B'),
|
||||
);
|
||||
expect(steeringMsg).toBeDefined();
|
||||
});
|
||||
|
||||
it('stream: emits a cancellation tool result then continues', async () => {
|
||||
const tool = makeSuspendToolForCancel();
|
||||
const { runtime } = createRuntimeWithTools([tool], 1);
|
||||
|
||||
streamText
|
||||
.mockReturnValueOnce({
|
||||
fullStream: makeChunkStream([{ type: 'text-delta', textDelta: 'thinking...' }]),
|
||||
finishReason: Promise.resolve('tool-calls'),
|
||||
usage: Promise.resolve({ inputTokens: 10, outputTokens: 5, totalTokens: 15 }),
|
||||
response: Promise.resolve({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
content: [
|
||||
{
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'interactive_tool',
|
||||
args: { prompt: 'continue?' },
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}),
|
||||
toolCalls: Promise.resolve([
|
||||
{ toolCallId: 'tc-1', toolName: 'interactive_tool', input: { prompt: 'continue?' } },
|
||||
]),
|
||||
})
|
||||
.mockReturnValueOnce(makeStreamSuccess('Redirected'));
|
||||
|
||||
const firstResult = await runtime.stream('start');
|
||||
const firstChunks = await collectChunks(firstResult.stream);
|
||||
|
||||
const suspendChunk = firstChunks.find((c) => c.type === 'tool-call-suspended');
|
||||
expect(suspendChunk).toBeDefined();
|
||||
const { runId, toolCallId } = suspendChunk as Extract<
|
||||
StreamChunk,
|
||||
{ type: 'tool-call-suspended' }
|
||||
>;
|
||||
|
||||
const resumed = await runtime.resume('stream', createCancellation('go another direction'), {
|
||||
runId,
|
||||
toolCallId,
|
||||
});
|
||||
const resumedChunks = await collectChunks(resumed.stream);
|
||||
|
||||
const cancellationResult = resumedChunks.find(
|
||||
(c) => c.type === 'tool-result' && c.toolCallId === 'tc-1',
|
||||
);
|
||||
expect(cancellationResult).toEqual(
|
||||
expect.objectContaining({
|
||||
type: 'tool-result',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'interactive_tool',
|
||||
output: '[Tool call cancelled. User said: "go another direction"]',
|
||||
canceled: true,
|
||||
}),
|
||||
);
|
||||
|
||||
// Generation should continue after the cancellation result
|
||||
const textChunks = resumedChunks.filter((c) => c.type === 'text-delta');
|
||||
expect(textChunks.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('rejects with an error if the checkpoint is not found', async () => {
|
||||
const { runtime } = createRuntimeWithTools([makeSuspendToolForCancel()], 1);
|
||||
await expect(
|
||||
runtime.resume('generate', createCancellation('no checkpoint'), {
|
||||
runId: 'nonexistent',
|
||||
toolCallId: 'tc-1',
|
||||
}),
|
||||
).rejects.toThrow('No suspended run found for runId: nonexistent');
|
||||
});
|
||||
});
|
||||
|
||||
describe('AgentRuntime.resume() with createCancellation() — manual handling (handleCancellation)', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it('calls the tool handler with ctx.cancellation set', async () => {
|
||||
const handlerSpy = vi.fn().mockImplementation(async (_input: unknown, ctx: unknown) => {
|
||||
const { suspend, resumeData, cancellation } = ctx as InterruptibleToolContext;
|
||||
if (cancellation) {
|
||||
// Manual cleanup path — return a note for the LLM
|
||||
return `Cancelled: ${cancellation.message}`;
|
||||
}
|
||||
if (!resumeData) {
|
||||
return await suspend({ prompt: 'Confirm?' });
|
||||
}
|
||||
return 'done';
|
||||
});
|
||||
const tool: BuiltTool = {
|
||||
name: 'manual_cancel_tool',
|
||||
description: 'A tool with manual cancellation',
|
||||
inputSchema: z.object({ value: z.string() }),
|
||||
suspendSchema: z.object({ prompt: z.string() }),
|
||||
resumeSchema: z.object({ confirmed: z.boolean() }),
|
||||
handleCancellation: true,
|
||||
handler: handlerSpy,
|
||||
};
|
||||
|
||||
const { runtime } = createRuntimeWithTools([tool], 1);
|
||||
|
||||
generateText
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{ toolCallId: 'tc-1', toolName: 'manual_cancel_tool', args: { value: 'x' } },
|
||||
]),
|
||||
)
|
||||
.mockResolvedValueOnce(makeGenerateSuccess('Done after manual cancel'));
|
||||
|
||||
const first = await runtime.generate('test');
|
||||
const { runId, toolCallId } = first.pendingSuspend![0];
|
||||
|
||||
handlerSpy.mockClear();
|
||||
|
||||
await runtime.resume('generate', createCancellation('user said stop'), { runId, toolCallId });
|
||||
|
||||
// Handler SHOULD have been called for the resume
|
||||
expect(handlerSpy).toHaveBeenCalledTimes(1);
|
||||
const callCtx = handlerSpy.mock.calls[0][1] as InterruptibleToolContext;
|
||||
expect(callCtx.cancellation).toEqual({ message: 'user said stop' });
|
||||
expect(callCtx.resumeData).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -80,6 +80,7 @@ import {
|
|||
toAiSdkProviderTools,
|
||||
toAiSdkTools,
|
||||
} from './tool-adapter';
|
||||
import { isCancellation } from '../sdk/cancellation';
|
||||
import { Telemetry } from '../sdk/telemetry';
|
||||
import { AgentEvent } from '../types/runtime/event';
|
||||
import type { AgentEventData } from '../types/runtime/event';
|
||||
|
|
@ -266,6 +267,13 @@ type ToolCallOutcome =
|
|||
payload: unknown;
|
||||
resumeSchema: JsonSchema7Type;
|
||||
}
|
||||
| {
|
||||
outcome: 'cancelled';
|
||||
toolEntry: ToolResultEntry;
|
||||
modelOutput: string;
|
||||
userMessage: string;
|
||||
canceled: true;
|
||||
}
|
||||
| { outcome: 'error'; error: unknown }
|
||||
| { outcome: 'noop' }; // tool call shouldn't be saved or logged anywhere, usually means that if was executed by AI SDK
|
||||
|
||||
|
|
@ -490,7 +498,8 @@ export class AgentRuntime {
|
|||
if (!toolForValidation) throw new Error(`Tool ${toolCall.toolName} not found`);
|
||||
|
||||
let resumeData: unknown = data;
|
||||
if (toolForValidation.resumeSchema) {
|
||||
|
||||
if (!isCancellation(resumeData) && toolForValidation.resumeSchema) {
|
||||
const parseResult = await parseWithSchema(toolForValidation.resumeSchema, data);
|
||||
if (!parseResult.success) {
|
||||
throw new Error(`Invalid resume payload: ${parseResult.error}`);
|
||||
|
|
@ -1334,6 +1343,7 @@ export class AgentRuntime {
|
|||
toolCallId: r.toolCallId,
|
||||
toolName: r.toolName,
|
||||
output: r.modelOutput,
|
||||
...(r.toolEntry.canceled ? { canceled: true } : {}),
|
||||
});
|
||||
if (r.customMessage) {
|
||||
await writer.write({ type: 'message', message: r.customMessage });
|
||||
|
|
@ -1502,6 +1512,7 @@ export class AgentRuntime {
|
|||
toolCallId: r.toolCallId,
|
||||
toolName: r.toolName,
|
||||
output: r.modelOutput,
|
||||
...(r.toolEntry.canceled ? { canceled: true } : {}),
|
||||
});
|
||||
if (r.customMessage) {
|
||||
await writer.write({ type: 'message', message: r.customMessage });
|
||||
|
|
@ -2138,6 +2149,42 @@ export class AgentRuntime {
|
|||
modelOutput: processResult.modelOutput,
|
||||
customMessage: processResult.customMessage,
|
||||
});
|
||||
} else if (processResult.outcome === 'cancelled') {
|
||||
results.push({
|
||||
toolCallId: resumedEntry.toolCallId,
|
||||
toolName: resumedToolName,
|
||||
input: resumedEntry.input,
|
||||
toolEntry: processResult.toolEntry,
|
||||
modelOutput: processResult.modelOutput,
|
||||
});
|
||||
list.addInput([
|
||||
{ role: 'user', content: [{ type: 'text', text: processResult.userMessage }] },
|
||||
]);
|
||||
|
||||
for (const id of Object.keys(pendingResume.pendingToolCalls)) {
|
||||
if (id !== resumedId) {
|
||||
const siblingEntry = pendingResume.pendingToolCalls[id];
|
||||
const modelOutput = '[Skipped: a sibling tool call was cancelled]';
|
||||
list.setToolCallResult(id, modelOutput, {
|
||||
canceled: true,
|
||||
});
|
||||
results.push({
|
||||
toolCallId: siblingEntry.toolCallId,
|
||||
toolName: siblingEntry.toolName,
|
||||
input: siblingEntry.input,
|
||||
toolEntry: {
|
||||
tool: siblingEntry.toolName,
|
||||
input: siblingEntry.input,
|
||||
output: modelOutput,
|
||||
transformed: false,
|
||||
canceled: true,
|
||||
},
|
||||
modelOutput,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return { results, suspensions, errors, pending };
|
||||
} else if (processResult.outcome === 'error') {
|
||||
errors.push({
|
||||
toolCallId: resumedEntry.toolCallId,
|
||||
|
|
@ -2274,6 +2321,31 @@ export class AgentRuntime {
|
|||
return { outcome: 'noop' };
|
||||
}
|
||||
|
||||
if (isCancellation(resumeData) && !builtTool.handleCancellation) {
|
||||
const modelOutput = `[Tool call cancelled. User said: "${resumeData.message}"]`;
|
||||
this.eventBus.emit({
|
||||
type: AgentEvent.ToolExecutionEnd,
|
||||
toolCallId,
|
||||
toolName,
|
||||
result: modelOutput,
|
||||
isError: false,
|
||||
});
|
||||
list.setToolCallResult(toolCallId, modelOutput, { canceled: true });
|
||||
return {
|
||||
outcome: 'cancelled',
|
||||
toolEntry: {
|
||||
tool: toolName,
|
||||
input: toolInput,
|
||||
output: modelOutput,
|
||||
transformed: false,
|
||||
canceled: true,
|
||||
},
|
||||
modelOutput,
|
||||
userMessage: resumeData.message,
|
||||
canceled: true,
|
||||
};
|
||||
}
|
||||
|
||||
if (countToolCall) {
|
||||
this.incrementToolCallCount(executionCounter);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -146,7 +146,11 @@ export class AgentMessageList {
|
|||
* Returns the mutated host message, or `undefined` if the toolCallId is
|
||||
* not found (internal invariant violation — caller should log/throw).
|
||||
*/
|
||||
setToolCallResult(toolCallId: string, output: JSONValue): AgentDbMessage | undefined {
|
||||
setToolCallResult(
|
||||
toolCallId: string,
|
||||
output: JSONValue,
|
||||
options?: { canceled?: boolean },
|
||||
): AgentDbMessage | undefined {
|
||||
const host = this.findToolCallHost(toolCallId);
|
||||
if (!host) return undefined;
|
||||
|
||||
|
|
@ -156,6 +160,11 @@ export class AgentMessageList {
|
|||
const mutableBlock = block;
|
||||
mutableBlock.state = 'resolved';
|
||||
(mutableBlock as Extract<ContentToolCall, { state: 'resolved' }>).output = output;
|
||||
if (options?.canceled) {
|
||||
(mutableBlock as Extract<ContentToolCall, { state: 'resolved' }>).canceled = true;
|
||||
} else if ('canceled' in mutableBlock) {
|
||||
delete (mutableBlock as { canceled?: boolean }).canceled;
|
||||
}
|
||||
if ('error' in mutableBlock) {
|
||||
delete (mutableBlock as { error: unknown }).error;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import type { JSONSchema7 } from 'json-schema';
|
|||
import { z } from 'zod';
|
||||
|
||||
import { loadAi } from './lazy-ai';
|
||||
import { isCancellation } from '../sdk/cancellation';
|
||||
import {
|
||||
type BuiltProviderTool,
|
||||
type BuiltTool,
|
||||
|
|
@ -119,11 +120,13 @@ export async function executeTool(
|
|||
}
|
||||
|
||||
if (builtTool.suspendSchema) {
|
||||
const isCancelled = isCancellation(resumeData);
|
||||
const ctx: InterruptibleToolContext = {
|
||||
suspend: async (payload: unknown): Promise<never> => {
|
||||
return await Promise.resolve({ [SUSPEND_BRAND]: true, payload } as never);
|
||||
},
|
||||
resumeData,
|
||||
resumeData: isCancelled ? undefined : resumeData,
|
||||
cancellation: isCancelled ? { message: resumeData.message } : undefined,
|
||||
parentTelemetry,
|
||||
toolCallId,
|
||||
runId: executionContext.runId,
|
||||
|
|
|
|||
55
packages/@n8n/agents/src/sdk/__tests__/cancellation.test.ts
Normal file
55
packages/@n8n/agents/src/sdk/__tests__/cancellation.test.ts
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
import { createCancellation, isCancellation, CANCELLATION_TYPE } from '../cancellation';
|
||||
|
||||
describe('createCancellation', () => {
|
||||
it('creates an object with the correct _type and message', () => {
|
||||
const c = createCancellation('do something else');
|
||||
expect(c._type).toBe(CANCELLATION_TYPE);
|
||||
expect(c.message).toBe('do something else');
|
||||
});
|
||||
|
||||
it('is detected by isCancellation', () => {
|
||||
const c = createCancellation('steer me');
|
||||
expect(isCancellation(c)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isCancellation', () => {
|
||||
it('returns true for a valid cancellation object', () => {
|
||||
expect(isCancellation({ _type: 'agent.cancellation', message: 'hello' })).toBe(true);
|
||||
});
|
||||
|
||||
it('returns false for null', () => {
|
||||
expect(isCancellation(null)).toBe(false);
|
||||
});
|
||||
|
||||
it('returns false for undefined', () => {
|
||||
expect(isCancellation(undefined)).toBe(false);
|
||||
});
|
||||
|
||||
it('returns false for a plain resume payload', () => {
|
||||
expect(isCancellation({ approved: true })).toBe(false);
|
||||
});
|
||||
|
||||
it('returns false when _type is wrong', () => {
|
||||
expect(isCancellation({ _type: 'something.else', message: 'hi' })).toBe(false);
|
||||
});
|
||||
|
||||
it('returns false when message is missing', () => {
|
||||
expect(isCancellation({ _type: 'agent.cancellation' })).toBe(false);
|
||||
});
|
||||
|
||||
it('returns false when message is not a string', () => {
|
||||
expect(isCancellation({ _type: 'agent.cancellation', message: 42 })).toBe(false);
|
||||
});
|
||||
|
||||
it('survives a JSON round-trip (simulating HTTP wire format)', () => {
|
||||
const original = createCancellation('change direction');
|
||||
const serialized = JSON.stringify(original);
|
||||
// eslint-disable-next-line n8n-local-rules/no-uncaught-json-parse
|
||||
const deserialized = JSON.parse(serialized) as unknown;
|
||||
expect(isCancellation(deserialized)).toBe(true);
|
||||
expect((deserialized as ReturnType<typeof createCancellation>).message).toBe(
|
||||
'change direction',
|
||||
);
|
||||
});
|
||||
});
|
||||
29
packages/@n8n/agents/src/sdk/cancellation.ts
Normal file
29
packages/@n8n/agents/src/sdk/cancellation.ts
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Pass as `resumeData` to `agent.resume()` to cancel a suspended tool call
|
||||
* and steer the agent with a new message instead of answering the tool.
|
||||
*
|
||||
* Uses a JSON-serializable `_type` string so it survives HTTP round-trips —
|
||||
* frontend code can construct `{ _type: 'agent.cancellation', message }`
|
||||
* without importing this package.
|
||||
*/
|
||||
|
||||
export const CANCELLATION_TYPE = 'agent.cancellation' as const;
|
||||
|
||||
export interface Cancellation {
|
||||
readonly _type: typeof CANCELLATION_TYPE;
|
||||
/** The user's steering message provided when cancelling. */
|
||||
readonly message: string;
|
||||
}
|
||||
|
||||
export function createCancellation(message: string): Cancellation {
|
||||
return { _type: CANCELLATION_TYPE, message };
|
||||
}
|
||||
|
||||
export function isCancellation(value: unknown): value is Cancellation {
|
||||
return (
|
||||
typeof value === 'object' &&
|
||||
value !== null &&
|
||||
(value as Record<string, unknown>)._type === CANCELLATION_TYPE &&
|
||||
typeof (value as Record<string, unknown>).message === 'string'
|
||||
);
|
||||
}
|
||||
|
|
@ -125,6 +125,8 @@ export class Tool<
|
|||
|
||||
private providerOptionsValue?: Record<string, JSONObject>;
|
||||
|
||||
private handleCancellationValue?: boolean;
|
||||
|
||||
private requireApprovalValue?: boolean;
|
||||
|
||||
private needsApprovalFnValue?: (args: unknown) => Promise<boolean> | boolean;
|
||||
|
|
@ -214,6 +216,15 @@ export class Tool<
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Opt in to handle cancellations in the tool handler (`ctx.cancellation`).
|
||||
* By default, the runtime bypasses the handler and injects the steering message directly.
|
||||
*/
|
||||
handleCancellation(): this {
|
||||
this.handleCancellationValue = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Require human approval before this tool executes. Mutually exclusive with .suspend()/.resume(). */
|
||||
requireApproval(): this {
|
||||
this.requireApprovalValue = true;
|
||||
|
|
@ -281,6 +292,7 @@ export class Tool<
|
|||
systemInstruction: this.systemInstructionText,
|
||||
suspendSchema: this.suspendSchemaValue,
|
||||
resumeSchema: this.resumeSchemaValue,
|
||||
handleCancellation: this.handleCancellationValue,
|
||||
toMessage: this.toMessageFn as (output: unknown) => AgentMessage | undefined,
|
||||
toModelOutput: this.toModelOutputFn as ((output: unknown) => unknown) | undefined,
|
||||
handler: this.handlerFn as (
|
||||
|
|
|
|||
|
|
@ -117,6 +117,7 @@ export type StreamChunk = ContentMetadata &
|
|||
toolName: string;
|
||||
output: unknown;
|
||||
isError?: boolean;
|
||||
canceled?: boolean;
|
||||
}
|
||||
| {
|
||||
type: 'tool-call-suspended';
|
||||
|
|
@ -171,6 +172,7 @@ export interface ToolResultEntry {
|
|||
input: unknown;
|
||||
output: unknown;
|
||||
transformed?: boolean;
|
||||
canceled?: boolean;
|
||||
}
|
||||
|
||||
export interface GenerateResult {
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ export type ContentToolCall = ContentMetadata & {
|
|||
providerExecuted?: boolean;
|
||||
} & (
|
||||
| { state: 'pending' }
|
||||
| { state: 'resolved'; output: JSONValue }
|
||||
| { state: 'resolved'; output: JSONValue; canceled?: boolean }
|
||||
| { state: 'rejected'; error: string }
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -51,8 +51,10 @@ export interface InterruptibleToolContext<S = unknown, R = unknown> {
|
|||
* the execution engine to halt. Code after `return await ctx.suspend()` is unreachable.
|
||||
*/
|
||||
suspend: (payload: S) => Promise<never>;
|
||||
/** Data from the consumer after resume. Undefined on first invocation. */
|
||||
/** Data from the consumer after resume. Undefined on first invocation or when cancelled. */
|
||||
resumeData: R | undefined;
|
||||
/** Set when the resume was a cancellation and the tool opted in via `.handleCancellation()`. */
|
||||
cancellation?: { message: string };
|
||||
/** AI SDK tool call ID for the current local tool execution. */
|
||||
toolCallId?: string;
|
||||
/** Agent run ID for the current execution. */
|
||||
|
|
@ -80,6 +82,8 @@ export interface BuiltTool {
|
|||
readonly systemInstruction?: string;
|
||||
readonly suspendSchema?: ZodType | JSONSchema7;
|
||||
readonly resumeSchema?: ZodType | JSONSchema7;
|
||||
/** When `true`, the handler is called on cancellation with `ctx.cancellation` set instead of being bypassed. */
|
||||
readonly handleCancellation?: boolean;
|
||||
readonly withDefaultApproval?: boolean;
|
||||
readonly toMessage?: (output: unknown) => AgentMessage | undefined;
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -106,10 +106,18 @@ export type AskQuestionResume = z.infer<typeof askQuestionResumeSchema>;
|
|||
// Discriminated union of all resume payloads (used by AgentBuildResumeDto)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const cancellationResumeSchema = z.object({
|
||||
_type: z.literal('agent.cancellation'),
|
||||
message: z.string().min(1),
|
||||
});
|
||||
|
||||
export type CancellationResumeData = z.infer<typeof cancellationResumeSchema>;
|
||||
|
||||
export const interactiveResumeDataSchema = z.union([
|
||||
askLlmResumeSchema,
|
||||
askCredentialResumeSchema,
|
||||
askQuestionResumeSchema,
|
||||
cancellationResumeSchema,
|
||||
]);
|
||||
|
||||
export type InteractiveResumeData = z.infer<typeof interactiveResumeDataSchema>;
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ export type AgentSseEvent =
|
|||
toolName: string;
|
||||
output: unknown;
|
||||
isError?: boolean;
|
||||
canceled?: boolean;
|
||||
}
|
||||
| { type: 'tool-call-suspended'; payload: ToolSuspendedPayload }
|
||||
| { type: 'message'; message: AgentSseMessage }
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ export {
|
|||
askQuestionOptionSchema,
|
||||
askQuestionInputSchema,
|
||||
askQuestionResumeSchema,
|
||||
cancellationResumeSchema,
|
||||
interactiveResumeDataSchema,
|
||||
type InteractiveToolName,
|
||||
type AskLlmInput,
|
||||
|
|
@ -30,5 +31,6 @@ export {
|
|||
type AskQuestionOption,
|
||||
type AskQuestionInput,
|
||||
type AskQuestionResume,
|
||||
type CancellationResumeData,
|
||||
type InteractiveResumeData,
|
||||
} from '../agent-builder-interactive';
|
||||
|
|
|
|||
|
|
@ -138,6 +138,7 @@ export interface AgentPersistedMessageContentPart {
|
|||
input?: unknown;
|
||||
state?: string;
|
||||
output?: unknown;
|
||||
canceled?: boolean;
|
||||
error?: string;
|
||||
/** Epoch ms when the tool handler started executing. */
|
||||
startTime?: number;
|
||||
|
|
|
|||
|
|
@ -162,16 +162,19 @@ function emitToolChunk(
|
|||
endTime: chunk.endTime,
|
||||
});
|
||||
break;
|
||||
case 'tool-result':
|
||||
case 'tool-result': {
|
||||
const toolResultChunk = chunk as typeof chunk & { canceled?: boolean };
|
||||
send({
|
||||
type: 'tool-result',
|
||||
toolCallId: chunk.toolCallId,
|
||||
toolName: chunk.toolName,
|
||||
output: chunk.output,
|
||||
...(chunk.isError !== undefined && { isError: chunk.isError }),
|
||||
...(toolResultChunk.canceled !== undefined && { canceled: toolResultChunk.canceled }),
|
||||
});
|
||||
onToolEvent?.toolResult?.(chunk.toolName);
|
||||
break;
|
||||
}
|
||||
case 'tool-call-suspended': {
|
||||
const payload: ToolSuspendedPayload = {
|
||||
toolCallId: chunk.toolCallId,
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import AgentChatPanel from '../components/AgentChatPanel.vue';
|
|||
const sendMessageMock = vi.fn();
|
||||
const stopGeneratingMock = vi.fn();
|
||||
const loadHistoryMock = vi.fn();
|
||||
const cancelAndSteerMock = vi.fn();
|
||||
const messagesMock = ref<ChatMessage[]>([]);
|
||||
const isStreamingMock = ref(false);
|
||||
|
||||
|
|
@ -54,6 +55,7 @@ vi.mock('../composables/useAgentChatStream', () => ({
|
|||
sendMessage: sendMessageMock,
|
||||
stopGenerating: stopGeneratingMock,
|
||||
resume: vi.fn(),
|
||||
cancelAndSteer: cancelAndSteerMock,
|
||||
dismissFatalError: vi.fn(),
|
||||
}),
|
||||
}));
|
||||
|
|
@ -241,21 +243,28 @@ describe('AgentChatPanel', () => {
|
|||
expect(loadHistoryMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('disables chat and blocks sending while an interactive question is unresolved', async () => {
|
||||
it('enables chat input and shows answer-question placeholder while an interactive question is unresolved', () => {
|
||||
messagesMock.value = [openInteractiveMessage()];
|
||||
|
||||
const wrapper = mountPanel();
|
||||
const chatInput = wrapper.findComponent({ name: 'ChatInputBase' });
|
||||
|
||||
expect(chatInput.props('disabled')).toBe(true);
|
||||
expect(chatInput.props('canSubmit')).toBe(false);
|
||||
// Input should be ENABLED so the user can cancel and steer
|
||||
expect(chatInput.props('disabled')).toBe(false);
|
||||
expect(chatInput.props('placeholder')).toBe('agents.chat.answerQuestionPlaceholder');
|
||||
});
|
||||
|
||||
it('calls cancelAndSteer (not sendMessage) when the user submits while an interactive question is open', async () => {
|
||||
messagesMock.value = [openInteractiveMessage()];
|
||||
|
||||
const wrapper = mountPanel();
|
||||
|
||||
(
|
||||
wrapper.vm as unknown as { sendMessageFromOutside: (message: string) => void }
|
||||
).sendMessageFromOutside('answer through chat');
|
||||
).sendMessageFromOutside('go another direction');
|
||||
await flushPromises();
|
||||
|
||||
expect(cancelAndSteerMock).toHaveBeenCalledWith('go another direction');
|
||||
expect(sendMessageMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
|
|
@ -285,15 +294,15 @@ describe('AgentChatPanel', () => {
|
|||
});
|
||||
|
||||
it.each([ASK_LLM_TOOL_NAME, ASK_CREDENTIAL_TOOL_NAME])(
|
||||
'disables chat while %s is unresolved',
|
||||
'enables chat input while %s is unresolved (cancel-and-steer mode)',
|
||||
(toolName) => {
|
||||
messagesMock.value = [openInteractiveMessage(toolName)];
|
||||
|
||||
const wrapper = mountPanel();
|
||||
const chatInput = wrapper.findComponent({ name: 'ChatInputBase' });
|
||||
|
||||
expect(chatInput.props('disabled')).toBe(true);
|
||||
expect(chatInput.props('canSubmit')).toBe(false);
|
||||
// Input should be enabled — the user can cancel and steer
|
||||
expect(chatInput.props('disabled')).toBe(false);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import {
|
|||
ASK_CREDENTIAL_TOOL_NAME,
|
||||
ASK_LLM_TOOL_NAME,
|
||||
ASK_QUESTION_TOOL_NAME,
|
||||
type AgentPersistedMessageContentPart,
|
||||
type AgentPersistedMessageDto,
|
||||
} from '@n8n/api-types';
|
||||
|
||||
|
|
@ -195,6 +196,33 @@ describe('convertDbMessages — interactive turn synthesis', () => {
|
|||
expect(tc?.output).toEqual([{ name: 'Slack' }]);
|
||||
});
|
||||
|
||||
it('treats cancelled resolved tool calls as cancelled', () => {
|
||||
const dbMessages: AgentPersistedMessageDto[] = [
|
||||
{
|
||||
id: 'm1',
|
||||
role: 'assistant',
|
||||
content: [
|
||||
{
|
||||
type: 'tool-call',
|
||||
toolName: 'delete_file',
|
||||
toolCallId: 'tc-cancel',
|
||||
input: { path: '/tmp/foo.txt' },
|
||||
state: 'resolved',
|
||||
output: 'The sibling tool call was skipped',
|
||||
canceled: true,
|
||||
} as AgentPersistedMessageContentPart,
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
const chat = convertDbMessages(dbMessages);
|
||||
expect(chat).toHaveLength(1);
|
||||
const tc = chat[0].toolCalls?.[0];
|
||||
expect(tc?.state).toBe('cancelled');
|
||||
expect(tc?.output).toBe('The sibling tool call was skipped');
|
||||
expect(tc?.canceled).toBe(true);
|
||||
});
|
||||
|
||||
it('renders a resolved-but-failed delegate_subagent call as an error', () => {
|
||||
const dbMessages: AgentPersistedMessageDto[] = [
|
||||
{
|
||||
|
|
|
|||
|
|
@ -403,6 +403,37 @@ describe('useAgentChatStream — SDK-aligned event handling', () => {
|
|||
expect(assistant.toolCalls?.[0].output).toBe(42);
|
||||
});
|
||||
|
||||
it('marks cancellation tool results as cancelled instead of done', async () => {
|
||||
const events: AgentSseEvent[] = [
|
||||
{ type: 'start-step' },
|
||||
{
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-cancel',
|
||||
toolName: 'delete_file',
|
||||
input: { path: '/tmp/a.txt' },
|
||||
},
|
||||
{ type: 'finish-step' },
|
||||
{
|
||||
type: 'tool-result',
|
||||
toolCallId: 'tc-cancel',
|
||||
toolName: 'delete_file',
|
||||
output: 'The tool call was cancelled',
|
||||
canceled: true,
|
||||
} as AgentSseEvent,
|
||||
{ type: 'done' },
|
||||
];
|
||||
globalThis.fetch = vi.fn(async () => makeSseResponse(events)) as typeof fetch;
|
||||
|
||||
const hook = buildHook();
|
||||
await hook.sendMessage('delete file');
|
||||
await nextTick();
|
||||
|
||||
const assistant = hook.messages.value[1];
|
||||
expect(assistant.toolCalls?.[0].state).toBe('cancelled');
|
||||
expect(assistant.toolCalls?.[0].output).toBe('The tool call was cancelled');
|
||||
expect(assistant.toolCalls?.[0].canceled).toBe(true);
|
||||
});
|
||||
|
||||
it('flips a ToolCall to done on tool-execution-end before the batched tool-result arrives', async () => {
|
||||
const events: AgentSseEvent[] = [
|
||||
{ type: 'start-step' },
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ const {
|
|||
sendMessage,
|
||||
stopGenerating,
|
||||
resume,
|
||||
cancelAndSteer,
|
||||
dismissFatalError,
|
||||
} = useAgentChatStream({
|
||||
projectId: toRef(props, 'projectId'),
|
||||
|
|
@ -132,14 +133,15 @@ watch(isStreaming, (v) => emit('update:streaming', v));
|
|||
|
||||
async function onSubmit() {
|
||||
const text = inputText.value.trim();
|
||||
if (
|
||||
!text ||
|
||||
isStreaming.value ||
|
||||
isPreparingToSend.value ||
|
||||
isBuilderReadOnly.value ||
|
||||
hasOpenInteractiveQuestion.value
|
||||
)
|
||||
if (!text || isStreaming.value || isPreparingToSend.value || isBuilderReadOnly.value) return;
|
||||
|
||||
// When there is an open interactive question, the user's message cancels
|
||||
// the suspended tool and steers the agent in a new direction.
|
||||
if (hasOpenInteractiveQuestion.value) {
|
||||
inputText.value = '';
|
||||
await cancelAndSteer(text);
|
||||
return;
|
||||
}
|
||||
|
||||
isPreparingToSend.value = true;
|
||||
try {
|
||||
|
|
@ -171,7 +173,6 @@ async function onSubmit() {
|
|||
}
|
||||
|
||||
function sendMessageFromOutside(message: string) {
|
||||
if (hasOpenInteractiveQuestion.value) return;
|
||||
inputText.value = message;
|
||||
void onSubmit();
|
||||
}
|
||||
|
|
@ -286,17 +287,10 @@ onBeforeUnmount(() => {
|
|||
:placeholder="chatPlaceholder"
|
||||
:is-streaming="messagingState === 'receiving'"
|
||||
:can-submit="
|
||||
!hasOpenInteractiveQuestion &&
|
||||
!isStreaming &&
|
||||
!isPreparingToSend &&
|
||||
!isBuilderReadOnly &&
|
||||
inputText.trim().length > 0
|
||||
!isStreaming && !isPreparingToSend && !isBuilderReadOnly && inputText.trim().length > 0
|
||||
"
|
||||
:disabled="
|
||||
isBuilderReadOnly ||
|
||||
hasOpenInteractiveQuestion ||
|
||||
isPreparingToSend ||
|
||||
(isStreaming && messagingState !== 'receiving')
|
||||
isBuilderReadOnly || isPreparingToSend || (isStreaming && messagingState !== 'receiving')
|
||||
"
|
||||
data-testid="chat-input"
|
||||
@submit="onSubmit"
|
||||
|
|
|
|||
|
|
@ -90,6 +90,12 @@ function toolDuration(tc: ToolCall): string {
|
|||
size="large"
|
||||
:class="$style.indicatorError"
|
||||
/>
|
||||
<N8nIcon
|
||||
v-else-if="tc.state === 'cancelled'"
|
||||
icon="circle-x"
|
||||
size="large"
|
||||
:class="$style.indicatorCancelled"
|
||||
/>
|
||||
<N8nTooltip
|
||||
v-else-if="tc.state === 'suspended'"
|
||||
placement="top"
|
||||
|
|
@ -209,6 +215,10 @@ function toolDuration(tc: ToolCall): string {
|
|||
color: var(--text-color--danger);
|
||||
}
|
||||
|
||||
.indicatorCancelled {
|
||||
color: var(--text-color--subtler);
|
||||
}
|
||||
|
||||
.indicatorLoading {
|
||||
color: var(--text-color);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ export interface ToolCall {
|
|||
toolCallId: string;
|
||||
input?: unknown;
|
||||
output?: unknown;
|
||||
canceled?: boolean;
|
||||
state: ToolCallState;
|
||||
/** Epoch ms when the tool started executing (live: client clock; reload: recorded). */
|
||||
startTime?: number;
|
||||
|
|
@ -64,6 +65,8 @@ interface InteractivePayloadBase {
|
|||
runId?: string;
|
||||
/** Wall-clock timestamp when the user submitted; absent when card is open. */
|
||||
resolvedAt?: number;
|
||||
/** Set when the tool was cancelled via a steering message rather than answered. */
|
||||
cancelled?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -290,13 +293,16 @@ export function convertDbMessages(dbMessages: AgentPersistedMessageDto[]): ChatM
|
|||
} else if (part.type === 'tool-call' && part.toolName) {
|
||||
let state: ToolCallState;
|
||||
let output: unknown;
|
||||
const canceled = part.canceled === true;
|
||||
if (part.state === 'resolved') {
|
||||
output = part.output;
|
||||
// A failed delegation resolves like any other tool, so detect it
|
||||
// from the output and render it as an error to match the live run.
|
||||
state = isFailedDelegateOutput(part.toolName, part.output)
|
||||
? TOOL_CALL_STATE.ERROR
|
||||
: TOOL_CALL_STATE.DONE;
|
||||
if (canceled) {
|
||||
state = TOOL_CALL_STATE.CANCELLED;
|
||||
} else if (isFailedDelegateOutput(part.toolName, part.output)) {
|
||||
state = TOOL_CALL_STATE.ERROR;
|
||||
} else {
|
||||
state = TOOL_CALL_STATE.DONE;
|
||||
}
|
||||
} else if (part.state === 'rejected') {
|
||||
state = TOOL_CALL_STATE.ERROR;
|
||||
output = part.error;
|
||||
|
|
@ -310,6 +316,7 @@ export function convertDbMessages(dbMessages: AgentPersistedMessageDto[]): ChatM
|
|||
toolCallId: part.toolCallId ?? '',
|
||||
input: part.input,
|
||||
...(output !== undefined && { output }),
|
||||
...(canceled && { canceled }),
|
||||
state,
|
||||
...(part.startTime !== undefined && { startTime: part.startTime }),
|
||||
...(part.endTime !== undefined && { endTime: part.endTime }),
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import type {
|
|||
AgentBuilderOpenSuspension,
|
||||
AgentPersistedMessageDto,
|
||||
AgentSseEvent,
|
||||
CancellationResumeData,
|
||||
} from '@n8n/api-types';
|
||||
import { useToast } from '@/app/composables/useToast';
|
||||
import {
|
||||
|
|
@ -47,6 +48,19 @@ export interface UseAgentChatStreamParams {
|
|||
onHistoryLoaded?: (count: number) => void;
|
||||
}
|
||||
|
||||
type ResumePayload =
|
||||
| {
|
||||
runId: string;
|
||||
toolCallId: string;
|
||||
resumeData: unknown;
|
||||
}
|
||||
| {
|
||||
runId: string;
|
||||
toolCallId: string;
|
||||
cancelled: true;
|
||||
text: string;
|
||||
};
|
||||
|
||||
export function useAgentChatStream(params: UseAgentChatStreamParams) {
|
||||
const rootStore = useRootStore();
|
||||
const locale = useI18n();
|
||||
|
|
@ -264,7 +278,8 @@ export function useAgentChatStream(params: UseAgentChatStreamParams) {
|
|||
);
|
||||
if (
|
||||
existing.state !== TOOL_CALL_STATE.RUNNING &&
|
||||
existing.state !== TOOL_CALL_STATE.DONE
|
||||
existing.state !== TOOL_CALL_STATE.DONE &&
|
||||
existing.state !== TOOL_CALL_STATE.CANCELLED
|
||||
) {
|
||||
existing.state = TOOL_CALL_STATE.PENDING;
|
||||
}
|
||||
|
|
@ -277,7 +292,11 @@ export function useAgentChatStream(params: UseAgentChatStreamParams) {
|
|||
const found = findToolCallById(event.toolCallId);
|
||||
if (found) {
|
||||
found.tc.startTime = event.startTime;
|
||||
if (found.tc.state !== TOOL_CALL_STATE.DONE && found.tc.state !== TOOL_CALL_STATE.ERROR) {
|
||||
if (
|
||||
found.tc.state !== TOOL_CALL_STATE.DONE &&
|
||||
found.tc.state !== TOOL_CALL_STATE.ERROR &&
|
||||
found.tc.state !== TOOL_CALL_STATE.CANCELLED
|
||||
) {
|
||||
found.tc.state = TOOL_CALL_STATE.RUNNING;
|
||||
}
|
||||
}
|
||||
|
|
@ -305,9 +324,15 @@ export function useAgentChatStream(params: UseAgentChatStreamParams) {
|
|||
case 'tool-result': {
|
||||
const found = findToolCallById(event.toolCallId);
|
||||
if (found) {
|
||||
const toolResultEvent = event as typeof event & { canceled?: boolean };
|
||||
found.tc.output = event.output;
|
||||
const failed = event.isError || isFailedDelegateOutput(found.tc.tool, event.output);
|
||||
found.tc.state = failed ? TOOL_CALL_STATE.ERROR : TOOL_CALL_STATE.DONE;
|
||||
found.tc.state = failed
|
||||
? TOOL_CALL_STATE.ERROR
|
||||
: toolResultEvent.canceled === true
|
||||
? TOOL_CALL_STATE.CANCELLED
|
||||
: TOOL_CALL_STATE.DONE;
|
||||
found.tc.canceled = toolResultEvent.canceled === true;
|
||||
found.tc.displaySummary = summariseToolCall(found.tc.tool, event.output, found.tc.input);
|
||||
// If this was an interactive tool call, the result IS the user's
|
||||
// resume payload — refresh the card so it flips to its resolved
|
||||
|
|
@ -425,10 +450,6 @@ export function useAgentChatStream(params: UseAgentChatStreamParams) {
|
|||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Run a request against a build/chat/resume endpoint
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
function finalizeStream(session: StreamSession): void {
|
||||
for (const msg of session.minted) {
|
||||
if (msg.status === CHAT_MESSAGE_STATUS.STREAMING) msg.status = CHAT_MESSAGE_STATUS.SUCCESS;
|
||||
|
|
@ -511,63 +532,102 @@ export function useAgentChatStream(params: UseAgentChatStreamParams) {
|
|||
await postAndConsume(url, body);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume a suspended build interaction. Posts to the build/resume endpoint
|
||||
* and re-enters the same SSE handler. The `runId` is required — it comes
|
||||
* from the original `tool-call-suspended` chunk (live) or from the
|
||||
* `openSuspensions` sidecar applied during history reload.
|
||||
*/
|
||||
async function resume(payload: {
|
||||
runId: string;
|
||||
toolCallId: string;
|
||||
resumeData: unknown;
|
||||
}): Promise<void> {
|
||||
// Optimistic update — the backend emits a matching `tool-result` on the
|
||||
// resume stream, but that arrives only after round-trip. Flipping state
|
||||
// here stops the spinner/clock indicator and disables the card so the
|
||||
// user sees immediate feedback on submit.
|
||||
//
|
||||
// Snapshot the pre-flight state so we can roll back if the resume POST
|
||||
// or the SSE stream fails. Otherwise a transport/expired-checkpoint
|
||||
// error would leave the card permanently disabled and the user with
|
||||
// no way to retry.
|
||||
async function resume(payload: ResumePayload): Promise<void> {
|
||||
const isCancellation = 'cancelled' in payload;
|
||||
const text = isCancellation ? payload.text.trim() : '';
|
||||
if (isCancellation && !text) return;
|
||||
|
||||
const found = findToolCallById(payload.toolCallId);
|
||||
const snapshot = found
|
||||
? {
|
||||
tc: found.tc,
|
||||
prevState: found.tc.state,
|
||||
prevOutput: found.tc.output,
|
||||
prevCanceled: found.tc.canceled,
|
||||
prevSummary: found.tc.displaySummary,
|
||||
msg: found.msg,
|
||||
prevStatus: found.msg.status,
|
||||
prevInteractive: found.msg.interactive,
|
||||
}
|
||||
: null;
|
||||
let optimisticUserMessageId: string | undefined;
|
||||
|
||||
if (found) {
|
||||
found.tc.state = TOOL_CALL_STATE.DONE;
|
||||
found.tc.output = payload.resumeData;
|
||||
found.tc.displaySummary = summariseToolCall(
|
||||
found.tc.tool,
|
||||
payload.resumeData,
|
||||
found.tc.input,
|
||||
);
|
||||
const updated = rebuildInteractiveFromHistory(found.tc);
|
||||
if (updated) found.msg.interactive = updated;
|
||||
if (isCancellation) {
|
||||
found.tc.state = TOOL_CALL_STATE.CANCELLED;
|
||||
found.tc.canceled = true;
|
||||
if (found.msg.interactive) {
|
||||
found.msg.interactive = {
|
||||
...found.msg.interactive,
|
||||
resolvedAt: Date.now(),
|
||||
cancelled: true,
|
||||
};
|
||||
}
|
||||
} else {
|
||||
found.tc.state = TOOL_CALL_STATE.DONE;
|
||||
found.tc.canceled = false;
|
||||
found.tc.output = payload.resumeData;
|
||||
found.tc.displaySummary = summariseToolCall(
|
||||
found.tc.tool,
|
||||
payload.resumeData,
|
||||
found.tc.input,
|
||||
);
|
||||
const updated = rebuildInteractiveFromHistory(found.tc);
|
||||
if (updated) found.msg.interactive = updated;
|
||||
}
|
||||
if (found.msg.status === CHAT_MESSAGE_STATUS.AWAITING_USER)
|
||||
found.msg.status = CHAT_MESSAGE_STATUS.SUCCESS;
|
||||
}
|
||||
|
||||
const resumeData: unknown = isCancellation
|
||||
? ({
|
||||
_type: 'agent.cancellation',
|
||||
message: text,
|
||||
} satisfies CancellationResumeData)
|
||||
: payload.resumeData;
|
||||
|
||||
if (isCancellation) {
|
||||
optimisticUserMessageId = crypto.randomUUID();
|
||||
fatalError.value = null;
|
||||
messages.value.push({
|
||||
id: optimisticUserMessageId,
|
||||
role: 'user',
|
||||
content: text,
|
||||
status: 'success',
|
||||
});
|
||||
}
|
||||
|
||||
const { baseUrl } = rootStore.restApiContext;
|
||||
const url = `${baseUrl}/projects/${params.projectId.value}/agents/v2/${params.agentId.value}/build/resume`;
|
||||
const { ok } = await postAndConsume(url, payload);
|
||||
const { ok } = await postAndConsume(url, {
|
||||
runId: payload.runId,
|
||||
toolCallId: payload.toolCallId,
|
||||
resumeData,
|
||||
});
|
||||
|
||||
if (!ok && snapshot) {
|
||||
snapshot.tc.state = snapshot.prevState;
|
||||
snapshot.tc.output = snapshot.prevOutput;
|
||||
snapshot.tc.canceled = snapshot.prevCanceled;
|
||||
snapshot.tc.displaySummary = snapshot.prevSummary;
|
||||
snapshot.msg.status = snapshot.prevStatus;
|
||||
snapshot.msg.interactive = snapshot.prevInteractive;
|
||||
}
|
||||
if (!ok && optimisticUserMessageId) {
|
||||
messages.value = messages.value.filter((m) => m.id !== optimisticUserMessageId);
|
||||
}
|
||||
}
|
||||
|
||||
async function cancelAndSteer(text: string): Promise<void> {
|
||||
const openMsg = messages.value.find((m) => m.interactive && !m.interactive.resolvedAt);
|
||||
if (!openMsg?.interactive?.runId) return;
|
||||
|
||||
await resume({
|
||||
runId: openMsg.interactive.runId,
|
||||
toolCallId: openMsg.interactive.toolCallId,
|
||||
cancelled: true,
|
||||
text,
|
||||
});
|
||||
}
|
||||
|
||||
async function sendMessage(text: string): Promise<void> {
|
||||
|
|
@ -602,6 +662,7 @@ export function useAgentChatStream(params: UseAgentChatStreamParams) {
|
|||
sendMessage,
|
||||
stopGenerating,
|
||||
resume,
|
||||
cancelAndSteer,
|
||||
dismissFatalError,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ export const TOOL_CALL_STATE = {
|
|||
RUNNING: 'running',
|
||||
SUSPENDED: 'suspended',
|
||||
DONE: 'done',
|
||||
CANCELLED: 'cancelled',
|
||||
ERROR: 'error',
|
||||
} as const;
|
||||
export type ToolCallState = (typeof TOOL_CALL_STATE)[keyof typeof TOOL_CALL_STATE];
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user