fix: Use isolated runtimes for agent calls (no-changelog) (#31658)

This commit is contained in:
yehorkardash 2026-06-04 12:21:13 +03:00 committed by GitHub
parent 77a1b844b5
commit bcbcf7be69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 520 additions and 237 deletions

View File

@ -130,7 +130,7 @@ describe('concurrent tool execution integration', () => {
}),
]),
);
expect(agent.getState().messageList.messages).toEqual(
expect(resumed.getState().messageList.messages).toEqual(
expect.arrayContaining([
expect.objectContaining({
content: expect.arrayContaining([

View File

@ -175,51 +175,42 @@ describe('event system — stream', () => {
});
// ---------------------------------------------------------------------------
// getState()
// Result getState()
// ---------------------------------------------------------------------------
describe('getState()', () => {
it('returns idle before first run', () => {
const agent = createSimpleAgent();
const state = agent.getState();
expect(state.status).toBe('idle');
expect(state.messageList.messages).toHaveLength(0);
});
describe('result getState()', () => {
it('returns success after a successful generate()', async () => {
const agent = createSimpleAgent();
await agent.generate('Say hello');
const state = agent.getState();
const result = await agent.generate('Say hello');
const state = result.getState();
expect(state.status).toBe('success');
});
it('returns success after a completed stream()', async () => {
const agent = createSimpleAgent();
const { stream } = await agent.stream('Say hello');
const result = await agent.stream('Say hello');
const { stream } = result;
await collectStreamChunks(stream);
const state = agent.getState();
const state = result.getState();
expect(state.status).toBe('success');
});
it('state is running during the generate loop (observed via event)', async () => {
it('stream result state is running before the stream is drained', async () => {
const agent = createSimpleAgent();
let stateWhileRunning: string | undefined;
agent.on(AgentEvent.TurnStart, () => {
stateWhileRunning = agent.getState().status;
});
const result = await agent.stream('Say hello');
expect(result.getState().status).toBe('running');
await agent.generate('Say hello');
expect(stateWhileRunning).toBe('running');
await collectStreamChunks(result.stream);
expect(result.getState().status).toBe('success');
});
it('reflects resourceId and threadId from RunOptions', async () => {
const agent = createSimpleAgent();
await agent.generate('Say hello', {
const result = await agent.generate('Say hello', {
persistence: { resourceId: 'user-123', threadId: 'thread-abc' },
});
const state = agent.getState();
const state = result.getState();
expect(state.persistence?.resourceId).toBe('user-123');
expect(state.persistence?.threadId).toBe('thread-abc');
});

View File

@ -183,7 +183,7 @@ describe('external abort signal', () => {
});
expect(result.finishReason).toBe('error');
expect(agent.getState().status).toBe('cancelled');
expect(result.getState().status).toBe('cancelled');
});
it('cancels a stream() call via external AbortSignal', async () => {

View File

@ -28,6 +28,9 @@ describe('createDelegateSubAgentTool', () => {
taskPath: '/root/research_api',
runId: 'child-run-1',
answer: 'done',
getState: () => {
throw new Error('not implemented');
},
}),
});
@ -312,6 +315,11 @@ describe('generateResultToDelegateSubAgentOutput', () => {
],
finishReason: 'stop',
usage: { promptTokens: 3, completionTokens: 2, totalTokens: 5 },
getState: () => ({
status: 'success',
messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] },
pendingToolCalls: {},
}),
};
expect(
@ -333,6 +341,11 @@ describe('generateResultToDelegateSubAgentOutput', () => {
messages: [],
finishReason: 'error',
error: new Error('boom'),
getState: () => ({
status: 'failed',
messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] },
pendingToolCalls: {},
}),
};
expect(generateResultToDelegateSubAgentOutput('/root/x_0', result)).toMatchObject({
@ -373,6 +386,9 @@ describe('generateResultToDelegateSubAgentOutput', () => {
suspendPayload: { message: 'Delete file?' },
},
],
getState: () => {
throw new Error('getState is not implemented');
},
};
expect(generateResultToDelegateSubAgentOutput('/root/x_0', result)).toEqual({
@ -400,6 +416,9 @@ describe('generateResultToDelegateSubAgentOutput', () => {
suspendPayload: {},
},
],
getState: () => {
throw new Error('getState is not implemented');
},
};
expect(generateResultToDelegateSubAgentOutput('/root/x_0', result)).toMatchObject({

View File

@ -202,6 +202,18 @@ export interface AgentRuntimeConfig {
toolCallConcurrency?: number;
titleGeneration?: TitleGenerationConfig;
telemetry?: BuiltTelemetry;
/** Existing run id to continue, used when resuming a suspended run. */
runId?: string;
/**
* Pre-fetched model cost from the catalog. When provided, skips the per-run
* catalog fetch. Set once during Agent.build() and shared across per-run runtimes.
*/
modelCost?: ModelCost;
/**
* Shared RunStateManager for suspend/resume. When provided, per-run runtimes
* use the same store so resume() can find state from a prior run.
*/
runState?: RunStateManager;
}
const MAX_LOOP_ITERATIONS = 30;
@ -318,7 +330,6 @@ type RuntimeExecutionOptions = RunOptions & ExecutionOptions & { iterationCount?
interface LoopContext {
list: AgentMessageList;
options?: RuntimeExecutionOptions;
runId: string;
pendingResume?: PendingResume;
}
@ -370,15 +381,19 @@ export class AgentRuntime {
private deferredToolManager: DeferredToolManager | undefined;
private runId: string;
/** Resolved telemetry for the current run (own config or inherited from parent). */
constructor(config: AgentRuntimeConfig) {
this.config = config;
this.runId = config.runId ?? generateRunId();
if (config.deferredTools && config.deferredTools.length > 0) {
this.deferredToolManager = new DeferredToolManager(config.deferredTools, config.toolSearch);
}
this.runState = new RunStateManager(config.checkpointStorage);
this.runState = config.runState ?? new RunStateManager(config.checkpointStorage);
this.eventBus = config.eventBus ?? new AgentEventBus();
this.modelCost = config.modelCost;
this.currentState = {
persistence: undefined,
status: 'idle',
@ -396,6 +411,7 @@ export class AgentRuntime {
* observer cycles) to settle. Safe to call multiple times.
*/
async dispose(): Promise<void> {
this.eventBus.dispose();
await this.backgroundTasks.flush();
}
@ -414,7 +430,6 @@ export class AgentRuntime {
input: AgentMessage[] | string,
options?: RunOptions & ExecutionOptions,
): Promise<GenerateResult> {
const runId = generateRunId();
let list: AgentMessageList | undefined = undefined;
try {
const initializedList = await this.initRun(input, options);
@ -422,10 +437,10 @@ export class AgentRuntime {
const rawResult = await this.withTelemetryRootSpan(
'generate',
options,
runId,
async () => await this.runGenerateLoop({ list: initializedList, options, runId }),
this.runId,
async () => await this.runGenerateLoop({ list: initializedList, options }),
);
return this.finalizeGenerate(rawResult, list, runId);
return this.finalizeGenerate(rawResult, list);
} catch (error) {
await this.flushTelemetry(options);
const isAbort = this.eventBus.isAborted;
@ -433,7 +448,13 @@ export class AgentRuntime {
if (!isAbort) {
this.eventBus.emit({ type: AgentEvent.Error, message: String(error), error });
}
return { runId, messages: list?.responseDelta() ?? [], finishReason: 'error', error };
return {
runId: this.runId,
messages: list?.responseDelta() ?? [],
finishReason: 'error',
error,
getState: () => this.getState(),
};
}
}
@ -442,7 +463,6 @@ export class AgentRuntime {
input: AgentMessage[] | string,
options?: RunOptions & ExecutionOptions,
): Promise<StreamResult> {
const runId = generateRunId();
let list: AgentMessageList;
try {
list = await this.initRun(input, options);
@ -452,10 +472,14 @@ export class AgentRuntime {
if (!isAbort) {
this.eventBus.emit({ type: AgentEvent.Error, message: String(error), error });
}
return { runId, stream: makeErrorStream(error) };
return { runId: this.runId, stream: makeErrorStream(error), getState: () => this.getState() };
}
return { runId, stream: this.startStreamLoop({ list, options, runId }) };
return {
runId: this.runId,
stream: this.startStreamLoop({ list, options }),
getState: () => this.getState(),
};
}
/**
@ -481,8 +505,9 @@ export class AgentRuntime {
data: unknown,
options: { runId: string; toolCallId: string } & ExecutionOptions,
): Promise<GenerateResult | StreamResult> {
const state = await this.runState.resume(options.runId);
if (!state) throw new Error(`No suspended run found for runId: ${options.runId}`);
this.runId = options.runId;
const state = await this.runState.resume(this.runId);
if (!state) throw new Error(`No suspended run found for runId: ${this.runId}`);
const toolCall = state.pendingToolCalls[options.toolCallId];
if (!toolCall) throw new Error(`No tool call found for toolCallId: ${options.toolCallId}`);
@ -556,29 +581,28 @@ export class AgentRuntime {
const rawResult = await this.withTelemetryRootSpan(
'generate',
resumeOptions,
options.runId,
this.runId,
async () =>
await this.runGenerateLoop({
list,
options: resumeOptions,
runId: options.runId,
pendingResume,
}),
);
if (!rawResult.pendingSuspend) {
await this.cleanupRun(options.runId);
await this.cleanupRun();
}
return this.finalizeGenerate(rawResult, list, options.runId);
return this.finalizeGenerate(rawResult, list);
}
return {
runId: options.runId,
runId: this.runId,
stream: this.startStreamLoop({
list,
options: resumeOptions,
runId: options.runId,
pendingResume,
}),
getState: () => this.getState(),
};
} catch (error) {
const isAbort = this.eventBus.isAborted;
@ -588,13 +612,14 @@ export class AgentRuntime {
}
if (method === 'generate') {
return {
runId: options.runId,
runId: this.runId,
messages: [],
finishReason: 'error' as const,
error,
getState: () => this.getState(),
};
}
return { runId: options.runId, stream: makeErrorStream(error) };
return { runId: this.runId, stream: makeErrorStream(error), getState: () => this.getState() };
}
}
@ -681,17 +706,13 @@ export class AgentRuntime {
* Post-loop finalization for generate: apply cost, set model id, roll up sub-agent usage,
* transition to success, and emit AgentEnd. Returns the finalized result.
*/
private finalizeGenerate(
result: GenerateResult,
list: AgentMessageList,
runId: string,
): GenerateResult {
result.runId = runId;
private finalizeGenerate(result: GenerateResult, list: AgentMessageList): GenerateResult {
result.runId = this.runId;
result.usage = this.applyCost(result.usage);
result.model = this.modelIdString;
this.updateState({ status: 'success', messageList: list.serialize() });
this.eventBus.emit({ type: AgentEvent.AgentEnd, messages: result.messages });
return result;
return { ...result, getState: () => this.getState() };
}
/** Resolve telemetry: own config wins, then inherited from options, then nothing. */
@ -889,7 +910,7 @@ export class AgentRuntime {
/** Core generate loop using generateText (non-streaming). */
private async runGenerateLoop(ctx: LoopContext): Promise<GenerateResult> {
const { list, options, runId, pendingResume } = ctx;
const { list, options, pendingResume } = ctx;
this.hydrateDeferredToolsFromList(list);
let totalUsage: TokenUsage | undefined;
@ -911,7 +932,7 @@ export class AgentRuntime {
const pendingToolCtx: ToolBatchContext = {
toolMap: pendingLoopContext.toolMap,
list,
runId,
runId: this.runId,
persistence: options?.persistence,
telemetry: runTelemetry,
executionCounter: options?.executionCounter,
@ -936,7 +957,6 @@ export class AgentRuntime {
options,
list,
totalUsage,
runId,
maxIterations,
iterationCount,
);
@ -953,6 +973,7 @@ export class AgentRuntime {
suspendPayload: s.payload,
resumeSchema: s.resumeSchema,
})),
getState: () => this.getState(),
};
}
}
@ -1006,7 +1027,7 @@ export class AgentRuntime {
const batch = await this.iterateToolCallsConcurrent({
toolMap,
list,
runId,
runId: this.runId,
persistence: options?.persistence,
telemetry: runTelemetry,
executionCounter: options?.executionCounter,
@ -1023,7 +1044,6 @@ export class AgentRuntime {
options,
list,
totalUsage,
runId,
maxIterations,
iterationCount + 1,
);
@ -1040,6 +1060,7 @@ export class AgentRuntime {
suspendPayload: s.payload,
resumeSchema: s.resumeSchema,
})),
getState: () => this.getState(),
};
}
@ -1071,12 +1092,13 @@ export class AgentRuntime {
}
return {
runId: runId ?? '',
runId: this.runId,
messages: list.responseDelta(),
finishReason: lastFinishReason,
usage: totalUsage,
...(structuredOutput !== undefined && { structuredOutput }),
...(toolCallSummary.length > 0 && { toolCalls: toolCallSummary }),
getState: () => this.getState(),
};
}
@ -1085,7 +1107,7 @@ export class AgentRuntime {
* Returns the readable side immediately; the loop runs in the background.
*/
private startStreamLoop(ctx: LoopContext): ReadableStream<StreamChunk> {
const { options, runId } = ctx;
const { options } = ctx;
const { readable, writable } = new TransformStream<StreamChunk, StreamChunk>();
const writer = writable.getWriter();
@ -1136,12 +1158,12 @@ export class AgentRuntime {
this.withTelemetryRootSpan(
'stream',
options,
runId,
this.runId,
async () => await this.runStreamLoop({ ...ctx, writer }),
)
.catch(async (error: unknown) => {
await this.flushTelemetry(options);
await this.cleanupRun(runId);
await this.cleanupRun();
try {
await writer.write({ type: 'error', error });
await writer.write({ type: 'finish', finishReason: 'error' });
@ -1164,7 +1186,7 @@ export class AgentRuntime {
private async runStreamLoop(
ctx: LoopContext & { writer: WritableStreamDefaultWriter<StreamChunk> },
): Promise<void> {
const { list, options, runId, pendingResume, writer } = ctx;
const { list, options, pendingResume, writer } = ctx;
this.hydrateDeferredToolsFromList(list);
const writeChunk = async (chunk: StreamChunk): Promise<void> => {
@ -1180,7 +1202,7 @@ export class AgentRuntime {
const { streamText } = loadAi();
const closeStreamWithError = async (error: unknown, status: AgentRunState): Promise<void> => {
await this.cleanupRun(runId);
await this.cleanupRun();
this.updateState({ status });
await writer.write({ type: 'error', error });
await writer.write({ type: 'finish', finishReason: 'error' });
@ -1207,7 +1229,7 @@ export class AgentRuntime {
const pendingToolCtx: ToolBatchContext = {
toolMap: pendingLoopContext.toolMap,
list,
runId,
runId: this.runId,
persistence: options?.persistence,
telemetry: runTelemetry,
executionCounter: options?.executionCounter,
@ -1248,7 +1270,6 @@ export class AgentRuntime {
options,
list,
totalUsage,
runId,
maxIterations,
iterationCount,
);
@ -1379,7 +1400,7 @@ export class AgentRuntime {
const batch = await this.iterateToolCallsConcurrent({
toolMap,
list,
runId,
runId: this.runId,
persistence: options?.persistence,
telemetry: runTelemetry,
executionCounter: options?.executionCounter,
@ -1417,7 +1438,6 @@ export class AgentRuntime {
options,
list,
totalUsage,
runId,
maxIterations,
iterationCount + 1,
);
@ -1477,7 +1497,7 @@ export class AgentRuntime {
}
}
await this.cleanupRun(runId);
await this.cleanupRun();
await this.flushTelemetry(options);
this.updateState({ status: 'success', messageList: list.serialize() });
@ -2368,19 +2388,16 @@ export class AgentRuntime {
/**
* Persist a suspended run state and update the current state snapshot.
* Returns the runId (reuses existingRunId when resuming to prevent dangling runs).
* Returns the runtime's runId.
*/
private async persistSuspension(
pendingToolCalls: Record<string, PendingToolCall>,
options: RuntimeExecutionOptions | undefined,
list: AgentMessageList,
totalUsage: TokenUsage | undefined,
existingRunId?: string,
maxIterations?: number,
iterationCount?: number,
): Promise<string> {
const runId = existingRunId ?? generateRunId();
// Persist loop controls only. providerOptions are intentionally excluded
// because they may contain sensitive data (API keys, auth headers).
const resolvedMaxIterations = maxIterations ?? options?.maxIterations;
@ -2397,16 +2414,14 @@ export class AgentRuntime {
executionOptions,
...(resolvedIterationCount !== undefined ? { iterationCount: resolvedIterationCount } : {}),
};
await this.runState.suspend(runId, state);
await this.runState.suspend(this.runId, state);
this.updateState({ status: 'suspended', pendingToolCalls, messageList: list.serialize() });
return runId;
return this.runId;
}
/** Clean up stored state for a run when it finishes without re-suspending. */
private async cleanupRun(runId: string | undefined): Promise<void> {
if (runId) {
await this.runState.complete(runId);
}
private async cleanupRun(): Promise<void> {
await this.runState.complete(this.runId);
}
/** Emit a TurnEnd event when an assistant message is present in `newMessages`. */

View File

@ -0,0 +1,129 @@
import * as aiModule from 'ai';
import type { Mock } from 'vitest';
import type { AgentRuntimeConfig } from '../../runtime/agent-runtime';
import type { AgentEventBus } from '../../runtime/event-bus';
import { AgentEvent } from '../../runtime/event-bus';
import type { StreamChunk } from '../../types';
import { Agent } from '../agent';
vi.mock('@ai-sdk/openai', () => ({
createOpenAI: () => () => ({ provider: 'openai', modelId: 'mock', specificationVersion: 'v3' }),
}));
// eslint-disable-next-line @typescript-eslint/consistent-type-imports
type AiImport = typeof import('ai');
vi.mock('ai', async () => {
const actual = await vi.importActual<AiImport>('ai');
return {
...actual,
generateText: vi.fn(),
};
});
const { generateText } = aiModule as unknown as {
generateText: Mock;
};
type ActiveRuntime = {
bus: AgentEventBus;
};
type AgentInternals = {
ensureBuilt(): Promise<AgentRuntimeConfig>;
createRuntime(config: AgentRuntimeConfig, runId?: string): ActiveRuntime;
trackStreamRuntime(
stream: ReadableStream<StreamChunk>,
active: ActiveRuntime,
): ReadableStream<StreamChunk>;
cleanupRuntime(active: ActiveRuntime): Promise<void>;
activeRuntimes: Set<ActiveRuntime>;
};
function makeGenerateSuccess(text: string) {
return {
finishReason: 'stop',
usage: { inputTokens: 10, outputTokens: 5, totalTokens: 15 },
response: {
messages: [
{
role: 'assistant',
content: [{ type: 'text', text }],
},
],
},
toolCalls: [],
};
}
describe('Agent isolated runtimes', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('keeps result state bound to the runtime that produced it', async () => {
generateText
.mockResolvedValueOnce(makeGenerateSuccess('first response'))
.mockResolvedValueOnce(makeGenerateSuccess('second response'));
const agent = new Agent('agent').model('openai/gpt-4o-mini').instructions('test');
const first = await agent.generate('first');
const second = await agent.generate('second');
expect(first.getState().messageList.messages).toEqual(
expect.arrayContaining([
expect.objectContaining({
content: expect.arrayContaining([expect.objectContaining({ text: 'first response' })]),
}),
]),
);
expect(second.getState().messageList.messages).toEqual(
expect.arrayContaining([
expect.objectContaining({
content: expect.arrayContaining([expect.objectContaining({ text: 'second response' })]),
}),
]),
);
});
it('applies event handler changes to active runtimes', async () => {
const agent = new Agent('agent').model('openai/gpt-4o-mini').instructions('test');
const internals = agent as unknown as AgentInternals;
const active = internals.createRuntime(await internals.ensureBuilt());
const handler = vi.fn();
agent.on(AgentEvent.AgentEnd, handler);
active.bus.emit({ type: AgentEvent.AgentEnd, messages: [] });
agent.off(AgentEvent.AgentEnd, handler);
active.bus.emit({ type: AgentEvent.AgentEnd, messages: [] });
expect(handler).toHaveBeenCalledTimes(1);
await internals.cleanupRuntime(active);
});
it('cleans up the active runtime when a wrapped stream is cancelled', async () => {
const agent = new Agent('agent').model('openai/gpt-4o-mini').instructions('test');
const internals = agent as unknown as AgentInternals;
const active = internals.createRuntime(await internals.ensureBuilt());
const sourceCancel = vi.fn();
const stream = internals.trackStreamRuntime(
new ReadableStream<StreamChunk>({
start(controller) {
controller.enqueue({ type: 'start-step' });
},
cancel: sourceCancel,
}),
active,
);
const reader = stream.getReader();
expect(internals.activeRuntimes.has(active)).toBe(true);
await reader.read();
await reader.cancel('client disconnected');
reader.releaseLock();
expect(sourceCancel).toHaveBeenCalledWith('client disconnected');
expect(internals.activeRuntimes.has(active)).toBe(false);
});
});

View File

@ -10,13 +10,17 @@ import {
type DelegateSubAgentRunner,
type DelegateSubAgentRunnerHelpers,
} from '../../runtime/delegate-sub-agent-tool';
import type { BuiltTool } from '../../types';
import type { BuiltTool, GenerateResult, SerializableAgentState } from '../../types';
import { Agent } from '../agent';
const runtimeConfigs: Array<Record<string, unknown>> = [];
let inlineChildGenerateResult:
| Awaited<ReturnType<InstanceType<typeof AgentRuntimeModule.AgentRuntime>['generate']>>
| undefined;
let inlineChildGenerateResult: GenerateResult | undefined;
const mockState = (): SerializableAgentState => ({
status: 'success',
messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] },
pendingToolCalls: {},
});
vi.mock('../../runtime/agent-runtime', async (importOriginal) => {
const actual = await importOriginal<typeof AgentRuntimeModule>();
@ -41,7 +45,8 @@ vi.mock('../../runtime/agent-runtime', async (importOriginal) => {
content: [{ type: 'text', text: 'inline answer' }],
},
],
usage: {},
usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0 },
getState: mockState,
});
}
@ -67,6 +72,12 @@ const delegateInput = {
goal: 'Find the API behavior.',
};
async function buildAgentConfig(agent: Agent): Promise<AgentRuntimeModule.AgentRuntimeConfig> {
return await (
agent as unknown as { build(): Promise<AgentRuntimeModule.AgentRuntimeConfig> }
).build();
}
describe('delegate sub-agent routing', () => {
beforeEach(() => {
runtimeConfigs.length = 0;
@ -89,10 +100,10 @@ describe('delegate sub-agent routing', () => {
)
.tool(makeTool('lookup'));
await (agent as unknown as { build(): Promise<unknown> }).build();
const runtimeConfig = await buildAgentConfig(agent);
expect(runtimeConfigs).toHaveLength(1);
const builtTools = runtimeConfigs[0]?.tools as BuiltTool[] | undefined;
expect(runtimeConfigs).toHaveLength(0);
const builtTools = runtimeConfig.tools;
const delegateTool = builtTools?.find((tool) => tool.name === DELEGATE_SUB_AGENT_TOOL_NAME);
expect(delegateTool).toBeDefined();
@ -104,12 +115,10 @@ describe('delegate sub-agent routing', () => {
});
expect(hostRunSubAgent).toHaveBeenCalledOnce();
expect(hostRunSubAgent.mock.calls[0]?.[1]).toEqual(
expect.objectContaining({
runInlineSubAgent: expect.any(Function),
}),
);
expect(runtimeConfigs).toHaveLength(2);
const helpers = hostRunSubAgent.mock.calls[0]?.[1];
expect(helpers).toBeDefined();
expect(typeof helpers?.runInlineSubAgent).toBe('function');
expect(runtimeConfigs).toHaveLength(1);
});
it('runs inline delegations without a host runner when the tool is built on an Agent', async () => {
@ -119,9 +128,10 @@ describe('delegate sub-agent routing', () => {
.tool(createDelegateSubAgentTool())
.tool(makeTool('lookup'));
await (agent as unknown as { build(): Promise<unknown> }).build();
const runtimeConfig = await buildAgentConfig(agent);
const builtTools = runtimeConfigs[0]?.tools as BuiltTool[] | undefined;
expect(runtimeConfigs).toHaveLength(0);
const builtTools = runtimeConfig.tools;
const delegateTool = builtTools?.find((tool) => tool.name === DELEGATE_SUB_AGENT_TOOL_NAME);
expect(delegateTool).toBeDefined();
@ -132,7 +142,7 @@ describe('delegate sub-agent routing', () => {
answer: 'inline answer',
});
expect(runtimeConfigs).toHaveLength(2);
expect(runtimeConfigs).toHaveLength(1);
});
it('lets a host-style runner delegate inline through helpers from tool metadata', async () => {
@ -196,6 +206,7 @@ describe('delegate sub-agent routing', () => {
suspendPayload: { message: 'Delete file?' },
},
],
getState: mockState,
};
const agent = new Agent('parent')
@ -204,9 +215,9 @@ describe('delegate sub-agent routing', () => {
.tool(createDelegateSubAgentTool())
.tool(makeTool('lookup'));
await (agent as unknown as { build(): Promise<unknown> }).build();
const runtimeConfig = await buildAgentConfig(agent);
const builtTools = runtimeConfigs[0]?.tools as BuiltTool[] | undefined;
const builtTools = runtimeConfig.tools;
const delegateTool = builtTools?.find((tool) => tool.name === DELEGATE_SUB_AGENT_TOOL_NAME);
expect(delegateTool).toBeDefined();

View File

@ -1,4 +1,4 @@
import type { AgentRuntime } from '../../runtime/agent-runtime';
import type { AgentRuntimeConfig } from '../../runtime/agent-runtime';
import {
DEFAULT_EPISODIC_MEMORY_EMBEDDING_MODEL,
DEFAULT_EPISODIC_MEMORY_MAX_ENTRIES_PER_RUN,
@ -68,14 +68,9 @@ describe('Memory builder — episodic memory', () => {
.instructions('You are a test assistant.')
.memory(memory);
const runtime = await (agent as unknown as { build(): Promise<AgentRuntime> }).build();
const runtimeConfig = (
runtime as unknown as {
config: {
episodicMemory?: EpisodicMemoryConfig;
};
}
).config;
const runtimeConfig = await (
agent as unknown as { build(): Promise<AgentRuntimeConfig> }
).build();
const embedder = runtimeConfig.episodicMemory?.embedder as unknown as Record<string, unknown>;
expect(runtimeConfig.episodicMemory).toMatchObject({

View File

@ -1,6 +1,6 @@
import type { AgentRuntime } from '../../runtime/agent-runtime';
import type { AgentRuntimeConfig } from '../../runtime/agent-runtime';
import { InMemoryMemory } from '../../runtime/memory-store';
import type { BuiltMemory, MemoryConfig, ObservationalMemoryConfig } from '../../types';
import type { BuiltMemory, MemoryConfig } from '../../types';
import { Agent } from '../agent';
import {
DEFAULT_OBSERVATION_LOG_LOCK_TTL_MS,
@ -137,15 +137,9 @@ describe('Memory builder — observation log memory', () => {
.instructions('You are a test assistant.')
.memory(memory);
const runtime = await (agent as unknown as { build(): Promise<AgentRuntime> }).build();
const runtimeConfig = (
runtime as unknown as {
config: {
observationLog?: { renderTokenBudget?: number };
observationalMemory?: ObservationalMemoryConfig;
};
}
).config;
const runtimeConfig = await (
agent as unknown as { build(): Promise<AgentRuntimeConfig> }
).build();
expect(runtimeConfig.observationLog).toEqual({
renderTokenBudget: DEFAULT_OBSERVATION_LOG_RENDER_TOKEN_BUDGET,

View File

@ -1,12 +1,13 @@
import type { ProviderOptions } from '@ai-sdk/provider-utils';
import type { z } from 'zod';
import { getModelCost } from './catalog';
import type { Eval } from './eval';
import type { McpClient } from './mcp-client';
import { Memory, normalizeMemoryConfig, resolveMemoryConfigDefaults } from './memory';
import { Telemetry } from './telemetry';
import { wrapToolForApproval } from './tool';
import { AgentRuntime } from '../runtime/agent-runtime';
import { AgentRuntime, type AgentRuntimeConfig } from '../runtime/agent-runtime';
import { LOAD_TOOL_TOOL_NAME, SEARCH_TOOLS_TOOL_NAME } from '../runtime/deferred-tool-manager';
import {
DELEGATE_SUB_AGENT_TOOL_NAME,
@ -21,6 +22,7 @@ import {
} from '../runtime/delegate-sub-agent-tool';
import { RECALL_MEMORY_TOOL_NAME } from '../runtime/episodic-memory';
import { AgentEventBus } from '../runtime/event-bus';
import { RunStateManager } from '../runtime/run-state';
import { isSdkOwnedBuiltInTool } from '../runtime/sdk-owned-tool';
import { WRITE_TODOS_TOOL_NAME } from '../runtime/write-todos-tool';
import {
@ -47,13 +49,13 @@ import type {
ModelConfig,
Provider,
RunOptions,
SerializableAgentState,
StreamResult,
ThinkingConfig,
ThinkingConfigFor,
ResumeOptions,
} from '../types';
import type { AgentEvent } from '../types/runtime/event';
import type { StreamChunk } from '../types/sdk/agent';
import type { AgentBuilder } from '../types/sdk/agent-builder';
import type { AgentMessage } from '../types/sdk/message';
import type { Workspace } from '../workspace/workspace';
@ -77,6 +79,11 @@ interface DeferredToolOptions {
};
}
type ActiveRuntime = {
runtime: AgentRuntime;
bus: AgentEventBus;
};
/**
* Lightweight read-only view of an agent's configured state.
* Returned by `Agent.snapshot` for testing and debugging purposes.
@ -155,8 +162,6 @@ export class Agent implements BuiltAgent, AgentBuilder {
private thinkingConfig?: ThinkingConfig;
private runtime?: AgentRuntime;
private concurrencyValue?: number;
private telemetryBuilder?: Telemetry;
@ -171,9 +176,11 @@ export class Agent implements BuiltAgent, AgentBuilder {
private defaultExecutionOptions?: ExecutionOptions;
private buildPromise: Promise<AgentRuntime> | undefined;
private buildPromise: Promise<AgentRuntimeConfig> | undefined;
private eventBus = new AgentEventBus();
private agentHandlers = new Map<AgentEvent, Set<AgentEventHandler>>();
private activeRuntimes = new Set<ActiveRuntime>();
private workspaceInstance?: Workspace;
@ -403,7 +410,7 @@ export class Agent implements BuiltAgent, AgentBuilder {
} else {
this.telemetryBuilder = undefined;
this.telemetryConfig = t;
this.runtime?.setTelemetry(t);
this.buildPromise = undefined;
}
return this;
}
@ -503,7 +510,15 @@ export class Agent implements BuiltAgent, AgentBuilder {
* Handlers are called synchronously during the agentic loop.
*/
on(event: AgentEvent, handler: AgentEventHandler): void {
this.eventBus.on(event, handler);
let handlers = this.agentHandlers.get(event);
if (!handlers) {
handlers = new Set();
this.agentHandlers.set(event, handlers);
}
handlers.add(handler);
for (const { bus } of this.activeRuntimes) {
bus.on(event, handler);
}
}
/**
@ -512,7 +527,15 @@ export class Agent implements BuiltAgent, AgentBuilder {
* cleanly between turns instead of accumulating on a long-lived agent.
*/
off(event: AgentEvent, handler: AgentEventHandler): void {
this.eventBus.off(event, handler);
const handlers = this.agentHandlers.get(event);
if (!handlers) return;
handlers.delete(handler);
if (handlers.size === 0) {
this.agentHandlers.delete(event);
}
for (const { bus } of this.activeRuntimes) {
bus.off(event, handler);
}
}
/**
@ -556,25 +579,14 @@ export class Agent implements BuiltAgent, AgentBuilder {
};
}
/** Return the latest state snapshot of the agent. Returns `{ status: 'idle' }` before first run. */
getState(): SerializableAgentState {
if (!this.runtime) {
return {
persistence: undefined,
status: 'idle',
messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] },
pendingToolCalls: {},
};
}
return this.runtime.getState();
}
/**
* Cancel the currently running agent.
* Synchronous sets an abort flag; the agentic loop checks it asynchronously.
*/
abort(): void {
this.eventBus.abort();
for (const { bus } of this.activeRuntimes) {
bus.abort();
}
}
/**
@ -590,7 +602,10 @@ export class Agent implements BuiltAgent, AgentBuilder {
*/
async close(): Promise<void> {
const tasks: Array<Promise<unknown>> = [];
if (this.runtime) tasks.push(this.runtime.dispose());
for (const active of this.activeRuntimes) {
active.bus.abort();
tasks.push(this.cleanupRuntime(active));
}
tasks.push(...this.mcpClients.map(async (c) => await c.close()));
await Promise.allSettled(tasks);
}
@ -600,9 +615,14 @@ export class Agent implements BuiltAgent, AgentBuilder {
input: AgentMessage[] | string,
options?: RunOptions & ExecutionOptions,
): Promise<GenerateResult> {
const runtime = await this.ensureBuilt();
const config = await this.ensureBuilt();
const active = this.createRuntime(config);
const mergedOptions = this.mergeWithDefaults(options);
return await runtime.generate(this.toMessages(input), mergedOptions);
try {
return await active.runtime.generate(this.toMessages(input), mergedOptions);
} finally {
await this.cleanupRuntime(active);
}
}
/** Stream a response. Lazy-builds on first call. */
@ -610,9 +630,16 @@ export class Agent implements BuiltAgent, AgentBuilder {
input: AgentMessage[] | string,
options?: RunOptions & ExecutionOptions,
): Promise<StreamResult> {
const runtime = await this.ensureBuilt();
const config = await this.ensureBuilt();
const active = this.createRuntime(config);
const mergedOptions = this.mergeWithDefaults(options);
return await runtime.stream(this.toMessages(input), mergedOptions);
try {
const result = await active.runtime.stream(this.toMessages(input), mergedOptions);
return { ...result, stream: this.trackStreamRuntime(result.stream, active) };
} catch (error) {
await this.cleanupRuntime(active);
throw error;
}
}
/** Resume a suspended tool call with data. Lazy-builds on first call. */
@ -631,11 +658,23 @@ export class Agent implements BuiltAgent, AgentBuilder {
data: unknown,
options: ResumeOptions & ExecutionOptions,
): Promise<GenerateResult | StreamResult> {
const runtime = await this.ensureBuilt();
const config = await this.ensureBuilt();
if (method === 'generate') {
return await runtime.resume('generate', data, options);
const active = this.createRuntime(config, options.runId);
try {
return await active.runtime.resume('generate', data, options);
} finally {
await this.cleanupRuntime(active);
}
}
const active = this.createRuntime(config, options.runId);
try {
const result = await active.runtime.resume('stream', data, options);
return { ...result, stream: this.trackStreamRuntime(result.stream, active) };
} catch (error) {
await this.cleanupRuntime(active);
throw error;
}
return await runtime.resume('stream', data, options);
}
approve(method: 'generate', options: ResumeOptions & ExecutionOptions): Promise<GenerateResult>;
@ -674,7 +713,7 @@ export class Agent implements BuiltAgent, AgentBuilder {
* concurrent callers share one build operation. On error the promise is
* cleared so the caller can retry.
*/
private async ensureBuilt(): Promise<AgentRuntime> {
private async ensureBuilt(): Promise<AgentRuntimeConfig> {
if (!this.buildPromise) {
const p = this.build();
this.buildPromise = p;
@ -685,13 +724,76 @@ export class Agent implements BuiltAgent, AgentBuilder {
return await this.buildPromise;
}
private createRuntime(config: AgentRuntimeConfig, runId?: string): ActiveRuntime {
const bus = new AgentEventBus();
for (const [event, handlers] of this.agentHandlers) {
for (const handler of handlers) {
bus.on(event, handler);
}
}
const runtime = new AgentRuntime({ ...config, eventBus: bus, runId });
const active = { runtime, bus };
this.activeRuntimes.add(active);
return active;
}
private trackStreamRuntime(
stream: ReadableStream<StreamChunk>,
active: ActiveRuntime,
): ReadableStream<StreamChunk> {
const reader = stream.getReader();
let cleanupPromise: Promise<void> | undefined;
const cleanup = async () => {
const doCleanup = async () => {
try {
reader.releaseLock();
} catch {
// The lock may already be released after cancellation/error cleanup.
}
await this.cleanupRuntime(active);
};
cleanupPromise ??= doCleanup();
return await cleanupPromise;
};
return new ReadableStream<StreamChunk>({
async pull(controller) {
try {
const { done, value } = await reader.read();
if (done) {
controller.close();
await cleanup();
return;
}
controller.enqueue(value);
} catch (error) {
controller.error(error);
await cleanup();
}
},
async cancel(reason) {
try {
await reader.cancel(reason);
} finally {
await cleanup();
}
},
});
}
private async cleanupRuntime(active: ActiveRuntime): Promise<void> {
if (!this.activeRuntimes.delete(active)) return;
active.bus.dispose();
await active.runtime.dispose();
}
private toMessages(input: string | AgentMessage[]): AgentMessage[] {
if (Array.isArray(input)) return input;
return [{ role: 'user', content: [{ type: 'text', text: input }] }];
}
/** @internal Validate configuration and produce an AgentRuntime. Overridden by the execution engine. */
protected async build(): Promise<AgentRuntime> {
protected async build(): Promise<AgentRuntimeConfig> {
if (!this.modelConfig) {
throw new Error(`Agent "${this.name}" requires a model`);
}
@ -836,7 +938,22 @@ export class Agent implements BuiltAgent, AgentBuilder {
...(toolSearch !== undefined ? { toolSearch } : {}),
});
this.runtime = new AgentRuntime({
let modelCost: Awaited<ReturnType<typeof getModelCost>> | undefined;
try {
const modelId =
typeof modelConfig === 'string'
? modelConfig
: 'id' in modelConfig && typeof modelConfig.id === 'string'
? modelConfig.id
: undefined;
modelCost = modelId ? await getModelCost(modelId) : undefined;
} catch {
modelCost = undefined;
}
const runState = new RunStateManager(this.checkpointStore);
return {
name: this.name,
model: modelConfig,
instructions,
@ -852,13 +969,12 @@ export class Agent implements BuiltAgent, AgentBuilder {
structuredOutput: this.outputSchema,
checkpointStorage: this.checkpointStore,
thinking: this.thinkingConfig,
eventBus: this.eventBus,
toolCallConcurrency: this.concurrencyValue,
titleGeneration: memoryConfig?.titleGeneration,
telemetry,
});
return this.runtime;
telemetry: this.telemetryConfig ?? (await this.telemetryBuilder?.build()),
modelCost,
runState,
};
}
private completeInlineDelegateTools(

View File

@ -13,6 +13,7 @@ import {
parseRuntimeSkillMarkdown,
renderSkillCatalogPrompt,
} from '..';
import type { AgentRuntimeConfig } from '../../runtime/agent-runtime';
import { Agent } from '../../sdk/agent';
import { isZodSchema } from '../../utils/zod';
@ -474,8 +475,10 @@ Use the workflow SDK.`,
.model('anthropic/claude-sonnet-4-5')
.instructions('Base instructions.')
.skills(source);
const runtime = await (agent as unknown as { build(): Promise<unknown> }).build();
const instructions = (runtime as { config: { instructions: string } }).config.instructions;
const runtimeConfig = await (
agent as unknown as { build(): Promise<AgentRuntimeConfig> }
).build();
const { instructions } = runtimeConfig;
expect(prepare).toHaveBeenCalledTimes(1);
expect(instructions).toContain('name: "Summarize notes"');

View File

@ -206,6 +206,8 @@ export interface GenerateResult {
* callers can handle them without try/catch.
*/
error?: unknown;
/** Return a snapshot of the agent state for this run. */
getState(): SerializableAgentState;
}
export interface StreamResult {
@ -213,6 +215,11 @@ export interface StreamResult {
runId: string;
/** The readable stream of chunks. */
stream: ReadableStream<StreamChunk>;
/**
* Return the current agent state for this run.
* May be called while streaming or after the stream closes.
*/
getState(): SerializableAgentState;
}
export interface ResumeOptions {
@ -234,8 +241,6 @@ export interface BuiltAgent {
on(event: AgentEvent, handler: AgentEventHandler): void;
getState(): SerializableAgentState;
/** Cancel the currently running agent. Synchronous — sets an abort flag that the agentic loop checks asynchronously. */
abort(): void;

View File

@ -53,6 +53,7 @@ import { markAgentDraftDirty } from './utils/agent-draft.utils';
import { draftChatMemoryResourceId } from './utils/agent-memory-scope';
import { executionsToMessagesDto } from './utils/execution-to-message-mapper';
import { generateAgentResourceId } from './utils/agent-resource-id';
import { streamAgentChunks } from './utils/agent-stream';
import { AgentExecutionService } from './agent-execution.service';
import { AgentSkillsService } from './agent-skills.service';
import { AGENT_THREAD_PREFIX } from './builder/builder-tool-names';
@ -896,16 +897,9 @@ export class AgentsService {
executionCounter: this.createAgentExecutionCounter({ agentId }),
});
const reader = resultStream.stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
recorder.record(value);
yield value;
}
} finally {
reader.releaseLock();
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
yield value;
}
// Always record resumed executions — even if they suspend again (chained HITL).
@ -1258,28 +1252,21 @@ export class AgentsService {
executionCounter: this.createAgentExecutionCounter({ agentId, userId }),
});
const reader = resultStream.stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
recorder.record(value);
if (value.type === 'tool-call-suspended') {
this.logger.info('Chat: tool-call-suspended chunk received', {
agentId,
toolCallId: value.toolCallId,
toolName: value.toolName,
});
}
if (value.type === 'finish' && value.finishReason === 'max-iterations') {
for (const chunk of getMaxIterationsChunks()) {
yield chunk;
}
}
yield value;
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
if (value.type === 'tool-call-suspended') {
this.logger.info('Chat: tool-call-suspended chunk received', {
agentId,
toolCallId: value.toolCallId,
toolName: value.toolName,
});
}
} finally {
reader.releaseLock();
if (value.type === 'finish' && value.finishReason === 'max-iterations') {
for (const chunk of getMaxIterationsChunks()) {
yield chunk;
}
}
yield value;
}
// Always record — even if suspended, the pre-suspension response text
@ -1398,29 +1385,22 @@ export class AgentsService {
executionCounter: this.createAgentExecutionCounter({ agentId, userId: telemetryUserId }),
});
const reader = resultStream.stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
recorder.record(value);
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
if (value.type === 'tool-call') {
toolInputs.set(value.toolCallId, { toolName: value.toolName, input: value.input });
} else if (value.type === 'tool-result') {
const pending = toolInputs.get(value.toolCallId);
toolCalls.push({
toolName: value.toolName,
input: pending?.input ?? null,
result: value.output,
});
toolInputs.delete(value.toolCallId);
} else if (value.type === 'finish' && value.structuredOutput !== undefined) {
structuredOutput = value.structuredOutput;
}
if (value.type === 'tool-call') {
toolInputs.set(value.toolCallId, { toolName: value.toolName, input: value.input });
} else if (value.type === 'tool-result') {
const pending = toolInputs.get(value.toolCallId);
toolCalls.push({
toolName: value.toolName,
input: pending?.input ?? null,
result: value.output,
});
toolInputs.delete(value.toolCallId);
} else if (value.type === 'finish' && value.structuredOutput !== undefined) {
structuredOutput = value.structuredOutput;
}
} finally {
reader.releaseLock();
}
const messageRecord = recorder.getMessageRecord();

View File

@ -20,6 +20,7 @@ import { N8NCheckpointStorage } from '../integrations/n8n-checkpoint-storage';
import { N8nMemory } from '../integrations/n8n-memory';
import type { AgentJsonConfig } from '@n8n/api-types';
import { AgentCheckpointRepository } from '../repositories/agent-checkpoint.repository';
import { streamAgentChunks } from '../utils/agent-stream';
import { buildAgentPreviewPath } from './agent-builder-preview-path';
import { buildBuilderPrompt } from './agents-builder-prompts';
import { AgentsBuilderToolsService, getAgentConfigHash } from './agents-builder-tools.service';
@ -233,15 +234,8 @@ export class AgentsBuilderService {
* plain readergenerator adapter.
*/
private async *streamFromAgent(resultStream: StreamResult): AsyncGenerator<StreamChunk> {
const reader = resultStream.stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
} finally {
reader.releaseLock();
for await (const value of streamAgentChunks(resultStream.stream)) {
yield value;
}
}

View File

@ -42,6 +42,9 @@ const generateResult: GenerateResult = {
],
},
],
getState: () => {
throw new Error('not implemented');
},
};
const foregroundResult: SubAgentForegroundResult = {

View File

@ -420,5 +420,8 @@ function makeStreamResult(chunks: StreamChunk[]): StreamResult {
controller.close();
},
}),
getState: () => {
throw new Error('not implemented');
},
};
}

View File

@ -8,15 +8,16 @@ import {
type GenerateResult,
type SubAgentTaskPath,
} from '@n8n/agents';
import { Logger } from '@n8n/backend-common';
import type { ResolvedSubAgentSource, SubAgentSpawnRequest } from '@n8n/api-types';
import { Logger } from '@n8n/backend-common';
import { Container, Service } from '@n8n/di';
import { UserError } from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import { AgentExecutionService } from '../agent-execution.service';
import { ExecutionRecorder } from '../execution-recorder';
import type { MessageRecord } from '../execution-recorder';
import { ExecutionRecorder } from '../execution-recorder';
import { streamAgentChunks } from '../utils/agent-stream';
import { SubAgentSourceResolver } from './sub-agent-source-resolver';
export interface SubAgentForegroundRunContext {
@ -115,21 +116,14 @@ export class SubAgentForegroundRunner {
let structuredOutput: unknown;
let childSuspended = false;
const reader = resultStream.stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
recorder.record(value);
if (value.type === 'tool-call-suspended') {
childSuspended = true;
}
if (value.type === 'finish' && value.structuredOutput !== undefined) {
structuredOutput = value.structuredOutput;
}
for await (const value of streamAgentChunks(resultStream.stream)) {
recorder.record(value);
if (value.type === 'tool-call-suspended') {
childSuspended = true;
}
if (value.type === 'finish' && value.structuredOutput !== undefined) {
structuredOutput = value.structuredOutput;
}
} finally {
reader.releaseLock();
}
const messageRecord = recorder.getMessageRecord();
@ -153,6 +147,9 @@ export class SubAgentForegroundRunner {
messages: [],
finishReason: 'error',
error: DELEGATED_CHILD_SUSPEND_UNSUPPORTED_MESSAGE,
getState: () => {
throw new Error('getState is not implemented for sub-agent foreground runner');
},
},
};
}
@ -265,6 +262,9 @@ function buildGenerateResultFromRecord(
: {}),
...(structuredOutput !== undefined ? { structuredOutput } : {}),
...(record.error !== null ? { error: record.error } : {}),
getState: () => {
throw new Error('getState is not implemented for sub-agent foreground runner');
},
};
return result;
}

View File

@ -0,0 +1,25 @@
import type { StreamChunk } from '@n8n/agents';
export async function* streamAgentChunks(
stream: ReadableStream<StreamChunk>,
): AsyncGenerator<StreamChunk> {
const reader = stream.getReader();
let streamDone = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
streamDone = true;
break;
}
yield value;
}
} finally {
try {
if (!streamDone) await reader.cancel();
} finally {
reader.releaseLock();
}
}
}