mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-01 09:17:08 +02:00
feat(agents): Add deferred tool loading (no-changelog) (#30466)
This commit is contained in:
parent
aabc6f1697
commit
8e709ff2cf
|
|
@ -0,0 +1,134 @@
|
|||
import { expect, it } from 'vitest';
|
||||
import { z } from 'zod';
|
||||
|
||||
import {
|
||||
chunksOfType,
|
||||
collectStreamChunks,
|
||||
collectTextDeltas,
|
||||
describeIf,
|
||||
getModel,
|
||||
} from './helpers';
|
||||
import { Agent, Tool } from '../../index';
|
||||
|
||||
const describe = describeIf('anthropic');
|
||||
|
||||
describe('deferred tools integration', () => {
|
||||
it('searches, loads, and uses a deferred tool in generate mode', async () => {
|
||||
const multiplyTool = new Tool('multiply_numbers')
|
||||
.description('Multiply two numbers and return their product')
|
||||
.input(
|
||||
z.object({
|
||||
a: z.number().describe('First number'),
|
||||
b: z.number().describe('Second number'),
|
||||
}),
|
||||
)
|
||||
.output(z.object({ product: z.number() }))
|
||||
.handler(async ({ a, b }) => ({ product: a * b }));
|
||||
|
||||
const agent = new Agent('deferred-generate-test')
|
||||
.model(getModel('anthropic'))
|
||||
.instructions(
|
||||
'You are testing deferred tool discovery. For arithmetic requests, first call search_tools, then call load_tool with the exact returned tool name, then call the loaded tool. Do not answer from mental math.',
|
||||
)
|
||||
.deferredTool(multiplyTool, { search: { topK: 2 } });
|
||||
|
||||
const result = await agent.generate(
|
||||
'Use deferred tools to multiply 6 and 7. Search for the tool, load it, call it, then answer with the product.',
|
||||
);
|
||||
|
||||
const toolNames = result.toolCalls?.map((toolCall) => toolCall.tool) ?? [];
|
||||
expect(toolNames).toEqual(
|
||||
expect.arrayContaining(['search_tools', 'load_tool', 'multiply_numbers']),
|
||||
);
|
||||
|
||||
const multiplyCall = result.toolCalls?.find((toolCall) => toolCall.tool === 'multiply_numbers');
|
||||
expect(multiplyCall?.output).toEqual({ product: 42 });
|
||||
});
|
||||
|
||||
it('searches, loads, and uses a deferred tool in stream mode', async () => {
|
||||
const countCharactersTool = new Tool('count_characters')
|
||||
.description('Count the characters in a text string')
|
||||
.input(z.object({ text: z.string().describe('Text to count') }))
|
||||
.output(z.object({ length: z.number() }))
|
||||
.handler(async ({ text }) => ({ length: text.length }));
|
||||
|
||||
const agent = new Agent('deferred-stream-test')
|
||||
.model(getModel('anthropic'))
|
||||
.instructions(
|
||||
'You are testing deferred tool discovery. For character counting requests, first call search_tools, then call load_tool with the exact returned tool name, then call the loaded tool. Do not count manually.',
|
||||
)
|
||||
.deferredTool(countCharactersTool);
|
||||
|
||||
const { stream } = await agent.stream(
|
||||
'Use deferred tools to count the characters in the text "n8n". Search for the tool, load it, call it, then answer with the length.',
|
||||
);
|
||||
|
||||
const chunks = await collectStreamChunks(stream);
|
||||
const toolResults = chunksOfType(chunks, 'tool-result');
|
||||
const toolNames = toolResults.map((toolResult) => toolResult.toolName);
|
||||
|
||||
expect(toolNames).toEqual(
|
||||
expect.arrayContaining(['search_tools', 'load_tool', 'count_characters']),
|
||||
);
|
||||
|
||||
const countResult = toolResults.find(
|
||||
(toolResult) => toolResult.toolName === 'count_characters',
|
||||
);
|
||||
expect(countResult?.output).toEqual({ length: 3 });
|
||||
expect(collectTextDeltas(chunks)).toMatch(/3/);
|
||||
});
|
||||
|
||||
it('resumes a suspended deferred tool after loading it', async () => {
|
||||
const deleteTool = new Tool('delete_temp_file')
|
||||
.description('Delete a temporary file at a requested path after approval')
|
||||
.input(z.object({ path: z.string().describe('Temporary file path to delete') }))
|
||||
.output(z.object({ deleted: z.boolean(), path: z.string() }))
|
||||
.suspend(z.object({ message: z.string(), severity: z.string() }))
|
||||
.resume(z.object({ approved: z.boolean() }))
|
||||
.handler(async ({ path }, ctx) => {
|
||||
if (!ctx.resumeData) {
|
||||
return await ctx.suspend({
|
||||
message: `Delete temporary file "${path}"?`,
|
||||
severity: 'destructive',
|
||||
});
|
||||
}
|
||||
|
||||
if (!ctx.resumeData.approved) return { deleted: false, path };
|
||||
return { deleted: true, path };
|
||||
});
|
||||
|
||||
const agent = new Agent('deferred-suspend-test')
|
||||
.model(getModel('anthropic'))
|
||||
.instructions(
|
||||
'You are testing deferred tool discovery with approval. When asked to delete a temporary file, first call search_tools, then call load_tool with the exact returned tool name, then call delete_temp_file. Do not skip the tool.',
|
||||
)
|
||||
.deferredTool(deleteTool)
|
||||
.checkpoint('memory');
|
||||
|
||||
const firstResult = await agent.generate(
|
||||
'Use deferred tools to delete /tmp/deferred-tool-test.txt. Search for the tool, load it, and call it with that exact path.',
|
||||
);
|
||||
|
||||
expect(firstResult.pendingSuspend).toHaveLength(1);
|
||||
expect(firstResult.pendingSuspend![0].toolName).toBe('delete_temp_file');
|
||||
|
||||
const resumeResult = await agent.resume(
|
||||
'generate',
|
||||
{ approved: true },
|
||||
{
|
||||
runId: firstResult.pendingSuspend![0].runId,
|
||||
toolCallId: firstResult.pendingSuspend![0].toolCallId,
|
||||
},
|
||||
);
|
||||
|
||||
expect(resumeResult.finishReason).not.toBe('error');
|
||||
expect(resumeResult.toolCalls).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
tool: 'delete_temp_file',
|
||||
output: { deleted: true, path: '/tmp/deferred-tool-test.txt' },
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
@ -760,6 +760,205 @@ function makeGenerateWithToolCalls(
|
|||
};
|
||||
}
|
||||
|
||||
describe('AgentRuntime — deferred tool loading', () => {
|
||||
beforeEach(() => {
|
||||
generateText.mockReset();
|
||||
streamText.mockReset();
|
||||
});
|
||||
|
||||
it('searches and loads deferred tools into the next generate iteration', async () => {
|
||||
const coreTool = makeMockTool('core_tool', async () => await Promise.resolve({ ok: true }));
|
||||
const deferredTool = makeMockTool(
|
||||
'deferred_capability',
|
||||
async () => await Promise.resolve({ ok: true }),
|
||||
);
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'You are a test assistant.',
|
||||
tools: [coreTool],
|
||||
deferredTools: [deferredTool],
|
||||
});
|
||||
|
||||
generateText
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{
|
||||
toolCallId: 'tc-search',
|
||||
toolName: 'search_tools',
|
||||
args: { query: 'deferred capability' },
|
||||
},
|
||||
]),
|
||||
)
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{
|
||||
toolCallId: 'tc-load',
|
||||
toolName: 'load_tool',
|
||||
args: { toolName: 'deferred_capability' },
|
||||
},
|
||||
]),
|
||||
)
|
||||
.mockResolvedValueOnce(makeGenerateSuccess('ready'));
|
||||
|
||||
const result = await runtime.generate('need the deferred capability');
|
||||
|
||||
expect(generateText).toHaveBeenCalledTimes(3);
|
||||
|
||||
const searchCall = result.toolCalls?.find((toolCall) => toolCall.tool === 'search_tools');
|
||||
expect(searchCall?.output).toEqual({
|
||||
results: [
|
||||
{
|
||||
name: 'deferred_capability',
|
||||
description: 'Mock tool deferred_capability',
|
||||
loaded: false,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const loadCall = result.toolCalls?.find((toolCall) => toolCall.tool === 'load_tool');
|
||||
expect(loadCall?.output).toEqual({
|
||||
status: 'loaded',
|
||||
toolName: 'deferred_capability',
|
||||
tool: {
|
||||
name: 'deferred_capability',
|
||||
description: 'Mock tool deferred_capability',
|
||||
loaded: true,
|
||||
},
|
||||
message: 'Tool "deferred_capability" is loaded and will be available on the next model turn.',
|
||||
});
|
||||
|
||||
const generateTextCalls = generateText.mock.calls as Array<
|
||||
[{ tools: Record<string, unknown> }]
|
||||
>;
|
||||
const firstCall = generateTextCalls[0][0];
|
||||
const firstTools = Object.keys(firstCall.tools);
|
||||
expect(firstTools).toEqual(expect.arrayContaining(['core_tool', 'search_tools', 'load_tool']));
|
||||
expect(firstTools).not.toContain('deferred_capability');
|
||||
|
||||
const secondTools = Object.keys(generateTextCalls[1][0].tools);
|
||||
expect(secondTools).toEqual(expect.arrayContaining(['core_tool', 'search_tools', 'load_tool']));
|
||||
expect(secondTools).not.toContain('deferred_capability');
|
||||
|
||||
const thirdTools = Object.keys(generateTextCalls[2][0].tools);
|
||||
expect(thirdTools).toEqual(
|
||||
expect.arrayContaining(['core_tool', 'search_tools', 'load_tool', 'deferred_capability']),
|
||||
);
|
||||
});
|
||||
|
||||
it('does not leak loaded deferred tools into the next generate run', async () => {
|
||||
const coreTool = makeMockTool('core_tool', async () => await Promise.resolve({ ok: true }));
|
||||
const deferredTool = makeMockTool(
|
||||
'deferred_capability',
|
||||
async () => await Promise.resolve({ ok: true }),
|
||||
);
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'You are a test assistant.',
|
||||
tools: [coreTool],
|
||||
deferredTools: [deferredTool],
|
||||
});
|
||||
|
||||
generateText
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{
|
||||
toolCallId: 'tc-load',
|
||||
toolName: 'load_tool',
|
||||
args: { toolName: 'deferred_capability' },
|
||||
},
|
||||
]),
|
||||
)
|
||||
.mockResolvedValueOnce(makeGenerateSuccess('first done'));
|
||||
|
||||
await runtime.generate('load the deferred capability');
|
||||
|
||||
generateText.mockClear();
|
||||
generateText.mockResolvedValueOnce(makeGenerateSuccess('second done'));
|
||||
|
||||
await runtime.generate('start a fresh run');
|
||||
|
||||
const generateTextCalls = generateText.mock.calls as Array<
|
||||
[{ tools: Record<string, unknown> }]
|
||||
>;
|
||||
const freshRunTools = Object.keys(generateTextCalls[0][0].tools);
|
||||
expect(freshRunTools).toEqual(
|
||||
expect.arrayContaining(['core_tool', 'search_tools', 'load_tool']),
|
||||
);
|
||||
expect(freshRunTools).not.toContain('deferred_capability');
|
||||
});
|
||||
|
||||
it('resumes a suspended deferred tool after it has been loaded', async () => {
|
||||
const deferredTool = makeSuspendingTool('deferred_approval', async (input, ctx) => {
|
||||
if (!ctx.resumeData) {
|
||||
return await ctx.suspend({ reason: 'approve deferred action?' });
|
||||
}
|
||||
|
||||
const resumeData = ctx.resumeData as { approved: boolean };
|
||||
return {
|
||||
approved: resumeData.approved,
|
||||
value: (input as { value?: string }).value,
|
||||
};
|
||||
});
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'You are a test assistant.',
|
||||
deferredTools: [deferredTool],
|
||||
checkpointStorage: 'memory',
|
||||
});
|
||||
|
||||
generateText
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{
|
||||
toolCallId: 'tc-load',
|
||||
toolName: 'load_tool',
|
||||
args: { toolName: 'deferred_approval' },
|
||||
},
|
||||
]),
|
||||
)
|
||||
.mockResolvedValueOnce(
|
||||
makeGenerateWithToolCalls([
|
||||
{
|
||||
toolCallId: 'tc-deferred',
|
||||
toolName: 'deferred_approval',
|
||||
args: { value: 'needs approval' },
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
const firstResult = await runtime.generate('load and run the deferred approval tool');
|
||||
|
||||
expect(firstResult.finishReason).toBe('tool-calls');
|
||||
expect(firstResult.pendingSuspend).toHaveLength(1);
|
||||
expect(firstResult.pendingSuspend![0].toolName).toBe('deferred_approval');
|
||||
|
||||
generateText.mockResolvedValueOnce(makeGenerateSuccess('approved'));
|
||||
|
||||
const resumeResult = await runtime.resume(
|
||||
'generate',
|
||||
{ approved: true },
|
||||
{
|
||||
runId: firstResult.pendingSuspend![0].runId,
|
||||
toolCallId: firstResult.pendingSuspend![0].toolCallId,
|
||||
},
|
||||
);
|
||||
|
||||
expect(resumeResult.finishReason).toBe('stop');
|
||||
expect(resumeResult.toolCalls).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
tool: 'deferred_approval',
|
||||
output: { approved: true, value: 'needs approval' },
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('AgentRuntime — concurrent tool execution', () => {
|
||||
beforeEach(() => {
|
||||
generateText.mockReset();
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import type {
|
|||
XaiThinkingConfig,
|
||||
} from '../types';
|
||||
import { BackgroundTaskTracker } from './background-task-tracker';
|
||||
import { DeferredToolManager } from './deferred-tool-manager';
|
||||
import { AgentEventBus } from './event-bus';
|
||||
import { toJsonValue } from './json-value';
|
||||
import { saveMessagesToThread } from './memory-store';
|
||||
|
|
@ -163,6 +164,10 @@ export interface AgentRuntimeConfig {
|
|||
instructions: string;
|
||||
instructionProviderOptions?: ProviderOptions;
|
||||
tools?: BuiltTool[];
|
||||
deferredTools?: BuiltTool[];
|
||||
toolSearch?: {
|
||||
topK?: number;
|
||||
};
|
||||
providerTools?: BuiltProviderTool[];
|
||||
memory?: BuiltMemory;
|
||||
lastMessages?: number;
|
||||
|
|
@ -315,10 +320,15 @@ export class AgentRuntime {
|
|||
|
||||
private observationTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
private deferredToolManager: DeferredToolManager | undefined;
|
||||
|
||||
/** Resolved telemetry for the current run (own config or inherited from parent). */
|
||||
|
||||
constructor(config: AgentRuntimeConfig) {
|
||||
this.config = config;
|
||||
if (config.deferredTools && config.deferredTools.length > 0) {
|
||||
this.deferredToolManager = new DeferredToolManager(config.deferredTools, config.toolSearch);
|
||||
}
|
||||
this.runState = new RunStateManager(config.checkpointStorage);
|
||||
this.eventBus = config.eventBus ?? new AgentEventBus();
|
||||
this.currentState = {
|
||||
|
|
@ -447,7 +457,10 @@ export class AgentRuntime {
|
|||
const toolCall = state.pendingToolCalls[options.toolCallId];
|
||||
if (!toolCall) throw new Error(`No tool call found for toolCallId: ${options.toolCallId}`);
|
||||
|
||||
const tool = this.config.tools?.find((t) => t.name === toolCall.toolName);
|
||||
const list = AgentMessageList.deserialize(state.messageList);
|
||||
this.hydrateDeferredToolsFromList(list);
|
||||
|
||||
const tool = this.getCurrentTools().find((t) => t.name === toolCall.toolName);
|
||||
if (!tool) throw new Error(`Tool ${toolCall.toolName} not found`);
|
||||
|
||||
let resumeData: unknown = data;
|
||||
|
|
@ -460,8 +473,6 @@ export class AgentRuntime {
|
|||
}
|
||||
|
||||
try {
|
||||
const list = AgentMessageList.deserialize(state.messageList);
|
||||
|
||||
// Merge persisted execution options with fresh caller options
|
||||
const { runId: _rid, toolCallId: _tcid, ...callerExecOptions } = options;
|
||||
const persisted = state.executionOptions ?? {};
|
||||
|
|
@ -913,15 +924,7 @@ export class AgentRuntime {
|
|||
/** Core generate loop using generateText (non-streaming). */
|
||||
private async runGenerateLoop(ctx: LoopContext): Promise<GenerateResult> {
|
||||
const { list, options, runId, pendingResume } = ctx;
|
||||
const {
|
||||
model,
|
||||
toolMap,
|
||||
aiTools,
|
||||
providerOptions,
|
||||
hasTools,
|
||||
outputSpec,
|
||||
effectiveInstructions,
|
||||
} = this.buildLoopContext({ ...options, persistence: options?.persistence });
|
||||
this.hydrateDeferredToolsFromList(list);
|
||||
|
||||
let totalUsage: TokenUsage | undefined;
|
||||
let lastFinishReason: FinishReason = 'stop';
|
||||
|
|
@ -931,8 +934,13 @@ export class AgentRuntime {
|
|||
|
||||
// Resolve pending tool calls from a resumed run before the first LLM call.
|
||||
const runTelemetry = this.resolveTelemetry(options);
|
||||
const toolCtx: ToolBatchContext = {
|
||||
toolMap,
|
||||
const staticLoopContext = this.buildStaticLoopContext({
|
||||
...options,
|
||||
persistence: options?.persistence,
|
||||
});
|
||||
const pendingLoopContext = this.buildToolLoopContext(staticLoopContext.aiProviderTools);
|
||||
const pendingToolCtx: ToolBatchContext = {
|
||||
toolMap: pendingLoopContext.toolMap,
|
||||
list,
|
||||
runId,
|
||||
telemetry: runTelemetry,
|
||||
|
|
@ -940,7 +948,10 @@ export class AgentRuntime {
|
|||
};
|
||||
|
||||
if (pendingResume) {
|
||||
const batch = await this.iteratePendingToolCallsConcurrent({ ...toolCtx, pendingResume });
|
||||
const batch = await this.iteratePendingToolCallsConcurrent({
|
||||
...pendingToolCtx,
|
||||
pendingResume,
|
||||
});
|
||||
|
||||
for (const r of batch.results) {
|
||||
toolCallSummary.push(r.toolEntry);
|
||||
|
|
@ -981,15 +992,19 @@ export class AgentRuntime {
|
|||
|
||||
this.eventBus.emit({ type: AgentEvent.TurnStart });
|
||||
|
||||
const { toolMap, aiTools, hasTools, effectiveInstructions } = this.buildToolLoopContext(
|
||||
staticLoopContext.aiProviderTools,
|
||||
);
|
||||
|
||||
const result = await generateText({
|
||||
model,
|
||||
model: staticLoopContext.model,
|
||||
messages: list.forLlm(effectiveInstructions, this.config.instructionProviderOptions),
|
||||
abortSignal: this.eventBus.signal,
|
||||
...(hasTools ? { tools: aiTools } : {}),
|
||||
...(providerOptions
|
||||
? { providerOptions: providerOptions as Record<string, JSONObject> }
|
||||
...(staticLoopContext.providerOptions
|
||||
? { providerOptions: staticLoopContext.providerOptions as Record<string, JSONObject> }
|
||||
: {}),
|
||||
...(outputSpec ? { output: outputSpec } : {}),
|
||||
...(staticLoopContext.outputSpec ? { output: staticLoopContext.outputSpec } : {}),
|
||||
...this.buildTelemetryOptions(options),
|
||||
});
|
||||
|
||||
|
|
@ -1004,7 +1019,7 @@ export class AgentRuntime {
|
|||
list.addResponse(newMessages);
|
||||
|
||||
if (aiFinishReason !== 'tool-calls') {
|
||||
if (outputSpec) {
|
||||
if (staticLoopContext.outputSpec) {
|
||||
structuredOutput = result.output;
|
||||
}
|
||||
this.emitTurnEnd(newMessages, extractSettledToolCalls(newMessages));
|
||||
|
|
@ -1012,7 +1027,11 @@ export class AgentRuntime {
|
|||
}
|
||||
|
||||
const batch = await this.iterateToolCallsConcurrent({
|
||||
...toolCtx,
|
||||
toolMap,
|
||||
list,
|
||||
runId,
|
||||
telemetry: runTelemetry,
|
||||
executionCounter: options?.executionCounter,
|
||||
toolCalls: result.toolCalls,
|
||||
});
|
||||
|
||||
|
|
@ -1141,15 +1160,7 @@ export class AgentRuntime {
|
|||
ctx: LoopContext & { writer: WritableStreamDefaultWriter<StreamChunk> },
|
||||
): Promise<void> {
|
||||
const { list, options, runId, pendingResume, writer } = ctx;
|
||||
const {
|
||||
model,
|
||||
toolMap,
|
||||
aiTools,
|
||||
providerOptions,
|
||||
hasTools,
|
||||
outputSpec,
|
||||
effectiveInstructions,
|
||||
} = this.buildLoopContext({ ...options, persistence: options?.persistence });
|
||||
this.hydrateDeferredToolsFromList(list);
|
||||
|
||||
const writeChunk = async (chunk: StreamChunk): Promise<void> => {
|
||||
await writer.write(chunk);
|
||||
|
|
@ -1177,8 +1188,13 @@ export class AgentRuntime {
|
|||
|
||||
// Resolve pending tool calls from a resumed run before the first LLM call.
|
||||
const runTelemetry = this.resolveTelemetry(options);
|
||||
const toolCtx: ToolBatchContext = {
|
||||
toolMap,
|
||||
const staticLoopContext = this.buildStaticLoopContext({
|
||||
...options,
|
||||
persistence: options?.persistence,
|
||||
});
|
||||
const pendingLoopContext = this.buildToolLoopContext(staticLoopContext.aiProviderTools);
|
||||
const pendingToolCtx: ToolBatchContext = {
|
||||
toolMap: pendingLoopContext.toolMap,
|
||||
list,
|
||||
runId,
|
||||
telemetry: runTelemetry,
|
||||
|
|
@ -1187,7 +1203,7 @@ export class AgentRuntime {
|
|||
if (pendingResume) {
|
||||
try {
|
||||
const batch = await this.iteratePendingToolCallsConcurrent({
|
||||
...toolCtx,
|
||||
...pendingToolCtx,
|
||||
pendingResume,
|
||||
});
|
||||
|
||||
|
|
@ -1248,16 +1264,19 @@ export class AgentRuntime {
|
|||
if (await handleAbort()) return;
|
||||
|
||||
this.eventBus.emit({ type: AgentEvent.TurnStart });
|
||||
const { toolMap, aiTools, hasTools, effectiveInstructions } = this.buildToolLoopContext(
|
||||
staticLoopContext.aiProviderTools,
|
||||
);
|
||||
const messages = list.forLlm(effectiveInstructions, this.config.instructionProviderOptions);
|
||||
const result = streamText({
|
||||
model,
|
||||
model: staticLoopContext.model,
|
||||
messages,
|
||||
abortSignal: this.eventBus.signal,
|
||||
...(hasTools ? { tools: aiTools } : {}),
|
||||
...(providerOptions
|
||||
? { providerOptions: providerOptions as Record<string, JSONObject> }
|
||||
...(staticLoopContext.providerOptions
|
||||
? { providerOptions: staticLoopContext.providerOptions as Record<string, JSONObject> }
|
||||
: {}),
|
||||
...(outputSpec ? { output: outputSpec } : {}),
|
||||
...(staticLoopContext.outputSpec ? { output: staticLoopContext.outputSpec } : {}),
|
||||
...this.buildTelemetryOptions(options),
|
||||
});
|
||||
|
||||
|
|
@ -1301,7 +1320,7 @@ export class AgentRuntime {
|
|||
list.addResponse(newMessages);
|
||||
|
||||
if (aiFinishReason !== 'tool-calls') {
|
||||
if (outputSpec) {
|
||||
if (staticLoopContext.outputSpec) {
|
||||
structuredOutput = await result.output;
|
||||
}
|
||||
this.emitTurnEnd(newMessages, extractSettledToolCalls(newMessages));
|
||||
|
|
@ -1311,7 +1330,14 @@ export class AgentRuntime {
|
|||
const toolCalls = await result.toolCalls;
|
||||
|
||||
try {
|
||||
const batch = await this.iterateToolCallsConcurrent({ ...toolCtx, toolCalls });
|
||||
const batch = await this.iterateToolCallsConcurrent({
|
||||
toolMap,
|
||||
list,
|
||||
runId,
|
||||
telemetry: runTelemetry,
|
||||
executionCounter: options?.executionCounter,
|
||||
toolCalls,
|
||||
});
|
||||
|
||||
if (await handleAbort()) return;
|
||||
|
||||
|
|
@ -1985,28 +2011,55 @@ export class AgentRuntime {
|
|||
};
|
||||
}
|
||||
|
||||
/** Build common LLM call dependencies shared by both the generate and stream loops. */
|
||||
private buildLoopContext(
|
||||
/** Build run-stable LLM call dependencies shared by all iterations. */
|
||||
private buildStaticLoopContext(
|
||||
execOptions?: ExecutionOptions & { persistence?: AgentPersistenceOptions },
|
||||
) {
|
||||
const allUserTools = this.config.tools ?? [];
|
||||
const aiTools = toAiSdkTools(allUserTools);
|
||||
const aiProviderTools = toAiSdkProviderTools(this.config.providerTools);
|
||||
const allTools = { ...aiTools, ...aiProviderTools };
|
||||
const model = createModel(this.config.model);
|
||||
return {
|
||||
model,
|
||||
toolMap: buildToolMap(allUserTools),
|
||||
aiTools: allTools,
|
||||
aiProviderTools,
|
||||
providerOptions: this.buildCallProviderOptions(execOptions?.providerOptions),
|
||||
hasTools: Object.keys(allTools).length > 0,
|
||||
outputSpec: this.config.structuredOutput
|
||||
? Output.object({ schema: this.config.structuredOutput })
|
||||
: undefined,
|
||||
effectiveInstructions: this.composeEffectiveInstructions(allUserTools),
|
||||
};
|
||||
}
|
||||
|
||||
/** Build the current local tool view; deferred loads can change this between iterations. */
|
||||
private buildToolLoopContext(aiProviderTools: ReturnType<typeof toAiSdkProviderTools>) {
|
||||
const allUserTools = this.getCurrentTools();
|
||||
const aiTools = toAiSdkTools(allUserTools);
|
||||
const allTools = { ...aiTools, ...aiProviderTools };
|
||||
const aiToolCount = Object.keys(allTools).length;
|
||||
const toolMap = buildToolMap(allUserTools);
|
||||
const effectiveInstructions = this.composeEffectiveInstructions(allUserTools);
|
||||
|
||||
return {
|
||||
toolMap,
|
||||
aiTools: allTools,
|
||||
hasTools: aiToolCount > 0,
|
||||
effectiveInstructions,
|
||||
};
|
||||
}
|
||||
|
||||
private getCurrentTools(): BuiltTool[] {
|
||||
const baseTools = this.config.tools ?? [];
|
||||
if (!this.deferredToolManager?.hasTools) return baseTools;
|
||||
|
||||
return [
|
||||
...baseTools,
|
||||
...this.deferredToolManager.getControllerTools(),
|
||||
...this.deferredToolManager.getLoadedTools(),
|
||||
];
|
||||
}
|
||||
|
||||
private hydrateDeferredToolsFromList(list: AgentMessageList): void {
|
||||
if (!this.deferredToolManager?.hasTools) return;
|
||||
this.deferredToolManager.hydrateLoadedToolsFromMessages(list.serialize().messages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge tool-attached `systemInstruction` fragments into the agent's
|
||||
* configured instructions. Fragments are wrapped in a single
|
||||
|
|
|
|||
247
packages/@n8n/agents/src/runtime/deferred-tool-manager.ts
Normal file
247
packages/@n8n/agents/src/runtime/deferred-tool-manager.ts
Normal file
|
|
@ -0,0 +1,247 @@
|
|||
import { z } from 'zod';
|
||||
|
||||
import type { AgentDbMessage, ContentToolCall } from '../types/sdk/message';
|
||||
import type { BuiltTool } from '../types/sdk/tool';
|
||||
|
||||
export const SEARCH_TOOLS_TOOL_NAME = 'search_tools';
|
||||
export const LOAD_TOOL_TOOL_NAME = 'load_tool';
|
||||
|
||||
const DEFAULT_TOP_K = 5;
|
||||
|
||||
const searchToolsInputSchema = z.object({
|
||||
query: z.string().min(1).describe('Keywords describing the capability or integration you need'),
|
||||
});
|
||||
|
||||
const toolSummarySchema = z.object({
|
||||
name: z.string(),
|
||||
description: z.string(),
|
||||
loaded: z.boolean(),
|
||||
});
|
||||
|
||||
const searchToolsOutputSchema = z.object({
|
||||
results: z.array(toolSummarySchema),
|
||||
});
|
||||
|
||||
const loadToolInputSchema = z.object({
|
||||
toolName: z.string().min(1).describe('Exact tool name returned by search_tools'),
|
||||
});
|
||||
|
||||
const loadToolOutputSchema = z.object({
|
||||
status: z.enum(['loaded', 'already_loaded', 'not_found']),
|
||||
toolName: z.string(),
|
||||
tool: toolSummarySchema.optional(),
|
||||
candidates: z.array(toolSummarySchema).optional(),
|
||||
message: z.string(),
|
||||
});
|
||||
|
||||
type SearchToolsOutput = z.infer<typeof searchToolsOutputSchema>;
|
||||
type ToolSummary = z.infer<typeof toolSummarySchema>;
|
||||
type LoadToolOutput = z.infer<typeof loadToolOutputSchema>;
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === 'object' && value !== null && !Array.isArray(value);
|
||||
}
|
||||
|
||||
function tokenize(value: string): Set<string> {
|
||||
return new Set(
|
||||
value
|
||||
.toLowerCase()
|
||||
.split(/[^a-z0-9_@./-]+/i)
|
||||
.map((token) => token.trim())
|
||||
.filter(Boolean),
|
||||
);
|
||||
}
|
||||
|
||||
function scoreTool(tool: BuiltTool, query: string): number {
|
||||
const normalizedQuery = query.toLowerCase().trim();
|
||||
const queryTokens = tokenize(query);
|
||||
const name = tool.name.toLowerCase();
|
||||
const description = tool.description.toLowerCase();
|
||||
const nameTokens = tokenize(name);
|
||||
const descriptionTokens = tokenize(description);
|
||||
let score = 0;
|
||||
|
||||
if (name === normalizedQuery) score += 100;
|
||||
if (name.includes(normalizedQuery)) score += 40;
|
||||
if (description.includes(normalizedQuery)) score += 15;
|
||||
|
||||
for (const token of queryTokens) {
|
||||
if (nameTokens.has(token)) score += 20;
|
||||
if (name.includes(token)) score += 10;
|
||||
if (descriptionTokens.has(token)) score += 8;
|
||||
if (description.includes(token)) score += 4;
|
||||
}
|
||||
|
||||
return score;
|
||||
}
|
||||
|
||||
export interface DeferredToolManagerOptions {
|
||||
topK?: number;
|
||||
}
|
||||
|
||||
export class DeferredToolManager {
|
||||
private readonly toolsByName = new Map<string, BuiltTool>();
|
||||
|
||||
private readonly loadedToolNames = new Set<string>();
|
||||
|
||||
private readonly topK: number;
|
||||
|
||||
private readonly searchTool: BuiltTool;
|
||||
|
||||
private readonly loadTool: BuiltTool;
|
||||
|
||||
constructor(tools: BuiltTool[], options: DeferredToolManagerOptions = {}) {
|
||||
for (const tool of tools) {
|
||||
if (tool.name === SEARCH_TOOLS_TOOL_NAME || tool.name === LOAD_TOOL_TOOL_NAME) {
|
||||
throw new Error(`Deferred tool name "${tool.name}" is reserved`);
|
||||
}
|
||||
if (this.toolsByName.has(tool.name)) {
|
||||
throw new Error(`Duplicate deferred tool name "${tool.name}"`);
|
||||
}
|
||||
this.toolsByName.set(tool.name, tool);
|
||||
}
|
||||
|
||||
this.topK = options.topK ?? DEFAULT_TOP_K;
|
||||
this.searchTool = this.createSearchTool();
|
||||
this.loadTool = this.createLoadTool();
|
||||
}
|
||||
|
||||
get hasTools(): boolean {
|
||||
return this.toolsByName.size > 0;
|
||||
}
|
||||
|
||||
get totalToolCount(): number {
|
||||
return this.toolsByName.size;
|
||||
}
|
||||
|
||||
get loadedToolCount(): number {
|
||||
return this.loadedToolNames.size;
|
||||
}
|
||||
|
||||
getControllerTools(): BuiltTool[] {
|
||||
if (!this.hasTools) return [];
|
||||
return [this.searchTool, this.loadTool];
|
||||
}
|
||||
|
||||
getLoadedTools(): BuiltTool[] {
|
||||
return Array.from(this.loadedToolNames)
|
||||
.map((name) => this.toolsByName.get(name))
|
||||
.filter((tool): tool is BuiltTool => tool !== undefined);
|
||||
}
|
||||
|
||||
hydrateLoadedToolsFromMessages(messages: AgentDbMessage[]): void {
|
||||
this.loadedToolNames.clear();
|
||||
|
||||
for (const message of messages) {
|
||||
if (!('content' in message) || !Array.isArray(message.content)) continue;
|
||||
for (const block of message.content) {
|
||||
if (!this.isSuccessfulLoadToolCall(block)) continue;
|
||||
const toolName = this.getLoadedToolNameFromOutput(block.output);
|
||||
if (toolName && this.toolsByName.has(toolName)) {
|
||||
this.loadedToolNames.add(toolName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private createSearchTool(): BuiltTool {
|
||||
return {
|
||||
name: SEARCH_TOOLS_TOOL_NAME,
|
||||
description:
|
||||
'Search for additional tools that can be loaded when the current toolset is missing a capability.',
|
||||
inputSchema: searchToolsInputSchema,
|
||||
outputSchema: searchToolsOutputSchema,
|
||||
handler: async (input) => {
|
||||
const { query } = searchToolsInputSchema.parse(input);
|
||||
return await Promise.resolve(this.search(query));
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private createLoadTool(): BuiltTool {
|
||||
return {
|
||||
name: LOAD_TOOL_TOOL_NAME,
|
||||
description:
|
||||
'Load a deferred tool by exact name. The tool becomes available on the next model turn and remains available for this conversation.',
|
||||
inputSchema: loadToolInputSchema,
|
||||
outputSchema: loadToolOutputSchema,
|
||||
handler: async (input) => {
|
||||
const { toolName } = loadToolInputSchema.parse(input);
|
||||
return await Promise.resolve(this.load(toolName));
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private search(query: string): SearchToolsOutput {
|
||||
const scored = Array.from(this.toolsByName.values())
|
||||
.map((tool) => ({
|
||||
tool,
|
||||
score: scoreTool(tool, query),
|
||||
}))
|
||||
.sort((left, right) => {
|
||||
if (left.score !== right.score) return right.score - left.score;
|
||||
return left.tool.name.localeCompare(right.tool.name);
|
||||
});
|
||||
|
||||
const positiveMatches = scored.filter(({ score }) => score > 0);
|
||||
const matches = positiveMatches.length > 0 ? positiveMatches : scored;
|
||||
|
||||
return {
|
||||
results: matches.slice(0, this.topK).map(({ tool }) => this.summarizeTool(tool)),
|
||||
};
|
||||
}
|
||||
|
||||
private load(toolName: string): LoadToolOutput {
|
||||
const tool = this.toolsByName.get(toolName);
|
||||
if (!tool) {
|
||||
return {
|
||||
status: 'not_found',
|
||||
toolName,
|
||||
candidates: this.search(toolName).results,
|
||||
message: `Tool "${toolName}" was not found. Use search_tools to find the exact tool name.`,
|
||||
};
|
||||
}
|
||||
|
||||
if (this.loadedToolNames.has(toolName)) {
|
||||
return {
|
||||
status: 'already_loaded',
|
||||
toolName,
|
||||
tool: this.summarizeTool(tool),
|
||||
message: `Tool "${toolName}" is already loaded.`,
|
||||
};
|
||||
}
|
||||
|
||||
this.loadedToolNames.add(toolName);
|
||||
return {
|
||||
status: 'loaded',
|
||||
toolName,
|
||||
tool: this.summarizeTool(tool),
|
||||
message: `Tool "${toolName}" is loaded and will be available on the next model turn.`,
|
||||
};
|
||||
}
|
||||
|
||||
private summarizeTool(tool: BuiltTool): ToolSummary {
|
||||
return {
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
loaded: this.loadedToolNames.has(tool.name),
|
||||
};
|
||||
}
|
||||
|
||||
private isSuccessfulLoadToolCall(
|
||||
block: unknown,
|
||||
): block is Extract<ContentToolCall, { state: 'resolved' }> {
|
||||
return (
|
||||
isRecord(block) &&
|
||||
block.type === 'tool-call' &&
|
||||
block.toolName === LOAD_TOOL_TOOL_NAME &&
|
||||
block.state === 'resolved'
|
||||
);
|
||||
}
|
||||
|
||||
private getLoadedToolNameFromOutput(output: unknown): string | undefined {
|
||||
if (!isRecord(output)) return undefined;
|
||||
if (output.status !== 'loaded' && output.status !== 'already_loaded') return undefined;
|
||||
return typeof output.toolName === 'string' ? output.toolName : undefined;
|
||||
}
|
||||
}
|
||||
|
|
@ -7,6 +7,7 @@ import { Memory } from './memory';
|
|||
import { Telemetry } from './telemetry';
|
||||
import { Tool, wrapToolForApproval } from './tool';
|
||||
import { AgentRuntime } from '../runtime/agent-runtime';
|
||||
import { LOAD_TOOL_TOOL_NAME, SEARCH_TOOLS_TOOL_NAME } from '../runtime/deferred-tool-manager';
|
||||
import { AgentEventBus } from '../runtime/event-bus';
|
||||
import { hasObservationStore } from '../runtime/observation-store';
|
||||
import {
|
||||
|
|
@ -50,6 +51,12 @@ const DEFAULT_LAST_MESSAGES = 10;
|
|||
|
||||
type ToolParameter = BuiltTool | { build(): BuiltTool };
|
||||
|
||||
interface DeferredToolOptions {
|
||||
search?: {
|
||||
topK?: number;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Lightweight read-only view of an agent's configured state.
|
||||
* Returned by `Agent.snapshot` for testing and debugging purposes.
|
||||
|
|
@ -61,7 +68,7 @@ export interface AgentSnapshot {
|
|||
model: { provider: string | null; name: string | null };
|
||||
/** Instruction text passed to `.instructions()`, or null if not set. */
|
||||
instructions: string | null;
|
||||
/** Minimal description of each registered tool. */
|
||||
/** Minimal description of each directly registered tool. */
|
||||
tools: ReadonlyArray<{ name: string; description: string | undefined }>;
|
||||
/** True when `.memory()` has been configured. */
|
||||
hasMemory: boolean;
|
||||
|
|
@ -100,6 +107,10 @@ export class Agent implements BuiltAgent, AgentBuilder {
|
|||
|
||||
private tools: BuiltTool[] = [];
|
||||
|
||||
private deferredTools: BuiltTool[] = [];
|
||||
|
||||
private deferredToolSearchTopK: number | undefined;
|
||||
|
||||
private providerTools: BuiltProviderTool[] = [];
|
||||
|
||||
private memoryConfig?: MemoryConfig;
|
||||
|
|
@ -191,6 +202,19 @@ export class Agent implements BuiltAgent, AgentBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/** Add tools that are searchable through `search_tools` and activated on demand with `load_tool`. */
|
||||
deferredTool(t: ToolParameter | ToolParameter[], options?: DeferredToolOptions): this {
|
||||
const tools = Array.isArray(t) ? t : [t];
|
||||
for (const tool of tools) {
|
||||
const built = 'build' in tool ? tool.build() : tool;
|
||||
this.deferredTools.push(built);
|
||||
}
|
||||
if (options?.search?.topK !== undefined) {
|
||||
this.deferredToolSearchTopK = options.search.topK;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Add a provider-defined tool (e.g. Anthropic web search, OpenAI code interpreter). */
|
||||
providerTool(builtProviderTool: BuiltProviderTool): this {
|
||||
this.providerTools.push(builtProviderTool);
|
||||
|
|
@ -737,6 +761,7 @@ export class Agent implements BuiltAgent, AgentBuilder {
|
|||
}
|
||||
|
||||
const finalTools = [...this.tools];
|
||||
const configuredDeferredTools = [...this.deferredTools];
|
||||
|
||||
if (this.workspaceInstance) {
|
||||
const wsTools = this.workspaceInstance.getTools();
|
||||
|
|
@ -744,15 +769,21 @@ export class Agent implements BuiltAgent, AgentBuilder {
|
|||
}
|
||||
|
||||
let finalStaticTools = finalTools;
|
||||
let finalDeferredTools = configuredDeferredTools;
|
||||
if (this.requireToolApprovalValue) {
|
||||
finalStaticTools = finalTools.map((t) =>
|
||||
t.suspendSchema ? t : wrapToolForApproval(t, { requireApproval: true }),
|
||||
);
|
||||
finalDeferredTools = configuredDeferredTools.map((t) =>
|
||||
t.suspendSchema ? t : wrapToolForApproval(t, { requireApproval: true }),
|
||||
);
|
||||
}
|
||||
|
||||
// Validate checkpoint requirement from static tools and known MCP approval config
|
||||
// before attempting any network connections (allows fast failure).
|
||||
const staticNeedsCheckpoint = finalStaticTools.some((t) => t.suspendSchema);
|
||||
const staticNeedsCheckpoint =
|
||||
finalStaticTools.some((t) => t.suspendSchema) ||
|
||||
finalDeferredTools.some((t) => t.suspendSchema);
|
||||
const mcpNeedsCheckpoint =
|
||||
(this.requireToolApprovalValue && this.mcpClients.length > 0) ||
|
||||
this.mcpClients.some((c) => c.declaresApproval());
|
||||
|
|
@ -776,9 +807,30 @@ export class Agent implements BuiltAgent, AgentBuilder {
|
|||
);
|
||||
}
|
||||
|
||||
// Detect collisions between MCP tools and static tools.
|
||||
// Detect collisions between direct, deferred, and MCP tools.
|
||||
const staticNames = new Set(finalStaticTools.map((t) => t.name));
|
||||
const collisions = mcpTools.filter((t) => staticNames.has(t.name)).map((t) => t.name);
|
||||
const reservedDeferredToolNames = new Set([SEARCH_TOOLS_TOOL_NAME, LOAD_TOOL_TOOL_NAME]);
|
||||
const deferredNames = new Set<string>();
|
||||
const deferredCollisions: string[] = [];
|
||||
for (const tool of finalDeferredTools) {
|
||||
if (
|
||||
staticNames.has(tool.name) ||
|
||||
reservedDeferredToolNames.has(tool.name) ||
|
||||
deferredNames.has(tool.name)
|
||||
) {
|
||||
deferredCollisions.push(tool.name);
|
||||
}
|
||||
deferredNames.add(tool.name);
|
||||
}
|
||||
if (deferredCollisions.length > 0) {
|
||||
throw new Error(
|
||||
`Deferred tool name collision — the following tool names resolve to duplicates or reserved tools: ${deferredCollisions.join(', ')}`,
|
||||
);
|
||||
}
|
||||
|
||||
const collisions = mcpTools
|
||||
.filter((t) => staticNames.has(t.name) || deferredNames.has(t.name))
|
||||
.map((t) => t.name);
|
||||
if (collisions.length > 0) {
|
||||
throw new Error(
|
||||
`MCP tool name collision — the following tool names resolve to duplicates: ${collisions.join(', ')}`,
|
||||
|
|
@ -789,7 +841,8 @@ export class Agent implements BuiltAgent, AgentBuilder {
|
|||
|
||||
// Validate checkpoint again after discovering actual MCP tools
|
||||
// (catches the case where MCP tools have suspendSchema after listing).
|
||||
const allNeedCheckpoint = allTools.some((t) => t.suspendSchema);
|
||||
const allNeedCheckpoint =
|
||||
allTools.some((t) => t.suspendSchema) || finalDeferredTools.some((t) => t.suspendSchema);
|
||||
if (allNeedCheckpoint && !this.checkpointStore) {
|
||||
throw new Error(
|
||||
`Agent "${this.name}" has tools requiring approval or suspend/resume but no checkpoint storage. ` +
|
||||
|
|
@ -813,6 +866,11 @@ export class Agent implements BuiltAgent, AgentBuilder {
|
|||
model: modelConfig,
|
||||
instructions,
|
||||
tools: allTools.length > 0 ? allTools : undefined,
|
||||
deferredTools: finalDeferredTools.length > 0 ? finalDeferredTools : undefined,
|
||||
toolSearch:
|
||||
finalDeferredTools.length > 0 && this.deferredToolSearchTopK !== undefined
|
||||
? { topK: this.deferredToolSearchTopK }
|
||||
: undefined,
|
||||
instructionProviderOptions: this.instructionProviderOpts,
|
||||
providerTools: this.providerTools.length > 0 ? this.providerTools : undefined,
|
||||
memory: this.memoryConfig?.memory,
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ export interface AgentBuilder {
|
|||
model(providerOrIdOrConfig: string | ModelConfig, modelName?: string): this;
|
||||
instructions(text: string): this;
|
||||
tool(t: BuiltTool | BuiltTool[]): this;
|
||||
deferredTool(t: BuiltTool | BuiltTool[], options?: { search?: { topK?: number } }): this;
|
||||
providerTool(t: BuiltProviderTool): this;
|
||||
thinking(provider: string, config?: Record<string, unknown>): this;
|
||||
toolCallConcurrency(n: number): this;
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user