mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-27 14:57:21 +02:00
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Benjamin Schroth <benjamin@n8n.io>
95 lines
3.1 KiB
TypeScript
95 lines
3.1 KiB
TypeScript
import type { StreamEvent } from '@langchain/core/dist/tracers/event_stream';
|
|
import type { IterableReadableStream } from '@langchain/core/dist/utils/stream';
|
|
import type { AIMessageChunk, MessageContentText } from '@langchain/core/messages';
|
|
import type { IExecuteFunctions } from 'n8n-workflow';
|
|
|
|
import type { AgentResult, ToolCallRequest } from './types';
|
|
|
|
/**
|
|
* Processes the event stream from a streaming agent execution.
|
|
* Handles streaming chunks, tool calls, and intermediate steps.
|
|
*
|
|
* This is a generalized version that can be used across different agent types
|
|
* (Tools Agent, OpenAI Functions Agent, etc.).
|
|
*
|
|
* @param ctx - The execution context
|
|
* @param eventStream - The stream of events from the agent
|
|
* @param itemIndex - The current item index
|
|
* @returns AgentResult containing output and optional tool calls/steps
|
|
*/
|
|
export async function processEventStream(
|
|
ctx: IExecuteFunctions,
|
|
eventStream: IterableReadableStream<StreamEvent>,
|
|
itemIndex: number,
|
|
): Promise<AgentResult> {
|
|
const agentResult: AgentResult = {
|
|
output: '',
|
|
};
|
|
|
|
const toolCalls: ToolCallRequest[] = [];
|
|
|
|
ctx.sendChunk('begin', itemIndex);
|
|
for await (const event of eventStream) {
|
|
// Stream chat model tokens as they come in
|
|
switch (event.event) {
|
|
case 'on_chat_model_stream':
|
|
const chunk = event.data?.chunk as AIMessageChunk;
|
|
if (chunk?.content) {
|
|
const chunkContent = chunk.content;
|
|
let chunkText = '';
|
|
if (Array.isArray(chunkContent)) {
|
|
for (const message of chunkContent) {
|
|
if (message?.type === 'text') {
|
|
chunkText += (message as MessageContentText)?.text;
|
|
}
|
|
}
|
|
} else if (typeof chunkContent === 'string') {
|
|
chunkText = chunkContent;
|
|
}
|
|
ctx.sendChunk('item', itemIndex, chunkText);
|
|
|
|
agentResult.output += chunkText;
|
|
}
|
|
break;
|
|
case 'on_chat_model_end':
|
|
// Capture full LLM response with tool calls for intermediate steps
|
|
if (event.data) {
|
|
const chatModelData = event.data;
|
|
const output = chatModelData.output;
|
|
|
|
// Check if this LLM response contains tool calls
|
|
if (output?.tool_calls && output.tool_calls.length > 0) {
|
|
// Collect tool calls for request building
|
|
// Note: For Gemini, we pass additional_kwargs to ALL tool calls
|
|
// so the signature can be applied to each when rebuilding
|
|
for (const toolCall of output.tool_calls) {
|
|
toolCalls.push({
|
|
tool: toolCall.name,
|
|
toolInput: toolCall.args,
|
|
toolCallId: toolCall.id || 'unknown',
|
|
type: toolCall.type || 'tool_call',
|
|
log:
|
|
output.content ||
|
|
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
|
|
messageLog: [output],
|
|
// Pass additional_kwargs to ALL tool calls so signature is available
|
|
additionalKwargs: output.additional_kwargs as Record<string, unknown> | undefined,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
ctx.sendChunk('end', itemIndex);
|
|
|
|
// Include collected tool calls in the result
|
|
if (toolCalls.length > 0) {
|
|
agentResult.toolCalls = toolCalls;
|
|
}
|
|
|
|
return agentResult;
|
|
}
|