feat(core): Capture the stream chunks and save partial messages (no-changelog) (#21016)

This commit is contained in:
Jaakko Husso 2025-10-22 11:11:05 +03:00 committed by GitHub
parent 86d291233f
commit 5d431aabeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 78 additions and 25 deletions

View File

@ -39,6 +39,8 @@ import {
type IWorkflowExecuteAdditionalData,
type StartNodeData,
type IRun,
jsonParse,
StructuredChunk,
} from 'n8n-workflow';
import { v4 as uuidv4 } from 'uuid';
@ -52,6 +54,7 @@ import { getBase } from '@/workflow-execute-additional-data';
import { WorkflowExecutionService } from '@/workflows/workflow-execution.service';
import { ChatHubMessage } from './chat-hub-message.entity';
import { ChatHubSession } from './chat-hub-session.entity';
import type {
HumanMessagePayload,
RegenerateMessagePayload,
@ -62,7 +65,7 @@ import type {
import { ChatHubMessageRepository } from './chat-message.repository';
import { ChatHubSessionRepository } from './chat-session.repository';
import { getMaxContextWindowTokens } from './context-limits';
import { ChatHubSession } from './chat-hub-session.entity';
import { captureResponseWrites } from './stream-capturer';
const providerNodeTypeMapping: Record<ChatHubProvider, INodeTypeNameVersion> = {
openai: {
@ -578,6 +581,18 @@ export class ChatHubService {
`Starting execution of workflow "${workflowData.name}" with ID ${workflowData.id}`,
);
// Capture the streaming response as it's being generated to save
// partial messages in the database when generation gets cancelled.
let message = '';
const onChunk = (chunk: string) => {
const data = jsonParse<StructuredChunk>(chunk);
if (data && data.type === 'item' && typeof data.content === 'string') {
message += data.content;
}
};
const stream = captureResponseWrites(res, onChunk);
const { executionId } = await this.workflowExecutionService.executeManually(
{
workflowData,
@ -587,7 +602,7 @@ export class ChatHubService {
user,
undefined,
true,
res,
stream,
);
if (!executionId) {
throw new OperationalError('There was a problem starting the chat execution.');
@ -598,7 +613,7 @@ export class ChatHubService {
sessionId,
executionId,
previousMessageId,
message: '',
message,
selectedModel,
retryOfMessageId,
status: 'running',
@ -621,9 +636,8 @@ export class ChatHubService {
}
if (execution.status === 'canceled') {
const message = 'Generation cancelled.';
await this.messageRepository.updateChatMessage(replyId, {
content: message,
content: message || 'Generation cancelled.',
status: 'cancelled',
});
return;
@ -645,13 +659,16 @@ export class ChatHubService {
throw new OperationalError(message);
}
const message = this.getAIOutput(execution);
if (!message) {
// TODO: We should consider can we just save the output from the captured stream always instead
// of parsing it from execution data, which seems error prone, especially with custom workflows.
// That could make handling multiple agents, multiple runes, tool executions etc easier...?
const output = this.getAIOutput(execution);
if (!output) {
throw new OperationalError('No response generated');
}
await this.messageRepository.updateChatMessage(replyId, {
content: message,
content: output,
status: 'success',
});
} catch (error: unknown) {

View File

@ -26,7 +26,7 @@ export class ChatHubMessageRepository extends Repository<ChatHubMessage> {
async updateChatMessage(
id: ChatMessageId,
fields: Partial<{ status: ChatHubMessageStatus; content: string }>,
fields: { status?: ChatHubMessageStatus; content?: string },
trx?: EntityManager,
) {
return await withTransaction(this.manager, trx, async (em) => {

View File

@ -0,0 +1,49 @@
import type { Response } from 'express';
import type { ServerResponse } from 'http';
type Write = ServerResponse['write'];
export type ChunkListenerCb = (chunk: string) => void;
export function captureResponseWrites<T extends Response>(res: T, onChunk: ChunkListenerCb): T {
const originalWrite = res.write.bind(res) as Write;
const writeListener = (chunk: string | Buffer, enc?: BufferEncoding) => {
try {
const text = Buffer.isBuffer(chunk) ? chunk.toString(enc ?? 'utf8') : String(chunk);
void onChunk(text);
} catch {
// Don't break the stream on listener errors
}
};
function write(chunk: string | Buffer, callbackFn?: (e?: Error | null) => void): boolean;
function write(
chunk: string | Buffer,
encoding: BufferEncoding,
callbackFn?: (e?: Error | null) => void,
): boolean;
function write(
chunk: string | Buffer,
encodingOrCallbackFn?: BufferEncoding | ((e?: Error | null) => void),
callbackFn?: (e?: Error | null) => void,
): boolean {
// TODO: We could also change the output that gets streamed from execution engine here,
// perhaps injecting the messageId or other metadata into the chunks. That could make
// AI responding with multiple messages (tools, multiple agents etc) easier to handle?
if (!encodingOrCallbackFn) {
writeListener(chunk);
return originalWrite(chunk);
} else if (typeof encodingOrCallbackFn === 'function') {
writeListener(chunk);
return originalWrite(chunk, encodingOrCallbackFn);
} else {
writeListener(chunk, encodingOrCallbackFn);
return originalWrite(chunk, encodingOrCallbackFn, callbackFn);
}
}
res.write = write;
return res;
}

View File

@ -11,7 +11,7 @@ import type {
ChatSessionId,
ChatMessageId,
} from '@n8n/api-types';
import type { StructuredChunk } from './chat.types';
import type { StructuredChunk } from 'n8n-workflow';
// Workflows stream data as newline separated JSON objects (jsonl)
const STREAM_SEPARATOR = '\n';

View File

@ -23,7 +23,8 @@ import type {
ChatSessionId,
ChatHubMessageDto,
} from '@n8n/api-types';
import type { StructuredChunk, CredentialsMap, ChatMessage, ChatConversation } from './chat.types';
import type { CredentialsMap, ChatMessage, ChatConversation } from './chat.types';
import type { StructuredChunk } from 'n8n-workflow';
export const useChatStore = defineStore(CHAT_STORE, () => {
const rootStore = useRootStore();

View File

@ -47,20 +47,6 @@ export interface StreamOutput {
messages: StreamChunk[];
}
// From @n8n/chat
export type ChunkType = 'begin' | 'item' | 'end' | 'error';
export interface StructuredChunk {
type: ChunkType;
content?: string;
metadata: {
nodeId: string;
nodeName: string;
timestamp: number;
runIndex: number;
itemIndex: number;
};
}
export interface NodeStreamingState {
nodeId: string;
chunks: string[];