refactor(instance-ai): normalize native stream results

This commit is contained in:
Oleg Ivaniv 2026-05-05 10:41:36 +02:00
parent 18f05c9a89
commit 7f37612d2f
No known key found for this signature in database
2 changed files with 126 additions and 6 deletions

View File

@ -36,6 +36,14 @@ async function* emptyStream() {
yield* [];
}
async function collectAsyncIterable(stream: AsyncIterable<unknown>) {
const chunks: unknown[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
return chunks;
}
describe('streamAgentRun', () => {
it('returns errored status when agent stream contains an error chunk', async () => {
jest.mocked(executeResumableStream).mockResolvedValue({
@ -210,4 +218,55 @@ describe('streamAgentRun', () => {
}),
);
});
it('normalizes native agent readable streams for the resumable executor', async () => {
const mockedExecuteResumableStream = jest.mocked(executeResumableStream);
mockedExecuteResumableStream.mockClear();
const nativeChunk = { type: 'text-delta', delta: 'All good' };
const readable = new ReadableStream<unknown>({
start(controller) {
controller.enqueue(nativeChunk);
controller.close();
},
});
const agent = {
stream: jest.fn().mockResolvedValue({
runId: 'agent-run-1',
stream: readable,
getState: jest.fn(),
}),
};
const eventBus = createEventBus();
mockedExecuteResumableStream.mockResolvedValue({
status: 'completed',
agentRunId: 'agent-run-1',
workSummary: emptyWorkSummary,
});
await streamAgentRun(
agent,
'hello',
{},
{
threadId: 'thread-1',
runId: 'run-1',
agentId: 'agent-1',
signal: new AbortController().signal,
eventBus,
logger: createLogger(),
},
);
const call = mockedExecuteResumableStream.mock.calls[0];
expect(call).toBeDefined();
const source = call?.[0].stream;
expect(source).toEqual(
expect.objectContaining({
runId: 'agent-run-1',
streamFormat: 'agent',
}),
);
await expect(collectAsyncIterable(source.fullStream)).resolves.toEqual([nativeChunk]);
});
});

View File

@ -1,4 +1,5 @@
import type { InstanceAiEvent } from '@n8n/api-types';
import type { StreamResult } from '@n8n/agents';
import type { InstanceAiEventBus } from '../event-bus';
import type { Logger } from '../logger';
@ -10,11 +11,16 @@ import {
type TraceStatus,
} from './resumable-stream-executor';
import { getTraceParentRun, withTraceParentContext } from '../tracing/langsmith-tracing';
import { asResumable } from '../utils/stream-helpers';
import { asResumable, isRecord } from '../utils/stream-helpers';
import type { SuspensionInfo } from '../utils/stream-helpers';
type StreamableAgentStreamResult = ResumableStreamSource | StreamResult;
export interface StreamableAgent {
stream: (input: unknown, options: Record<string, unknown>) => Promise<ResumableStreamSource>;
stream: (
input: unknown,
options: Record<string, unknown>,
) => Promise<StreamableAgentStreamResult>;
}
export interface StreamRunOptions {
@ -34,6 +40,59 @@ export interface StreamRunResult {
confirmationEvent?: Extract<InstanceAiEvent, { type: 'confirmation-request' }>;
}
function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
return (
value !== null &&
typeof value === 'object' &&
typeof Reflect.get(value, Symbol.asyncIterator) === 'function'
);
}
function isReadableStream(value: unknown): value is ReadableStream<unknown> {
return (
value !== null &&
typeof value === 'object' &&
typeof Reflect.get(value, 'getReader') === 'function'
);
}
function isResumableStreamSource(value: unknown): value is ResumableStreamSource {
return isRecord(value) && isAsyncIterable(value.fullStream);
}
function isNativeStreamResult(value: unknown): value is StreamResult {
return isRecord(value) && isReadableStream(value.stream);
}
async function* readableStreamToAsyncIterable(stream: ReadableStream<unknown>) {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
}
function normalizeStreamSource(result: StreamableAgentStreamResult): ResumableStreamSource {
if (isResumableStreamSource(result)) {
return result;
}
if (isNativeStreamResult(result)) {
return {
runId: result.runId,
streamFormat: 'agent',
fullStream: readableStreamToAsyncIterable(result.stream),
};
}
throw new Error('Unsupported agent stream result');
}
export async function streamAgentRun(
agent: StreamableAgent,
input: unknown,
@ -47,8 +106,9 @@ export async function streamAgentRun(
...streamOptions,
...(llmStepTraceHooks?.executionOptions ?? {}),
});
const agentRunId = typeof result.runId === 'string' ? result.runId : '';
return await consumeStream(agent, result, { ...options, agentRunId, llmStepTraceHooks });
const stream = normalizeStreamSource(result);
const agentRunId = typeof stream.runId === 'string' ? stream.runId : '';
return await consumeStream(agent, stream, { ...options, agentRunId, llmStepTraceHooks });
});
}
@ -65,8 +125,9 @@ export async function resumeAgentRun(
...resumeOptions,
...(llmStepTraceHooks?.executionOptions ?? {}),
});
const agentRunId = (typeof resumed.runId === 'string' && resumed.runId) || options.agentRunId;
return await consumeStream(agent, resumed, { ...options, agentRunId, llmStepTraceHooks });
const stream = normalizeStreamSource(resumed);
const agentRunId = (typeof stream.runId === 'string' && stream.runId) || options.agentRunId;
return await consumeStream(agent, stream, { ...options, agentRunId, llmStepTraceHooks });
});
}