diff --git a/packages/@n8n/instance-ai/SPECS.md b/packages/@n8n/instance-ai/SPECS.md index 3d04ecb535c..d85a1779f13 100644 --- a/packages/@n8n/instance-ai/SPECS.md +++ b/packages/@n8n/instance-ai/SPECS.md @@ -455,7 +455,7 @@ pnpm typecheck - [x] Delete Mastra registration logic. - [x] Rewrite all tools to native `Tool`. - [x] Rewrite orchestration tools for native streaming and resume. -- [ ] Rewrite stream runner and stream executor for native `StreamChunk`. +- [x] Rewrite stream runner and stream executor for native `StreamChunk`. - [x] Rename internal `mastraRunId` to `agentRunId`. - [x] Implement native chunk to Instance AI event mapper. - [x] Implement TypeORM `BuiltMemory`. @@ -474,10 +474,6 @@ pnpm typecheck Current remaining cleanup: -- Remove the Mastra-shaped stream fallback (`mapMastraChunkToEvent`, - `ResumableStreamFormat = 'mastra' | 'agent'`) now that runtime execution is - native-only. -- Rename stale Mastra-only test fixture IDs/tool names and comments. - Update architecture/tooling docs that still describe the old Mastra runtime. ## Acceptance Criteria diff --git a/packages/@n8n/instance-ai/eslint.config.mjs b/packages/@n8n/instance-ai/eslint.config.mjs index 8fb01086090..454e8912b48 100644 --- a/packages/@n8n/instance-ai/eslint.config.mjs +++ b/packages/@n8n/instance-ai/eslint.config.mjs @@ -5,8 +5,8 @@ export default defineConfig(baseConfig, { ignores: ['scripts/**/*.cjs'], }, { rules: { - // Mastra tool names are kebab-case identifiers (e.g. 'list-workflows') - // which require quotes in object literals — skip naming checks for those + // Tool names may be kebab-case identifiers (e.g. 'list-workflows'), which + // require quotes in object literals. Skip naming checks for those. '@typescript-eslint/naming-convention': [ 'error', { diff --git a/packages/@n8n/instance-ai/src/constants/max-steps.ts b/packages/@n8n/instance-ai/src/constants/max-steps.ts index 4348e2783ef..2dd737b3625 100644 --- a/packages/@n8n/instance-ai/src/constants/max-steps.ts +++ b/packages/@n8n/instance-ai/src/constants/max-steps.ts @@ -1,10 +1,8 @@ /** * Maximum LLM steps (inference rounds) for each agent role. * - * Mastra's Agent.stream() defaults to stepCountIs(5) which is too low - * for most use cases. Each agent sets its own limit based on task complexity. - * - * @see https://github.com/mastra-ai/mastra/issues/2930 + * Native agent iteration limits for each role. Each agent sets its own limit + * based on task complexity. */ export const MAX_STEPS = { /** Main orchestrator — coordinates all other agents and handles direct tool calls. */ diff --git a/packages/@n8n/instance-ai/src/index.ts b/packages/@n8n/instance-ai/src/index.ts index 1847ea34b86..ef8e1be8d78 100644 --- a/packages/@n8n/instance-ai/src/index.ts +++ b/packages/@n8n/instance-ai/src/index.ts @@ -65,7 +65,7 @@ export type { } from './storage'; export { truncateToTitle, generateTitleForRun } from './memory/title-utils'; export { McpClientManager } from './mcp/mcp-client-manager'; -export { mapAgentChunkToEvent, mapMastraChunkToEvent } from './stream/map-chunk'; +export { mapAgentChunkToEvent } from './stream/map-chunk'; export { isRecord, parseSuspension, asResumable } from './utils/stream-helpers'; export { createEvalAgent, extractText, Tool, SONNET_MODEL, HAIKU_MODEL } from './utils/eval-agents'; export type { SuspensionInfo, Resumable } from './utils/stream-helpers'; diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts index 73db13e8675..8e668982ba0 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts @@ -229,6 +229,73 @@ function readableFromChunks(chunks: unknown[]) { }); } +function textChunk(text: string) { + return { type: 'text-delta', delta: text }; +} + +function errorChunk(error: unknown) { + return { type: 'error', error }; +} + +function suspensionChunk(input: { + toolCallId: string; + toolName?: string; + suspendPayload?: Record; + input?: Record; +}) { + return { + type: 'tool-call-suspended', + toolCallId: input.toolCallId, + ...(input.toolName ? { toolName: input.toolName } : {}), + ...(input.input ? { input: input.input } : {}), + suspendPayload: input.suspendPayload ?? {}, + }; +} + +function toolCallChunk(input: { + toolCallId: string; + toolName: string; + args?: Record; +}) { + return { + type: 'message', + message: { + role: 'tool', + content: [ + { + type: 'tool-call', + toolCallId: input.toolCallId, + toolName: input.toolName, + input: input.args ?? {}, + }, + ], + }, + }; +} + +function toolResultChunk(input: { + toolCallId: string; + toolName: string; + result: unknown; + isError?: boolean; +}) { + return { + type: 'message', + message: { + role: 'tool', + content: [ + { + type: 'tool-result', + toolCallId: input.toolCallId, + toolName: input.toolName, + result: input.result, + ...(input.isError ? { isError: true } : {}), + }, + ], + }, + }; +} + function createDeferred() { let resolve!: (value: T | PromiseLike) => void; let reject!: (reason?: unknown) => void; @@ -260,20 +327,17 @@ describe('executeResumableStream', () => { const result = await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-1', + runId: 'agent-run-1', fullStream: fromChunks([ - { type: 'text-delta', payload: { text: 'Working...' } }, - { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tool-call-1', - toolName: 'ask-user', - suspendPayload: { - requestId: 'request-1', - message: 'Need approval', - }, + textChunk('Working...'), + suspensionChunk({ + toolCallId: 'tool-call-1', + toolName: 'ask-user', + suspendPayload: { + requestId: 'request-1', + message: 'Need approval', }, - }, + }), ]), }, context: { @@ -290,7 +354,7 @@ describe('executeResumableStream', () => { expect(result).toEqual( expect.objectContaining({ status: 'suspended', - agentRunId: 'mastra-run-1', + agentRunId: 'agent-run-1', suspension: { toolCallId: 'tool-call-1', requestId: 'request-1', @@ -319,19 +383,18 @@ describe('executeResumableStream', () => { agent: {}, stream: { runId: 'agent-run-1', - streamFormat: 'agent', fullStream: fromChunks([ - { type: 'text-delta', delta: 'Working...' }, - { - type: 'tool-call-suspended', + textChunk('Working...'), + suspensionChunk({ toolCallId: 'tool-call-1', toolName: 'ask-user', input: { prompt: 'Confirm?' }, suspendPayload: { + toolCallId: 'tool-call-1', requestId: 'request-1', message: 'Need approval', }, - }, + }), ]), }, context: { @@ -386,16 +449,8 @@ describe('executeResumableStream', () => { const result = await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-1', - fullStream: fromChunks([ - { type: 'text-delta', payload: { text: 'Working...' } }, - { - type: 'error', - runId: 'mastra-run-1', - from: 'AGENT', - payload: { error: new Error('Not Found') }, - }, - ]), + runId: 'agent-run-1', + fullStream: fromChunks([textChunk('Working...'), errorChunk(new Error('Not Found'))]), }, context: { threadId: 'thread-1', @@ -409,34 +464,30 @@ describe('executeResumableStream', () => { }); expect(result.status).toBe('errored'); - expect(result.agentRunId).toBe('mastra-run-1'); + expect(result.agentRunId).toBe('agent-run-1'); }); it('auto-resumes suspended streams and surfaces queued corrections', async () => { const eventBus = createEventBus(); - const resumeStream = jest.fn().mockResolvedValue({ - runId: 'mastra-run-2', - fullStream: fromChunks([{ type: 'text-delta', payload: { text: 'Done.' } }]), - text: Promise.resolve('Done.'), + const resume = jest.fn().mockResolvedValue({ + runId: 'agent-run-2', + stream: readableFromChunks([textChunk('Done.')]), }); const waitForConfirmation = jest.fn().mockResolvedValue({ approved: true }); const result = await executeResumableStream({ - agent: { resumeStream }, + agent: { resume }, stream: { - runId: 'mastra-run-1', + runId: 'agent-run-1', fullStream: fromChunks([ - { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tool-call-1', - toolName: 'pause-for-user', - suspendPayload: { - requestId: 'request-1', - message: 'Please confirm', - }, + suspensionChunk({ + toolCallId: 'tool-call-1', + toolName: 'pause-for-user', + suspendPayload: { + requestId: 'request-1', + message: 'Please confirm', }, - }, + }), ]), text: Promise.resolve('Initial text'), }, @@ -456,12 +507,13 @@ describe('executeResumableStream', () => { }); expect(waitForConfirmation).toHaveBeenCalledWith('request-1'); - expect(resumeStream).toHaveBeenCalledWith( + expect(resume).toHaveBeenCalledWith( + 'stream', { approved: true }, - { runId: 'mastra-run-1', toolCallId: 'tool-call-1' }, + { runId: 'agent-run-1', toolCallId: 'tool-call-1' }, ); expect(result.status).toBe('completed'); - expect(result.agentRunId).toBe('mastra-run-2'); + expect(result.agentRunId).toBe('agent-run-2'); await expect(result.text ?? Promise.resolve('')).resolves.toBe('Done.'); expect(eventBus.publish).toHaveBeenCalledWith( 'thread-1', @@ -485,7 +537,6 @@ describe('executeResumableStream', () => { agent: { resume }, stream: { runId: 'agent-run-1', - streamFormat: 'agent', fullStream: fromChunks([ { type: 'tool-call-suspended', @@ -535,10 +586,9 @@ describe('executeResumableStream', () => { const finishGate = createDeferred(); const approval = createDeferred>(); const waitStarted = createDeferred(); - const resumeStream = jest.fn().mockResolvedValue({ - runId: 'mastra-run-2', - fullStream: fromChunks([{ type: 'text-delta', payload: { text: 'Done.' } }]), - text: Promise.resolve('Done.'), + const resume = jest.fn().mockResolvedValue({ + runId: 'agent-run-2', + stream: readableFromChunks([textChunk('Done.')]), }); const waitForConfirmation = jest.fn().mockImplementation(async () => { waitStarted.resolve(undefined); @@ -546,21 +596,18 @@ describe('executeResumableStream', () => { }); const execution = executeResumableStream({ - agent: { resumeStream }, + agent: { resume }, stream: { - runId: 'mastra-run-1', + runId: 'agent-run-1', fullStream: (async function* () { - yield { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tool-call-1', - toolName: 'pause-for-user', - suspendPayload: { - requestId: 'request-1', - message: 'Please confirm', - }, + yield suspensionChunk({ + toolCallId: 'tool-call-1', + toolName: 'pause-for-user', + suspendPayload: { + requestId: 'request-1', + message: 'Please confirm', }, - }; + }); await finishGate.promise; yield { type: 'finish', finishReason: 'tool-calls' }; })(), @@ -583,7 +630,7 @@ describe('executeResumableStream', () => { await waitStarted.promise; expect(waitForConfirmation).toHaveBeenCalledWith('request-1'); - expect(resumeStream).not.toHaveBeenCalled(); + expect(resume).not.toHaveBeenCalled(); const publishCalls = eventBus.publish.mock.calls as Array<[string, PublishedEvent]>; const confirmationEvent = publishCalls.find( ([, event]) => event.type === 'confirmation-request', @@ -597,54 +644,48 @@ describe('executeResumableStream', () => { await expect(execution).resolves.toEqual( expect.objectContaining({ status: 'completed', - agentRunId: 'mastra-run-2', + agentRunId: 'agent-run-2', }), ); - expect(resumeStream).toHaveBeenCalledWith( + expect(resume).toHaveBeenCalledWith( + 'stream', { approved: true }, - { runId: 'mastra-run-1', toolCallId: 'tool-call-1' }, + { runId: 'agent-run-1', toolCallId: 'tool-call-1' }, ); }); it('surfaces only the first actionable suspension in a drain', async () => { const eventBus = createEventBus(); const warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => {}); - const resumeStream = jest.fn().mockResolvedValue({ - runId: 'mastra-run-2', - fullStream: fromChunks([{ type: 'text-delta', payload: { text: 'Done.' } }]), - text: Promise.resolve('Done.'), + const resume = jest.fn().mockResolvedValue({ + runId: 'agent-run-2', + stream: readableFromChunks([textChunk('Done.')]), }); const waitForConfirmation = jest.fn().mockResolvedValue({ approved: true }); const onSuspension = jest.fn((_: SuspensionInfo) => undefined); try { await executeResumableStream({ - agent: { resumeStream }, + agent: { resume }, stream: { - runId: 'mastra-run-1', + runId: 'agent-run-1', fullStream: fromChunks([ - { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tool-call-1', - toolName: 'pause-for-user', - suspendPayload: { - requestId: 'request-1', - message: 'First confirmation', - }, + suspensionChunk({ + toolCallId: 'tool-call-1', + toolName: 'pause-for-user', + suspendPayload: { + requestId: 'request-1', + message: 'First confirmation', }, - }, - { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tool-call-2', - toolName: 'pause-for-user', - suspendPayload: { - requestId: 'request-2', - message: 'Second confirmation', - }, + }), + suspensionChunk({ + toolCallId: 'tool-call-2', + toolName: 'pause-for-user', + suspendPayload: { + requestId: 'request-2', + message: 'Second confirmation', }, - }, + }), { type: 'finish', finishReason: 'tool-calls' }, ]), text: Promise.resolve('Initial text'), @@ -675,9 +716,10 @@ describe('executeResumableStream', () => { }); expect(waitForConfirmation).toHaveBeenCalledTimes(1); expect(waitForConfirmation).toHaveBeenCalledWith('request-1'); - expect(resumeStream).toHaveBeenCalledWith( + expect(resume).toHaveBeenCalledWith( + 'stream', { approved: true }, - { runId: 'mastra-run-1', toolCallId: 'tool-call-1' }, + { runId: 'agent-run-1', toolCallId: 'tool-call-1' }, ); const confirmationEvents = (eventBus.publish.mock.calls as Array<[string, PublishedEvent]>) @@ -704,7 +746,7 @@ describe('executeResumableStream', () => { await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-1', + runId: 'agent-run-1', fullStream: fromChunks([ { type: 'step-start', @@ -718,7 +760,7 @@ describe('executeResumableStream', () => { warnings: [], }, }, - { type: 'text-delta', payload: { text: 'Let me check.' } }, + textChunk('Let me check.'), { type: 'step-finish', payload: { @@ -842,7 +884,7 @@ describe('executeResumableStream', () => { stepNumber: 0, messages: [{ role: 'user', content: 'Build the workflow' }], }); - hooks?.onStreamChunk({ type: 'text-delta', payload: { text: 'Let me write it.' } }); + hooks?.onStreamChunk(textChunk('Let me write it.')); await hooks?.executionOptions.onStepFinish({ stepNumber: 0, @@ -851,7 +893,7 @@ describe('executeResumableStream', () => { toolCalls: [ { toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', args: { path: '/tmp/workflow.ts', content: 'export default workflow', @@ -861,7 +903,7 @@ describe('executeResumableStream', () => { toolResults: [ { toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', result: 'Wrote 23 bytes', }, ], @@ -886,7 +928,7 @@ describe('executeResumableStream', () => { { type: 'tool-call', toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', args: { path: '/tmp/workflow.ts', content: 'export default workflow', @@ -907,13 +949,13 @@ describe('executeResumableStream', () => { expect(llmRun?.outputs?.messages).toEqual([ { role: 'assistant', - content: 'Let me write it.\n\n[Calling tools: mastra_workspace_write_file]', + content: 'Let me write it.\n\n[Calling tools: workspace_write_file]', }, ]); expect(llmRun?.outputs?.requested_tools).toEqual([ { toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', }, ]); expect(llmRun?.outputs).not.toHaveProperty('tool_results'); @@ -944,7 +986,7 @@ describe('executeResumableStream', () => { stepNumber: 0, messages: [{ role: 'user', content: 'Build the workflow' }], }); - hooks?.onStreamChunk({ type: 'text-delta', payload: { text: 'Let me write it.' } }); + hooks?.onStreamChunk(textChunk('Let me write it.')); hooks?.onStreamChunk({ type: 'step-finish', payload: { @@ -964,7 +1006,7 @@ describe('executeResumableStream', () => { toolCalls: [ { toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', }, ], toolResults: [], @@ -996,7 +1038,7 @@ describe('executeResumableStream', () => { toolCalls: [ { toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', args: { path: '/tmp/workflow.ts', content: 'export default workflow', @@ -1006,7 +1048,7 @@ describe('executeResumableStream', () => { toolResults: [ { toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', result: 'Wrote 23 bytes', }, ], @@ -1026,7 +1068,7 @@ describe('executeResumableStream', () => { { type: 'tool-call', toolCallId: 'native-1', - toolName: 'mastra_workspace_write_file', + toolName: 'workspace_write_file', args: { path: '/tmp/workflow.ts', content: 'export default workflow', @@ -1222,26 +1264,18 @@ describe('executeResumableStream', () => { await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-3', + runId: 'agent-run-3', fullStream: fromChunks([ - { - type: 'tool-call', - payload: { - toolCallId: 'native-tool-1', - toolName: 'mastra_workspace_execute_command', - args: { - command: 'echo hello', - }, - }, - }, - { - type: 'tool-result', - payload: { - toolCallId: 'native-tool-1', - toolName: 'mastra_workspace_execute_command', - result: 'hello', - }, - }, + toolCallChunk({ + toolCallId: 'native-tool-1', + toolName: 'workspace_execute_command', + args: { command: 'echo hello' }, + }), + toolResultChunk({ + toolCallId: 'native-tool-1', + toolName: 'workspace_execute_command', + result: 'hello', + }), { type: 'finish', finishReason: 'stop' }, ]), }, @@ -1259,12 +1293,12 @@ describe('executeResumableStream', () => { const toolRun = langsmithMock .getCreatedRuns() - .find((run) => run.name === 'tool:mastra_workspace_execute_command'); + .find((run) => run.name === 'tool:workspace_execute_command'); expect(toolRun).toBeDefined(); expect(toolRun?.metadata).toEqual( expect.objectContaining({ synthetic_tool_trace: true, - tool_name: 'mastra_workspace_execute_command', + tool_name: 'workspace_execute_command', }), ); expect(toolRun?.outputs).toEqual({ @@ -1299,22 +1333,16 @@ describe('executeResumableStream', () => { }); async function* streamWithToolCall() { - yield { - type: 'tool-call', - payload: { - toolCallId: 'native-tool-turn-1', - toolName: 'mastra_workspace_execute_command', - args: { command: 'echo hello' }, - }, - }; - yield { - type: 'tool-result', - payload: { - toolCallId: 'native-tool-turn-1', - toolName: 'mastra_workspace_execute_command', - result: 'hello', - }, - }; + yield toolCallChunk({ + toolCallId: 'native-tool-turn-1', + toolName: 'workspace_execute_command', + args: { command: 'echo hello' }, + }); + yield toolResultChunk({ + toolCallId: 'native-tool-turn-1', + toolName: 'workspace_execute_command', + result: 'hello', + }); await hooks?.executionOptions.onStepFinish({ stepNumber: 0, text: 'Done.', @@ -1322,13 +1350,13 @@ describe('executeResumableStream', () => { toolCalls: [ { toolCallId: 'native-tool-turn-1', - toolName: 'mastra_workspace_execute_command', + toolName: 'workspace_execute_command', }, ], toolResults: [ { toolCallId: 'native-tool-turn-1', - toolName: 'mastra_workspace_execute_command', + toolName: 'workspace_execute_command', result: 'hello', }, ], @@ -1355,7 +1383,7 @@ describe('executeResumableStream', () => { await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-turn-1', + runId: 'agent-run-turn-1', fullStream: streamWithToolCall(), }, context: { @@ -1376,7 +1404,7 @@ describe('executeResumableStream', () => { .find((run) => run.name === 'llm:anthropic/claude-sonnet-4-6'); const toolRun = langsmithMock .getCreatedRuns() - .find((run) => run.name === 'tool:mastra_workspace_execute_command'); + .find((run) => run.name === 'tool:workspace_execute_command'); expect(llmRun?.parent_run_id).toBe(parentRun.id); expect(toolRun?.parent_run_id).toBe(llmRun?.id); @@ -1398,9 +1426,9 @@ describe('executeResumableStream', () => { await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-2', + runId: 'agent-run-2', fullStream: fromChunks([ - { type: 'text-delta', payload: { text: 'I found the matching tables.' } }, + textChunk('I found the matching tables.'), { type: 'finish', finishReason: 'stop' }, ]), steps: Promise.resolve([ @@ -1489,7 +1517,7 @@ describe('executeResumableStream', () => { await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-5', + runId: 'agent-run-5', fullStream: (async function* () { await prepareStep?.({ stepNumber: 0, @@ -1499,7 +1527,7 @@ describe('executeResumableStream', () => { }, messages: [{ role: 'user', content: 'Build a weather workflow' }], }); - yield { type: 'text-delta', payload: { text: 'Done.' } }; + yield textChunk('Done.'); await llmStepTraceHooks?.executionOptions.onStepFinish({ stepNumber: 0, text: 'Done.', @@ -1601,7 +1629,7 @@ describe('executeResumableStream', () => { await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-usage-v3', + runId: 'agent-run-usage-v3', fullStream: fromChunks([ { type: 'step-start', @@ -1614,7 +1642,7 @@ describe('executeResumableStream', () => { }, }, }, - { type: 'text-delta', payload: { text: 'Done.' } }, + textChunk('Done.'), { type: 'step-finish', payload: { @@ -1724,7 +1752,7 @@ describe('executeResumableStream', () => { }, messages: [{ role: 'user', content: 'Build the weather workflow' }], }); - hooks?.onStreamChunk({ type: 'text-delta', payload: { text: 'Done.' } }); + hooks?.onStreamChunk(textChunk('Done.')); await hooks?.executionOptions.onStepFinish({ stepNumber: 0, @@ -1815,7 +1843,7 @@ describe('executeResumableStream', () => { const result = await executeResumableStream({ agent: {}, stream: { - runId: 'mastra-run-suspended-usage', + runId: 'agent-run-suspended-usage', fullStream: (async function* () { await prepareStep?.({ stepNumber: 0, @@ -1825,7 +1853,7 @@ describe('executeResumableStream', () => { }, messages: [{ role: 'user', content: 'Ask me a question' }], }); - yield { type: 'text-delta', payload: { text: 'I need one detail first.' } }; + yield textChunk('I need one detail first.'); await llmStepTraceHooks?.executionOptions.onStepFinish({ stepNumber: 0, text: 'I need one detail first.', @@ -1868,26 +1896,20 @@ describe('executeResumableStream', () => { }, providerMetadata: undefined, }); - yield { - type: 'tool-call', - payload: { - toolCallId: 'ask-user-1', - toolName: 'ask-user', - args: { - questions: [{ id: 'q1', question: 'Which city?', type: 'text' }], - }, + yield toolCallChunk({ + toolCallId: 'ask-user-1', + toolName: 'ask-user', + args: { + questions: [{ id: 'q1', question: 'Which city?', type: 'text' }], }, - }; - yield { - type: 'tool-call-suspended', - payload: { - toolCallId: 'ask-user-1', - toolName: 'ask-user', - suspendPayload: { - requestId: 'req-ask-user-1', - }, + }); + yield suspensionChunk({ + toolCallId: 'ask-user-1', + toolName: 'ask-user', + suspendPayload: { + requestId: 'req-ask-user-1', }, - }; + }); })(), usage: Promise.resolve({ inputTokens: 120, diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/run-state-registry.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/run-state-registry.test.ts index fe4b8aa30cf..77e8ac384e8 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/run-state-registry.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/run-state-registry.test.ts @@ -26,7 +26,7 @@ function createSuspendedRunState( return { runId: 'run_abc', abortController: new AbortController(), - agentRunId: 'mastra-1', + agentRunId: 'agent-run-1', agent: {}, threadId: 'thread-1', user: { id: 'user-1', name: 'Alice' }, diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts index 6d592a47c46..1f5ac548c32 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts @@ -56,21 +56,16 @@ describe('streamAgentRun', () => { it('returns errored status when agent stream contains an error chunk', async () => { jest.mocked(executeResumableStream).mockResolvedValue({ status: 'errored', - agentRunId: 'mastra-run-1', + agentRunId: 'agent-run-1', workSummary: emptyWorkSummary, }); const eventBus = createEventBus(); const agent = { stream: jest.fn().mockResolvedValue({ - runId: 'mastra-run-1', + runId: 'agent-run-1', fullStream: fromChunks([ - { type: 'text-delta', payload: { text: 'Hello' } }, - { - type: 'error', - runId: 'mastra-run-1', - from: 'AGENT', - payload: { error: new Error('Not Found') }, - }, + { type: 'text-delta', delta: 'Hello' }, + { type: 'error', error: new Error('Not Found') }, ]), }), }; @@ -90,20 +85,20 @@ describe('streamAgentRun', () => { ); expect(result.status).toBe('errored'); - expect(result.agentRunId).toBe('mastra-run-1'); + expect(result.agentRunId).toBe('agent-run-1'); }); it('returns completed status for successful streams', async () => { jest.mocked(executeResumableStream).mockResolvedValue({ status: 'completed', - agentRunId: 'mastra-run-1', + agentRunId: 'agent-run-1', workSummary: emptyWorkSummary, }); const eventBus = createEventBus(); const agent = { stream: jest.fn().mockResolvedValue({ - runId: 'mastra-run-1', - fullStream: fromChunks([{ type: 'text-delta', payload: { text: 'All good' } }]), + runId: 'agent-run-1', + fullStream: fromChunks([{ type: 'text-delta', delta: 'All good' }]), }), }; @@ -128,7 +123,7 @@ describe('streamAgentRun', () => { const mockedExecuteResumableStream = jest.mocked(executeResumableStream); const agent = { stream: jest.fn().mockResolvedValue({ - runId: 'mastra-run-1', + runId: 'agent-run-1', fullStream: emptyStream(), }), }; @@ -136,7 +131,7 @@ describe('streamAgentRun', () => { mockedExecuteResumableStream.mockResolvedValue({ status: 'suspended', - agentRunId: 'mastra-run-1', + agentRunId: 'agent-run-1', workSummary: emptyWorkSummary, suspension: { requestId: 'request-1', @@ -173,7 +168,7 @@ describe('streamAgentRun', () => { ); expect(result.status).toBe('suspended'); - expect(result.agentRunId).toBe('mastra-run-1'); + expect(result.agentRunId).toBe('agent-run-1'); expect(result.suspension?.requestId).toBe('request-1'); expect(result.confirmationEvent?.type).toBe('confirmation-request'); expect(result.confirmationEvent?.payload.requestId).toBe('request-1'); @@ -184,10 +179,10 @@ describe('streamAgentRun', () => { ); }); - it('passes the full Mastra stream payload through to the resumable executor', async () => { + it('passes an already-normalized native stream source through to the resumable executor', async () => { const mockedExecuteResumableStream = jest.mocked(executeResumableStream); const streamResult = { - runId: 'mastra-run-2', + runId: 'agent-run-2', fullStream: emptyStream(), text: Promise.resolve('done'), steps: Promise.resolve([{ text: 'done' }]), @@ -201,7 +196,7 @@ describe('streamAgentRun', () => { mockedExecuteResumableStream.mockResolvedValue({ status: 'completed', - agentRunId: 'mastra-run-2', + agentRunId: 'agent-run-2', text: Promise.resolve('done'), workSummary: emptyWorkSummary, }); @@ -272,7 +267,6 @@ describe('streamAgentRun', () => { expect(source).toEqual( expect.objectContaining({ runId: 'agent-run-1', - streamFormat: 'agent', }), ); await expect(collectAsyncIterable(source.fullStream)).resolves.toEqual([nativeChunk]); diff --git a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts index 764d5e30abd..4ab0d645d28 100644 --- a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts +++ b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts @@ -4,18 +4,16 @@ import type { RunTree } from 'langsmith'; import type { InstanceAiEventBus } from '../event-bus'; import type { Logger } from '../logger'; -import { mapAgentChunkToEvent, mapMastraChunkToEvent } from '../stream/map-chunk'; +import { mapAgentChunkToEvent } from '../stream/map-chunk'; import { WorkSummaryAccumulator, type WorkSummary } from '../stream/work-summary-accumulator'; import { getTraceParentRun, setTraceParentOverride } from '../tracing/langsmith-tracing'; -import { parseSuspension, resumeStream } from '../utils/stream-helpers'; +import { parseSuspension, resumeAgentStream } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; type ConfirmationRequestEvent = Extract; -export type ResumableStreamFormat = 'mastra' | 'agent'; export interface ResumableStreamSource { runId?: string; - streamFormat?: ResumableStreamFormat; fullStream: AsyncIterable; text?: Promise; steps?: Promise; @@ -168,6 +166,10 @@ function isRecord(value: unknown): value is Record { return value !== null && typeof value === 'object' && !Array.isArray(value); } +function isUnknownArray(value: unknown): value is unknown[] { + return Array.isArray(value); +} + function isAsyncIterable(value: unknown): value is AsyncIterable { return ( value !== null && @@ -257,14 +259,9 @@ async function* readableStreamToAsyncIterable(stream: ReadableStream) { } } -export function normalizeStreamSource( - result: unknown, - options?: { streamFormat?: ResumableStreamFormat }, -): ResumableStreamSource { +export function normalizeStreamSource(result: unknown): ResumableStreamSource { if (isResumableStreamSource(result)) { - return options?.streamFormat && !result.streamFormat - ? { ...result, streamFormat: options.streamFormat } - : result; + return result; } if (isNativeStreamResult(result)) { @@ -272,7 +269,6 @@ export function normalizeStreamSource( return { runId: result.runId, - streamFormat: 'agent', fullStream: readableStreamToAsyncIterable(eventStream), text: collectNativeStreamText(textStream), }; @@ -1131,6 +1127,47 @@ function getChunkPayload(chunk: unknown): Record | undefined { return isRecord(chunk.payload) ? chunk.payload : chunk; } +function getNativeToolContent(chunk: unknown, type: 'tool-call' | 'tool-result') { + if (!isRecord(chunk) || chunk.type !== 'message' || !isRecord(chunk.message)) { + return undefined; + } + + const message = chunk.message; + if (message.role !== 'tool' || !isUnknownArray(message.content)) { + return undefined; + } + + return message.content.find((part) => isRecord(part) && part.type === type); +} + +function getNativeToolCallPayload(chunk: unknown): Record | undefined { + const toolCall = getNativeToolContent(chunk, 'tool-call'); + if (!isRecord(toolCall)) { + return undefined; + } + + return { + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args: toolCall.input, + ...(toolCall.providerExecuted === true ? { providerExecuted: true } : {}), + }; +} + +function getNativeToolResultPayload(chunk: unknown): Record | undefined { + const toolResult = getNativeToolContent(chunk, 'tool-result'); + if (!isRecord(toolResult)) { + return undefined; + } + + return { + toolCallId: toolResult.toolCallId, + toolName: toolResult.toolName, + result: toolResult.result, + ...(toolResult.isError === true ? { isError: true } : {}), + }; +} + function buildSyntheticToolInputs( toolCallId: string, _toolName: string, @@ -1145,7 +1182,7 @@ function buildSyntheticToolInputs( function shouldCreateSyntheticToolTrace(payload: Record): boolean { const toolName = typeof payload.toolName === 'string' ? payload.toolName : ''; return ( - toolName.startsWith('mastra_') || + toolName.startsWith('workspace_') || SYNTHETIC_TOOL_TRACE_NAMES.has(toolName) || payload.providerExecuted === true || payload.dynamic === true @@ -1156,11 +1193,7 @@ async function startSyntheticToolTrace( chunk: unknown, records: Map, ): Promise { - if (!isRecord(chunk) || chunk.type !== 'tool-call') { - return; - } - - const payload = getChunkPayload(chunk); + const payload = getNativeToolCallPayload(chunk); if (!payload || !shouldCreateSyntheticToolTrace(payload)) { return; } @@ -1182,7 +1215,7 @@ async function startSyntheticToolTrace( tags: dedupeTags([ ...(parentRun.tags ?? []), 'tool', - ...(toolName.startsWith('mastra_') ? ['native-tool'] : []), + ...(toolName.startsWith('workspace_') ? ['native-tool'] : []), ]), metadata: { ...(parentRun.metadata ?? {}), @@ -1207,11 +1240,7 @@ async function finishSyntheticToolTrace( chunk: unknown, records: Map, ): Promise { - if (!isRecord(chunk) || (chunk.type !== 'tool-result' && chunk.type !== 'tool-error')) { - return; - } - - const payload = getChunkPayload(chunk); + const payload = getNativeToolResultPayload(chunk); if (!payload) { return; } @@ -1228,8 +1257,18 @@ async function finishSyntheticToolTrace( await startSyntheticToolTrace( { - type: 'tool-call', - payload, + type: 'message', + message: { + role: 'tool', + content: [ + { + type: 'tool-call', + toolCallId: payload.toolCallId, + toolName: payload.toolName, + input: {}, + }, + ], + }, }, records, ); @@ -1370,25 +1409,23 @@ function updateStepRecordFromChunk( return undefined; } - const payload = isRecord(chunk.payload) ? chunk.payload : chunk; - if ((chunk.type === 'text-delta' || chunk.type === 'text') && typeof payload.text === 'string') { - record.textParts.push(payload.text); + if (chunk.type === 'text-delta' && typeof chunk.delta === 'string') { + record.textParts.push(chunk.delta); recordFirstTokenEvent(record); } - if ( - (chunk.type === 'reasoning-delta' || chunk.type === 'reasoning') && - typeof payload.text === 'string' - ) { - record.reasoningParts.push(payload.text); + if (chunk.type === 'reasoning-delta' && typeof chunk.delta === 'string') { + record.reasoningParts.push(chunk.delta); } - if (chunk.type === 'tool-call' && isRecord(payload)) { - record.toolCalls.push(toTraceObject(payload)); + const toolCallPayload = getNativeToolCallPayload(chunk); + if (toolCallPayload) { + record.toolCalls.push(toTraceObject(toolCallPayload)); } - if ((chunk.type === 'tool-result' || chunk.type === 'tool-error') && isRecord(payload)) { - record.toolResults.push(toTraceObject(payload)); + const toolResultPayload = getNativeToolResultPayload(chunk); + if (toolResultPayload) { + record.toolResults.push(toTraceObject(toolResultPayload)); } return record; @@ -2033,20 +2070,12 @@ export async function executeResumableStream( hasError = true; } - const event = - activeSource.streamFormat === 'agent' - ? mapAgentChunkToEvent( - options.context.runId, - options.context.agentId, - chunk, - currentResponseId, - ) - : mapMastraChunkToEvent( - options.context.runId, - options.context.agentId, - chunk, - currentResponseId, - ); + const event = mapAgentChunkToEvent( + options.context.runId, + options.context.agentId, + chunk, + currentResponseId, + ); if (event) { workSummaryAccumulator.observe(event); let shouldPublishEvent = true; @@ -2138,13 +2167,11 @@ export async function executeResumableStream( runId: activeAgentRunId, toolCallId: suspension.toolCallId, }; - const resumed = await resumeStream(options.agent, resumeData, { + const resumed = await resumeAgentStream(options.agent, resumeData, { ...resumeOptions, ...(options.llmStepTraceHooks?.executionOptions ?? {}), }); - const resumedSource = normalizeStreamSource(resumed, { - streamFormat: activeSource.streamFormat, - }); + const resumedSource = normalizeStreamSource(resumed); activeAgentRunId = (typeof resumedSource.runId === 'string' ? resumedSource.runId : '') || activeAgentRunId; diff --git a/packages/@n8n/instance-ai/src/runtime/stream-runner.ts b/packages/@n8n/instance-ai/src/runtime/stream-runner.ts index c15fe9a4dd3..813108a0bf7 100644 --- a/packages/@n8n/instance-ai/src/runtime/stream-runner.ts +++ b/packages/@n8n/instance-ai/src/runtime/stream-runner.ts @@ -11,7 +11,7 @@ import { type TraceStatus, } from './resumable-stream-executor'; import { getTraceParentRun, withTraceParentContext } from '../tracing/langsmith-tracing'; -import { resumeStream } from '../utils/stream-helpers'; +import { resumeAgentStream } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; export interface StreamableAgent { @@ -63,7 +63,7 @@ export async function resumeAgentRun( const resumeTraceParent = getTraceParentRun(); return await withTraceParentContext(resumeTraceParent, async () => { const llmStepTraceHooks = createLlmStepTraceHooks(resumeTraceParent); - const resumed = await resumeStream(agent, resumeData, { + const resumed = await resumeAgentStream(agent, resumeData, { ...resumeOptions, ...(llmStepTraceHooks?.executionOptions ?? {}), }); diff --git a/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts b/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts index f53f20b3193..dc6b2117270 100644 --- a/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts +++ b/packages/@n8n/instance-ai/src/stream/__tests__/map-chunk.test.ts @@ -1,1202 +1,22 @@ -import type { StreamChunk } from '@n8n/agents'; - -import { mapAgentChunkToEvent, mapMastraChunkToEvent } from '../map-chunk'; - -describe('mapMastraChunkToEvent', () => { - const runId = 'run-1'; - const agentId = 'agent-1'; - - // ----------------------------------------------------------------------- - // Null / invalid inputs - // ----------------------------------------------------------------------- - - describe('null and invalid inputs', () => { - it('returns null for null chunk', () => { - expect(mapMastraChunkToEvent(runId, agentId, null)).toBeNull(); - }); - - it('returns null for string chunk', () => { - expect(mapMastraChunkToEvent(runId, agentId, 'hello')).toBeNull(); - }); - - it('returns null for number chunk', () => { - expect(mapMastraChunkToEvent(runId, agentId, 42)).toBeNull(); - }); - - it('returns null for array chunk', () => { - expect(mapMastraChunkToEvent(runId, agentId, [1, 2, 3])).toBeNull(); - }); - - it('returns null for undefined chunk', () => { - expect(mapMastraChunkToEvent(runId, agentId, undefined)).toBeNull(); - }); - - it('returns null for object without type', () => { - expect(mapMastraChunkToEvent(runId, agentId, { payload: { text: 'hi' } })).toBeNull(); - }); - }); - - // ----------------------------------------------------------------------- - // text-delta - // ----------------------------------------------------------------------- - - describe('text-delta', () => { - it('maps chunk with payload.text', () => { - const chunk = { type: 'text-delta', payload: { text: 'hello' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'text-delta', - runId, - agentId, - payload: { text: 'hello' }, - }); - }); - - it('maps chunk with payload.textDelta', () => { - const chunk = { type: 'text-delta', payload: { textDelta: 'world' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'text-delta', - runId, - agentId, - payload: { text: 'world' }, - }); - }); - - it('prefers payload.text over payload.textDelta', () => { - const chunk = { type: 'text-delta', payload: { text: 'preferred', textDelta: 'ignored' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'text-delta', - runId, - agentId, - payload: { text: 'preferred' }, - }); - }); - - it('maps empty string text', () => { - const chunk = { type: 'text-delta', payload: { text: '' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'text-delta', - runId, - agentId, - payload: { text: '' }, - }); - }); - - it('returns null when payload has no text or textDelta', () => { - const chunk = { type: 'text-delta', payload: { other: 'value' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - - it('returns null when payload is not an object', () => { - const chunk = { type: 'text-delta', payload: 'not-an-object' }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - }); - - // ----------------------------------------------------------------------- - // reasoning-delta - // ----------------------------------------------------------------------- - - describe('reasoning-delta', () => { - it('maps chunk with type reasoning-delta', () => { - const chunk = { type: 'reasoning-delta', payload: { text: 'thinking...' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'reasoning-delta', - runId, - agentId, - payload: { text: 'thinking...' }, - }); - }); - - it('maps chunk with type reasoning (alias)', () => { - const chunk = { type: 'reasoning', payload: { text: 'also thinking' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'reasoning-delta', - runId, - agentId, - payload: { text: 'also thinking' }, - }); - }); - - it('maps reasoning with payload.textDelta', () => { - const chunk = { type: 'reasoning-delta', payload: { textDelta: 'delta text' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'reasoning-delta', - runId, - agentId, - payload: { text: 'delta text' }, - }); - }); - - it('returns null when reasoning chunk has no text', () => { - const chunk = { type: 'reasoning-delta', payload: { other: 'value' } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - }); - - // ----------------------------------------------------------------------- - // tool-call - // ----------------------------------------------------------------------- - - describe('tool-call', () => { - it('maps chunk with all fields', () => { - const chunk = { - type: 'tool-call', - payload: { - toolCallId: 'tc-1', - toolName: 'my-tool', - args: { key: 'value' }, - }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'tool-call', - runId, - agentId, - payload: { - toolCallId: 'tc-1', - toolName: 'my-tool', - args: { key: 'value' }, - }, - }); - }); - - it('defaults toolCallId to empty string when missing', () => { - const chunk = { - type: 'tool-call', - payload: { toolName: 'my-tool', args: { a: 1 } }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result?.payload).toMatchObject({ toolCallId: '' }); - }); - - it('defaults toolName to empty string when missing', () => { - const chunk = { - type: 'tool-call', - payload: { toolCallId: 'tc-1', args: {} }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result?.payload).toMatchObject({ toolName: '' }); - }); - - it('defaults args to empty object when not a record', () => { - const chunk = { - type: 'tool-call', - payload: { toolCallId: 'tc-1', toolName: 'tool', args: 'invalid' }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result?.payload).toMatchObject({ args: {} }); - }); - - it('defaults args to empty object when args is an array', () => { - const chunk = { - type: 'tool-call', - payload: { toolCallId: 'tc-1', toolName: 'tool', args: [1, 2] }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result?.payload).toMatchObject({ args: {} }); - }); - }); - - // ----------------------------------------------------------------------- - // tool-result - // ----------------------------------------------------------------------- - - describe('tool-result', () => { - it('maps a normal tool result', () => { - const chunk = { - type: 'tool-result', - payload: { toolCallId: 'tc-1', result: { data: 'ok' } }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'tool-result', - runId, - agentId, - payload: { toolCallId: 'tc-1', result: { data: 'ok' } }, - }); - }); - - it('maps tool-result with string result', () => { - const chunk = { - type: 'tool-result', - payload: { toolCallId: 'tc-1', result: 'done' }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'tool-result', - runId, - agentId, - payload: { toolCallId: 'tc-1', result: 'done' }, - }); - }); - - it('maps tool-result with undefined result', () => { - const chunk = { - type: 'tool-result', - payload: { toolCallId: 'tc-1' }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result).toEqual({ - type: 'tool-result', - runId, - agentId, - payload: { toolCallId: 'tc-1', result: undefined }, - }); - }); - - it('defaults toolCallId to empty string when missing', () => { - const chunk = { - type: 'tool-result', - payload: { result: 'ok' }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result?.payload).toMatchObject({ toolCallId: '' }); - }); - - it('maps tool-result with isError=true to tool-error event', () => { - const chunk = { - type: 'tool-result', - payload: { toolCallId: 'tc-1', isError: true, result: 'Something went wrong' }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'tool-error', - runId, - agentId, - payload: { toolCallId: 'tc-1', error: 'Something went wrong' }, - }); - }); - - it('uses default error message when isError=true but result is not a string', () => { - const chunk = { - type: 'tool-result', - payload: { toolCallId: 'tc-1', isError: true, result: { complex: 'error' } }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'tool-error', - runId, - agentId, - payload: { toolCallId: 'tc-1', error: 'Tool execution failed' }, - }); - }); - }); - - // ----------------------------------------------------------------------- - // tool-error - // ----------------------------------------------------------------------- - - describe('tool-error (chunk type)', () => { - it('maps tool-error chunk without isError to tool-result', () => { - const chunk = { - type: 'tool-error', - payload: { toolCallId: 'tc-1', result: 'some result' }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'tool-result', - runId, - agentId, - payload: { toolCallId: 'tc-1', result: 'some result' }, - }); - }); - - it('maps tool-error chunk with isError=true to tool-error event', () => { - const chunk = { - type: 'tool-error', - payload: { toolCallId: 'tc-2', isError: true, result: 'Timeout' }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'tool-error', - runId, - agentId, - payload: { toolCallId: 'tc-2', error: 'Timeout' }, - }); - }); - }); - - // ----------------------------------------------------------------------- - // tool-call-suspended (confirmation-request) - // ----------------------------------------------------------------------- - - describe('tool-call-suspended (confirmation-request)', () => { - it('maps basic confirmation with requestId and toolCallId', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - toolName: 'delete-workflow', - args: { id: 'wf-1' }, - suspendPayload: { - requestId: 'req-1', - message: 'Delete this workflow?', - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result).toEqual({ - type: 'confirmation-request', - runId, - agentId, - payload: { - requestId: 'req-1', - toolCallId: 'tc-1', - toolName: 'delete-workflow', - args: { id: 'wf-1' }, - severity: 'warning', - message: 'Delete this workflow?', - }, - }); - }); - - it('falls back requestId to toolCallId when requestId is absent', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-fallback', - toolName: 'some-tool', - suspendPayload: { - message: 'Confirm?', - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result?.type).toBe('confirmation-request'); - if (result?.type === 'confirmation-request') { - expect(result.payload.requestId).toBe('tc-fallback'); - } - }); - - it('returns null when both requestId and toolCallId are empty', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: '', - suspendPayload: { - requestId: '', - }, - }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - - it('returns null when toolCallId is missing', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - suspendPayload: { - requestId: 'req-1', - }, - }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - - it('defaults message to "Confirmation required" when missing', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: {}, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.message).toBe('Confirmation required'); - } - }); - - // Severity - - it.each(['destructive', 'warning', 'info'] as const)( - 'accepts valid severity "%s"', - (severity) => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { severity }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.severity).toBe(severity); - } - }, - ); - - it('defaults severity to warning for unknown value', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { severity: 'critical' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.severity).toBe('warning'); - } - }); - - it('defaults severity to warning when severity is missing', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: {}, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.severity).toBe('warning'); - } - }); - - // credentialRequests - - it('includes valid credentialRequests', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - credentialRequests: [ - { - credentialType: 'notionApi', - reason: 'Need Notion access', - existingCredentials: [{ id: 'cred-1', name: 'My Notion' }], - suggestedName: 'Notion Cred', - }, - ], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.credentialRequests).toEqual([ - { - credentialType: 'notionApi', - reason: 'Need Notion access', - existingCredentials: [{ id: 'cred-1', name: 'My Notion' }], - suggestedName: 'Notion Cred', - }, - ]); - } - }); - - it('filters out invalid credentialRequests', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - credentialRequests: [ - { invalid: true }, - { - credentialType: 'slackApi', - reason: 'Need Slack', - existingCredentials: [], - }, - ], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.credentialRequests).toEqual([ - { - credentialType: 'slackApi', - reason: 'Need Slack', - existingCredentials: [], - }, - ]); - } - }); - - it('omits credentialRequests when all items are invalid', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - credentialRequests: [{ invalid: true }], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('credentialRequests'); - } - }); - - // projectId - - it('includes projectId when present', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { projectId: 'proj-123' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.projectId).toBe('proj-123'); - } - }); - - it('omits projectId when not a string', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { projectId: 123 }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('projectId'); - } - }); - - // inputType - - it.each(['approval', 'text', 'questions', 'plan-review'] as const)( - 'accepts valid inputType "%s"', - (inputType) => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { inputType }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.inputType).toBe(inputType); - } - }, - ); - - it('omits inputType for invalid value', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { inputType: 'invalid-type' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('inputType'); - } - }); - - it('omits inputType when not a string', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { inputType: 42 }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('inputType'); - } - }); - - // questions - - it('includes valid questions', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - questions: [ - { id: 'q1', question: 'Which one?', type: 'single', options: ['A', 'B'] }, - { id: 'q2', question: 'Describe it', type: 'text' }, - ], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.questions).toEqual([ - { id: 'q1', question: 'Which one?', type: 'single', options: ['A', 'B'] }, - { id: 'q2', question: 'Describe it', type: 'text' }, - ]); - } - }); - - it('filters out invalid questions', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - questions: [{ id: 'q1', question: 'Valid', type: 'multi' }, { missing: 'fields' }], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.questions).toEqual([{ id: 'q1', question: 'Valid', type: 'multi' }]); - } - }); - - it('omits questions when all items are invalid', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - questions: [{ bad: true }], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('questions'); - } - }); - - // introMessage - - it('includes introMessage when present', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { introMessage: 'Please review the following' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.introMessage).toBe('Please review the following'); - } - }); - - it('omits introMessage when not a string', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { introMessage: 123 }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('introMessage'); - } - }); - - // tasks - - it('includes valid tasks', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - tasks: { - tasks: [ - { id: 't1', description: 'Build workflow', status: 'todo' }, - { id: 't2', description: 'Test workflow', status: 'in_progress' }, - ], - }, - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.tasks).toEqual({ - tasks: [ - { id: 't1', description: 'Build workflow', status: 'todo' }, - { id: 't2', description: 'Test workflow', status: 'in_progress' }, - ], - }); - } - }); - - it('omits tasks when schema validation fails', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - tasks: { tasks: [{ invalid: true }] }, - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('tasks'); - } - }); - - it('omits tasks when not a record', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { tasks: 'not-an-object' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('tasks'); - } - }); - - // domainAccess - - it('includes domainAccess with url and host', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - domainAccess: { url: 'https://example.com/api', host: 'example.com' }, - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.domainAccess).toEqual({ - url: 'https://example.com/api', - host: 'example.com', - }); - } - }); - - it('omits domainAccess when url is missing', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - domainAccess: { host: 'example.com' }, - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('domainAccess'); - } - }); - - it('omits domainAccess when host is missing', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - domainAccess: { url: 'https://example.com' }, - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('domainAccess'); - } - }); - - it('omits domainAccess when not a record', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { domainAccess: 'not-an-object' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('domainAccess'); - } - }); - - // credentialFlow - - it.each(['generic', 'finalize'] as const)( - 'includes credentialFlow with valid stage "%s"', - (stage) => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { credentialFlow: { stage } }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.credentialFlow).toEqual({ stage }); - } - }, - ); - - it('omits credentialFlow for invalid stage', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { credentialFlow: { stage: 'unknown' } }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('credentialFlow'); - } - }); - - it('omits credentialFlow when not a record', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { credentialFlow: 'generic' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('credentialFlow'); - } - }); - - // setupRequests - - it('includes valid setupRequests', () => { - const validNode = { - node: { - name: 'Slack', - type: 'n8n-nodes-base.slack', - typeVersion: 2, - parameters: {}, - position: [0, 0] as [number, number], - id: 'node-1', - }, - isTrigger: false, - }; - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - setupRequests: [validNode], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.setupRequests).toEqual([validNode]); - } - }); - - it('filters out invalid setupRequests', () => { - const validNode = { - node: { - name: 'Slack', - type: 'n8n-nodes-base.slack', - typeVersion: 2, - parameters: {}, - position: [0, 0] as [number, number], - id: 'node-1', - }, - isTrigger: false, - }; - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - setupRequests: [{ invalid: true }, validNode], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.setupRequests).toEqual([validNode]); - } - }); - - it('omits setupRequests when all items are invalid', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { - setupRequests: [{ bad: true }], - }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('setupRequests'); - } - }); - - // workflowId - - it('includes workflowId when present', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { workflowId: 'wf-abc' }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.workflowId).toBe('wf-abc'); - } - }); - - it('omits workflowId when not a string', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: { workflowId: 42 }, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload).not.toHaveProperty('workflowId'); - } - }); - - // defaults for toolName and args - - it('defaults toolName to empty string when missing from payload', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: {}, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.toolName).toBe(''); - } - }); - - it('defaults args to empty object when missing from payload', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - suspendPayload: {}, - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'confirmation-request') { - expect(result.payload.args).toEqual({}); - } - }); - - // suspendPayload missing - - it('handles missing suspendPayload gracefully', () => { - const chunk = { - type: 'tool-call-suspended', - payload: { - toolCallId: 'tc-1', - }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result).toEqual({ - type: 'confirmation-request', - runId, - agentId, - payload: { - requestId: 'tc-1', - toolCallId: 'tc-1', - toolName: '', - args: {}, - severity: 'warning', - message: 'Confirmation required', - }, - }); - }); - }); - - // ----------------------------------------------------------------------- - // error - // ----------------------------------------------------------------------- - - describe('error', () => { - it('maps string error', () => { - const chunk = { - type: 'error', - payload: { error: 'Something failed' }, - }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'error', - runId, - agentId, - payload: { content: 'Something failed' }, - }); - }); - - it('maps Error instance', () => { - const chunk = { - type: 'error', - payload: { error: new Error('Boom') }, - }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - expect(result).toEqual({ - type: 'error', - runId, - agentId, - payload: { content: 'Boom' }, - }); - }); - - it('maps Error with statusCode', () => { - const error = new Error('Rate limited'); - Object.assign(error, { statusCode: 429 }); - const chunk = { type: 'error', payload: { error } }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'error') { - expect(result.payload.content).toBe('Rate limited'); - expect(result.payload.statusCode).toBe(429); - } - }); - - it('maps Error with JSON responseBody and extracts message', () => { - const error = new Error('API Error'); - Object.assign(error, { - statusCode: 400, - responseBody: JSON.stringify({ - error: { message: 'Invalid API key', type: 'invalid_request_error' }, - }), - }); - const chunk = { type: 'error', payload: { error } }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'error') { - expect(result.payload.content).toBe('Invalid API key'); - expect(result.payload.statusCode).toBe(400); - expect(result.payload.technicalDetails).toBe( - JSON.stringify({ - error: { message: 'Invalid API key', type: 'invalid_request_error' }, - }), - ); - } - }); - - it('maps Error with non-JSON responseBody as technicalDetails', () => { - const error = new Error('Server error'); - Object.assign(error, { - responseBody: 'Internal Server Error', - }); - const chunk = { type: 'error', payload: { error } }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'error') { - expect(result.payload.content).toBe('Server error'); - expect(result.payload.technicalDetails).toBe('Internal Server Error'); - } - }); - - it('maps Error with JSON responseBody but no error.message keeps original message', () => { - const error = new Error('Original message'); - Object.assign(error, { - responseBody: JSON.stringify({ status: 'fail' }), - }); - const chunk = { type: 'error', payload: { error } }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'error') { - expect(result.payload.content).toBe('Original message'); - } - }); - - it('maps Error with anthropic URL to provider Anthropic', () => { - const error = new Error('Context length exceeded'); - Object.assign(error, { url: 'https://api.anthropic.com/v1/messages' }); - const chunk = { type: 'error', payload: { error } }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'error') { - expect(result.payload.provider).toBe('Anthropic'); - } - }); - - it('maps Error with openai URL to provider OpenAI', () => { - const error = new Error('Model not found'); - Object.assign(error, { url: 'https://api.openai.com/v1/chat/completions' }); - const chunk = { type: 'error', payload: { error } }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'error') { - expect(result.payload.provider).toBe('OpenAI'); - } - }); - - it('does not set provider for unknown URLs', () => { - const error = new Error('Fail'); - Object.assign(error, { url: 'https://api.custom-llm.com/v1/chat' }); - const chunk = { type: 'error', payload: { error } }; - const result = mapMastraChunkToEvent(runId, agentId, chunk); - if (result?.type === 'error') { - expect(result.payload).not.toHaveProperty('provider'); - } - }); - - it('maps unknown error type to "Unknown error"', () => { - const chunk = { type: 'error', payload: { error: 12345 } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'error', - runId, - agentId, - payload: { content: 'Unknown error' }, - }); - }); - - it('maps null error to "Unknown error"', () => { - const chunk = { type: 'error', payload: { error: null } }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'error', - runId, - agentId, - payload: { content: 'Unknown error' }, - }); - }); - - it('maps undefined error to "Unknown error"', () => { - const chunk = { type: 'error', payload: {} }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toEqual({ - type: 'error', - runId, - agentId, - payload: { content: 'Unknown error' }, - }); - }); - }); - - // ----------------------------------------------------------------------- - // Unknown / ignored chunk types - // ----------------------------------------------------------------------- - - describe('unknown chunk types', () => { - it('returns null for step-finish type', () => { - const chunk = { type: 'step-finish', payload: {} }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - - it('returns null for finish type', () => { - const chunk = { type: 'finish', payload: {} }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - - it('returns null for completely unknown type', () => { - const chunk = { type: 'something-new', payload: {} }; - expect(mapMastraChunkToEvent(runId, agentId, chunk)).toBeNull(); - }); - }); -}); +import { mapAgentChunkToEvent } from '../map-chunk'; describe('mapAgentChunkToEvent', () => { const runId = 'run-1'; const agentId = 'agent-1'; - it('maps native text delta chunks', () => { - const chunk = { type: 'text-delta', delta: 'hello' } satisfies StreamChunk; + const map = (chunk: unknown, responseId?: string) => + mapAgentChunkToEvent(runId, agentId, chunk, responseId); - expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({ + it('returns null for invalid and ignored chunks', () => { + expect(map(null)).toBeNull(); + expect(map('hello')).toBeNull(); + expect(map({ payload: { text: 'hi' } })).toBeNull(); + expect(map({ type: 'finish', finishReason: 'stop' })).toBeNull(); + expect(map({ type: 'tool-call-delta', argumentsDelta: '{"x"' })).toBeNull(); + }); + + it('maps native text deltas', () => { + expect(map({ type: 'text-delta', delta: 'hello' })).toEqual({ type: 'text-delta', runId, agentId, @@ -1204,64 +24,286 @@ describe('mapAgentChunkToEvent', () => { }); }); - it('maps native tool result messages', () => { - const chunk = { - type: 'message', - message: { - role: 'tool', - content: [ - { - type: 'tool-result', - toolCallId: 'tc-1', - toolName: 'lookup', - result: { ok: true }, - }, - ], - }, - } satisfies StreamChunk; + it('maps native reasoning deltas', () => { + expect(map({ type: 'reasoning-delta', delta: 'thinking...' })).toEqual({ + type: 'reasoning-delta', + runId, + agentId, + payload: { text: 'thinking...' }, + }); + }); - expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({ + it('preserves responseId when provided', () => { + expect(map({ type: 'text-delta', delta: 'hello' }, 'response-1')).toEqual({ + type: 'text-delta', + runId, + agentId, + responseId: 'response-1', + payload: { text: 'hello' }, + }); + }); + + it('maps native tool call messages', () => { + expect( + map({ + type: 'message', + message: { + role: 'tool', + content: [ + { + type: 'tool-call', + toolCallId: 'tc-1', + toolName: 'create-workflow', + input: { name: 'Workflow' }, + }, + ], + }, + }), + ).toEqual({ + type: 'tool-call', + runId, + agentId, + payload: { + toolCallId: 'tc-1', + toolName: 'create-workflow', + args: { name: 'Workflow' }, + }, + }); + }); + + it('maps native tool result messages', () => { + expect( + map({ + type: 'message', + message: { + role: 'tool', + content: [ + { + type: 'tool-result', + toolCallId: 'tc-1', + toolName: 'create-workflow', + result: { workflowId: 'wf-1' }, + }, + ], + }, + }), + ).toEqual({ type: 'tool-result', runId, agentId, payload: { toolCallId: 'tc-1', - result: { ok: true }, + result: { workflowId: 'wf-1' }, }, }); }); - it('maps native suspended tool calls to confirmation requests', () => { - const chunk = { - type: 'tool-call-suspended', - toolCallId: 'tc-1', - toolName: 'delete-workflow', - input: { id: 'wf-1' }, - suspendPayload: { - requestId: 'req-1', - message: 'Delete this workflow?', - severity: 'destructive', + it('maps native tool result errors', () => { + expect( + map({ + type: 'message', + message: { + role: 'tool', + content: [ + { + type: 'tool-result', + toolCallId: 'tc-1', + toolName: 'create-workflow', + result: 'Could not create workflow', + isError: true, + }, + ], + }, + }), + ).toEqual({ + type: 'tool-error', + runId, + agentId, + payload: { + toolCallId: 'tc-1', + error: 'Could not create workflow', }, - } satisfies StreamChunk; + }); + }); - expect(mapAgentChunkToEvent(runId, agentId, chunk)).toEqual({ + it('maps native suspension chunks to confirmation requests', () => { + const validSetupNode = { + node: { + name: 'Slack', + type: 'n8n-nodes-base.slack', + typeVersion: 2, + parameters: {}, + position: [0, 0] as [number, number], + id: 'node-1', + }, + isTrigger: false, + }; + + expect( + map({ + type: 'tool-call-suspended', + toolCallId: 'tc-1', + toolName: 'ask-user', + input: { prompt: 'Confirm?' }, + suspendPayload: { + requestId: 'request-1', + severity: 'destructive', + message: 'Need approval', + credentialRequests: [ + { + credentialType: 'slackApi', + reason: 'Need Slack access', + existingCredentials: [{ id: 'cred-1', name: 'Main Slack' }], + suggestedName: 'Slack API', + }, + ], + projectId: 'project-1', + inputType: 'plan-review', + questions: [ + { + id: 'q1', + question: 'Which channel?', + type: 'text', + }, + ], + introMessage: 'Before continuing', + tasks: { + tasks: [{ id: 't1', description: 'Build workflow', status: 'todo' }], + }, + planItems: [ + { + id: 'task-1', + title: 'Build workflow', + kind: 'build-workflow', + spec: 'Create a workflow', + deps: [], + }, + ], + domainAccess: { url: 'https://example.com/api', host: 'example.com' }, + credentialFlow: { stage: 'generic' }, + setupRequests: [validSetupNode], + workflowId: 'wf-1', + resourceDecision: { + toolGroup: 'Local Gateway', + resource: '/tmp/file.txt', + description: 'Read /tmp/file.txt', + options: ['allow', 'deny'], + }, + }, + }), + ).toEqual({ type: 'confirmation-request', runId, agentId, payload: { - requestId: 'req-1', + requestId: 'request-1', toolCallId: 'tc-1', - toolName: 'delete-workflow', - args: { id: 'wf-1' }, + toolName: 'ask-user', + args: { prompt: 'Confirm?' }, severity: 'destructive', - message: 'Delete this workflow?', + message: 'Need approval', + credentialRequests: [ + { + credentialType: 'slackApi', + reason: 'Need Slack access', + existingCredentials: [{ id: 'cred-1', name: 'Main Slack' }], + suggestedName: 'Slack API', + }, + ], + projectId: 'project-1', + inputType: 'plan-review', + domainAccess: { url: 'https://example.com/api', host: 'example.com' }, + credentialFlow: { stage: 'generic' }, + setupRequests: [validSetupNode], + workflowId: 'wf-1', + questions: [ + { + id: 'q1', + question: 'Which channel?', + type: 'text', + }, + ], + introMessage: 'Before continuing', + tasks: { + tasks: [{ id: 't1', description: 'Build workflow', status: 'todo' }], + }, + planItems: [ + { + id: 'task-1', + title: 'Build workflow', + kind: 'build-workflow', + spec: 'Create a workflow', + deps: [], + }, + ], + resourceDecision: { + toolGroup: 'Local Gateway', + resource: '/tmp/file.txt', + description: 'Read /tmp/file.txt', + options: ['allow', 'deny'], + }, }, }); }); - it('ignores native finish chunks', () => { - const chunk = { type: 'finish', finishReason: 'stop' } satisfies StreamChunk; + it('defaults optional suspension values and filters invalid structured payloads', () => { + const result = map({ + type: 'tool-call-suspended', + toolCallId: 'tc-1', + suspendPayload: { + severity: 'unknown', + credentialRequests: [{ invalid: true }], + inputType: 'bad-input-type', + questions: [{ invalid: true }], + tasks: { tasks: [{ invalid: true }] }, + domainAccess: { url: 'https://example.com' }, + credentialFlow: { stage: 'unknown' }, + setupRequests: [{ invalid: true }], + workflowId: 42, + }, + }); - expect(mapAgentChunkToEvent(runId, agentId, chunk)).toBeNull(); + expect(result).toEqual({ + type: 'confirmation-request', + runId, + agentId, + payload: { + requestId: 'tc-1', + toolCallId: 'tc-1', + toolName: '', + args: {}, + severity: 'warning', + message: 'Confirmation required', + }, + }); + }); + + it('returns null for suspensions without a tool call id', () => { + expect( + map({ type: 'tool-call-suspended', suspendPayload: { requestId: 'request-1' } }), + ).toBeNull(); + }); + + it('maps native error chunks with provider metadata when available', () => { + const error = new Error('Provider failed') as Error & { + statusCode: number; + responseBody: string; + url: string; + }; + error.statusCode = 429; + error.responseBody = JSON.stringify({ error: { message: 'Rate limited' } }); + error.url = 'https://api.openai.com/v1/responses'; + + expect(map({ type: 'error', error })).toEqual({ + type: 'error', + runId, + agentId, + payload: { + content: 'Rate limited', + statusCode: 429, + provider: 'OpenAI', + technicalDetails: JSON.stringify({ error: { message: 'Rate limited' } }), + }, + }); }); }); diff --git a/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts b/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts index d785858fb9a..e96955bc648 100644 --- a/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts +++ b/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts @@ -23,7 +23,7 @@ export interface ConsumeWithHitlOptions { * Used to unblock HITL suspensions when a correction arrives mid-confirmation. */ waitForCorrection?: () => Promise; llmStepTraceHooks?: LlmStepTraceHooks; - /** Max iterations for the agent — passed to resumeStream so resumed streams keep the same limit. */ + /** Max iterations for the agent; passed to native stream resume so resumed streams keep the same limit. */ maxIterations?: number; /** Additional options to preserve when resuming a suspended stream. */ resumeOptions?: Record; diff --git a/packages/@n8n/instance-ai/src/stream/map-chunk.ts b/packages/@n8n/instance-ai/src/stream/map-chunk.ts index 66b52046088..38e030c1d69 100644 --- a/packages/@n8n/instance-ai/src/stream/map-chunk.ts +++ b/packages/@n8n/instance-ai/src/stream/map-chunk.ts @@ -48,8 +48,6 @@ interface ErrorInfo { technicalDetails?: string; } -/** Extract structured error info from Mastra's error chunk payload. - * Mastra sets `payload.error` to the raw Error object, not a string. */ function extractErrorInfo(error: unknown): ErrorInfo { if (typeof error === 'string') return { content: error }; @@ -88,91 +86,35 @@ function extractErrorInfo(error: unknown): ErrorInfo { return { content: 'Unknown error' }; } -/** - * Maps a Mastra fullStream chunk to our InstanceAiEvent schema. - * - * Mastra chunks have the shape: { type, runId, from, payload: { ... } } - * The actual data (textDelta, toolCallId, etc.) lives inside chunk.payload. - * - * Returns null for unrecognized chunk types (step-finish, finish, etc.) - */ -export function mapMastraChunkToEvent( +export function mapAgentChunkToEvent( runId: string, agentId: string, chunk: unknown, responseId?: string, ): InstanceAiEvent | null { - if (!isRecord(chunk)) return null; + if (!isAgentStreamChunk(chunk)) return null; - const { type } = chunk; - const payload = isRecord(chunk.payload) ? chunk.payload : {}; const base = { runId, agentId, ...(responseId ? { responseId } : {}) }; - // Mastra payload uses `text` (not `textDelta`) for text-delta chunks - const textValue = - typeof payload.text === 'string' - ? payload.text - : typeof payload.textDelta === 'string' - ? payload.textDelta - : undefined; - - if (type === 'text-delta' && textValue !== undefined) { + if (chunk.type === 'text-delta') { return { type: 'text-delta', ...base, - payload: { text: textValue }, + payload: { text: chunk.delta }, }; } - if ((type === 'reasoning-delta' || type === 'reasoning') && textValue !== undefined) { + if (chunk.type === 'reasoning-delta') { return { type: 'reasoning-delta', ...base, - payload: { text: textValue }, + payload: { text: chunk.delta }, }; } - if (type === 'tool-call') { - return { - type: 'tool-call', - ...base, - payload: { - toolCallId: typeof payload.toolCallId === 'string' ? payload.toolCallId : '', - toolName: typeof payload.toolName === 'string' ? payload.toolName : '', - args: isRecord(payload.args) ? payload.args : {}, - }, - }; - } - - if (type === 'tool-result' || type === 'tool-error') { - const toolCallId = typeof payload.toolCallId === 'string' ? payload.toolCallId : ''; - - // Mastra signals tool errors via `isError` on tool-result chunks, - // not a separate event type. Map to our `tool-error` event. - if (payload.isError === true) { - return { - type: 'tool-error', - ...base, - payload: { - toolCallId, - error: typeof payload.result === 'string' ? payload.result : 'Tool execution failed', - }, - }; - } - - return { - type: 'tool-result', - ...base, - payload: { - toolCallId, - result: payload.result, - }, - }; - } - - if (type === 'tool-call-suspended') { - const suspendPayload = isRecord(payload.suspendPayload) ? payload.suspendPayload : {}; - const toolCallId = typeof payload.toolCallId === 'string' ? payload.toolCallId : ''; + if (chunk.type === 'tool-call-suspended') { + const suspendPayload = isRecord(chunk.suspendPayload) ? chunk.suspendPayload : {}; + const toolCallId = typeof chunk.toolCallId === 'string' ? chunk.toolCallId : ''; const requestId = typeof suspendPayload.requestId === 'string' && suspendPayload.requestId @@ -187,7 +129,6 @@ export function mapMastraChunkToEvent( ? (rawSeverity as (typeof validSeverities)[number]) : 'warning'; - // Extract and validate optional credentialRequests for credential setup HITL let credentialRequests: InstanceAiCredentialRequest[] | undefined; if (Array.isArray(suspendPayload.credentialRequests)) { const parsed = suspendPayload.credentialRequests @@ -199,11 +140,9 @@ export function mapMastraChunkToEvent( } } - // Extract optional projectId for project-scoped actions const projectId = typeof suspendPayload.projectId === 'string' ? suspendPayload.projectId : undefined; - // Extract optional inputType (e.g., 'text' for ask-user, 'questions', 'plan-review', 'resource-decision') const rawInputType = typeof suspendPayload.inputType === 'string' ? suspendPayload.inputType : undefined; const validInputTypes = [ @@ -217,7 +156,6 @@ export function mapMastraChunkToEvent( ? (rawInputType as (typeof validInputTypes)[number]) : undefined; - // Extract optional structured questions (for ask-user tool with questions) let questions: Array> | undefined; if (Array.isArray(suspendPayload.questions)) { const parsed = suspendPayload.questions @@ -229,11 +167,9 @@ export function mapMastraChunkToEvent( } } - // Extract optional intro message const introMessage = typeof suspendPayload.introMessage === 'string' ? suspendPayload.introMessage : undefined; - // Extract optional task list (for plan-review) let tasks: TaskList | undefined; if (isRecord(suspendPayload.tasks)) { const parsed = taskListSchema.safeParse(suspendPayload.tasks); @@ -242,7 +178,6 @@ export function mapMastraChunkToEvent( } } - // Extract optional full planned task items (for plan-review panel details) let planItems: PlannedTaskArg[] | undefined; if (Array.isArray(suspendPayload.planItems)) { const parsed = suspendPayload.planItems @@ -254,7 +189,6 @@ export function mapMastraChunkToEvent( } } - // Extract optional domainAccess metadata (for domain-gated tools like fetch-url) const rawDomainAccess = isRecord(suspendPayload.domainAccess) ? suspendPayload.domainAccess : undefined; @@ -265,7 +199,6 @@ export function mapMastraChunkToEvent( ? { url: rawDomainAccess.url, host: rawDomainAccess.host } : undefined; - // Extract optional credentialFlow for credential setup stage const rawCredentialFlow = isRecord(suspendPayload.credentialFlow) ? suspendPayload.credentialFlow : undefined; @@ -279,7 +212,6 @@ export function mapMastraChunkToEvent( ? { stage: rawStage as 'generic' | 'finalize' } : undefined; - // Extract and validate optional setupRequests for workflow setup HITL let setupRequests: InstanceAiWorkflowSetupNode[] | undefined; if (Array.isArray(suspendPayload.setupRequests)) { const parsed = suspendPayload.setupRequests @@ -291,11 +223,9 @@ export function mapMastraChunkToEvent( } } - // Extract optional workflowId for workflow setup tool const workflowId = typeof suspendPayload.workflowId === 'string' ? suspendPayload.workflowId : undefined; - // Extract optional resourceDecision for gateway permission gating (inputType=resource-decision) let resourceDecision: GatewayConfirmationRequiredPayload | undefined; if (isRecord(suspendPayload.resourceDecision)) { const parsed = gatewayConfirmationRequiredPayloadSchema.safeParse( @@ -312,8 +242,8 @@ export function mapMastraChunkToEvent( payload: { requestId, toolCallId, - toolName: typeof payload.toolName === 'string' ? payload.toolName : '', - args: isRecord(payload.args) ? payload.args : {}, + toolName: typeof chunk.toolName === 'string' ? chunk.toolName : '', + args: isRecord(chunk.input) ? chunk.input : {}, severity, message: typeof suspendPayload.message === 'string' @@ -335,8 +265,47 @@ export function mapMastraChunkToEvent( }; } - if (type === 'error') { - const errorInfo = extractErrorInfo(payload.error); + if (chunk.type === 'message' && 'role' in chunk.message && chunk.message.role === 'tool') { + const toolCall = chunk.message.content.find((part) => part.type === 'tool-call'); + if (toolCall?.type === 'tool-call') { + return { + type: 'tool-call', + ...base, + payload: { + toolCallId: typeof toolCall.toolCallId === 'string' ? toolCall.toolCallId : '', + toolName: toolCall.toolName, + args: isRecord(toolCall.input) ? toolCall.input : {}, + }, + }; + } + + const toolResult = chunk.message.content.find((part) => part.type === 'tool-result'); + if (toolResult?.type === 'tool-result') { + if (toolResult.isError === true) { + return { + type: 'tool-error', + ...base, + payload: { + toolCallId: toolResult.toolCallId, + error: + typeof toolResult.result === 'string' ? toolResult.result : 'Tool execution failed', + }, + }; + } + + return { + type: 'tool-result', + ...base, + payload: { + toolCallId: toolResult.toolCallId, + result: toolResult.result, + }, + }; + } + } + + if (chunk.type === 'error') { + const errorInfo = extractErrorInfo(chunk.error); return { type: 'error', ...base, @@ -349,97 +318,5 @@ export function mapMastraChunkToEvent( }; } - // Other Mastra chunk types (step-finish, finish, etc.) are ignored - return null; -} - -export function mapAgentChunkToEvent( - runId: string, - agentId: string, - chunk: unknown, - responseId?: string, -): InstanceAiEvent | null { - if (!isAgentStreamChunk(chunk)) return null; - - if (chunk.type === 'text-delta') { - return mapMastraChunkToEvent( - runId, - agentId, - { type: 'text-delta', payload: { text: chunk.delta } }, - responseId, - ); - } - - if (chunk.type === 'reasoning-delta') { - return mapMastraChunkToEvent( - runId, - agentId, - { type: 'reasoning-delta', payload: { text: chunk.delta } }, - responseId, - ); - } - - if (chunk.type === 'tool-call-suspended') { - return mapMastraChunkToEvent( - runId, - agentId, - { - type: 'tool-call-suspended', - payload: { - toolCallId: chunk.toolCallId, - toolName: chunk.toolName, - args: isRecord(chunk.input) ? chunk.input : {}, - suspendPayload: isRecord(chunk.suspendPayload) ? chunk.suspendPayload : {}, - }, - }, - responseId, - ); - } - - if (chunk.type === 'message' && 'role' in chunk.message && chunk.message.role === 'tool') { - const toolCall = chunk.message.content.find((part) => part.type === 'tool-call'); - if (toolCall?.type === 'tool-call') { - return mapMastraChunkToEvent( - runId, - agentId, - { - type: 'tool-call', - payload: { - toolCallId: toolCall.toolCallId, - toolName: toolCall.toolName, - args: isRecord(toolCall.input) ? toolCall.input : {}, - }, - }, - responseId, - ); - } - - const toolResult = chunk.message.content.find((part) => part.type === 'tool-result'); - if (toolResult?.type === 'tool-result') { - return mapMastraChunkToEvent( - runId, - agentId, - { - type: 'tool-result', - payload: { - toolCallId: toolResult.toolCallId, - result: toolResult.result, - isError: toolResult.isError, - }, - }, - responseId, - ); - } - } - - if (chunk.type === 'error') { - return mapMastraChunkToEvent( - runId, - agentId, - { type: 'error', payload: { error: chunk.error } }, - responseId, - ); - } - return null; } diff --git a/packages/@n8n/instance-ai/src/tools/filesystem/create-tools-from-mcp-server.ts b/packages/@n8n/instance-ai/src/tools/filesystem/create-tools-from-mcp-server.ts index d927f778153..765ee7daa9b 100644 --- a/packages/@n8n/instance-ai/src/tools/filesystem/create-tools-from-mcp-server.ts +++ b/packages/@n8n/instance-ai/src/tools/filesystem/create-tools-from-mcp-server.ts @@ -181,7 +181,7 @@ export function createToolsFromLocalMcpServer(server: LocalMcpServer): Record { if (item.type === 'image') { return { diff --git a/packages/@n8n/instance-ai/src/tools/nodes.tool.ts b/packages/@n8n/instance-ai/src/tools/nodes.tool.ts index aa6f9d718ce..9cad2070667 100644 --- a/packages/@n8n/instance-ai/src/tools/nodes.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/nodes.tool.ts @@ -192,7 +192,7 @@ async function handleTypeDefinition( context: InstanceAiContext, input: Extract, ) { - // Mastra validates against the flattened top-level schema (required for + // Native tool validation uses the flattened top-level schema (required for // Anthropic's `type: "object"` constraint), which makes every variant field // optional. Re-assert the variant contract so missing/invalid inputs return // a structured error the model can self-correct from, instead of crashing diff --git a/packages/@n8n/instance-ai/src/tools/workflows/write-sandbox-file.tool.ts b/packages/@n8n/instance-ai/src/tools/workflows/write-sandbox-file.tool.ts index e8e85c76021..10694dfbafb 100644 --- a/packages/@n8n/instance-ai/src/tools/workflows/write-sandbox-file.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/workflows/write-sandbox-file.tool.ts @@ -2,8 +2,8 @@ * Write Sandbox File Tool * * Writes a file to the sandbox workspace. Uses command-based I/O so it works - * with both Daytona and Local sandboxes (unlike Mastra's built-in write_file - * which requires workspace.filesystem — absent on Daytona). + * with both Daytona and Local sandboxes, including environments where only + * command-based file I/O is available. */ import { Tool } from '@n8n/agents'; diff --git a/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts b/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts index 0c0ca1c5545..342da8154a5 100644 --- a/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts +++ b/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts @@ -1,4 +1,4 @@ -import { isRecord, parseSuspension, asResumable, resumeStream } from '../stream-helpers'; +import { isRecord, parseSuspension, asResumable, resumeAgentStream } from '../stream-helpers'; describe('isRecord', () => { it('returns true for plain objects', () => { @@ -122,35 +122,25 @@ describe('parseSuspension', () => { describe('asResumable', () => { it('casts agent to Resumable interface', () => { - const agent = { resumeStream: jest.fn() }; + const agent = { resume: jest.fn() }; const resumable = asResumable(agent); - expect(resumable.resumeStream).toBe(agent.resumeStream); + expect(resumable.resume).toBe(agent.resume); }); }); -describe('resumeStream', () => { - it('uses Mastra-style resumeStream when available', async () => { - const resumed = { runId: 'run-2' }; - const agent = { resumeStream: jest.fn().mockResolvedValue(resumed) }; - - await expect(resumeStream(agent, { approved: true }, { runId: 'run-1' })).resolves.toBe( - resumed, - ); - expect(agent.resumeStream).toHaveBeenCalledWith({ approved: true }, { runId: 'run-1' }); - }); - - it('uses native agent resume in stream mode when resumeStream is absent', async () => { +describe('resumeAgentStream', () => { + it('uses native agent resume in stream mode', async () => { const resumed = { runId: 'run-2' }; const agent = { resume: jest.fn().mockResolvedValue(resumed) }; - await expect(resumeStream(agent, { approved: true }, { runId: 'run-1' })).resolves.toBe( + await expect(resumeAgentStream(agent, { approved: true }, { runId: 'run-1' })).resolves.toBe( resumed, ); expect(agent.resume).toHaveBeenCalledWith('stream', { approved: true }, { runId: 'run-1' }); }); it('throws when the agent cannot resume streams', async () => { - await expect(resumeStream({}, { approved: true }, { runId: 'run-1' })).rejects.toThrow( + await expect(resumeAgentStream({}, { approved: true }, { runId: 'run-1' })).rejects.toThrow( 'Agent does not support stream resume', ); }); diff --git a/packages/@n8n/instance-ai/src/utils/stream-helpers.ts b/packages/@n8n/instance-ai/src/utils/stream-helpers.ts index 99c3769ecbf..30c690c2a44 100644 --- a/packages/@n8n/instance-ai/src/utils/stream-helpers.ts +++ b/packages/@n8n/instance-ai/src/utils/stream-helpers.ts @@ -32,12 +32,7 @@ export function parseSuspension(chunk: unknown): SuspensionInfo | null { return { toolCallId: tcId, requestId: reqId, toolName }; } -/** Type for Mastra's resumeStream method (not exported by the framework). */ export interface Resumable { - resumeStream?: ( - data: Record, - options: Record, - ) => Promise; resume?: ( method: 'stream', data: Record, @@ -50,7 +45,7 @@ export function asResumable(agent: unknown): Resumable { return agent as Resumable; } -export async function resumeStream( +export async function resumeAgentStream( agent: unknown, data: Record, options: Record, @@ -61,10 +56,6 @@ export async function resumeStream( const resumable = asResumable(agent); - if (typeof resumable.resumeStream === 'function') { - return await resumable.resumeStream(data, options); - } - if (typeof resumable.resume === 'function') { return await resumable.resume('stream', data, options); } diff --git a/packages/cli/src/modules/instance-ai/eval/__tests__/sub-agent-eval.service.test.ts b/packages/cli/src/modules/instance-ai/eval/__tests__/sub-agent-eval.service.test.ts index 14dc4ea0b99..869c988e227 100644 --- a/packages/cli/src/modules/instance-ai/eval/__tests__/sub-agent-eval.service.test.ts +++ b/packages/cli/src/modules/instance-ai/eval/__tests__/sub-agent-eval.service.test.ts @@ -10,15 +10,15 @@ function makeAgentResult( overrides: Partial<{ text: string; toolCalls: unknown[]; - toolResults: unknown[]; finishReason: string; }> = {}, ) { + const text = overrides.text ?? 'done'; return { - text: 'done', - toolCalls: [], - toolResults: [], - finishReason: 'stop', + runId: 'agent-run-1', + messages: [{ role: 'assistant', content: [{ type: 'text', text }] }], + toolCalls: overrides.toolCalls ?? [], + finishReason: overrides.finishReason ?? 'stop', ...overrides, }; } @@ -29,9 +29,9 @@ jest.mock('@n8n/instance-ai', () => ({ MAX_STEPS: { BUILDER: 60 }, createSubAgent: jest.fn(() => ({ generate: jest.fn().mockResolvedValue({ - text: 'done', + runId: 'agent-run-1', + messages: [{ role: 'assistant', content: [{ type: 'text', text: 'done' }] }], toolCalls: [], - toolResults: [], finishReason: 'stop', }), })), @@ -132,7 +132,7 @@ describe('SubAgentEvalService', () => { expect(result.error).toMatch(/timed out/i); }); - it('serializes mastra-shaped tool calls and results', async () => { + it('serializes native tool calls and results', async () => { adapter.createContext.mockReturnValue({ userId: user.id, workflowService: { @@ -143,12 +143,18 @@ describe('SubAgentEvalService', () => { const { createSubAgent } = jest.requireMock('@n8n/instance-ai'); createSubAgent.mockReturnValue({ - generate: jest.fn(async () => ({ - text: 'ok', - toolCalls: [{ payload: { toolName: 'nodes', args: { action: 'list' } } }], - toolResults: [{ payload: { toolName: 'nodes', result: { success: true, items: [] } } }], - finishReason: 'stop', - })), + generate: jest.fn(async () => + makeAgentResult({ + text: 'ok', + toolCalls: [ + { + tool: 'nodes', + input: { action: 'list' }, + output: { success: true, items: [] }, + }, + ], + }), + ), }); const result = await service.run(user, { role: 'builder', prompt: 'inspect' }); diff --git a/packages/cli/src/modules/instance-ai/filesystem/local-gateway.ts b/packages/cli/src/modules/instance-ai/filesystem/local-gateway.ts index bdc4ae71212..bb66264d124 100644 --- a/packages/cli/src/modules/instance-ai/filesystem/local-gateway.ts +++ b/packages/cli/src/modules/instance-ai/filesystem/local-gateway.ts @@ -48,7 +48,7 @@ export type LocalGatewayEvent = LocalGatewayRequestEvent | LocalGatewayDisconnec * 5. resolveRequest() resolves the pending promise → caller gets McpToolCallResult * * Resource-access confirmations (GATEWAY_CONFIRMATION_REQUIRED) are handled at the - * tool layer via Mastra's suspend()/resumeData mechanism — not here. + * tool layer via native agents suspend/resume data — not here. */ export class LocalGateway { private readonly pendingRequests = new Map(); @@ -126,7 +126,7 @@ export class LocalGateway { // Resolve with the result as-is (including isError responses) so the tool // layer (create-tools-from-mcp-server.ts) can inspect GATEWAY_CONFIRMATION_REQUIRED - // errors and handle them via Mastra suspend(). + // errors and handle them via native tool suspension. pending.resolve(result ?? { content: [] }); return true; } diff --git a/packages/cli/src/modules/instance-ai/instance-ai-settings.service.ts b/packages/cli/src/modules/instance-ai/instance-ai-settings.service.ts index 9022000daa1..039b61ff687 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai-settings.service.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai-settings.service.ts @@ -26,8 +26,8 @@ const ADMIN_SETTINGS_KEY = 'instanceAi.settings'; type UserInstanceAiPreferences = NonNullable; -/** Credential types we support and their Mastra provider mapping. */ -const CREDENTIAL_TO_MASTRA_PROVIDER: Record = { +/** Credential types we support and their model provider mapping. */ +const CREDENTIAL_TO_MODEL_PROVIDER: Record = { openAiApi: 'openai', anthropicApi: 'anthropic', googlePalmApi: 'google', @@ -40,7 +40,7 @@ const CREDENTIAL_TO_MASTRA_PROVIDER: Record = { cohereApi: 'cohere', }; -const SUPPORTED_CREDENTIAL_TYPES = Object.keys(CREDENTIAL_TO_MASTRA_PROVIDER); +const SUPPORTED_CREDENTIAL_TYPES = Object.keys(CREDENTIAL_TO_MODEL_PROVIDER); /** Fields that contain the base URL per credential type. */ const URL_FIELD_MAP: Record = { @@ -274,7 +274,7 @@ export class InstanceAiSettingsService { id: c.id, name: c.name, type: c.type, - provider: CREDENTIAL_TO_MASTRA_PROVIDER[c.type] ?? 'custom', + provider: CREDENTIAL_TO_MODEL_PROVIDER[c.type] ?? 'custom', })); } @@ -434,7 +434,7 @@ export class InstanceAiSettingsService { return this.envVarModelConfig(); } - const provider = CREDENTIAL_TO_MASTRA_PROVIDER[credential.type]; + const provider = CREDENTIAL_TO_MODEL_PROVIDER[credential.type]; if (!provider) { return this.envVarModelConfig(); } diff --git a/packages/cli/src/modules/instance-ai/instance-ai.controller.ts b/packages/cli/src/modules/instance-ai/instance-ai.controller.ts index 1dce3e585e9..91b0e67e030 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai.controller.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai.controller.ts @@ -557,7 +557,7 @@ export class InstanceAiController { } // Exclude snapshots for active/suspended runs — they have no matching - // assistant message in Mastra memory yet and would misalign the + // assistant message in native memory yet and would misalign the // positional snapshot-to-message matching in parseStoredMessages. const threadStatus = this.instanceAiService.getThreadStatus(threadId); const activeRunId = this.instanceAiService.getActiveRunId(threadId);