From 18f05c9a899fb2af15a96b52d019763ac8e9eced Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Tue, 5 May 2026 10:40:10 +0200 Subject: [PATCH] refactor(instance-ai): route native stream events --- .../resumable-stream-executor.test.ts | 68 +++++++++++++++++++ .../src/runtime/resumable-stream-executor.ts | 26 ++++--- .../@n8n/instance-ai/src/stream/map-chunk.ts | 18 ++++- 3 files changed, 103 insertions(+), 9 deletions(-) diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts index 4bf141c1688..ecbe55e918b 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts @@ -301,6 +301,74 @@ describe('executeResumableStream', () => { expect(result.confirmationEvent?.payload.requestId).toBe('request-1'); }); + it('maps native agent stream chunks when stream source is native', async () => { + const eventBus = createEventBus(); + + const result = await executeResumableStream({ + agent: {}, + stream: { + runId: 'agent-run-1', + streamFormat: 'agent', + fullStream: fromChunks([ + { type: 'text-delta', delta: 'Working...' }, + { + type: 'tool-call-suspended', + toolCallId: 'tool-call-1', + toolName: 'ask-user', + input: { prompt: 'Confirm?' }, + suspendPayload: { + requestId: 'request-1', + message: 'Need approval', + }, + }, + ]), + }, + context: { + threadId: 'thread-1', + runId: 'run-1', + agentId: 'agent-1', + eventBus, + signal: new AbortController().signal, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() }, + }, + control: { mode: 'manual' }, + }); + + expect(result).toEqual( + expect.objectContaining({ + status: 'suspended', + agentRunId: 'agent-run-1', + suspension: { + toolCallId: 'tool-call-1', + requestId: 'request-1', + toolName: 'ask-user', + }, + }), + ); + expect(eventBus.publish).toHaveBeenCalledWith( + 'thread-1', + expect.objectContaining({ + type: 'text-delta', + runId: 'run-1', + agentId: 'agent-1', + payload: { text: 'Working...' }, + }), + ); + expect(eventBus.publish).not.toHaveBeenCalledWith( + 'thread-1', + expect.objectContaining({ type: 'confirmation-request' }), + ); + expect(result.confirmationEvent?.type).toBe('confirmation-request'); + expect(result.confirmationEvent?.payload).toEqual( + expect.objectContaining({ + requestId: 'request-1', + toolCallId: 'tool-call-1', + toolName: 'ask-user', + args: { prompt: 'Confirm?' }, + }), + ); + }); + it('returns errored status when stream contains an error chunk', async () => { const eventBus = createEventBus(); diff --git a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts index 75771c12f1e..8bf9b9ef20e 100644 --- a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts +++ b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts @@ -3,16 +3,18 @@ import type { RunTree } from 'langsmith'; import type { InstanceAiEventBus } from '../event-bus'; import type { Logger } from '../logger'; -import { mapMastraChunkToEvent } from '../stream/map-chunk'; +import { mapAgentChunkToEvent, mapMastraChunkToEvent } from '../stream/map-chunk'; import { WorkSummaryAccumulator, type WorkSummary } from '../stream/work-summary-accumulator'; import { getTraceParentRun, setTraceParentOverride } from '../tracing/langsmith-tracing'; import { asResumable, parseSuspension } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; type ConfirmationRequestEvent = Extract; +export type ResumableStreamFormat = 'mastra' | 'agent'; export interface ResumableStreamSource { runId?: string; + streamFormat?: ResumableStreamFormat; fullStream: AsyncIterable; text?: Promise; steps?: Promise; @@ -1917,12 +1919,20 @@ export async function executeResumableStream( hasError = true; } - const event = mapMastraChunkToEvent( - options.context.runId, - options.context.agentId, - chunk, - currentResponseId, - ); + const event = + activeSource.streamFormat === 'agent' + ? mapAgentChunkToEvent( + options.context.runId, + options.context.agentId, + chunk, + currentResponseId, + ) + : mapMastraChunkToEvent( + options.context.runId, + options.context.agentId, + chunk, + currentResponseId, + ); if (event) { workSummaryAccumulator.observe(event); let shouldPublishEvent = true; @@ -2020,7 +2030,7 @@ export async function executeResumableStream( }); activeAgentRunId = (typeof resumed.runId === 'string' ? resumed.runId : '') || activeAgentRunId; - activeSource = resumed; + activeSource = { ...resumed, streamFormat: activeSource.streamFormat }; activeStream = resumed.fullStream; text = resumed.text; } diff --git a/packages/@n8n/instance-ai/src/stream/map-chunk.ts b/packages/@n8n/instance-ai/src/stream/map-chunk.ts index c6d6a35ff2b..5e28b78e069 100644 --- a/packages/@n8n/instance-ai/src/stream/map-chunk.ts +++ b/packages/@n8n/instance-ai/src/stream/map-chunk.ts @@ -27,6 +27,20 @@ function isRecord(value: unknown): value is Record { return value !== null && typeof value === 'object' && !Array.isArray(value); } +const agentStreamChunkTypes = new Set([ + 'finish', + 'text-delta', + 'reasoning-delta', + 'tool-call-delta', + 'error', + 'message', + 'tool-call-suspended', +]); + +function isAgentStreamChunk(value: unknown): value is StreamChunk { + return isRecord(value) && typeof value.type === 'string' && agentStreamChunkTypes.has(value.type); +} + interface ErrorInfo { content: string; statusCode?: number; @@ -342,9 +356,11 @@ export function mapMastraChunkToEvent( export function mapAgentChunkToEvent( runId: string, agentId: string, - chunk: StreamChunk, + chunk: unknown, responseId?: string, ): InstanceAiEvent | null { + if (!isAgentStreamChunk(chunk)) return null; + if (chunk.type === 'text-delta') { return mapMastraChunkToEvent( runId,