From 7f37612d2f89635b74b74b54e8d5f37edd19ca2d Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Tue, 5 May 2026 10:41:36 +0200 Subject: [PATCH] refactor(instance-ai): normalize native stream results --- .../runtime/__tests__/stream-runner.test.ts | 59 +++++++++++++++ .../instance-ai/src/runtime/stream-runner.ts | 73 +++++++++++++++++-- 2 files changed, 126 insertions(+), 6 deletions(-) diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts index c4ee82da8f2..1652a3bf129 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts @@ -36,6 +36,14 @@ async function* emptyStream() { yield* []; } +async function collectAsyncIterable(stream: AsyncIterable) { + const chunks: unknown[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + return chunks; +} + describe('streamAgentRun', () => { it('returns errored status when agent stream contains an error chunk', async () => { jest.mocked(executeResumableStream).mockResolvedValue({ @@ -210,4 +218,55 @@ describe('streamAgentRun', () => { }), ); }); + + it('normalizes native agent readable streams for the resumable executor', async () => { + const mockedExecuteResumableStream = jest.mocked(executeResumableStream); + mockedExecuteResumableStream.mockClear(); + const nativeChunk = { type: 'text-delta', delta: 'All good' }; + const readable = new ReadableStream({ + start(controller) { + controller.enqueue(nativeChunk); + controller.close(); + }, + }); + const agent = { + stream: jest.fn().mockResolvedValue({ + runId: 'agent-run-1', + stream: readable, + getState: jest.fn(), + }), + }; + const eventBus = createEventBus(); + + mockedExecuteResumableStream.mockResolvedValue({ + status: 'completed', + agentRunId: 'agent-run-1', + workSummary: emptyWorkSummary, + }); + + await streamAgentRun( + agent, + 'hello', + {}, + { + threadId: 'thread-1', + runId: 'run-1', + agentId: 'agent-1', + signal: new AbortController().signal, + eventBus, + logger: createLogger(), + }, + ); + + const call = mockedExecuteResumableStream.mock.calls[0]; + expect(call).toBeDefined(); + const source = call?.[0].stream; + expect(source).toEqual( + expect.objectContaining({ + runId: 'agent-run-1', + streamFormat: 'agent', + }), + ); + await expect(collectAsyncIterable(source.fullStream)).resolves.toEqual([nativeChunk]); + }); }); diff --git a/packages/@n8n/instance-ai/src/runtime/stream-runner.ts b/packages/@n8n/instance-ai/src/runtime/stream-runner.ts index fa34e81d6b5..04c8b722c6f 100644 --- a/packages/@n8n/instance-ai/src/runtime/stream-runner.ts +++ b/packages/@n8n/instance-ai/src/runtime/stream-runner.ts @@ -1,4 +1,5 @@ import type { InstanceAiEvent } from '@n8n/api-types'; +import type { StreamResult } from '@n8n/agents'; import type { InstanceAiEventBus } from '../event-bus'; import type { Logger } from '../logger'; @@ -10,11 +11,16 @@ import { type TraceStatus, } from './resumable-stream-executor'; import { getTraceParentRun, withTraceParentContext } from '../tracing/langsmith-tracing'; -import { asResumable } from '../utils/stream-helpers'; +import { asResumable, isRecord } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; +type StreamableAgentStreamResult = ResumableStreamSource | StreamResult; + export interface StreamableAgent { - stream: (input: unknown, options: Record) => Promise; + stream: ( + input: unknown, + options: Record, + ) => Promise; } export interface StreamRunOptions { @@ -34,6 +40,59 @@ export interface StreamRunResult { confirmationEvent?: Extract; } +function isAsyncIterable(value: unknown): value is AsyncIterable { + return ( + value !== null && + typeof value === 'object' && + typeof Reflect.get(value, Symbol.asyncIterator) === 'function' + ); +} + +function isReadableStream(value: unknown): value is ReadableStream { + return ( + value !== null && + typeof value === 'object' && + typeof Reflect.get(value, 'getReader') === 'function' + ); +} + +function isResumableStreamSource(value: unknown): value is ResumableStreamSource { + return isRecord(value) && isAsyncIterable(value.fullStream); +} + +function isNativeStreamResult(value: unknown): value is StreamResult { + return isRecord(value) && isReadableStream(value.stream); +} + +async function* readableStreamToAsyncIterable(stream: ReadableStream) { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) return; + yield value; + } + } finally { + reader.releaseLock(); + } +} + +function normalizeStreamSource(result: StreamableAgentStreamResult): ResumableStreamSource { + if (isResumableStreamSource(result)) { + return result; + } + + if (isNativeStreamResult(result)) { + return { + runId: result.runId, + streamFormat: 'agent', + fullStream: readableStreamToAsyncIterable(result.stream), + }; + } + + throw new Error('Unsupported agent stream result'); +} + export async function streamAgentRun( agent: StreamableAgent, input: unknown, @@ -47,8 +106,9 @@ export async function streamAgentRun( ...streamOptions, ...(llmStepTraceHooks?.executionOptions ?? {}), }); - const agentRunId = typeof result.runId === 'string' ? result.runId : ''; - return await consumeStream(agent, result, { ...options, agentRunId, llmStepTraceHooks }); + const stream = normalizeStreamSource(result); + const agentRunId = typeof stream.runId === 'string' ? stream.runId : ''; + return await consumeStream(agent, stream, { ...options, agentRunId, llmStepTraceHooks }); }); } @@ -65,8 +125,9 @@ export async function resumeAgentRun( ...resumeOptions, ...(llmStepTraceHooks?.executionOptions ?? {}), }); - const agentRunId = (typeof resumed.runId === 'string' && resumed.runId) || options.agentRunId; - return await consumeStream(agent, resumed, { ...options, agentRunId, llmStepTraceHooks }); + const stream = normalizeStreamSource(resumed); + const agentRunId = (typeof stream.runId === 'string' && stream.runId) || options.agentRunId; + return await consumeStream(agent, stream, { ...options, agentRunId, llmStepTraceHooks }); }); }