refactor(instance-ai): accumulate native stream text

This commit is contained in:
Oleg Ivaniv 2026-05-05 11:30:24 +02:00
parent e7c854eadc
commit ad31edcdd7
No known key found for this signature in database
3 changed files with 58 additions and 1 deletions

View File

@ -520,6 +520,7 @@ describe('executeResumableStream', () => {
);
expect(result.status).toBe('completed');
expect(result.agentRunId).toBe('agent-run-2');
await expect(result.text).resolves.toBe('Done.');
expect(eventBus.publish).toHaveBeenCalledWith(
'thread-1',
expect.objectContaining({

View File

@ -277,5 +277,6 @@ describe('streamAgentRun', () => {
}),
);
await expect(collectAsyncIterable(source.fullStream)).resolves.toEqual([nativeChunk]);
await expect(source.text).resolves.toBe('All good');
});
});

View File

@ -192,6 +192,58 @@ function isNativeStreamResult(value: unknown): value is StreamResult {
return isRecord(value) && isReadableStream(value.stream);
}
function extractContentText(content: unknown): string {
if (typeof content === 'string') {
return content;
}
if (!Array.isArray(content)) {
return '';
}
return content
.map((part) => {
if (!isRecord(part) || part.type !== 'text') {
return '';
}
return typeof part.text === 'string' ? part.text : '';
})
.join('');
}
async function collectNativeStreamText(stream: ReadableStream<unknown>): Promise<string> {
const reader = stream.getReader();
let deltaText = '';
let messageText = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) return deltaText || messageText;
if (!isRecord(value)) {
continue;
}
if (value.type === 'text-delta' && typeof value.delta === 'string') {
deltaText += value.delta;
continue;
}
if (value.type !== 'message' || !isRecord(value.message)) {
continue;
}
if (value.message.role === 'assistant') {
messageText += extractContentText(value.message.content);
}
}
} finally {
reader.releaseLock();
}
}
async function* readableStreamToAsyncIterable(stream: ReadableStream<unknown>) {
const reader = stream.getReader();
try {
@ -216,10 +268,13 @@ export function normalizeStreamSource(
}
if (isNativeStreamResult(result)) {
const [eventStream, textStream] = result.stream.tee();
return {
runId: result.runId,
streamFormat: 'agent',
fullStream: readableStreamToAsyncIterable(result.stream),
fullStream: readableStreamToAsyncIterable(eventStream),
text: collectNativeStreamText(textStream),
};
}