From 5d431aabeb9a1ca560bc97d0c9fd60ea72c95064 Mon Sep 17 00:00:00 2001 From: Jaakko Husso Date: Wed, 22 Oct 2025 11:11:05 +0300 Subject: [PATCH] feat(core): Capture the stream chunks and save partial messages (no-changelog) (#21016) --- .../src/modules/chat-hub/chat-hub.service.ts | 33 ++++++++++--- .../chat-hub/chat-message.repository.ts | 2 +- .../src/modules/chat-hub/stream-capturer.ts | 49 +++++++++++++++++++ .../src/features/ai/chatHub/chat.api.ts | 2 +- .../src/features/ai/chatHub/chat.store.ts | 3 +- .../src/features/ai/chatHub/chat.types.ts | 14 ------ 6 files changed, 78 insertions(+), 25 deletions(-) create mode 100644 packages/cli/src/modules/chat-hub/stream-capturer.ts diff --git a/packages/cli/src/modules/chat-hub/chat-hub.service.ts b/packages/cli/src/modules/chat-hub/chat-hub.service.ts index 7af3032734f..48a0e32e6aa 100644 --- a/packages/cli/src/modules/chat-hub/chat-hub.service.ts +++ b/packages/cli/src/modules/chat-hub/chat-hub.service.ts @@ -39,6 +39,8 @@ import { type IWorkflowExecuteAdditionalData, type StartNodeData, type IRun, + jsonParse, + StructuredChunk, } from 'n8n-workflow'; import { v4 as uuidv4 } from 'uuid'; @@ -52,6 +54,7 @@ import { getBase } from '@/workflow-execute-additional-data'; import { WorkflowExecutionService } from '@/workflows/workflow-execution.service'; import { ChatHubMessage } from './chat-hub-message.entity'; +import { ChatHubSession } from './chat-hub-session.entity'; import type { HumanMessagePayload, RegenerateMessagePayload, @@ -62,7 +65,7 @@ import type { import { ChatHubMessageRepository } from './chat-message.repository'; import { ChatHubSessionRepository } from './chat-session.repository'; import { getMaxContextWindowTokens } from './context-limits'; -import { ChatHubSession } from './chat-hub-session.entity'; +import { captureResponseWrites } from './stream-capturer'; const providerNodeTypeMapping: Record = { openai: { @@ -578,6 +581,18 @@ export class ChatHubService { `Starting execution of workflow "${workflowData.name}" with ID ${workflowData.id}`, ); + // Capture the streaming response as it's being generated to save + // partial messages in the database when generation gets cancelled. + let message = ''; + const onChunk = (chunk: string) => { + const data = jsonParse(chunk); + if (data && data.type === 'item' && typeof data.content === 'string') { + message += data.content; + } + }; + + const stream = captureResponseWrites(res, onChunk); + const { executionId } = await this.workflowExecutionService.executeManually( { workflowData, @@ -587,7 +602,7 @@ export class ChatHubService { user, undefined, true, - res, + stream, ); if (!executionId) { throw new OperationalError('There was a problem starting the chat execution.'); @@ -598,7 +613,7 @@ export class ChatHubService { sessionId, executionId, previousMessageId, - message: '', + message, selectedModel, retryOfMessageId, status: 'running', @@ -621,9 +636,8 @@ export class ChatHubService { } if (execution.status === 'canceled') { - const message = 'Generation cancelled.'; await this.messageRepository.updateChatMessage(replyId, { - content: message, + content: message || 'Generation cancelled.', status: 'cancelled', }); return; @@ -645,13 +659,16 @@ export class ChatHubService { throw new OperationalError(message); } - const message = this.getAIOutput(execution); - if (!message) { + // TODO: We should consider can we just save the output from the captured stream always instead + // of parsing it from execution data, which seems error prone, especially with custom workflows. + // That could make handling multiple agents, multiple runes, tool executions etc easier...? + const output = this.getAIOutput(execution); + if (!output) { throw new OperationalError('No response generated'); } await this.messageRepository.updateChatMessage(replyId, { - content: message, + content: output, status: 'success', }); } catch (error: unknown) { diff --git a/packages/cli/src/modules/chat-hub/chat-message.repository.ts b/packages/cli/src/modules/chat-hub/chat-message.repository.ts index 0f7ad06afcd..b9dc8e799d2 100644 --- a/packages/cli/src/modules/chat-hub/chat-message.repository.ts +++ b/packages/cli/src/modules/chat-hub/chat-message.repository.ts @@ -26,7 +26,7 @@ export class ChatHubMessageRepository extends Repository { async updateChatMessage( id: ChatMessageId, - fields: Partial<{ status: ChatHubMessageStatus; content: string }>, + fields: { status?: ChatHubMessageStatus; content?: string }, trx?: EntityManager, ) { return await withTransaction(this.manager, trx, async (em) => { diff --git a/packages/cli/src/modules/chat-hub/stream-capturer.ts b/packages/cli/src/modules/chat-hub/stream-capturer.ts new file mode 100644 index 00000000000..8926c347709 --- /dev/null +++ b/packages/cli/src/modules/chat-hub/stream-capturer.ts @@ -0,0 +1,49 @@ +import type { Response } from 'express'; +import type { ServerResponse } from 'http'; + +type Write = ServerResponse['write']; + +export type ChunkListenerCb = (chunk: string) => void; + +export function captureResponseWrites(res: T, onChunk: ChunkListenerCb): T { + const originalWrite = res.write.bind(res) as Write; + + const writeListener = (chunk: string | Buffer, enc?: BufferEncoding) => { + try { + const text = Buffer.isBuffer(chunk) ? chunk.toString(enc ?? 'utf8') : String(chunk); + void onChunk(text); + } catch { + // Don't break the stream on listener errors + } + }; + + function write(chunk: string | Buffer, callbackFn?: (e?: Error | null) => void): boolean; + function write( + chunk: string | Buffer, + encoding: BufferEncoding, + callbackFn?: (e?: Error | null) => void, + ): boolean; + function write( + chunk: string | Buffer, + encodingOrCallbackFn?: BufferEncoding | ((e?: Error | null) => void), + callbackFn?: (e?: Error | null) => void, + ): boolean { + // TODO: We could also change the output that gets streamed from execution engine here, + // perhaps injecting the messageId or other metadata into the chunks. That could make + // AI responding with multiple messages (tools, multiple agents etc) easier to handle? + if (!encodingOrCallbackFn) { + writeListener(chunk); + return originalWrite(chunk); + } else if (typeof encodingOrCallbackFn === 'function') { + writeListener(chunk); + return originalWrite(chunk, encodingOrCallbackFn); + } else { + writeListener(chunk, encodingOrCallbackFn); + return originalWrite(chunk, encodingOrCallbackFn, callbackFn); + } + } + + res.write = write; + + return res; +} diff --git a/packages/frontend/editor-ui/src/features/ai/chatHub/chat.api.ts b/packages/frontend/editor-ui/src/features/ai/chatHub/chat.api.ts index b86df5daf14..5fefe78700e 100644 --- a/packages/frontend/editor-ui/src/features/ai/chatHub/chat.api.ts +++ b/packages/frontend/editor-ui/src/features/ai/chatHub/chat.api.ts @@ -11,7 +11,7 @@ import type { ChatSessionId, ChatMessageId, } from '@n8n/api-types'; -import type { StructuredChunk } from './chat.types'; +import type { StructuredChunk } from 'n8n-workflow'; // Workflows stream data as newline separated JSON objects (jsonl) const STREAM_SEPARATOR = '\n'; diff --git a/packages/frontend/editor-ui/src/features/ai/chatHub/chat.store.ts b/packages/frontend/editor-ui/src/features/ai/chatHub/chat.store.ts index 7f712e93ded..342a460f66c 100644 --- a/packages/frontend/editor-ui/src/features/ai/chatHub/chat.store.ts +++ b/packages/frontend/editor-ui/src/features/ai/chatHub/chat.store.ts @@ -23,7 +23,8 @@ import type { ChatSessionId, ChatHubMessageDto, } from '@n8n/api-types'; -import type { StructuredChunk, CredentialsMap, ChatMessage, ChatConversation } from './chat.types'; +import type { CredentialsMap, ChatMessage, ChatConversation } from './chat.types'; +import type { StructuredChunk } from 'n8n-workflow'; export const useChatStore = defineStore(CHAT_STORE, () => { const rootStore = useRootStore(); diff --git a/packages/frontend/editor-ui/src/features/ai/chatHub/chat.types.ts b/packages/frontend/editor-ui/src/features/ai/chatHub/chat.types.ts index 1111961993b..148feee318f 100644 --- a/packages/frontend/editor-ui/src/features/ai/chatHub/chat.types.ts +++ b/packages/frontend/editor-ui/src/features/ai/chatHub/chat.types.ts @@ -47,20 +47,6 @@ export interface StreamOutput { messages: StreamChunk[]; } -// From @n8n/chat -export type ChunkType = 'begin' | 'item' | 'end' | 'error'; -export interface StructuredChunk { - type: ChunkType; - content?: string; - metadata: { - nodeId: string; - nodeName: string; - timestamp: number; - runIndex: number; - itemIndex: number; - }; -} - export interface NodeStreamingState { nodeId: string; chunks: string[];