refactor(instance-ai): route native stream events

This commit is contained in:
Oleg Ivaniv 2026-05-05 10:40:10 +02:00
parent 69bc96a5c8
commit 18f05c9a89
No known key found for this signature in database
3 changed files with 103 additions and 9 deletions

View File

@ -301,6 +301,74 @@ describe('executeResumableStream', () => {
expect(result.confirmationEvent?.payload.requestId).toBe('request-1');
});
it('maps native agent stream chunks when stream source is native', async () => {
const eventBus = createEventBus();
const result = await executeResumableStream({
agent: {},
stream: {
runId: 'agent-run-1',
streamFormat: 'agent',
fullStream: fromChunks([
{ type: 'text-delta', delta: 'Working...' },
{
type: 'tool-call-suspended',
toolCallId: 'tool-call-1',
toolName: 'ask-user',
input: { prompt: 'Confirm?' },
suspendPayload: {
requestId: 'request-1',
message: 'Need approval',
},
},
]),
},
context: {
threadId: 'thread-1',
runId: 'run-1',
agentId: 'agent-1',
eventBus,
signal: new AbortController().signal,
logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn() },
},
control: { mode: 'manual' },
});
expect(result).toEqual(
expect.objectContaining({
status: 'suspended',
agentRunId: 'agent-run-1',
suspension: {
toolCallId: 'tool-call-1',
requestId: 'request-1',
toolName: 'ask-user',
},
}),
);
expect(eventBus.publish).toHaveBeenCalledWith(
'thread-1',
expect.objectContaining({
type: 'text-delta',
runId: 'run-1',
agentId: 'agent-1',
payload: { text: 'Working...' },
}),
);
expect(eventBus.publish).not.toHaveBeenCalledWith(
'thread-1',
expect.objectContaining({ type: 'confirmation-request' }),
);
expect(result.confirmationEvent?.type).toBe('confirmation-request');
expect(result.confirmationEvent?.payload).toEqual(
expect.objectContaining({
requestId: 'request-1',
toolCallId: 'tool-call-1',
toolName: 'ask-user',
args: { prompt: 'Confirm?' },
}),
);
});
it('returns errored status when stream contains an error chunk', async () => {
const eventBus = createEventBus();

View File

@ -3,16 +3,18 @@ import type { RunTree } from 'langsmith';
import type { InstanceAiEventBus } from '../event-bus';
import type { Logger } from '../logger';
import { mapMastraChunkToEvent } from '../stream/map-chunk';
import { mapAgentChunkToEvent, mapMastraChunkToEvent } from '../stream/map-chunk';
import { WorkSummaryAccumulator, type WorkSummary } from '../stream/work-summary-accumulator';
import { getTraceParentRun, setTraceParentOverride } from '../tracing/langsmith-tracing';
import { asResumable, parseSuspension } from '../utils/stream-helpers';
import type { SuspensionInfo } from '../utils/stream-helpers';
type ConfirmationRequestEvent = Extract<InstanceAiEvent, { type: 'confirmation-request' }>;
export type ResumableStreamFormat = 'mastra' | 'agent';
export interface ResumableStreamSource {
runId?: string;
streamFormat?: ResumableStreamFormat;
fullStream: AsyncIterable<unknown>;
text?: Promise<string>;
steps?: Promise<unknown[]>;
@ -1917,12 +1919,20 @@ export async function executeResumableStream(
hasError = true;
}
const event = mapMastraChunkToEvent(
options.context.runId,
options.context.agentId,
chunk,
currentResponseId,
);
const event =
activeSource.streamFormat === 'agent'
? mapAgentChunkToEvent(
options.context.runId,
options.context.agentId,
chunk,
currentResponseId,
)
: mapMastraChunkToEvent(
options.context.runId,
options.context.agentId,
chunk,
currentResponseId,
);
if (event) {
workSummaryAccumulator.observe(event);
let shouldPublishEvent = true;
@ -2020,7 +2030,7 @@ export async function executeResumableStream(
});
activeAgentRunId = (typeof resumed.runId === 'string' ? resumed.runId : '') || activeAgentRunId;
activeSource = resumed;
activeSource = { ...resumed, streamFormat: activeSource.streamFormat };
activeStream = resumed.fullStream;
text = resumed.text;
}

View File

@ -27,6 +27,20 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return value !== null && typeof value === 'object' && !Array.isArray(value);
}
const agentStreamChunkTypes = new Set<string>([
'finish',
'text-delta',
'reasoning-delta',
'tool-call-delta',
'error',
'message',
'tool-call-suspended',
]);
function isAgentStreamChunk(value: unknown): value is StreamChunk {
return isRecord(value) && typeof value.type === 'string' && agentStreamChunkTypes.has(value.type);
}
interface ErrorInfo {
content: string;
statusCode?: number;
@ -342,9 +356,11 @@ export function mapMastraChunkToEvent(
export function mapAgentChunkToEvent(
runId: string,
agentId: string,
chunk: StreamChunk,
chunk: unknown,
responseId?: string,
): InstanceAiEvent | null {
if (!isAgentStreamChunk(chunk)) return null;
if (chunk.type === 'text-delta') {
return mapMastraChunkToEvent(
runId,