diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts index ecbe55e918b..5db87b6d3bf 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts @@ -218,6 +218,17 @@ async function* fromChunks(chunks: unknown[]) { } } +function readableFromChunks(chunks: unknown[]) { + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); +} + function createDeferred() { let resolve!: (value: T | PromiseLike) => void; let reject!: (reason?: unknown) => void; @@ -461,6 +472,63 @@ describe('executeResumableStream', () => { ); }); + it('auto-resumes native agent streams', async () => { + const eventBus = createEventBus(); + const resume = jest.fn().mockResolvedValue({ + runId: 'agent-run-2', + stream: readableFromChunks([{ type: 'text-delta', delta: 'Done.' }]), + getState: jest.fn(), + }); + const waitForConfirmation = jest.fn().mockResolvedValue({ approved: true }); + + const result = await executeResumableStream({ + agent: { resume }, + stream: { + runId: 'agent-run-1', + streamFormat: 'agent', + fullStream: fromChunks([ + { + type: 'tool-call-suspended', + toolCallId: 'tool-call-1', + toolName: 'pause-for-user', + suspendPayload: { + requestId: 'request-1', + message: 'Please confirm', + }, + }, + ]), + }, + context: { + threadId: 'thread-1', + runId: 'run-1', + agentId: 'agent-1', + eventBus, + signal: new AbortController().signal, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() }, + }, + control: { + mode: 'auto', + waitForConfirmation, + }, + }); + + expect(waitForConfirmation).toHaveBeenCalledWith('request-1'); + expect(resume).toHaveBeenCalledWith( + 'stream', + { approved: true }, + { runId: 'agent-run-1', toolCallId: 'tool-call-1' }, + ); + expect(result.status).toBe('completed'); + expect(result.agentRunId).toBe('agent-run-2'); + expect(eventBus.publish).toHaveBeenCalledWith( + 'thread-1', + expect.objectContaining({ + type: 'text-delta', + payload: { text: 'Done.' }, + }), + ); + }); + it('registers auto confirmations before the stream finishes draining', async () => { const eventBus = createEventBus(); const finishGate = createDeferred(); diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts index 1652a3bf129..c960c4eab41 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts @@ -2,10 +2,19 @@ import type { WorkSummary } from '../../stream/work-summary-accumulator'; import { executeResumableStream } from '../resumable-stream-executor'; import { streamAgentRun } from '../stream-runner'; -jest.mock('../resumable-stream-executor', () => ({ - executeResumableStream: jest.fn(), - createLlmStepTraceHooks: jest.fn(), -})); +jest.mock('../resumable-stream-executor', () => { + const actual = + // eslint-disable-next-line @typescript-eslint/no-require-imports + jest.requireActual( + '../resumable-stream-executor', + ); + + return { + ...actual, + executeResumableStream: jest.fn(), + createLlmStepTraceHooks: jest.fn(), + }; +}); const emptyWorkSummary: WorkSummary = { toolCalls: [], totalToolCalls: 0, totalToolErrors: 0 }; diff --git a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts index 8bf9b9ef20e..cb36108aa99 100644 --- a/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts +++ b/packages/@n8n/instance-ai/src/runtime/resumable-stream-executor.ts @@ -1,4 +1,5 @@ import type { InstanceAiEvent } from '@n8n/api-types'; +import type { StreamResult } from '@n8n/agents'; import type { RunTree } from 'langsmith'; import type { InstanceAiEventBus } from '../event-bus'; @@ -6,7 +7,7 @@ import type { Logger } from '../logger'; import { mapAgentChunkToEvent, mapMastraChunkToEvent } from '../stream/map-chunk'; import { WorkSummaryAccumulator, type WorkSummary } from '../stream/work-summary-accumulator'; import { getTraceParentRun, setTraceParentOverride } from '../tracing/langsmith-tracing'; -import { asResumable, parseSuspension } from '../utils/stream-helpers'; +import { parseSuspension, resumeStream } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; type ConfirmationRequestEvent = Extract; @@ -167,6 +168,64 @@ function isRecord(value: unknown): value is Record { return value !== null && typeof value === 'object' && !Array.isArray(value); } +function isAsyncIterable(value: unknown): value is AsyncIterable { + return ( + value !== null && + typeof value === 'object' && + typeof Reflect.get(value, Symbol.asyncIterator) === 'function' + ); +} + +function isReadableStream(value: unknown): value is ReadableStream { + return ( + value !== null && + typeof value === 'object' && + typeof Reflect.get(value, 'getReader') === 'function' + ); +} + +function isResumableStreamSource(value: unknown): value is ResumableStreamSource { + return isRecord(value) && isAsyncIterable(value.fullStream); +} + +function isNativeStreamResult(value: unknown): value is StreamResult { + return isRecord(value) && isReadableStream(value.stream); +} + +async function* readableStreamToAsyncIterable(stream: ReadableStream) { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) return; + yield value; + } + } finally { + reader.releaseLock(); + } +} + +export function normalizeStreamSource( + result: unknown, + options?: { streamFormat?: ResumableStreamFormat }, +): ResumableStreamSource { + if (isResumableStreamSource(result)) { + return options?.streamFormat && !result.streamFormat + ? { ...result, streamFormat: options.streamFormat } + : result; + } + + if (isNativeStreamResult(result)) { + return { + runId: result.runId, + streamFormat: 'agent', + fullStream: readableStreamToAsyncIterable(result.stream), + }; + } + + throw new Error('Unsupported agent stream result'); +} + function getFiniteNumber(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) ? value : undefined; } @@ -2024,15 +2083,19 @@ export async function executeResumableStream( runId: activeAgentRunId, toolCallId: suspension.toolCallId, }; - const resumed = await asResumable(options.agent).resumeStream(resumeData, { + const resumed = await resumeStream(options.agent, resumeData, { ...resumeOptions, ...(options.llmStepTraceHooks?.executionOptions ?? {}), }); + const resumedSource = normalizeStreamSource(resumed, { + streamFormat: activeSource.streamFormat, + }); - activeAgentRunId = (typeof resumed.runId === 'string' ? resumed.runId : '') || activeAgentRunId; - activeSource = { ...resumed, streamFormat: activeSource.streamFormat }; - activeStream = resumed.fullStream; - text = resumed.text; + activeAgentRunId = + (typeof resumedSource.runId === 'string' ? resumedSource.runId : '') || activeAgentRunId; + activeSource = resumedSource; + activeStream = resumedSource.fullStream; + text = resumedSource.text; } } diff --git a/packages/@n8n/instance-ai/src/runtime/stream-runner.ts b/packages/@n8n/instance-ai/src/runtime/stream-runner.ts index 04c8b722c6f..c15fe9a4dd3 100644 --- a/packages/@n8n/instance-ai/src/runtime/stream-runner.ts +++ b/packages/@n8n/instance-ai/src/runtime/stream-runner.ts @@ -1,5 +1,4 @@ import type { InstanceAiEvent } from '@n8n/api-types'; -import type { StreamResult } from '@n8n/agents'; import type { InstanceAiEventBus } from '../event-bus'; import type { Logger } from '../logger'; @@ -7,20 +6,16 @@ import { createLlmStepTraceHooks, executeResumableStream, type LlmStepTraceHooks, + normalizeStreamSource, type ResumableStreamSource, type TraceStatus, } from './resumable-stream-executor'; import { getTraceParentRun, withTraceParentContext } from '../tracing/langsmith-tracing'; -import { asResumable, isRecord } from '../utils/stream-helpers'; +import { resumeStream } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; -type StreamableAgentStreamResult = ResumableStreamSource | StreamResult; - export interface StreamableAgent { - stream: ( - input: unknown, - options: Record, - ) => Promise; + stream: (input: unknown, options: Record) => Promise; } export interface StreamRunOptions { @@ -40,59 +35,6 @@ export interface StreamRunResult { confirmationEvent?: Extract; } -function isAsyncIterable(value: unknown): value is AsyncIterable { - return ( - value !== null && - typeof value === 'object' && - typeof Reflect.get(value, Symbol.asyncIterator) === 'function' - ); -} - -function isReadableStream(value: unknown): value is ReadableStream { - return ( - value !== null && - typeof value === 'object' && - typeof Reflect.get(value, 'getReader') === 'function' - ); -} - -function isResumableStreamSource(value: unknown): value is ResumableStreamSource { - return isRecord(value) && isAsyncIterable(value.fullStream); -} - -function isNativeStreamResult(value: unknown): value is StreamResult { - return isRecord(value) && isReadableStream(value.stream); -} - -async function* readableStreamToAsyncIterable(stream: ReadableStream) { - const reader = stream.getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) return; - yield value; - } - } finally { - reader.releaseLock(); - } -} - -function normalizeStreamSource(result: StreamableAgentStreamResult): ResumableStreamSource { - if (isResumableStreamSource(result)) { - return result; - } - - if (isNativeStreamResult(result)) { - return { - runId: result.runId, - streamFormat: 'agent', - fullStream: readableStreamToAsyncIterable(result.stream), - }; - } - - throw new Error('Unsupported agent stream result'); -} - export async function streamAgentRun( agent: StreamableAgent, input: unknown, @@ -121,7 +63,7 @@ export async function resumeAgentRun( const resumeTraceParent = getTraceParentRun(); return await withTraceParentContext(resumeTraceParent, async () => { const llmStepTraceHooks = createLlmStepTraceHooks(resumeTraceParent); - const resumed = await asResumable(agent).resumeStream(resumeData, { + const resumed = await resumeStream(agent, resumeData, { ...resumeOptions, ...(llmStepTraceHooks?.executionOptions ?? {}), }); diff --git a/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts b/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts index e33d07c7868..0c0ca1c5545 100644 --- a/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts +++ b/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts @@ -1,4 +1,4 @@ -import { isRecord, parseSuspension, asResumable } from '../stream-helpers'; +import { isRecord, parseSuspension, asResumable, resumeStream } from '../stream-helpers'; describe('isRecord', () => { it('returns true for plain objects', () => { @@ -127,3 +127,31 @@ describe('asResumable', () => { expect(resumable.resumeStream).toBe(agent.resumeStream); }); }); + +describe('resumeStream', () => { + it('uses Mastra-style resumeStream when available', async () => { + const resumed = { runId: 'run-2' }; + const agent = { resumeStream: jest.fn().mockResolvedValue(resumed) }; + + await expect(resumeStream(agent, { approved: true }, { runId: 'run-1' })).resolves.toBe( + resumed, + ); + expect(agent.resumeStream).toHaveBeenCalledWith({ approved: true }, { runId: 'run-1' }); + }); + + it('uses native agent resume in stream mode when resumeStream is absent', async () => { + const resumed = { runId: 'run-2' }; + const agent = { resume: jest.fn().mockResolvedValue(resumed) }; + + await expect(resumeStream(agent, { approved: true }, { runId: 'run-1' })).resolves.toBe( + resumed, + ); + expect(agent.resume).toHaveBeenCalledWith('stream', { approved: true }, { runId: 'run-1' }); + }); + + it('throws when the agent cannot resume streams', async () => { + await expect(resumeStream({}, { approved: true }, { runId: 'run-1' })).rejects.toThrow( + 'Agent does not support stream resume', + ); + }); +}); diff --git a/packages/@n8n/instance-ai/src/utils/stream-helpers.ts b/packages/@n8n/instance-ai/src/utils/stream-helpers.ts index cec7046052c..a2e10f55e1e 100644 --- a/packages/@n8n/instance-ai/src/utils/stream-helpers.ts +++ b/packages/@n8n/instance-ai/src/utils/stream-helpers.ts @@ -34,13 +34,38 @@ export function parseSuspension(chunk: unknown): SuspensionInfo | null { /** Type for Mastra's resumeStream method (not exported by the framework). */ export interface Resumable { - resumeStream: ( + resumeStream?: ( data: Record, options: Record, - ) => Promise<{ runId?: string; fullStream: AsyncIterable; text: Promise }>; + ) => Promise; + resume?: ( + method: 'stream', + data: Record, + options: Record, + ) => Promise; } /** Cast an agent to Resumable for suspend/resume operations. */ export function asResumable(agent: unknown): Resumable { return agent as Resumable; } + +export async function resumeStream( + agent: unknown, + data: Record, + options: Record, +): Promise { + if (!isRecord(agent)) { + throw new Error('Agent does not support stream resume'); + } + + if (typeof agent.resumeStream === 'function') { + return await agent.resumeStream(data, options); + } + + if (typeof agent.resume === 'function') { + return await agent.resume('stream', data, options); + } + + throw new Error('Agent does not support stream resume'); +}