From ad31edcdd7aa27032f93afd2c8f0f24f34baa254 Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Tue, 5 May 2026 11:30:24 +0200 Subject: [PATCH] refactor(instance-ai): accumulate native stream text --- .../resumable-stream-executor.test.ts | 1 + .../runtime/__tests__/stream-runner.test.ts | 1 + .../src/runtime/resumable-stream-executor.ts | 57 ++++++++++++++++++- 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts index 5db87b6d3bf..73db13e8675 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts @@ -520,6 +520,7 @@ describe('executeResumableStream', () => { ); expect(result.status).toBe('completed'); expect(result.agentRunId).toBe('agent-run-2'); + await expect(result.text).resolves.toBe('Done.'); expect(eventBus.publish).toHaveBeenCalledWith( 'thread-1', expect.objectContaining({ diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts index c960c4eab41..66ee4b91b8b 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts @@ -277,5 +277,6 @@ describe('streamAgentRun', () => { }), ); await expect(collectAsyncIterable(source.fullStream)).resolves.toEqual([nativeChunk]); + await expect(source.text).resolves.toBe('All good'); }); }); diff --git a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts index cb36108aa99..28d43439ba4 100644 --- a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts +++ b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts @@ -192,6 +192,58 @@ function isNativeStreamResult(value: unknown): value is StreamResult { return isRecord(value) && isReadableStream(value.stream); } +function extractContentText(content: unknown): string { + if (typeof content === 'string') { + return content; + } + + if (!Array.isArray(content)) { + return ''; + } + + return content + .map((part) => { + if (!isRecord(part) || part.type !== 'text') { + return ''; + } + + return typeof part.text === 'string' ? part.text : ''; + }) + .join(''); +} + +async function collectNativeStreamText(stream: ReadableStream): Promise { + const reader = stream.getReader(); + let deltaText = ''; + let messageText = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) return deltaText || messageText; + + if (!isRecord(value)) { + continue; + } + + if (value.type === 'text-delta' && typeof value.delta === 'string') { + deltaText += value.delta; + continue; + } + + if (value.type !== 'message' || !isRecord(value.message)) { + continue; + } + + if (value.message.role === 'assistant') { + messageText += extractContentText(value.message.content); + } + } + } finally { + reader.releaseLock(); + } +} + async function* readableStreamToAsyncIterable(stream: ReadableStream) { const reader = stream.getReader(); try { @@ -216,10 +268,13 @@ export function normalizeStreamSource( } if (isNativeStreamResult(result)) { + const [eventStream, textStream] = result.stream.tee(); + return { runId: result.runId, streamFormat: 'agent', - fullStream: readableStreamToAsyncIterable(result.stream), + fullStream: readableStreamToAsyncIterable(eventStream), + text: collectNativeStreamText(textStream), }; }