From 4fb69b39a110a79148f3c79fe6ca536afa46b7d1 Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Tue, 5 May 2026 10:37:25 +0200 Subject: [PATCH] refactor(instance-ai): add native stream chunk mapper --- packages/@n8n/instance-ai/src/index.ts | 2 +- .../src/stream/__tests__/map-chunk.test.ts | 81 ++++++++++++++++- .../@n8n/instance-ai/src/stream/map-chunk.ts | 90 +++++++++++++++++++ 3 files changed, 171 insertions(+), 2 deletions(-) diff --git a/packages/@n8n/instance-ai/src/index.ts b/packages/@n8n/instance-ai/src/index.ts index 24b98cb6079..bc0eaa09145 100644 --- a/packages/@n8n/instance-ai/src/index.ts +++ b/packages/@n8n/instance-ai/src/index.ts @@ -65,7 +65,7 @@ export type { } from './storage'; export { truncateToTitle, generateTitleForRun } from './memory/title-utils'; export { McpClientManager } from './mcp/mcp-client-manager'; -export { mapMastraChunkToEvent } from './stream/map-chunk'; +export { mapAgentChunkToEvent, mapMastraChunkToEvent } from './stream/map-chunk'; export { isRecord, parseSuspension, asResumable } from './utils/stream-helpers'; export { createEvalAgent, extractText, Tool, SONNET_MODEL, HAIKU_MODEL } from './utils/eval-agents'; export type { SuspensionInfo, Resumable } from './utils/stream-helpers'; diff --git a/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts b/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts index f50325f93f0..f53f20b3193 100644 --- a/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts +++ b/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts @@ -1,4 +1,6 @@ -import { mapMastraChunkToEvent } from '../map-chunk'; +import type { StreamChunk } from '@n8n/agents'; + +import { mapAgentChunkToEvent, mapMastraChunkToEvent } from '../map-chunk'; describe('mapMastraChunkToEvent', () => { const runId = 'run-1'; @@ -1186,3 +1188,80 @@ describe('mapMastraChunkToEvent', () => { }); }); }); + +describe('mapAgentChunkToEvent', () => { + const runId = 'run-1'; + const agentId = 'agent-1'; + + it('maps native text delta chunks', () => { + const chunk = { type: 'text-delta', delta: 'hello' } satisfies StreamChunk; + + expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({ + type: 'text-delta', + runId, + agentId, + payload: { text: 'hello' }, + }); + }); + + it('maps native tool result messages', () => { + const chunk = { + type: 'message', + message: { + role: 'tool', + content: [ + { + type: 'tool-result', + toolCallId: 'tc-1', + toolName: 'lookup', + result: { ok: true }, + }, + ], + }, + } satisfies StreamChunk; + + expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({ + type: 'tool-result', + runId, + agentId, + payload: { + toolCallId: 'tc-1', + result: { ok: true }, + }, + }); + }); + + it('maps native suspended tool calls to confirmation requests', () => { + const chunk = { + type: 'tool-call-suspended', + toolCallId: 'tc-1', + toolName: 'delete-workflow', + input: { id: 'wf-1' }, + suspendPayload: { + requestId: 'req-1', + message: 'Delete this workflow?', + severity: 'destructive', + }, + } satisfies StreamChunk; + + expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({ + type: 'confirmation-request', + runId, + agentId, + payload: { + requestId: 'req-1', + toolCallId: 'tc-1', + toolName: 'delete-workflow', + args: { id: 'wf-1' }, + severity: 'destructive', + message: 'Delete this workflow?', + }, + }); + }); + + it('ignores native finish chunks', () => { + const chunk = { type: 'finish', finishReason: 'stop' } satisfies StreamChunk; + + expect(mapAgentChunkToEvent(runId, agentId, chunk)).toBeNull(); + }); +}); diff --git a/packages/@n8n/instance-ai/src/stream/map-chunk.ts b/packages/@n8n/instance-ai/src/stream/map-chunk.ts index cea9d68b793..c6d6a35ff2b 100644 --- a/packages/@n8n/instance-ai/src/stream/map-chunk.ts +++ b/packages/@n8n/instance-ai/src/stream/map-chunk.ts @@ -13,6 +13,7 @@ import type { TaskList, GatewayConfirmationRequiredPayload, } from '@n8n/api-types'; +import type { StreamChunk } from '@n8n/agents'; import { z } from 'zod'; const questionItemSchema = z.object({ @@ -337,3 +338,92 @@ export function mapMastraChunkToEvent( // Other Mastra chunk types (step-finish, finish, etc.) are ignored return null; } + +export function mapAgentChunkToEvent( + runId: string, + agentId: string, + chunk: StreamChunk, + responseId?: string, +): InstanceAiEvent | null { + if (chunk.type === 'text-delta') { + return mapMastraChunkToEvent( + runId, + agentId, + { type: 'text-delta', payload: { text: chunk.delta } }, + responseId, + ); + } + + if (chunk.type === 'reasoning-delta') { + return mapMastraChunkToEvent( + runId, + agentId, + { type: 'reasoning-delta', payload: { text: chunk.delta } }, + responseId, + ); + } + + if (chunk.type === 'tool-call-suspended') { + return mapMastraChunkToEvent( + runId, + agentId, + { + type: 'tool-call-suspended', + payload: { + toolCallId: chunk.toolCallId, + toolName: chunk.toolName, + args: isRecord(chunk.input) ? chunk.input : {}, + suspendPayload: isRecord(chunk.suspendPayload) ? chunk.suspendPayload : {}, + }, + }, + responseId, + ); + } + + if (chunk.type === 'message' && 'role' in chunk.message && chunk.message.role === 'tool') { + const toolCall = chunk.message.content.find((part) => part.type === 'tool-call'); + if (toolCall?.type === 'tool-call') { + return mapMastraChunkToEvent( + runId, + agentId, + { + type: 'tool-call', + payload: { + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args: isRecord(toolCall.input) ? toolCall.input : {}, + }, + }, + responseId, + ); + } + + const toolResult = chunk.message.content.find((part) => part.type === 'tool-result'); + if (toolResult?.type === 'tool-result') { + return mapMastraChunkToEvent( + runId, + agentId, + { + type: 'tool-result', + payload: { + toolCallId: toolResult.toolCallId, + result: toolResult.result, + isError: toolResult.isError, + }, + }, + responseId, + ); + } + } + + if (chunk.type === 'error') { + return mapMastraChunkToEvent( + runId, + agentId, + { type: 'error', payload: { error: chunk.error } }, + responseId, + ); + } + + return null; +}