refactor(instance-ai): add native stream chunk mapper

This commit is contained in:
Oleg Ivaniv 2026-05-05 10:37:25 +02:00
parent c731e4ba7f
commit 4fb69b39a1
No known key found for this signature in database
3 changed files with 171 additions and 2 deletions

View File

@ -65,7 +65,7 @@ export type {
} from './storage';
export { truncateToTitle, generateTitleForRun } from './memory/title-utils';
export { McpClientManager } from './mcp/mcp-client-manager';
export { mapMastraChunkToEvent } from './stream/map-chunk';
export { mapAgentChunkToEvent, mapMastraChunkToEvent } from './stream/map-chunk';
export { isRecord, parseSuspension, asResumable } from './utils/stream-helpers';
export { createEvalAgent, extractText, Tool, SONNET_MODEL, HAIKU_MODEL } from './utils/eval-agents';
export type { SuspensionInfo, Resumable } from './utils/stream-helpers';

View File

@ -1,4 +1,6 @@
import { mapMastraChunkToEvent } from '../map-chunk';
import type { StreamChunk } from '@n8n/agents';
import { mapAgentChunkToEvent, mapMastraChunkToEvent } from '../map-chunk';
describe('mapMastraChunkToEvent', () => {
const runId = 'run-1';
@ -1186,3 +1188,80 @@ describe('mapMastraChunkToEvent', () => {
});
});
});
describe('mapAgentChunkToEvent', () => {
const runId = 'run-1';
const agentId = 'agent-1';
it('maps native text delta chunks', () => {
const chunk = { type: 'text-delta', delta: 'hello' } satisfies StreamChunk;
expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({
type: 'text-delta',
runId,
agentId,
payload: { text: 'hello' },
});
});
it('maps native tool result messages', () => {
const chunk = {
type: 'message',
message: {
role: 'tool',
content: [
{
type: 'tool-result',
toolCallId: 'tc-1',
toolName: 'lookup',
result: { ok: true },
},
],
},
} satisfies StreamChunk;
expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({
type: 'tool-result',
runId,
agentId,
payload: {
toolCallId: 'tc-1',
result: { ok: true },
},
});
});
it('maps native suspended tool calls to confirmation requests', () => {
const chunk = {
type: 'tool-call-suspended',
toolCallId: 'tc-1',
toolName: 'delete-workflow',
input: { id: 'wf-1' },
suspendPayload: {
requestId: 'req-1',
message: 'Delete this workflow?',
severity: 'destructive',
},
} satisfies StreamChunk;
expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({
type: 'confirmation-request',
runId,
agentId,
payload: {
requestId: 'req-1',
toolCallId: 'tc-1',
toolName: 'delete-workflow',
args: { id: 'wf-1' },
severity: 'destructive',
message: 'Delete this workflow?',
},
});
});
it('ignores native finish chunks', () => {
const chunk = { type: 'finish', finishReason: 'stop' } satisfies StreamChunk;
expect(mapAgentChunkToEvent(runId, agentId, chunk)).toBeNull();
});
});

View File

@ -13,6 +13,7 @@ import type {
TaskList,
GatewayConfirmationRequiredPayload,
} from '@n8n/api-types';
import type { StreamChunk } from '@n8n/agents';
import { z } from 'zod';
const questionItemSchema = z.object({
@ -337,3 +338,92 @@ export function mapMastraChunkToEvent(
// Other Mastra chunk types (step-finish, finish, etc.) are ignored
return null;
}
export function mapAgentChunkToEvent(
runId: string,
agentId: string,
chunk: StreamChunk,
responseId?: string,
): InstanceAiEvent | null {
if (chunk.type === 'text-delta') {
return mapMastraChunkToEvent(
runId,
agentId,
{ type: 'text-delta', payload: { text: chunk.delta } },
responseId,
);
}
if (chunk.type === 'reasoning-delta') {
return mapMastraChunkToEvent(
runId,
agentId,
{ type: 'reasoning-delta', payload: { text: chunk.delta } },
responseId,
);
}
if (chunk.type === 'tool-call-suspended') {
return mapMastraChunkToEvent(
runId,
agentId,
{
type: 'tool-call-suspended',
payload: {
toolCallId: chunk.toolCallId,
toolName: chunk.toolName,
args: isRecord(chunk.input) ? chunk.input : {},
suspendPayload: isRecord(chunk.suspendPayload) ? chunk.suspendPayload : {},
},
},
responseId,
);
}
if (chunk.type === 'message' && 'role' in chunk.message && chunk.message.role === 'tool') {
const toolCall = chunk.message.content.find((part) => part.type === 'tool-call');
if (toolCall?.type === 'tool-call') {
return mapMastraChunkToEvent(
runId,
agentId,
{
type: 'tool-call',
payload: {
toolCallId: toolCall.toolCallId,
toolName: toolCall.toolName,
args: isRecord(toolCall.input) ? toolCall.input : {},
},
},
responseId,
);
}
const toolResult = chunk.message.content.find((part) => part.type === 'tool-result');
if (toolResult?.type === 'tool-result') {
return mapMastraChunkToEvent(
runId,
agentId,
{
type: 'tool-result',
payload: {
toolCallId: toolResult.toolCallId,
result: toolResult.result,
isError: toolResult.isError,
},
},
responseId,
);
}
}
if (chunk.type === 'error') {
return mapMastraChunkToEvent(
runId,
agentId,
{ type: 'error', payload: { error: chunk.error } },
responseId,
);
}
return null;
}