From bcbcf7be69ecf6039f00dd6693b563170f2b9fcb Mon Sep 17 00:00:00 2001 From: yehorkardash Date: Thu, 4 Jun 2026 12:21:13 +0300 Subject: [PATCH] fix: Use isolated runtimes for agent calls (no-changelog) (#31658) --- .../concurrent-tool-execution.test.ts | 2 +- .../integration/events-and-abort.test.ts | 37 ++-- .../integration/provider-options.test.ts | 2 +- .../__tests__/delegate-sub-agent-tool.test.ts | 19 ++ .../@n8n/agents/src/runtime/agent-runtime.ts | 121 ++++++----- .../__tests__/agent-isolated-runtime.test.ts | 129 ++++++++++++ .../delegate-sub-agent-routing.test.ts | 49 +++-- .../__tests__/memory-builder-episodic.test.ts | 13 +- .../memory-builder-observational.test.ts | 16 +- packages/@n8n/agents/src/sdk/agent.ts | 194 ++++++++++++++---- .../skills/__tests__/runtime-skills.test.ts | 7 +- packages/@n8n/agents/src/types/sdk/agent.ts | 9 +- .../cli/src/modules/agents/agents.service.ts | 84 +++----- .../agents/builder/agents-builder.service.ts | 12 +- .../__tests__/delegate-sub-agent-tool.test.ts | 3 + .../sub-agent-foreground-runner.test.ts | 3 + .../sub-agents/sub-agent-foreground-runner.ts | 32 +-- .../src/modules/agents/utils/agent-stream.ts | 25 +++ 18 files changed, 520 insertions(+), 237 deletions(-) create mode 100644 packages/@n8n/agents/src/sdk/__tests__/agent-isolated-runtime.test.ts create mode 100644 packages/cli/src/modules/agents/utils/agent-stream.ts diff --git a/packages/@n8n/agents/src/__tests__/integration/concurrent-tool-execution.test.ts b/packages/@n8n/agents/src/__tests__/integration/concurrent-tool-execution.test.ts index d612f6d75f4..75d8b1d7fef 100644 --- a/packages/@n8n/agents/src/__tests__/integration/concurrent-tool-execution.test.ts +++ b/packages/@n8n/agents/src/__tests__/integration/concurrent-tool-execution.test.ts @@ -130,7 +130,7 @@ describe('concurrent tool execution integration', () => { }), ]), ); - expect(agent.getState().messageList.messages).toEqual( + expect(resumed.getState().messageList.messages).toEqual( expect.arrayContaining([ expect.objectContaining({ content: expect.arrayContaining([ diff --git a/packages/@n8n/agents/src/__tests__/integration/events-and-abort.test.ts b/packages/@n8n/agents/src/__tests__/integration/events-and-abort.test.ts index 8b96f462cec..522c6b70d53 100644 --- a/packages/@n8n/agents/src/__tests__/integration/events-and-abort.test.ts +++ b/packages/@n8n/agents/src/__tests__/integration/events-and-abort.test.ts @@ -175,51 +175,42 @@ describe('event system — stream', () => { }); // --------------------------------------------------------------------------- -// getState() +// Result getState() // --------------------------------------------------------------------------- -describe('getState()', () => { - it('returns idle before first run', () => { - const agent = createSimpleAgent(); - const state = agent.getState(); - expect(state.status).toBe('idle'); - expect(state.messageList.messages).toHaveLength(0); - }); - +describe('result getState()', () => { it('returns success after a successful generate()', async () => { const agent = createSimpleAgent(); - await agent.generate('Say hello'); - const state = agent.getState(); + const result = await agent.generate('Say hello'); + const state = result.getState(); expect(state.status).toBe('success'); }); it('returns success after a completed stream()', async () => { const agent = createSimpleAgent(); - const { stream } = await agent.stream('Say hello'); + const result = await agent.stream('Say hello'); + const { stream } = result; await collectStreamChunks(stream); - const state = agent.getState(); + const state = result.getState(); expect(state.status).toBe('success'); }); - it('state is running during the generate loop (observed via event)', async () => { + it('stream result state is running before the stream is drained', async () => { const agent = createSimpleAgent(); - let stateWhileRunning: string | undefined; - agent.on(AgentEvent.TurnStart, () => { - stateWhileRunning = agent.getState().status; - }); + const result = await agent.stream('Say hello'); + expect(result.getState().status).toBe('running'); - await agent.generate('Say hello'); - - expect(stateWhileRunning).toBe('running'); + await collectStreamChunks(result.stream); + expect(result.getState().status).toBe('success'); }); it('reflects resourceId and threadId from RunOptions', async () => { const agent = createSimpleAgent(); - await agent.generate('Say hello', { + const result = await agent.generate('Say hello', { persistence: { resourceId: 'user-123', threadId: 'thread-abc' }, }); - const state = agent.getState(); + const state = result.getState(); expect(state.persistence?.resourceId).toBe('user-123'); expect(state.persistence?.threadId).toBe('thread-abc'); }); diff --git a/packages/@n8n/agents/src/__tests__/integration/provider-options.test.ts b/packages/@n8n/agents/src/__tests__/integration/provider-options.test.ts index bcdece99156..b307a73cb5e 100644 --- a/packages/@n8n/agents/src/__tests__/integration/provider-options.test.ts +++ b/packages/@n8n/agents/src/__tests__/integration/provider-options.test.ts @@ -183,7 +183,7 @@ describe('external abort signal', () => { }); expect(result.finishReason).toBe('error'); - expect(agent.getState().status).toBe('cancelled'); + expect(result.getState().status).toBe('cancelled'); }); it('cancels a stream() call via external AbortSignal', async () => { diff --git a/packages/@n8n/agents/src/runtime/__tests__/delegate-sub-agent-tool.test.ts b/packages/@n8n/agents/src/runtime/__tests__/delegate-sub-agent-tool.test.ts index 079f5ef6df4..13e5e2d35cd 100644 --- a/packages/@n8n/agents/src/runtime/__tests__/delegate-sub-agent-tool.test.ts +++ b/packages/@n8n/agents/src/runtime/__tests__/delegate-sub-agent-tool.test.ts @@ -28,6 +28,9 @@ describe('createDelegateSubAgentTool', () => { taskPath: '/root/research_api', runId: 'child-run-1', answer: 'done', + getState: () => { + throw new Error('not implemented'); + }, }), }); @@ -312,6 +315,11 @@ describe('generateResultToDelegateSubAgentOutput', () => { ], finishReason: 'stop', usage: { promptTokens: 3, completionTokens: 2, totalTokens: 5 }, + getState: () => ({ + status: 'success', + messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] }, + pendingToolCalls: {}, + }), }; expect( @@ -333,6 +341,11 @@ describe('generateResultToDelegateSubAgentOutput', () => { messages: [], finishReason: 'error', error: new Error('boom'), + getState: () => ({ + status: 'failed', + messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] }, + pendingToolCalls: {}, + }), }; expect(generateResultToDelegateSubAgentOutput('/root/x_0', result)).toMatchObject({ @@ -373,6 +386,9 @@ describe('generateResultToDelegateSubAgentOutput', () => { suspendPayload: { message: 'Delete file?' }, }, ], + getState: () => { + throw new Error('getState is not implemented'); + }, }; expect(generateResultToDelegateSubAgentOutput('/root/x_0', result)).toEqual({ @@ -400,6 +416,9 @@ describe('generateResultToDelegateSubAgentOutput', () => { suspendPayload: {}, }, ], + getState: () => { + throw new Error('getState is not implemented'); + }, }; expect(generateResultToDelegateSubAgentOutput('/root/x_0', result)).toMatchObject({ diff --git a/packages/@n8n/agents/src/runtime/agent-runtime.ts b/packages/@n8n/agents/src/runtime/agent-runtime.ts index ba29920e402..d259717b94e 100644 --- a/packages/@n8n/agents/src/runtime/agent-runtime.ts +++ b/packages/@n8n/agents/src/runtime/agent-runtime.ts @@ -202,6 +202,18 @@ export interface AgentRuntimeConfig { toolCallConcurrency?: number; titleGeneration?: TitleGenerationConfig; telemetry?: BuiltTelemetry; + /** Existing run id to continue, used when resuming a suspended run. */ + runId?: string; + /** + * Pre-fetched model cost from the catalog. When provided, skips the per-run + * catalog fetch. Set once during Agent.build() and shared across per-run runtimes. + */ + modelCost?: ModelCost; + /** + * Shared RunStateManager for suspend/resume. When provided, per-run runtimes + * use the same store so resume() can find state from a prior run. + */ + runState?: RunStateManager; } const MAX_LOOP_ITERATIONS = 30; @@ -318,7 +330,6 @@ type RuntimeExecutionOptions = RunOptions & ExecutionOptions & { iterationCount? interface LoopContext { list: AgentMessageList; options?: RuntimeExecutionOptions; - runId: string; pendingResume?: PendingResume; } @@ -370,15 +381,19 @@ export class AgentRuntime { private deferredToolManager: DeferredToolManager | undefined; + private runId: string; + /** Resolved telemetry for the current run (own config or inherited from parent). */ constructor(config: AgentRuntimeConfig) { this.config = config; + this.runId = config.runId ?? generateRunId(); if (config.deferredTools && config.deferredTools.length > 0) { this.deferredToolManager = new DeferredToolManager(config.deferredTools, config.toolSearch); } - this.runState = new RunStateManager(config.checkpointStorage); + this.runState = config.runState ?? new RunStateManager(config.checkpointStorage); this.eventBus = config.eventBus ?? new AgentEventBus(); + this.modelCost = config.modelCost; this.currentState = { persistence: undefined, status: 'idle', @@ -396,6 +411,7 @@ export class AgentRuntime { * observer cycles) to settle. Safe to call multiple times. */ async dispose(): Promise { + this.eventBus.dispose(); await this.backgroundTasks.flush(); } @@ -414,7 +430,6 @@ export class AgentRuntime { input: AgentMessage[] | string, options?: RunOptions & ExecutionOptions, ): Promise { - const runId = generateRunId(); let list: AgentMessageList | undefined = undefined; try { const initializedList = await this.initRun(input, options); @@ -422,10 +437,10 @@ export class AgentRuntime { const rawResult = await this.withTelemetryRootSpan( 'generate', options, - runId, - async () => await this.runGenerateLoop({ list: initializedList, options, runId }), + this.runId, + async () => await this.runGenerateLoop({ list: initializedList, options }), ); - return this.finalizeGenerate(rawResult, list, runId); + return this.finalizeGenerate(rawResult, list); } catch (error) { await this.flushTelemetry(options); const isAbort = this.eventBus.isAborted; @@ -433,7 +448,13 @@ export class AgentRuntime { if (!isAbort) { this.eventBus.emit({ type: AgentEvent.Error, message: String(error), error }); } - return { runId, messages: list?.responseDelta() ?? [], finishReason: 'error', error }; + return { + runId: this.runId, + messages: list?.responseDelta() ?? [], + finishReason: 'error', + error, + getState: () => this.getState(), + }; } } @@ -442,7 +463,6 @@ export class AgentRuntime { input: AgentMessage[] | string, options?: RunOptions & ExecutionOptions, ): Promise { - const runId = generateRunId(); let list: AgentMessageList; try { list = await this.initRun(input, options); @@ -452,10 +472,14 @@ export class AgentRuntime { if (!isAbort) { this.eventBus.emit({ type: AgentEvent.Error, message: String(error), error }); } - return { runId, stream: makeErrorStream(error) }; + return { runId: this.runId, stream: makeErrorStream(error), getState: () => this.getState() }; } - return { runId, stream: this.startStreamLoop({ list, options, runId }) }; + return { + runId: this.runId, + stream: this.startStreamLoop({ list, options }), + getState: () => this.getState(), + }; } /** @@ -481,8 +505,9 @@ export class AgentRuntime { data: unknown, options: { runId: string; toolCallId: string } & ExecutionOptions, ): Promise { - const state = await this.runState.resume(options.runId); - if (!state) throw new Error(`No suspended run found for runId: ${options.runId}`); + this.runId = options.runId; + const state = await this.runState.resume(this.runId); + if (!state) throw new Error(`No suspended run found for runId: ${this.runId}`); const toolCall = state.pendingToolCalls[options.toolCallId]; if (!toolCall) throw new Error(`No tool call found for toolCallId: ${options.toolCallId}`); @@ -556,29 +581,28 @@ export class AgentRuntime { const rawResult = await this.withTelemetryRootSpan( 'generate', resumeOptions, - options.runId, + this.runId, async () => await this.runGenerateLoop({ list, options: resumeOptions, - runId: options.runId, pendingResume, }), ); if (!rawResult.pendingSuspend) { - await this.cleanupRun(options.runId); + await this.cleanupRun(); } - return this.finalizeGenerate(rawResult, list, options.runId); + return this.finalizeGenerate(rawResult, list); } return { - runId: options.runId, + runId: this.runId, stream: this.startStreamLoop({ list, options: resumeOptions, - runId: options.runId, pendingResume, }), + getState: () => this.getState(), }; } catch (error) { const isAbort = this.eventBus.isAborted; @@ -588,13 +612,14 @@ export class AgentRuntime { } if (method === 'generate') { return { - runId: options.runId, + runId: this.runId, messages: [], finishReason: 'error' as const, error, + getState: () => this.getState(), }; } - return { runId: options.runId, stream: makeErrorStream(error) }; + return { runId: this.runId, stream: makeErrorStream(error), getState: () => this.getState() }; } } @@ -681,17 +706,13 @@ export class AgentRuntime { * Post-loop finalization for generate: apply cost, set model id, roll up sub-agent usage, * transition to success, and emit AgentEnd. Returns the finalized result. */ - private finalizeGenerate( - result: GenerateResult, - list: AgentMessageList, - runId: string, - ): GenerateResult { - result.runId = runId; + private finalizeGenerate(result: GenerateResult, list: AgentMessageList): GenerateResult { + result.runId = this.runId; result.usage = this.applyCost(result.usage); result.model = this.modelIdString; this.updateState({ status: 'success', messageList: list.serialize() }); this.eventBus.emit({ type: AgentEvent.AgentEnd, messages: result.messages }); - return result; + return { ...result, getState: () => this.getState() }; } /** Resolve telemetry: own config wins, then inherited from options, then nothing. */ @@ -889,7 +910,7 @@ export class AgentRuntime { /** Core generate loop using generateText (non-streaming). */ private async runGenerateLoop(ctx: LoopContext): Promise { - const { list, options, runId, pendingResume } = ctx; + const { list, options, pendingResume } = ctx; this.hydrateDeferredToolsFromList(list); let totalUsage: TokenUsage | undefined; @@ -911,7 +932,7 @@ export class AgentRuntime { const pendingToolCtx: ToolBatchContext = { toolMap: pendingLoopContext.toolMap, list, - runId, + runId: this.runId, persistence: options?.persistence, telemetry: runTelemetry, executionCounter: options?.executionCounter, @@ -936,7 +957,6 @@ export class AgentRuntime { options, list, totalUsage, - runId, maxIterations, iterationCount, ); @@ -953,6 +973,7 @@ export class AgentRuntime { suspendPayload: s.payload, resumeSchema: s.resumeSchema, })), + getState: () => this.getState(), }; } } @@ -1006,7 +1027,7 @@ export class AgentRuntime { const batch = await this.iterateToolCallsConcurrent({ toolMap, list, - runId, + runId: this.runId, persistence: options?.persistence, telemetry: runTelemetry, executionCounter: options?.executionCounter, @@ -1023,7 +1044,6 @@ export class AgentRuntime { options, list, totalUsage, - runId, maxIterations, iterationCount + 1, ); @@ -1040,6 +1060,7 @@ export class AgentRuntime { suspendPayload: s.payload, resumeSchema: s.resumeSchema, })), + getState: () => this.getState(), }; } @@ -1071,12 +1092,13 @@ export class AgentRuntime { } return { - runId: runId ?? '', + runId: this.runId, messages: list.responseDelta(), finishReason: lastFinishReason, usage: totalUsage, ...(structuredOutput !== undefined && { structuredOutput }), ...(toolCallSummary.length > 0 && { toolCalls: toolCallSummary }), + getState: () => this.getState(), }; } @@ -1085,7 +1107,7 @@ export class AgentRuntime { * Returns the readable side immediately; the loop runs in the background. */ private startStreamLoop(ctx: LoopContext): ReadableStream { - const { options, runId } = ctx; + const { options } = ctx; const { readable, writable } = new TransformStream(); const writer = writable.getWriter(); @@ -1136,12 +1158,12 @@ export class AgentRuntime { this.withTelemetryRootSpan( 'stream', options, - runId, + this.runId, async () => await this.runStreamLoop({ ...ctx, writer }), ) .catch(async (error: unknown) => { await this.flushTelemetry(options); - await this.cleanupRun(runId); + await this.cleanupRun(); try { await writer.write({ type: 'error', error }); await writer.write({ type: 'finish', finishReason: 'error' }); @@ -1164,7 +1186,7 @@ export class AgentRuntime { private async runStreamLoop( ctx: LoopContext & { writer: WritableStreamDefaultWriter }, ): Promise { - const { list, options, runId, pendingResume, writer } = ctx; + const { list, options, pendingResume, writer } = ctx; this.hydrateDeferredToolsFromList(list); const writeChunk = async (chunk: StreamChunk): Promise => { @@ -1180,7 +1202,7 @@ export class AgentRuntime { const { streamText } = loadAi(); const closeStreamWithError = async (error: unknown, status: AgentRunState): Promise => { - await this.cleanupRun(runId); + await this.cleanupRun(); this.updateState({ status }); await writer.write({ type: 'error', error }); await writer.write({ type: 'finish', finishReason: 'error' }); @@ -1207,7 +1229,7 @@ export class AgentRuntime { const pendingToolCtx: ToolBatchContext = { toolMap: pendingLoopContext.toolMap, list, - runId, + runId: this.runId, persistence: options?.persistence, telemetry: runTelemetry, executionCounter: options?.executionCounter, @@ -1248,7 +1270,6 @@ export class AgentRuntime { options, list, totalUsage, - runId, maxIterations, iterationCount, ); @@ -1379,7 +1400,7 @@ export class AgentRuntime { const batch = await this.iterateToolCallsConcurrent({ toolMap, list, - runId, + runId: this.runId, persistence: options?.persistence, telemetry: runTelemetry, executionCounter: options?.executionCounter, @@ -1417,7 +1438,6 @@ export class AgentRuntime { options, list, totalUsage, - runId, maxIterations, iterationCount + 1, ); @@ -1477,7 +1497,7 @@ export class AgentRuntime { } } - await this.cleanupRun(runId); + await this.cleanupRun(); await this.flushTelemetry(options); this.updateState({ status: 'success', messageList: list.serialize() }); @@ -2368,19 +2388,16 @@ export class AgentRuntime { /** * Persist a suspended run state and update the current state snapshot. - * Returns the runId (reuses existingRunId when resuming to prevent dangling runs). + * Returns the runtime's runId. */ private async persistSuspension( pendingToolCalls: Record, options: RuntimeExecutionOptions | undefined, list: AgentMessageList, totalUsage: TokenUsage | undefined, - existingRunId?: string, maxIterations?: number, iterationCount?: number, ): Promise { - const runId = existingRunId ?? generateRunId(); - // Persist loop controls only. providerOptions are intentionally excluded // because they may contain sensitive data (API keys, auth headers). const resolvedMaxIterations = maxIterations ?? options?.maxIterations; @@ -2397,16 +2414,14 @@ export class AgentRuntime { executionOptions, ...(resolvedIterationCount !== undefined ? { iterationCount: resolvedIterationCount } : {}), }; - await this.runState.suspend(runId, state); + await this.runState.suspend(this.runId, state); this.updateState({ status: 'suspended', pendingToolCalls, messageList: list.serialize() }); - return runId; + return this.runId; } /** Clean up stored state for a run when it finishes without re-suspending. */ - private async cleanupRun(runId: string | undefined): Promise { - if (runId) { - await this.runState.complete(runId); - } + private async cleanupRun(): Promise { + await this.runState.complete(this.runId); } /** Emit a TurnEnd event when an assistant message is present in `newMessages`. */ diff --git a/packages/@n8n/agents/src/sdk/__tests__/agent-isolated-runtime.test.ts b/packages/@n8n/agents/src/sdk/__tests__/agent-isolated-runtime.test.ts new file mode 100644 index 00000000000..d6fc1c06f44 --- /dev/null +++ b/packages/@n8n/agents/src/sdk/__tests__/agent-isolated-runtime.test.ts @@ -0,0 +1,129 @@ +import * as aiModule from 'ai'; +import type { Mock } from 'vitest'; + +import type { AgentRuntimeConfig } from '../../runtime/agent-runtime'; +import type { AgentEventBus } from '../../runtime/event-bus'; +import { AgentEvent } from '../../runtime/event-bus'; +import type { StreamChunk } from '../../types'; +import { Agent } from '../agent'; + +vi.mock('@ai-sdk/openai', () => ({ + createOpenAI: () => () => ({ provider: 'openai', modelId: 'mock', specificationVersion: 'v3' }), +})); + +// eslint-disable-next-line @typescript-eslint/consistent-type-imports +type AiImport = typeof import('ai'); + +vi.mock('ai', async () => { + const actual = await vi.importActual('ai'); + return { + ...actual, + generateText: vi.fn(), + }; +}); + +const { generateText } = aiModule as unknown as { + generateText: Mock; +}; + +type ActiveRuntime = { + bus: AgentEventBus; +}; + +type AgentInternals = { + ensureBuilt(): Promise; + createRuntime(config: AgentRuntimeConfig, runId?: string): ActiveRuntime; + trackStreamRuntime( + stream: ReadableStream, + active: ActiveRuntime, + ): ReadableStream; + cleanupRuntime(active: ActiveRuntime): Promise; + activeRuntimes: Set; +}; + +function makeGenerateSuccess(text: string) { + return { + finishReason: 'stop', + usage: { inputTokens: 10, outputTokens: 5, totalTokens: 15 }, + response: { + messages: [ + { + role: 'assistant', + content: [{ type: 'text', text }], + }, + ], + }, + toolCalls: [], + }; +} + +describe('Agent isolated runtimes', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('keeps result state bound to the runtime that produced it', async () => { + generateText + .mockResolvedValueOnce(makeGenerateSuccess('first response')) + .mockResolvedValueOnce(makeGenerateSuccess('second response')); + const agent = new Agent('agent').model('openai/gpt-4o-mini').instructions('test'); + + const first = await agent.generate('first'); + const second = await agent.generate('second'); + + expect(first.getState().messageList.messages).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + content: expect.arrayContaining([expect.objectContaining({ text: 'first response' })]), + }), + ]), + ); + expect(second.getState().messageList.messages).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + content: expect.arrayContaining([expect.objectContaining({ text: 'second response' })]), + }), + ]), + ); + }); + + it('applies event handler changes to active runtimes', async () => { + const agent = new Agent('agent').model('openai/gpt-4o-mini').instructions('test'); + const internals = agent as unknown as AgentInternals; + const active = internals.createRuntime(await internals.ensureBuilt()); + const handler = vi.fn(); + + agent.on(AgentEvent.AgentEnd, handler); + active.bus.emit({ type: AgentEvent.AgentEnd, messages: [] }); + agent.off(AgentEvent.AgentEnd, handler); + active.bus.emit({ type: AgentEvent.AgentEnd, messages: [] }); + + expect(handler).toHaveBeenCalledTimes(1); + await internals.cleanupRuntime(active); + }); + + it('cleans up the active runtime when a wrapped stream is cancelled', async () => { + const agent = new Agent('agent').model('openai/gpt-4o-mini').instructions('test'); + const internals = agent as unknown as AgentInternals; + const active = internals.createRuntime(await internals.ensureBuilt()); + const sourceCancel = vi.fn(); + const stream = internals.trackStreamRuntime( + new ReadableStream({ + start(controller) { + controller.enqueue({ type: 'start-step' }); + }, + cancel: sourceCancel, + }), + active, + ); + const reader = stream.getReader(); + + expect(internals.activeRuntimes.has(active)).toBe(true); + await reader.read(); + await reader.cancel('client disconnected'); + reader.releaseLock(); + + expect(sourceCancel).toHaveBeenCalledWith('client disconnected'); + expect(internals.activeRuntimes.has(active)).toBe(false); + }); +}); diff --git a/packages/@n8n/agents/src/sdk/__tests__/delegate-sub-agent-routing.test.ts b/packages/@n8n/agents/src/sdk/__tests__/delegate-sub-agent-routing.test.ts index 3f584a9341d..54e9908ac9a 100644 --- a/packages/@n8n/agents/src/sdk/__tests__/delegate-sub-agent-routing.test.ts +++ b/packages/@n8n/agents/src/sdk/__tests__/delegate-sub-agent-routing.test.ts @@ -10,13 +10,17 @@ import { type DelegateSubAgentRunner, type DelegateSubAgentRunnerHelpers, } from '../../runtime/delegate-sub-agent-tool'; -import type { BuiltTool } from '../../types'; +import type { BuiltTool, GenerateResult, SerializableAgentState } from '../../types'; import { Agent } from '../agent'; const runtimeConfigs: Array> = []; -let inlineChildGenerateResult: - | Awaited['generate']>> - | undefined; +let inlineChildGenerateResult: GenerateResult | undefined; + +const mockState = (): SerializableAgentState => ({ + status: 'success', + messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] }, + pendingToolCalls: {}, +}); vi.mock('../../runtime/agent-runtime', async (importOriginal) => { const actual = await importOriginal(); @@ -41,7 +45,8 @@ vi.mock('../../runtime/agent-runtime', async (importOriginal) => { content: [{ type: 'text', text: 'inline answer' }], }, ], - usage: {}, + usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0 }, + getState: mockState, }); } @@ -67,6 +72,12 @@ const delegateInput = { goal: 'Find the API behavior.', }; +async function buildAgentConfig(agent: Agent): Promise { + return await ( + agent as unknown as { build(): Promise } + ).build(); +} + describe('delegate sub-agent routing', () => { beforeEach(() => { runtimeConfigs.length = 0; @@ -89,10 +100,10 @@ describe('delegate sub-agent routing', () => { ) .tool(makeTool('lookup')); - await (agent as unknown as { build(): Promise }).build(); + const runtimeConfig = await buildAgentConfig(agent); - expect(runtimeConfigs).toHaveLength(1); - const builtTools = runtimeConfigs[0]?.tools as BuiltTool[] | undefined; + expect(runtimeConfigs).toHaveLength(0); + const builtTools = runtimeConfig.tools; const delegateTool = builtTools?.find((tool) => tool.name === DELEGATE_SUB_AGENT_TOOL_NAME); expect(delegateTool).toBeDefined(); @@ -104,12 +115,10 @@ describe('delegate sub-agent routing', () => { }); expect(hostRunSubAgent).toHaveBeenCalledOnce(); - expect(hostRunSubAgent.mock.calls[0]?.[1]).toEqual( - expect.objectContaining({ - runInlineSubAgent: expect.any(Function), - }), - ); - expect(runtimeConfigs).toHaveLength(2); + const helpers = hostRunSubAgent.mock.calls[0]?.[1]; + expect(helpers).toBeDefined(); + expect(typeof helpers?.runInlineSubAgent).toBe('function'); + expect(runtimeConfigs).toHaveLength(1); }); it('runs inline delegations without a host runner when the tool is built on an Agent', async () => { @@ -119,9 +128,10 @@ describe('delegate sub-agent routing', () => { .tool(createDelegateSubAgentTool()) .tool(makeTool('lookup')); - await (agent as unknown as { build(): Promise }).build(); + const runtimeConfig = await buildAgentConfig(agent); - const builtTools = runtimeConfigs[0]?.tools as BuiltTool[] | undefined; + expect(runtimeConfigs).toHaveLength(0); + const builtTools = runtimeConfig.tools; const delegateTool = builtTools?.find((tool) => tool.name === DELEGATE_SUB_AGENT_TOOL_NAME); expect(delegateTool).toBeDefined(); @@ -132,7 +142,7 @@ describe('delegate sub-agent routing', () => { answer: 'inline answer', }); - expect(runtimeConfigs).toHaveLength(2); + expect(runtimeConfigs).toHaveLength(1); }); it('lets a host-style runner delegate inline through helpers from tool metadata', async () => { @@ -196,6 +206,7 @@ describe('delegate sub-agent routing', () => { suspendPayload: { message: 'Delete file?' }, }, ], + getState: mockState, }; const agent = new Agent('parent') @@ -204,9 +215,9 @@ describe('delegate sub-agent routing', () => { .tool(createDelegateSubAgentTool()) .tool(makeTool('lookup')); - await (agent as unknown as { build(): Promise }).build(); + const runtimeConfig = await buildAgentConfig(agent); - const builtTools = runtimeConfigs[0]?.tools as BuiltTool[] | undefined; + const builtTools = runtimeConfig.tools; const delegateTool = builtTools?.find((tool) => tool.name === DELEGATE_SUB_AGENT_TOOL_NAME); expect(delegateTool).toBeDefined(); diff --git a/packages/@n8n/agents/src/sdk/__tests__/memory-builder-episodic.test.ts b/packages/@n8n/agents/src/sdk/__tests__/memory-builder-episodic.test.ts index 2ab279372f7..bf89ea51a20 100644 --- a/packages/@n8n/agents/src/sdk/__tests__/memory-builder-episodic.test.ts +++ b/packages/@n8n/agents/src/sdk/__tests__/memory-builder-episodic.test.ts @@ -1,4 +1,4 @@ -import type { AgentRuntime } from '../../runtime/agent-runtime'; +import type { AgentRuntimeConfig } from '../../runtime/agent-runtime'; import { DEFAULT_EPISODIC_MEMORY_EMBEDDING_MODEL, DEFAULT_EPISODIC_MEMORY_MAX_ENTRIES_PER_RUN, @@ -68,14 +68,9 @@ describe('Memory builder — episodic memory', () => { .instructions('You are a test assistant.') .memory(memory); - const runtime = await (agent as unknown as { build(): Promise }).build(); - const runtimeConfig = ( - runtime as unknown as { - config: { - episodicMemory?: EpisodicMemoryConfig; - }; - } - ).config; + const runtimeConfig = await ( + agent as unknown as { build(): Promise } + ).build(); const embedder = runtimeConfig.episodicMemory?.embedder as unknown as Record; expect(runtimeConfig.episodicMemory).toMatchObject({ diff --git a/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts b/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts index 778ce81e295..448bef1356c 100644 --- a/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts +++ b/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts @@ -1,6 +1,6 @@ -import type { AgentRuntime } from '../../runtime/agent-runtime'; +import type { AgentRuntimeConfig } from '../../runtime/agent-runtime'; import { InMemoryMemory } from '../../runtime/memory-store'; -import type { BuiltMemory, MemoryConfig, ObservationalMemoryConfig } from '../../types'; +import type { BuiltMemory, MemoryConfig } from '../../types'; import { Agent } from '../agent'; import { DEFAULT_OBSERVATION_LOG_LOCK_TTL_MS, @@ -137,15 +137,9 @@ describe('Memory builder — observation log memory', () => { .instructions('You are a test assistant.') .memory(memory); - const runtime = await (agent as unknown as { build(): Promise }).build(); - const runtimeConfig = ( - runtime as unknown as { - config: { - observationLog?: { renderTokenBudget?: number }; - observationalMemory?: ObservationalMemoryConfig; - }; - } - ).config; + const runtimeConfig = await ( + agent as unknown as { build(): Promise } + ).build(); expect(runtimeConfig.observationLog).toEqual({ renderTokenBudget: DEFAULT_OBSERVATION_LOG_RENDER_TOKEN_BUDGET, diff --git a/packages/@n8n/agents/src/sdk/agent.ts b/packages/@n8n/agents/src/sdk/agent.ts index 6bae29c5696..3cb2ab6992c 100644 --- a/packages/@n8n/agents/src/sdk/agent.ts +++ b/packages/@n8n/agents/src/sdk/agent.ts @@ -1,12 +1,13 @@ import type { ProviderOptions } from '@ai-sdk/provider-utils'; import type { z } from 'zod'; +import { getModelCost } from './catalog'; import type { Eval } from './eval'; import type { McpClient } from './mcp-client'; import { Memory, normalizeMemoryConfig, resolveMemoryConfigDefaults } from './memory'; import { Telemetry } from './telemetry'; import { wrapToolForApproval } from './tool'; -import { AgentRuntime } from '../runtime/agent-runtime'; +import { AgentRuntime, type AgentRuntimeConfig } from '../runtime/agent-runtime'; import { LOAD_TOOL_TOOL_NAME, SEARCH_TOOLS_TOOL_NAME } from '../runtime/deferred-tool-manager'; import { DELEGATE_SUB_AGENT_TOOL_NAME, @@ -21,6 +22,7 @@ import { } from '../runtime/delegate-sub-agent-tool'; import { RECALL_MEMORY_TOOL_NAME } from '../runtime/episodic-memory'; import { AgentEventBus } from '../runtime/event-bus'; +import { RunStateManager } from '../runtime/run-state'; import { isSdkOwnedBuiltInTool } from '../runtime/sdk-owned-tool'; import { WRITE_TODOS_TOOL_NAME } from '../runtime/write-todos-tool'; import { @@ -47,13 +49,13 @@ import type { ModelConfig, Provider, RunOptions, - SerializableAgentState, StreamResult, ThinkingConfig, ThinkingConfigFor, ResumeOptions, } from '../types'; import type { AgentEvent } from '../types/runtime/event'; +import type { StreamChunk } from '../types/sdk/agent'; import type { AgentBuilder } from '../types/sdk/agent-builder'; import type { AgentMessage } from '../types/sdk/message'; import type { Workspace } from '../workspace/workspace'; @@ -77,6 +79,11 @@ interface DeferredToolOptions { }; } +type ActiveRuntime = { + runtime: AgentRuntime; + bus: AgentEventBus; +}; + /** * Lightweight read-only view of an agent's configured state. * Returned by `Agent.snapshot` for testing and debugging purposes. @@ -155,8 +162,6 @@ export class Agent implements BuiltAgent, AgentBuilder { private thinkingConfig?: ThinkingConfig; - private runtime?: AgentRuntime; - private concurrencyValue?: number; private telemetryBuilder?: Telemetry; @@ -171,9 +176,11 @@ export class Agent implements BuiltAgent, AgentBuilder { private defaultExecutionOptions?: ExecutionOptions; - private buildPromise: Promise | undefined; + private buildPromise: Promise | undefined; - private eventBus = new AgentEventBus(); + private agentHandlers = new Map>(); + + private activeRuntimes = new Set(); private workspaceInstance?: Workspace; @@ -403,7 +410,7 @@ export class Agent implements BuiltAgent, AgentBuilder { } else { this.telemetryBuilder = undefined; this.telemetryConfig = t; - this.runtime?.setTelemetry(t); + this.buildPromise = undefined; } return this; } @@ -503,7 +510,15 @@ export class Agent implements BuiltAgent, AgentBuilder { * Handlers are called synchronously during the agentic loop. */ on(event: AgentEvent, handler: AgentEventHandler): void { - this.eventBus.on(event, handler); + let handlers = this.agentHandlers.get(event); + if (!handlers) { + handlers = new Set(); + this.agentHandlers.set(event, handlers); + } + handlers.add(handler); + for (const { bus } of this.activeRuntimes) { + bus.on(event, handler); + } } /** @@ -512,7 +527,15 @@ export class Agent implements BuiltAgent, AgentBuilder { * cleanly between turns instead of accumulating on a long-lived agent. */ off(event: AgentEvent, handler: AgentEventHandler): void { - this.eventBus.off(event, handler); + const handlers = this.agentHandlers.get(event); + if (!handlers) return; + handlers.delete(handler); + if (handlers.size === 0) { + this.agentHandlers.delete(event); + } + for (const { bus } of this.activeRuntimes) { + bus.off(event, handler); + } } /** @@ -556,25 +579,14 @@ export class Agent implements BuiltAgent, AgentBuilder { }; } - /** Return the latest state snapshot of the agent. Returns `{ status: 'idle' }` before first run. */ - getState(): SerializableAgentState { - if (!this.runtime) { - return { - persistence: undefined, - status: 'idle', - messageList: { messages: [], historyIds: [], inputIds: [], responseIds: [] }, - pendingToolCalls: {}, - }; - } - return this.runtime.getState(); - } - /** * Cancel the currently running agent. * Synchronous — sets an abort flag; the agentic loop checks it asynchronously. */ abort(): void { - this.eventBus.abort(); + for (const { bus } of this.activeRuntimes) { + bus.abort(); + } } /** @@ -590,7 +602,10 @@ export class Agent implements BuiltAgent, AgentBuilder { */ async close(): Promise { const tasks: Array> = []; - if (this.runtime) tasks.push(this.runtime.dispose()); + for (const active of this.activeRuntimes) { + active.bus.abort(); + tasks.push(this.cleanupRuntime(active)); + } tasks.push(...this.mcpClients.map(async (c) => await c.close())); await Promise.allSettled(tasks); } @@ -600,9 +615,14 @@ export class Agent implements BuiltAgent, AgentBuilder { input: AgentMessage[] | string, options?: RunOptions & ExecutionOptions, ): Promise { - const runtime = await this.ensureBuilt(); + const config = await this.ensureBuilt(); + const active = this.createRuntime(config); const mergedOptions = this.mergeWithDefaults(options); - return await runtime.generate(this.toMessages(input), mergedOptions); + try { + return await active.runtime.generate(this.toMessages(input), mergedOptions); + } finally { + await this.cleanupRuntime(active); + } } /** Stream a response. Lazy-builds on first call. */ @@ -610,9 +630,16 @@ export class Agent implements BuiltAgent, AgentBuilder { input: AgentMessage[] | string, options?: RunOptions & ExecutionOptions, ): Promise { - const runtime = await this.ensureBuilt(); + const config = await this.ensureBuilt(); + const active = this.createRuntime(config); const mergedOptions = this.mergeWithDefaults(options); - return await runtime.stream(this.toMessages(input), mergedOptions); + try { + const result = await active.runtime.stream(this.toMessages(input), mergedOptions); + return { ...result, stream: this.trackStreamRuntime(result.stream, active) }; + } catch (error) { + await this.cleanupRuntime(active); + throw error; + } } /** Resume a suspended tool call with data. Lazy-builds on first call. */ @@ -631,11 +658,23 @@ export class Agent implements BuiltAgent, AgentBuilder { data: unknown, options: ResumeOptions & ExecutionOptions, ): Promise { - const runtime = await this.ensureBuilt(); + const config = await this.ensureBuilt(); if (method === 'generate') { - return await runtime.resume('generate', data, options); + const active = this.createRuntime(config, options.runId); + try { + return await active.runtime.resume('generate', data, options); + } finally { + await this.cleanupRuntime(active); + } + } + const active = this.createRuntime(config, options.runId); + try { + const result = await active.runtime.resume('stream', data, options); + return { ...result, stream: this.trackStreamRuntime(result.stream, active) }; + } catch (error) { + await this.cleanupRuntime(active); + throw error; } - return await runtime.resume('stream', data, options); } approve(method: 'generate', options: ResumeOptions & ExecutionOptions): Promise; @@ -674,7 +713,7 @@ export class Agent implements BuiltAgent, AgentBuilder { * concurrent callers share one build operation. On error the promise is * cleared so the caller can retry. */ - private async ensureBuilt(): Promise { + private async ensureBuilt(): Promise { if (!this.buildPromise) { const p = this.build(); this.buildPromise = p; @@ -685,13 +724,76 @@ export class Agent implements BuiltAgent, AgentBuilder { return await this.buildPromise; } + private createRuntime(config: AgentRuntimeConfig, runId?: string): ActiveRuntime { + const bus = new AgentEventBus(); + for (const [event, handlers] of this.agentHandlers) { + for (const handler of handlers) { + bus.on(event, handler); + } + } + const runtime = new AgentRuntime({ ...config, eventBus: bus, runId }); + const active = { runtime, bus }; + this.activeRuntimes.add(active); + return active; + } + + private trackStreamRuntime( + stream: ReadableStream, + active: ActiveRuntime, + ): ReadableStream { + const reader = stream.getReader(); + let cleanupPromise: Promise | undefined; + const cleanup = async () => { + const doCleanup = async () => { + try { + reader.releaseLock(); + } catch { + // The lock may already be released after cancellation/error cleanup. + } + await this.cleanupRuntime(active); + }; + cleanupPromise ??= doCleanup(); + return await cleanupPromise; + }; + + return new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read(); + if (done) { + controller.close(); + await cleanup(); + return; + } + controller.enqueue(value); + } catch (error) { + controller.error(error); + await cleanup(); + } + }, + async cancel(reason) { + try { + await reader.cancel(reason); + } finally { + await cleanup(); + } + }, + }); + } + + private async cleanupRuntime(active: ActiveRuntime): Promise { + if (!this.activeRuntimes.delete(active)) return; + active.bus.dispose(); + await active.runtime.dispose(); + } + private toMessages(input: string | AgentMessage[]): AgentMessage[] { if (Array.isArray(input)) return input; return [{ role: 'user', content: [{ type: 'text', text: input }] }]; } /** @internal Validate configuration and produce an AgentRuntime. Overridden by the execution engine. */ - protected async build(): Promise { + protected async build(): Promise { if (!this.modelConfig) { throw new Error(`Agent "${this.name}" requires a model`); } @@ -836,7 +938,22 @@ export class Agent implements BuiltAgent, AgentBuilder { ...(toolSearch !== undefined ? { toolSearch } : {}), }); - this.runtime = new AgentRuntime({ + let modelCost: Awaited> | undefined; + try { + const modelId = + typeof modelConfig === 'string' + ? modelConfig + : 'id' in modelConfig && typeof modelConfig.id === 'string' + ? modelConfig.id + : undefined; + modelCost = modelId ? await getModelCost(modelId) : undefined; + } catch { + modelCost = undefined; + } + + const runState = new RunStateManager(this.checkpointStore); + + return { name: this.name, model: modelConfig, instructions, @@ -852,13 +969,12 @@ export class Agent implements BuiltAgent, AgentBuilder { structuredOutput: this.outputSchema, checkpointStorage: this.checkpointStore, thinking: this.thinkingConfig, - eventBus: this.eventBus, toolCallConcurrency: this.concurrencyValue, titleGeneration: memoryConfig?.titleGeneration, - telemetry, - }); - - return this.runtime; + telemetry: this.telemetryConfig ?? (await this.telemetryBuilder?.build()), + modelCost, + runState, + }; } private completeInlineDelegateTools( diff --git a/packages/@n8n/agents/src/skills/__tests__/runtime-skills.test.ts b/packages/@n8n/agents/src/skills/__tests__/runtime-skills.test.ts index 14cd680b157..df326568e8a 100644 --- a/packages/@n8n/agents/src/skills/__tests__/runtime-skills.test.ts +++ b/packages/@n8n/agents/src/skills/__tests__/runtime-skills.test.ts @@ -13,6 +13,7 @@ import { parseRuntimeSkillMarkdown, renderSkillCatalogPrompt, } from '..'; +import type { AgentRuntimeConfig } from '../../runtime/agent-runtime'; import { Agent } from '../../sdk/agent'; import { isZodSchema } from '../../utils/zod'; @@ -474,8 +475,10 @@ Use the workflow SDK.`, .model('anthropic/claude-sonnet-4-5') .instructions('Base instructions.') .skills(source); - const runtime = await (agent as unknown as { build(): Promise }).build(); - const instructions = (runtime as { config: { instructions: string } }).config.instructions; + const runtimeConfig = await ( + agent as unknown as { build(): Promise } + ).build(); + const { instructions } = runtimeConfig; expect(prepare).toHaveBeenCalledTimes(1); expect(instructions).toContain('name: "Summarize notes"'); diff --git a/packages/@n8n/agents/src/types/sdk/agent.ts b/packages/@n8n/agents/src/types/sdk/agent.ts index 72e3dbc532b..e19e25aaf29 100644 --- a/packages/@n8n/agents/src/types/sdk/agent.ts +++ b/packages/@n8n/agents/src/types/sdk/agent.ts @@ -206,6 +206,8 @@ export interface GenerateResult { * callers can handle them without try/catch. */ error?: unknown; + /** Return a snapshot of the agent state for this run. */ + getState(): SerializableAgentState; } export interface StreamResult { @@ -213,6 +215,11 @@ export interface StreamResult { runId: string; /** The readable stream of chunks. */ stream: ReadableStream; + /** + * Return the current agent state for this run. + * May be called while streaming or after the stream closes. + */ + getState(): SerializableAgentState; } export interface ResumeOptions { @@ -234,8 +241,6 @@ export interface BuiltAgent { on(event: AgentEvent, handler: AgentEventHandler): void; - getState(): SerializableAgentState; - /** Cancel the currently running agent. Synchronous — sets an abort flag that the agentic loop checks asynchronously. */ abort(): void; diff --git a/packages/cli/src/modules/agents/agents.service.ts b/packages/cli/src/modules/agents/agents.service.ts index 19b7c6e933a..e5f7748380e 100644 --- a/packages/cli/src/modules/agents/agents.service.ts +++ b/packages/cli/src/modules/agents/agents.service.ts @@ -53,6 +53,7 @@ import { markAgentDraftDirty } from './utils/agent-draft.utils'; import { draftChatMemoryResourceId } from './utils/agent-memory-scope'; import { executionsToMessagesDto } from './utils/execution-to-message-mapper'; import { generateAgentResourceId } from './utils/agent-resource-id'; +import { streamAgentChunks } from './utils/agent-stream'; import { AgentExecutionService } from './agent-execution.service'; import { AgentSkillsService } from './agent-skills.service'; import { AGENT_THREAD_PREFIX } from './builder/builder-tool-names'; @@ -896,16 +897,9 @@ export class AgentsService { executionCounter: this.createAgentExecutionCounter({ agentId }), }); - const reader = resultStream.stream.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - recorder.record(value); - yield value; - } - } finally { - reader.releaseLock(); + for await (const value of streamAgentChunks(resultStream.stream)) { + recorder.record(value); + yield value; } // Always record resumed executions — even if they suspend again (chained HITL). @@ -1258,28 +1252,21 @@ export class AgentsService { executionCounter: this.createAgentExecutionCounter({ agentId, userId }), }); - const reader = resultStream.stream.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - recorder.record(value); - if (value.type === 'tool-call-suspended') { - this.logger.info('Chat: tool-call-suspended chunk received', { - agentId, - toolCallId: value.toolCallId, - toolName: value.toolName, - }); - } - if (value.type === 'finish' && value.finishReason === 'max-iterations') { - for (const chunk of getMaxIterationsChunks()) { - yield chunk; - } - } - yield value; + for await (const value of streamAgentChunks(resultStream.stream)) { + recorder.record(value); + if (value.type === 'tool-call-suspended') { + this.logger.info('Chat: tool-call-suspended chunk received', { + agentId, + toolCallId: value.toolCallId, + toolName: value.toolName, + }); } - } finally { - reader.releaseLock(); + if (value.type === 'finish' && value.finishReason === 'max-iterations') { + for (const chunk of getMaxIterationsChunks()) { + yield chunk; + } + } + yield value; } // Always record — even if suspended, the pre-suspension response text @@ -1398,29 +1385,22 @@ export class AgentsService { executionCounter: this.createAgentExecutionCounter({ agentId, userId: telemetryUserId }), }); - const reader = resultStream.stream.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - recorder.record(value); + for await (const value of streamAgentChunks(resultStream.stream)) { + recorder.record(value); - if (value.type === 'tool-call') { - toolInputs.set(value.toolCallId, { toolName: value.toolName, input: value.input }); - } else if (value.type === 'tool-result') { - const pending = toolInputs.get(value.toolCallId); - toolCalls.push({ - toolName: value.toolName, - input: pending?.input ?? null, - result: value.output, - }); - toolInputs.delete(value.toolCallId); - } else if (value.type === 'finish' && value.structuredOutput !== undefined) { - structuredOutput = value.structuredOutput; - } + if (value.type === 'tool-call') { + toolInputs.set(value.toolCallId, { toolName: value.toolName, input: value.input }); + } else if (value.type === 'tool-result') { + const pending = toolInputs.get(value.toolCallId); + toolCalls.push({ + toolName: value.toolName, + input: pending?.input ?? null, + result: value.output, + }); + toolInputs.delete(value.toolCallId); + } else if (value.type === 'finish' && value.structuredOutput !== undefined) { + structuredOutput = value.structuredOutput; } - } finally { - reader.releaseLock(); } const messageRecord = recorder.getMessageRecord(); diff --git a/packages/cli/src/modules/agents/builder/agents-builder.service.ts b/packages/cli/src/modules/agents/builder/agents-builder.service.ts index f7d113ae17c..f3e96abf672 100644 --- a/packages/cli/src/modules/agents/builder/agents-builder.service.ts +++ b/packages/cli/src/modules/agents/builder/agents-builder.service.ts @@ -20,6 +20,7 @@ import { N8NCheckpointStorage } from '../integrations/n8n-checkpoint-storage'; import { N8nMemory } from '../integrations/n8n-memory'; import type { AgentJsonConfig } from '@n8n/api-types'; import { AgentCheckpointRepository } from '../repositories/agent-checkpoint.repository'; +import { streamAgentChunks } from '../utils/agent-stream'; import { buildAgentPreviewPath } from './agent-builder-preview-path'; import { buildBuilderPrompt } from './agents-builder-prompts'; import { AgentsBuilderToolsService, getAgentConfigHash } from './agents-builder-tools.service'; @@ -233,15 +234,8 @@ export class AgentsBuilderService { * plain reader→generator adapter. */ private async *streamFromAgent(resultStream: StreamResult): AsyncGenerator { - const reader = resultStream.stream.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - yield value; - } - } finally { - reader.releaseLock(); + for await (const value of streamAgentChunks(resultStream.stream)) { + yield value; } } diff --git a/packages/cli/src/modules/agents/sub-agents/__tests__/delegate-sub-agent-tool.test.ts b/packages/cli/src/modules/agents/sub-agents/__tests__/delegate-sub-agent-tool.test.ts index 854a120e316..6cb5ee6b89a 100644 --- a/packages/cli/src/modules/agents/sub-agents/__tests__/delegate-sub-agent-tool.test.ts +++ b/packages/cli/src/modules/agents/sub-agents/__tests__/delegate-sub-agent-tool.test.ts @@ -42,6 +42,9 @@ const generateResult: GenerateResult = { ], }, ], + getState: () => { + throw new Error('not implemented'); + }, }; const foregroundResult: SubAgentForegroundResult = { diff --git a/packages/cli/src/modules/agents/sub-agents/__tests__/sub-agent-foreground-runner.test.ts b/packages/cli/src/modules/agents/sub-agents/__tests__/sub-agent-foreground-runner.test.ts index dd43330c5c8..9045fcf9c5f 100644 --- a/packages/cli/src/modules/agents/sub-agents/__tests__/sub-agent-foreground-runner.test.ts +++ b/packages/cli/src/modules/agents/sub-agents/__tests__/sub-agent-foreground-runner.test.ts @@ -420,5 +420,8 @@ function makeStreamResult(chunks: StreamChunk[]): StreamResult { controller.close(); }, }), + getState: () => { + throw new Error('not implemented'); + }, }; } diff --git a/packages/cli/src/modules/agents/sub-agents/sub-agent-foreground-runner.ts b/packages/cli/src/modules/agents/sub-agents/sub-agent-foreground-runner.ts index 111b47d4f70..4bebb6a77a6 100644 --- a/packages/cli/src/modules/agents/sub-agents/sub-agent-foreground-runner.ts +++ b/packages/cli/src/modules/agents/sub-agents/sub-agent-foreground-runner.ts @@ -8,15 +8,16 @@ import { type GenerateResult, type SubAgentTaskPath, } from '@n8n/agents'; -import { Logger } from '@n8n/backend-common'; import type { ResolvedSubAgentSource, SubAgentSpawnRequest } from '@n8n/api-types'; +import { Logger } from '@n8n/backend-common'; import { Container, Service } from '@n8n/di'; import { UserError } from 'n8n-workflow'; import { v4 as uuid } from 'uuid'; import { AgentExecutionService } from '../agent-execution.service'; -import { ExecutionRecorder } from '../execution-recorder'; import type { MessageRecord } from '../execution-recorder'; +import { ExecutionRecorder } from '../execution-recorder'; +import { streamAgentChunks } from '../utils/agent-stream'; import { SubAgentSourceResolver } from './sub-agent-source-resolver'; export interface SubAgentForegroundRunContext { @@ -115,21 +116,14 @@ export class SubAgentForegroundRunner { let structuredOutput: unknown; let childSuspended = false; - const reader = resultStream.stream.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - recorder.record(value); - if (value.type === 'tool-call-suspended') { - childSuspended = true; - } - if (value.type === 'finish' && value.structuredOutput !== undefined) { - structuredOutput = value.structuredOutput; - } + for await (const value of streamAgentChunks(resultStream.stream)) { + recorder.record(value); + if (value.type === 'tool-call-suspended') { + childSuspended = true; + } + if (value.type === 'finish' && value.structuredOutput !== undefined) { + structuredOutput = value.structuredOutput; } - } finally { - reader.releaseLock(); } const messageRecord = recorder.getMessageRecord(); @@ -153,6 +147,9 @@ export class SubAgentForegroundRunner { messages: [], finishReason: 'error', error: DELEGATED_CHILD_SUSPEND_UNSUPPORTED_MESSAGE, + getState: () => { + throw new Error('getState is not implemented for sub-agent foreground runner'); + }, }, }; } @@ -265,6 +262,9 @@ function buildGenerateResultFromRecord( : {}), ...(structuredOutput !== undefined ? { structuredOutput } : {}), ...(record.error !== null ? { error: record.error } : {}), + getState: () => { + throw new Error('getState is not implemented for sub-agent foreground runner'); + }, }; return result; } diff --git a/packages/cli/src/modules/agents/utils/agent-stream.ts b/packages/cli/src/modules/agents/utils/agent-stream.ts new file mode 100644 index 00000000000..f3ec3f5e1e9 --- /dev/null +++ b/packages/cli/src/modules/agents/utils/agent-stream.ts @@ -0,0 +1,25 @@ +import type { StreamChunk } from '@n8n/agents'; + +export async function* streamAgentChunks( + stream: ReadableStream, +): AsyncGenerator { + const reader = stream.getReader(); + let streamDone = false; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + streamDone = true; + break; + } + yield value; + } + } finally { + try { + if (!streamDone) await reader.cancel(); + } finally { + reader.releaseLock(); + } + } +}