refactor(AI Agent Node): Split node in helpers (#21363)

This commit is contained in:
Benjamin Schroth 2025-11-18 09:48:06 +01:00 committed by GitHub
parent a896417300
commit 40b1fc9a4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 3939 additions and 1926 deletions

View File

@ -1,9 +1,3 @@
import {
promptTypeOptions,
textFromGuardrailsNode,
textFromPreviousNode,
textInput,
} from '@utils/descriptions';
import { NodeConnectionTypes } from 'n8n-workflow';
import type {
IExecuteFunctions,
@ -15,6 +9,13 @@ import type {
EngineRequest,
} from 'n8n-workflow';
import {
promptTypeOptions,
textFromGuardrailsNode,
textFromPreviousNode,
textInput,
} from '@utils/descriptions';
import { toolsAgentProperties } from '../agents/ToolsAgent/V3/description';
import type { RequestResponseMetadata } from '../agents/ToolsAgent/V3/execute';
import { toolsAgentExecute } from '../agents/ToolsAgent/V3/execute';

View File

@ -1,364 +1,29 @@
import type { StreamEvent } from '@langchain/core/dist/tracers/event_stream';
import type { IterableReadableStream } from '@langchain/core/dist/utils/stream';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { AIMessageChunk, BaseMessage, MessageContentText } from '@langchain/core/messages';
import { AIMessage, trimMessages } from '@langchain/core/messages';
import type { ToolCall } from '@langchain/core/messages/tool';
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import { RunnableSequence } from '@langchain/core/runnables';
import { type AgentRunnableSequence, createToolCallingAgent } from 'langchain/agents';
import type { BaseChatMemory } from 'langchain/memory';
import type { DynamicStructuredTool, Tool } from 'langchain/tools';
import omit from 'lodash/omit';
import {
jsonParse,
NodeConnectionTypes,
nodeNameToToolName,
NodeOperationError,
sleep,
} from 'n8n-workflow';
import { sleep } from 'n8n-workflow';
import type {
EngineRequest,
GenericValue,
IDataObject,
IExecuteFunctions,
INodeExecutionData,
ISupplyDataFunctions,
EngineResponse,
} from 'n8n-workflow';
import assert from 'node:assert';
import { getPromptInputByType } from '@utils/helpers';
import {
getOptionalOutputParser,
type N8nOutputParser,
} from '@utils/output_parsers/N8nOutputParser';
import {
fixEmptyContentMessage,
getAgentStepsParser,
getChatModel,
getOptionalMemory,
getTools,
prepareMessages,
preparePrompt,
} from '../common';
import { SYSTEM_MESSAGE } from '../prompt';
type ToolCallRequest = {
tool: string;
toolInput: Record<string, unknown>;
toolCallId: string;
type?: string;
log?: string;
messageLog?: unknown[];
};
async function createEngineRequests(
toolCalls: ToolCallRequest[],
itemIndex: number,
tools: Array<DynamicStructuredTool | Tool>,
) {
return toolCalls.map((toolCall) => {
// First try to get from metadata (for toolkit tools)
const foundTool = tools.find((tool) => tool.name === toolCall.tool);
if (!foundTool) return;
const nodeName = foundTool.metadata?.sourceNodeName;
// For toolkit tools, include the tool name so the node knows which tool to execute
const input = foundTool.metadata?.isFromToolkit
? { ...toolCall.toolInput, tool: toolCall.tool }
: toolCall.toolInput;
return {
nodeName,
input,
type: NodeConnectionTypes.AiTool,
id: toolCall.toolCallId,
metadata: {
itemIndex,
},
};
});
}
/**
* Uses provided tools and tried to get tools from model metadata
* Some chat model nodes can define built-in tools in their metadata
*/
function getAllTools(model: BaseChatModel, tools: Array<DynamicStructuredTool | Tool>) {
const modelTools = (model.metadata?.tools as Tool[]) ?? [];
const allTools = [...tools, ...modelTools];
return allTools;
}
/**
* Creates an agent executor with the given configuration
*/
function createAgentSequence(
model: BaseChatModel,
tools: Array<DynamicStructuredTool | Tool>,
prompt: ChatPromptTemplate,
_options: { maxIterations?: number; returnIntermediateSteps?: boolean },
outputParser?: N8nOutputParser,
memory?: BaseChatMemory,
fallbackModel?: BaseChatModel | null,
) {
const agent = createToolCallingAgent({
llm: model,
tools: getAllTools(model, tools),
prompt,
streamRunnable: false,
});
let fallbackAgent: AgentRunnableSequence | undefined;
if (fallbackModel) {
fallbackAgent = createToolCallingAgent({
llm: fallbackModel,
tools: getAllTools(fallbackModel, tools),
prompt,
streamRunnable: false,
});
}
const runnableAgent = RunnableSequence.from([
fallbackAgent ? agent.withFallbacks([fallbackAgent]) : agent,
getAgentStepsParser(outputParser, memory),
fixEmptyContentMessage,
]) as AgentRunnableSequence;
runnableAgent.singleAction = true;
runnableAgent.streamRunnable = false;
return runnableAgent;
}
type IntermediateStep = {
action: {
tool: string;
toolInput: Record<string, unknown>;
log: string;
messageLog: unknown[];
toolCallId: string;
type: string;
};
observation?: string;
};
type AgentResult = {
output: string;
intermediateSteps?: IntermediateStep[];
toolCalls?: ToolCallRequest[];
};
async function processEventStream(
ctx: IExecuteFunctions,
eventStream: IterableReadableStream<StreamEvent>,
itemIndex: number,
returnIntermediateSteps: boolean = false,
memory?: BaseChatMemory,
input?: string,
): Promise<AgentResult> {
const agentResult: AgentResult = {
output: '',
};
if (returnIntermediateSteps) {
agentResult.intermediateSteps = [];
}
const toolCalls: ToolCallRequest[] = [];
ctx.sendChunk('begin', itemIndex);
for await (const event of eventStream) {
// Stream chat model tokens as they come in
switch (event.event) {
case 'on_chat_model_stream':
const chunk = event.data?.chunk as AIMessageChunk;
if (chunk?.content) {
const chunkContent = chunk.content;
let chunkText = '';
if (Array.isArray(chunkContent)) {
for (const message of chunkContent) {
if (message?.type === 'text') {
chunkText += (message as MessageContentText)?.text;
}
}
} else if (typeof chunkContent === 'string') {
chunkText = chunkContent;
}
ctx.sendChunk('item', itemIndex, chunkText);
agentResult.output += chunkText;
}
break;
case 'on_chat_model_end':
// Capture full LLM response with tool calls for intermediate steps
if (event.data) {
const chatModelData = event.data as {
output?: { tool_calls?: ToolCall[]; content?: string };
};
const output = chatModelData.output;
// Check if this LLM response contains tool calls
if (output?.tool_calls && output.tool_calls.length > 0) {
// Collect tool calls for request building
for (const toolCall of output.tool_calls) {
toolCalls.push({
tool: toolCall.name,
toolInput: toolCall.args,
toolCallId: toolCall.id || 'unknown',
type: toolCall.type || 'tool_call',
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output],
});
}
// Also add to intermediate steps if needed
if (returnIntermediateSteps) {
for (const toolCall of output.tool_calls) {
agentResult.intermediateSteps!.push({
action: {
tool: toolCall.name,
toolInput: toolCall.args,
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output], // Include the full LLM response
toolCallId: toolCall.id || 'unknown',
type: toolCall.type || 'tool_call',
},
});
}
}
}
}
break;
case 'on_tool_end':
// Capture tool execution results and match with action
if (returnIntermediateSteps && event.data && agentResult.intermediateSteps!.length > 0) {
const toolData = event.data as { output?: string };
// Find the matching intermediate step for this tool call
const matchingStep = agentResult.intermediateSteps!.find(
(step) => !step.observation && step.action.tool === event.name,
);
if (matchingStep) {
matchingStep.observation = toolData.output || '';
}
}
break;
default:
break;
}
}
ctx.sendChunk('end', itemIndex);
// Save conversation to memory if memory is connected
if (memory && input && agentResult.output) {
await memory.saveContext({ input }, { output: agentResult.output });
}
// Include collected tool calls in the result
if (toolCalls.length > 0) {
agentResult.toolCalls = toolCalls;
}
return agentResult;
}
export type RequestResponseMetadata = {
itemIndex?: number;
previousRequests: ToolCallData[];
iterationCount?: number;
};
type ToolCallData = {
action: {
tool: string;
toolInput: Record<string, unknown>;
log: string | number | true | object;
toolCallId: IDataObject | GenericValue | GenericValue[] | IDataObject[];
type: string | number | true | object;
};
observation: string;
};
function buildSteps(
response: EngineResponse<RequestResponseMetadata> | undefined,
itemIndex: number,
): ToolCallData[] {
const steps: ToolCallData[] = [];
if (response) {
const responses = response?.actionResponses ?? [];
if (response.metadata?.previousRequests) {
steps.push(...response.metadata.previousRequests);
}
for (const tool of responses) {
if (tool.action?.metadata?.itemIndex !== itemIndex) continue;
const toolInput: IDataObject = {
...tool.action.input,
id: tool.action.id,
};
if (!toolInput || !tool.data) {
continue;
}
const step = steps.find((step) => step.action.toolCallId === toolInput.id);
if (step) {
continue;
}
// Create a synthetic AI message for the messageLog
// This represents the AI's decision to call the tool
const syntheticAIMessage = new AIMessage({
content: `Calling ${tool.action.nodeName} with input: ${JSON.stringify(toolInput)}`,
tool_calls: [
{
id: (toolInput?.id as string) ?? 'reconstructed_call',
name: nodeNameToToolName(tool.action.nodeName),
args: toolInput,
type: 'tool_call',
},
],
});
const toolResult = {
action: {
tool: nodeNameToToolName(tool.action.nodeName),
toolInput: (toolInput.input as IDataObject) || {},
log: toolInput.log || syntheticAIMessage.content,
messageLog: [syntheticAIMessage],
toolCallId: toolInput?.id,
type: toolInput.type || 'tool_call',
},
observation: JSON.stringify(tool.data?.data?.ai_tool?.[0]?.map((item) => item?.json) ?? ''),
};
steps.push(toolResult);
}
}
return steps;
}
import { buildExecutionContext, executeBatch, checkMaxIterations } from './helpers';
import type { RequestResponseMetadata } from './types';
/* -----------------------------------------------------------
Main Executor Function
----------------------------------------------------------- */
/**
* The main executor method for the Tools Agent.
* The main executor method for the Tools Agent V3.
*
* This function retrieves necessary components (model, memory, tools), prepares the prompt,
* creates the agent, and processes each input item. The error handling for each item is also
* managed here based on the node's continueOnFail setting.
* This function orchestrates the execution across input batches, handling:
* - Building shared execution context (models, memory, batching config)
* - Processing items in batches with continue-on-fail logic
* - Returning either tool call requests or node output data
*
* @param this Execute context. SupplyDataContext is passed when agent is as a tool
*
* @returns The array of execution data for all processed items
* @param this Execute context. SupplyDataContext is passed when agent is used as a tool
* @param response Optional engine response containing tool call results from previous execution
* @returns Array of execution data for all processed items, or engine request for tool calls
*/
export async function toolsAgentExecute(
this: IExecuteFunctions | ISupplyDataFunctions,
@ -366,266 +31,50 @@ export async function toolsAgentExecute(
): Promise<INodeExecutionData[][] | EngineRequest<RequestResponseMetadata>> {
this.logger.debug('Executing Tools Agent V3');
// Check max iterations if this is a continuation of a previous execution
const maxIterations = this.getNodeParameter('options.maxIterations', 0, 10) as number;
checkMaxIterations(response, maxIterations, this.getNode());
const returnData: INodeExecutionData[] = [];
let request: EngineRequest<RequestResponseMetadata> | undefined = undefined;
const items = this.getInputData();
const batchSize = this.getNodeParameter('options.batching.batchSize', 0, 1) as number;
const delayBetweenBatches = this.getNodeParameter(
'options.batching.delayBetweenBatches',
0,
0,
) as number;
const needsFallback = this.getNodeParameter('needsFallback', 0, false) as boolean;
const memory = await getOptionalMemory(this);
const model = await getChatModel(this, 0);
assert(model, 'Please connect a model to the Chat Model input');
const fallbackModel = needsFallback ? await getChatModel(this, 1) : null;
if (needsFallback && !fallbackModel) {
throw new NodeOperationError(
this.getNode(),
'Please connect a model to the Fallback Model input or disable the fallback option',
);
}
// Build execution context with shared configuration
const executionContext = await buildExecutionContext(this);
const { items, batchSize, delayBetweenBatches, model, fallbackModel, memory } = executionContext;
// Process items in batches
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
if (response && response?.metadata?.itemIndex === itemIndex) {
return null;
}
const { returnData: batchReturnData, request: batchRequest } = await executeBatch(
this,
batch,
i,
model,
fallbackModel,
memory,
response,
);
const steps = buildSteps(response, itemIndex);
// Collect results from batch
returnData.push.apply(returnData, batchReturnData);
const input = getPromptInputByType({
ctx: this,
i: itemIndex,
inputKey: 'text',
promptTypeKey: 'promptType',
});
if (input === undefined) {
throw new NodeOperationError(this.getNode(), 'The "text" parameter is empty.');
}
const outputParser = await getOptionalOutputParser(this, itemIndex);
const tools = await getTools(this, outputParser);
const options = this.getNodeParameter('options', itemIndex) as {
systemMessage?: string;
maxIterations?: number;
returnIntermediateSteps?: boolean;
passthroughBinaryImages?: boolean;
enableStreaming?: boolean;
maxTokensFromMemory?: number;
};
if (options.enableStreaming === undefined) {
options.enableStreaming = true;
}
// Prepare the prompt messages and prompt template.
const messages = await prepareMessages(this, itemIndex, {
systemMessage: options.systemMessage,
passthroughBinaryImages: options.passthroughBinaryImages ?? true,
outputParser,
});
const prompt: ChatPromptTemplate = preparePrompt(messages);
// Create executors for primary and fallback models
const executor = createAgentSequence(
model,
tools,
prompt,
options,
outputParser,
memory,
fallbackModel,
);
// Invoke with fallback logic
const invokeParams = {
steps,
input,
system_message: options.systemMessage ?? SYSTEM_MESSAGE,
formatting_instructions:
'IMPORTANT: For your response to user, you MUST use the `format_final_json_response` tool with your complete answer formatted according to the required schema. Do not attempt to format the JSON manually - always use this tool. Your response will be rejected if it is not properly formatted through this tool. Only use this tool once you are ready to provide your final answer.',
};
const executeOptions = { signal: this.getExecutionCancelSignal() };
// Check if streaming is actually available
const isStreamingAvailable = 'isStreaming' in this ? this.isStreaming?.() : undefined;
if (
'isStreaming' in this &&
options.enableStreaming &&
isStreamingAvailable &&
this.getNode().typeVersion >= 2.1
) {
let chatHistory: BaseMessage[] | undefined = undefined;
if (memory) {
// Load memory variables to respect context window length
chatHistory = await loadChatHistory(memory, model, options.maxTokensFromMemory);
}
const eventStream = executor.streamEvents(
{
...invokeParams,
chat_history: chatHistory,
},
{
version: 'v2',
...executeOptions,
},
);
const result = await processEventStream(
this,
eventStream,
itemIndex,
options.returnIntermediateSteps,
memory,
input,
);
// If result contains tool calls, build the request object like the normal flow
if (result.toolCalls && result.toolCalls.length > 0) {
const currentIteration = (response?.metadata?.iterationCount ?? 0) + 1;
// Check if we've exceeded maxIterations
if (options.maxIterations && currentIteration > options.maxIterations) {
throw new NodeOperationError(this.getNode(), 'Maximum iterations reached');
}
const actions = await createEngineRequests(result.toolCalls, itemIndex, tools);
return {
actions,
metadata: {
previousRequests: buildSteps(response, itemIndex),
iterationCount: currentIteration,
},
};
}
return result;
// Collect requests from batch
if (batchRequest) {
if (!request) {
request = batchRequest;
} else {
// Handle regular execution
let chatHistory: BaseMessage[] | undefined = undefined;
if (memory) {
// Load memory variables to respect context window length
chatHistory = await loadChatHistory(memory, model, options.maxTokensFromMemory);
}
const modelResponse = await executor.invoke({
...invokeParams,
chat_history: chatHistory,
});
if ('returnValues' in modelResponse) {
// Save conversation to memory including any tool call context
if (memory && input && modelResponse.returnValues.output) {
// If there were tool calls in this conversation, include them in the context
let fullOutput = modelResponse.returnValues.output as string;
if (steps.length > 0) {
// Include tool call information in the conversation context
const toolContext = steps
.map(
(step) =>
`Tool: ${step.action.tool}, Input: ${JSON.stringify(step.action.toolInput)}, Result: ${step.observation}`,
)
.join('; ');
fullOutput = `[Used tools: ${toolContext}] ${fullOutput}`;
}
await memory.saveContext({ input }, { output: fullOutput });
}
// Include intermediate steps if requested
const result = { ...modelResponse.returnValues };
if (options.returnIntermediateSteps && steps.length > 0) {
result.intermediateSteps = steps;
}
return result;
}
const currentIteration = (response?.metadata?.iterationCount ?? 0) + 1;
// Check if we've exceeded maxIterations
if (options.maxIterations && currentIteration > options.maxIterations) {
throw new NodeOperationError(this.getNode(), 'Maximum iterations reached');
}
const actions = await createEngineRequests(modelResponse, itemIndex, tools);
return {
actions,
metadata: {
previousRequests: buildSteps(response, itemIndex),
iterationCount: currentIteration,
},
};
request.actions.push.apply(request.actions, batchRequest.actions);
}
});
const batchResults = await Promise.allSettled(batchPromises);
// This is only used to check if the output parser is connected
// so we can parse the output if needed. Actual output parsing is done in the loop above
const outputParser = await getOptionalOutputParser(this, 0);
batchResults.forEach((result, index) => {
const itemIndex = i + index;
if (result.status === 'rejected') {
const error = result.reason as Error;
if (this.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
} as INodeExecutionData);
return;
} else {
throw new NodeOperationError(this.getNode(), error);
}
}
const response = result.value;
if ('actions' in response) {
if (!request) {
request = {
actions: response.actions,
metadata: response.metadata,
};
} else {
request.actions.push(...response.actions);
}
return;
}
// If memory and outputParser are connected, parse the output.
if (memory && outputParser) {
const parsedOutput = jsonParse<{ output: Record<string, unknown> }>(
response.output as string,
);
response.output = parsedOutput?.output ?? parsedOutput;
}
// Omit internal keys before returning the result.
const itemResult: INodeExecutionData = {
json: omit(
response,
'system_message',
'formatting_instructions',
'input',
'chat_history',
'agent_scratchpad',
),
pairedItem: { item: itemIndex },
};
returnData.push(itemResult);
});
}
// Apply delay between batches if configured
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
}
}
// Check if we have any Request objects (tool calls)
// Return tool call request if any tools need to be executed
if (request) {
return request;
}
@ -633,24 +82,6 @@ export async function toolsAgentExecute(
// Otherwise return execution data
return [returnData];
}
async function loadChatHistory(
memory: BaseChatMemory,
model: BaseChatModel,
maxTokensFromMemory?: number,
): Promise<BaseMessage[]> {
const memoryVariables = await memory.loadMemoryVariables({});
let chatHistory = memoryVariables['chat_history'] as BaseMessage[];
if (maxTokensFromMemory) {
chatHistory = await trimMessages(chatHistory, {
strategy: 'last',
maxTokens: maxTokensFromMemory,
tokenCounter: model,
includeSystem: true,
startOn: 'human',
allowPartial: true,
});
}
return chatHistory;
}
// Re-export types for backwards compatibility
export type { RequestResponseMetadata } from './types';

View File

@ -0,0 +1,66 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { BaseChatMemory } from 'langchain/memory';
import { NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, ISupplyDataFunctions, INodeExecutionData } from 'n8n-workflow';
import assert from 'node:assert';
import { getChatModel, getOptionalMemory } from '../../common';
/**
* Execution context that contains shared configuration needed across all items
*/
export type ToolsAgentExecutionContext = {
items: INodeExecutionData[];
batchSize: number;
delayBetweenBatches: number;
needsFallback: boolean;
model: BaseChatModel;
fallbackModel: BaseChatModel | null;
memory: BaseChatMemory | undefined;
};
/**
* Builds the execution context by collecting shared configuration
* such as models, memory, batching settings, and streaming flags.
*
* @param ctx - The execution context (IExecuteFunctions or ISupplyDataFunctions)
* @returns ExecutionContext containing all shared configuration
*/
export async function buildToolsAgentExecutionContext(
ctx: IExecuteFunctions | ISupplyDataFunctions,
): Promise<ToolsAgentExecutionContext> {
const items = ctx.getInputData();
const batchSize = ctx.getNodeParameter('options.batching.batchSize', 0, 1) as number;
const delayBetweenBatches = ctx.getNodeParameter(
'options.batching.delayBetweenBatches',
0,
0,
) as number;
const needsFallback = ctx.getNodeParameter('needsFallback', 0, false) as boolean;
const memory = await getOptionalMemory(ctx);
const model = await getChatModel(ctx, 0);
assert(model, 'Please connect a model to the Chat Model input');
let fallbackModel: BaseChatModel | null = null;
if (needsFallback) {
const maybeFallbackModel = await getChatModel(ctx, 1);
if (!maybeFallbackModel) {
throw new NodeOperationError(
ctx.getNode(),
'Please connect a model to the Fallback Model input or disable the fallback option',
);
}
fallbackModel = maybeFallbackModel;
}
return {
items,
batchSize,
delayBetweenBatches,
needsFallback,
model,
fallbackModel,
memory,
};
}

View File

@ -0,0 +1,29 @@
import type { EngineResponse } from 'n8n-workflow';
import { buildSteps } from '@utils/agent-execution';
import type { RequestResponseMetadata } from '../types';
/**
* Builds metadata for an engine request, tracking iteration count and previous requests.
*
* This helper centralizes the logic for incrementing iteration count and building
* the request history, which is used to enforce max iterations and maintain context.
*
* @param response - The optional engine response from previous tool execution
* @param itemIndex - The current item index being processed
* @returns Metadata object with previousRequests and iterationCount
*
*/
export function buildResponseMetadata(
response: EngineResponse<RequestResponseMetadata> | undefined,
itemIndex: number,
): RequestResponseMetadata {
const currentIterationCount = response?.metadata?.iterationCount ?? 0;
return {
previousRequests: buildSteps(response, itemIndex),
itemIndex,
iterationCount: currentIterationCount + 1,
};
}

View File

@ -0,0 +1,44 @@
import { NodeOperationError } from 'n8n-workflow';
import type { INode, EngineResponse } from 'n8n-workflow';
import type { RequestResponseMetadata } from '../types';
/**
* Checks if the maximum iteration limit has been reached and throws an error if so.
*
* This function is called at the start of each agent execution to enforce
* the maximum number of tool call iterations allowed.
*
* @param response - The engine response containing iteration metadata (if this is a continuation)
* @param maxIterations - The maximum number of iterations allowed
* @param node - The current node (for error context)
* @throws {NodeOperationError} When the iteration count reaches or exceeds maxIterations
*
* @example
* ```typescript
* const response: EngineResponse<RequestResponseMetadata> = {
* // ... response data
* metadata: { iterationCount: 3 }
* };
*
* // This will throw if iterationCount >= maxIterations
* checkMaxIterations(response, 2, node);
* ```
*/
export function checkMaxIterations(
response: EngineResponse<RequestResponseMetadata> | undefined,
maxIterations: number,
node: INode,
): void {
// Only check if this is a continuation (response has iteration count)
if (response?.metadata?.iterationCount === undefined) {
return;
}
if (response.metadata.iterationCount >= maxIterations) {
throw new NodeOperationError(
node,
`Max iterations (${maxIterations}) reached. The agent could not complete the task within the allowed number of iterations.`,
);
}
}

View File

@ -0,0 +1,70 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import { RunnableSequence } from '@langchain/core/runnables';
import { type AgentRunnableSequence, createToolCallingAgent } from 'langchain/agents';
import type { BaseChatMemory } from 'langchain/memory';
import type { DynamicStructuredTool, Tool } from 'langchain/tools';
import type { N8nOutputParser } from '@utils/output_parsers/N8nOutputParser';
import { fixEmptyContentMessage, getAgentStepsParser } from '../../common';
/**
* Creates an agent sequence with the given configuration.
* The sequence includes the agent, output parser, and fallback logic.
*
* @param model - The primary chat model
* @param tools - Array of tools available to the agent
* @param prompt - The prompt template
* @param _options - Additional options (maxIterations, returnIntermediateSteps)
* @param outputParser - Optional output parser for structured responses
* @param memory - Optional memory for conversation context
* @param fallbackModel - Optional fallback model if primary fails
* @returns AgentRunnableSequence ready for execution
*/
export function createAgentSequence(
model: BaseChatModel,
tools: Array<DynamicStructuredTool | Tool>,
prompt: ChatPromptTemplate,
_options: { maxIterations?: number; returnIntermediateSteps?: boolean },
outputParser?: N8nOutputParser,
memory?: BaseChatMemory,
fallbackModel?: BaseChatModel | null,
) {
const agent = createToolCallingAgent({
llm: model,
tools: getAllTools(model, tools),
prompt,
streamRunnable: false,
});
let fallbackAgent: AgentRunnableSequence | undefined;
if (fallbackModel) {
fallbackAgent = createToolCallingAgent({
llm: fallbackModel,
tools: getAllTools(fallbackModel, tools),
prompt,
streamRunnable: false,
});
}
const runnableAgent = RunnableSequence.from([
fallbackAgent ? agent.withFallbacks([fallbackAgent]) : agent,
getAgentStepsParser(outputParser, memory),
fixEmptyContentMessage,
]) as AgentRunnableSequence;
runnableAgent.singleAction = true;
runnableAgent.streamRunnable = false;
return runnableAgent;
}
/**
* Uses provided tools and tried to get tools from model metadata
* Some chat model nodes can define built-in tools in their metadata
*/
function getAllTools(model: BaseChatModel, tools: Array<DynamicStructuredTool | Tool>) {
const modelTools = (model.metadata?.tools as Tool[]) ?? [];
const allTools = [...tools, ...modelTools];
return allTools;
}

View File

@ -0,0 +1,115 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { AgentRunnableSequence } from 'langchain/agents';
import type { BaseChatMemory } from 'langchain/memory';
import { NodeOperationError } from 'n8n-workflow';
import type {
IExecuteFunctions,
ISupplyDataFunctions,
INodeExecutionData,
EngineResponse,
EngineRequest,
} from 'n8n-workflow';
import { getOptionalOutputParser } from '@utils/output_parsers/N8nOutputParser';
import type { RequestResponseMetadata, AgentResult } from '../types';
import { createAgentSequence } from './createAgentSequence';
import { finalizeResult } from './finalizeResult';
import { prepareItemContext } from './prepareItemContext';
import { runAgent } from './runAgent';
type BatchResult = AgentResult | EngineRequest<RequestResponseMetadata>;
/**
* Executes a batch of items, handling both successful execution and errors.
* Applies continue-on-fail logic when errors occur.
*
* @param ctx - The execution context
* @param batch - Array of items to process in this batch
* @param startIndex - Starting index of the batch in the original items array (used to calculate itemIndex)
* @param model - Primary chat model
* @param fallbackModel - Optional fallback model
* @param memory - Optional memory for conversation context
* @param response - Optional engine response with previous tool calls
* @returns Object containing execution data and optional requests
*/
export async function executeBatch(
ctx: IExecuteFunctions | ISupplyDataFunctions,
batch: INodeExecutionData[],
startIndex: number,
model: BaseChatModel,
fallbackModel: BaseChatModel | null,
memory: BaseChatMemory | undefined,
response?: EngineResponse<RequestResponseMetadata>,
): Promise<{
returnData: INodeExecutionData[];
request: EngineRequest<RequestResponseMetadata> | undefined;
}> {
const returnData: INodeExecutionData[] = [];
let request: EngineRequest<RequestResponseMetadata> | undefined = undefined;
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = startIndex + batchItemIndex;
const itemContext = await prepareItemContext(ctx, itemIndex, response);
const { tools, prompt, options, outputParser } = itemContext;
// Create executors for primary and fallback models
const executor: AgentRunnableSequence = createAgentSequence(
model,
tools,
prompt,
options,
outputParser,
memory,
fallbackModel,
);
// Run the agent
return await runAgent(ctx, executor, itemContext, model, memory, response);
});
const batchResults = await Promise.allSettled(batchPromises);
// This is only used to check if the output parser is connected
// so we can parse the output if needed. Actual output parsing is done in the loop above
const outputParser = await getOptionalOutputParser(ctx, 0);
batchResults.forEach((result, index) => {
const itemIndex = startIndex + index;
if (result.status === 'rejected') {
const error = result.reason as Error;
if (ctx.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
} as INodeExecutionData);
return;
} else {
throw new NodeOperationError(ctx.getNode(), error);
}
}
const batchResult = result.value as BatchResult;
if (!batchResult) {
return;
}
if ('actions' in batchResult) {
if (!request) {
request = {
actions: batchResult.actions,
metadata: batchResult.metadata,
};
} else {
request.actions.push.apply(request.actions, batchResult.actions);
}
return;
}
// Finalize the result
const itemResult = finalizeResult(batchResult, itemIndex, memory, outputParser);
returnData.push(itemResult);
});
return { returnData, request };
}

View File

@ -0,0 +1,47 @@
import type { BaseChatMemory } from 'langchain/memory';
import omit from 'lodash/omit';
import { jsonParse } from 'n8n-workflow';
import type { INodeExecutionData } from 'n8n-workflow';
import type { N8nOutputParser } from '@utils/output_parsers/N8nOutputParser';
import type { AgentResult } from '../types';
/**
* Finalizes the result by parsing output and preparing execution data.
* Handles output parser integration and memory-based parsing.
*
* @param result - The agent result to finalize
* @param itemIndex - The current item index
* @param memory - Optional memory for parsing context
* @param outputParser - Optional output parser for structured responses
* @returns INodeExecutionData ready for output
*/
export function finalizeResult(
result: AgentResult,
itemIndex: number,
memory: BaseChatMemory | undefined,
outputParser: N8nOutputParser | undefined,
): INodeExecutionData {
// If memory and outputParser are connected, parse the output.
if (memory && outputParser) {
const parsedOutput = jsonParse<{ output: Record<string, unknown> }>(result.output);
// Type assertion needed because parsedOutput can be various types
result.output = (parsedOutput?.output ?? parsedOutput) as unknown as string;
}
// Omit internal keys before returning the result.
const itemResult: INodeExecutionData = {
json: omit(
result,
'system_message',
'formatting_instructions',
'input',
'chat_history',
'agent_scratchpad',
),
pairedItem: { item: itemIndex },
};
return itemResult;
}

View File

@ -0,0 +1,17 @@
export { buildToolsAgentExecutionContext as buildExecutionContext } from './buildExecutionContext';
export type { ToolsAgentExecutionContext as ExecutionContext } from './buildExecutionContext';
export { createAgentSequence } from './createAgentSequence';
export { prepareItemContext } from './prepareItemContext';
export type { ItemContext } from './prepareItemContext';
export { runAgent } from './runAgent';
export { finalizeResult } from './finalizeResult';
export { executeBatch } from './executeBatch';
export { checkMaxIterations } from './checkMaxIterations';
export { buildResponseMetadata } from './buildResponseMetadata';

View File

@ -0,0 +1,78 @@
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import type { DynamicStructuredTool, Tool } from 'langchain/tools';
import { NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, ISupplyDataFunctions, EngineResponse } from 'n8n-workflow';
import { buildSteps, type ToolCallData } from '@utils/agent-execution';
import { getPromptInputByType } from '@utils/helpers';
import { getOptionalOutputParser } from '@utils/output_parsers/N8nOutputParser';
import type { N8nOutputParser } from '@utils/output_parsers/N8nOutputParser';
import { getTools, prepareMessages, preparePrompt } from '../../common';
import type { AgentOptions, RequestResponseMetadata } from '../types';
/**
* Context specific to a single item's processing
*/
export type ItemContext = {
itemIndex: number;
input: string;
steps: ToolCallData[];
tools: Array<DynamicStructuredTool | Tool>;
prompt: ChatPromptTemplate;
options: AgentOptions;
outputParser: N8nOutputParser | undefined;
};
/**
* Prepares the context for processing a single item.
* This includes loading steps, input, tools, prompt, and options.
*
* @param ctx - The execution context
* @param itemIndex - The index of the item to process
* @param response - Optional engine response with previous tool calls
* @returns ItemContext containing all item-specific state
*/
export async function prepareItemContext(
ctx: IExecuteFunctions | ISupplyDataFunctions,
itemIndex: number,
response?: EngineResponse<RequestResponseMetadata>,
): Promise<ItemContext> {
const steps = buildSteps(response, itemIndex);
const input = getPromptInputByType({
ctx,
i: itemIndex,
inputKey: 'text',
promptTypeKey: 'promptType',
});
if (input === undefined) {
throw new NodeOperationError(ctx.getNode(), 'The "text" parameter is empty.');
}
const outputParser = await getOptionalOutputParser(ctx, itemIndex);
const tools = await getTools(ctx, outputParser);
const options = ctx.getNodeParameter('options', itemIndex) as AgentOptions;
if (options.enableStreaming === undefined) {
options.enableStreaming = true;
}
// Prepare the prompt messages and prompt template.
const messages = await prepareMessages(ctx, itemIndex, {
systemMessage: options.systemMessage,
passthroughBinaryImages: options.passthroughBinaryImages ?? true,
outputParser,
});
const prompt: ChatPromptTemplate = preparePrompt(messages);
return {
itemIndex,
input,
steps,
tools,
prompt,
options,
outputParser,
};
}

View File

@ -0,0 +1,140 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { AgentRunnableSequence } from 'langchain/agents';
import type { BaseChatMemory } from 'langchain/memory';
import type {
IExecuteFunctions,
ISupplyDataFunctions,
EngineResponse,
EngineRequest,
} from 'n8n-workflow';
import {
loadMemory,
processEventStream,
createEngineRequests,
saveToMemory,
} from '@utils/agent-execution';
import { SYSTEM_MESSAGE } from '../../prompt';
import type { AgentResult, RequestResponseMetadata } from '../types';
import { buildResponseMetadata } from './buildResponseMetadata';
import type { ItemContext } from './prepareItemContext';
type RunAgentResult = AgentResult | EngineRequest<RequestResponseMetadata>;
/**
* Runs the agent for a single item, choosing between streaming or non-streaming execution.
* Handles both regular execution and execution after tool calls.
*
* @param ctx - The execution context
* @param executor - The agent runnable sequence
* @param itemContext - Context for the current item
* @param model - The chat model for token counting
* @param memory - Optional memory for conversation context
* @param response - Optional engine response with previous tool calls
* @returns AgentResult or engine request with tool calls
*/
export async function runAgent(
ctx: IExecuteFunctions | ISupplyDataFunctions,
executor: AgentRunnableSequence,
itemContext: ItemContext,
model: BaseChatModel,
memory: BaseChatMemory | undefined,
response?: EngineResponse<RequestResponseMetadata>,
): Promise<RunAgentResult> {
const { itemIndex, input, steps, tools, options } = itemContext;
const invokeParams = {
steps,
input,
system_message: options.systemMessage ?? SYSTEM_MESSAGE,
formatting_instructions:
'IMPORTANT: For your response to user, you MUST use the `format_final_json_response` tool with your complete answer formatted according to the required schema. Do not attempt to format the JSON manually - always use this tool. Your response will be rejected if it is not properly formatted through this tool. Only use this tool once you are ready to provide your final answer.',
};
const executeOptions = { signal: ctx.getExecutionCancelSignal() };
// Check if streaming is actually available
const isStreamingAvailable = 'isStreaming' in ctx ? ctx.isStreaming?.() : undefined;
if (
'isStreaming' in ctx &&
options.enableStreaming &&
isStreamingAvailable &&
ctx.getNode().typeVersion >= 2.1
) {
const chatHistory = await loadMemory(memory, model, options.maxTokensFromMemory);
const eventStream = executor.streamEvents(
{
...invokeParams,
chat_history: chatHistory,
},
{
version: 'v2',
...executeOptions,
},
);
const result = await processEventStream(
ctx,
eventStream,
itemIndex,
options.returnIntermediateSteps,
memory,
input,
);
// If result contains tool calls, build the request object like the normal flow
if (result.toolCalls && result.toolCalls.length > 0) {
const actions = await createEngineRequests(result.toolCalls, itemIndex, tools);
return {
actions,
metadata: buildResponseMetadata(response, itemIndex),
};
}
return result;
} else {
// Handle regular execution
const chatHistory = await loadMemory(memory, model, options.maxTokensFromMemory);
const modelResponse = await executor.invoke({
...invokeParams,
chat_history: chatHistory,
});
if ('returnValues' in modelResponse) {
// Save conversation to memory including any tool call context
if (memory && input && modelResponse.returnValues.output) {
// If there were tool calls in this conversation, include them in the context
let fullOutput = modelResponse.returnValues.output as string;
if (steps.length > 0) {
// Include tool call information in the conversation context
const toolContext = steps
.map(
(step) =>
`Tool: ${step.action.tool}, Input: ${JSON.stringify(step.action.toolInput)}, Result: ${step.observation}`,
)
.join('; ');
fullOutput = `[Used tools: ${toolContext}] ${fullOutput}`;
}
await saveToMemory(input, fullOutput, memory);
}
// Include intermediate steps if requested
const result = { ...modelResponse.returnValues };
if (options.returnIntermediateSteps && steps.length > 0) {
result.intermediateSteps = steps;
}
return result;
}
// If response contains tool calls, we need to return this in the right format
const actions = await createEngineRequests(modelResponse, itemIndex, tools);
return {
actions,
metadata: buildResponseMetadata(response, itemIndex),
};
}
}

View File

@ -0,0 +1,158 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import { mock } from 'jest-mock-extended';
import { NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, INode, INodeExecutionData } from 'n8n-workflow';
import * as commonHelpers from '../../../common';
import { buildToolsAgentExecutionContext } from '../buildExecutionContext';
jest.mock('../../../common', () => ({
getChatModel: jest.fn(),
getOptionalMemory: jest.fn(),
}));
const mockContext = mock<IExecuteFunctions>();
const mockNode = mock<INode>();
beforeEach(() => {
jest.clearAllMocks();
mockContext.getNode.mockReturnValue(mockNode);
});
describe('buildExecutionContext', () => {
it('should build execution context with default values', async () => {
const mockInputData: INodeExecutionData[] = [
{ json: { text: 'input 1' } },
{ json: { text: 'input 2' } },
];
const mockModel = mock<BaseChatModel>();
mockContext.getInputData.mockReturnValue(mockInputData);
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'needsFallback') return defaultValue;
return defaultValue;
});
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
const result = await buildToolsAgentExecutionContext(mockContext);
expect(result).toEqual({
items: mockInputData,
batchSize: 1,
delayBetweenBatches: 0,
needsFallback: false,
model: mockModel,
fallbackModel: null,
memory: undefined,
});
});
it('should build execution context with custom batch settings', async () => {
const mockInputData: INodeExecutionData[] = [
{ json: { text: 'input 1' } },
{ json: { text: 'input 2' } },
];
const mockModel = mock<BaseChatModel>();
mockContext.getInputData.mockReturnValue(mockInputData);
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'options.batching.batchSize') return 5;
if (param === 'options.batching.delayBetweenBatches') return 1000;
if (param === 'needsFallback') return false;
return defaultValue;
});
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
const result = await buildToolsAgentExecutionContext(mockContext);
expect(result.batchSize).toBe(5);
expect(result.delayBetweenBatches).toBe(1000);
});
it('should build execution context with fallback model when needsFallback is true', async () => {
const mockInputData: INodeExecutionData[] = [{ json: { text: 'input 1' } }];
const mockModel = mock<BaseChatModel>();
const mockFallbackModel = mock<BaseChatModel>();
mockContext.getInputData.mockReturnValue(mockInputData);
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'needsFallback') return true;
return defaultValue;
});
jest
.spyOn(commonHelpers, 'getChatModel')
.mockResolvedValueOnce(mockModel)
.mockResolvedValueOnce(mockFallbackModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
const result = await buildToolsAgentExecutionContext(mockContext);
expect(result.needsFallback).toBe(true);
expect(result.model).toBe(mockModel);
expect(result.fallbackModel).toBe(mockFallbackModel);
expect(commonHelpers.getChatModel).toHaveBeenCalledWith(mockContext, 0);
expect(commonHelpers.getChatModel).toHaveBeenCalledWith(mockContext, 1);
});
it('should throw error when fallback is needed but no fallback model is provided', async () => {
const mockInputData: INodeExecutionData[] = [{ json: { text: 'input 1' } }];
const mockModel = mock<BaseChatModel>();
mockContext.getInputData.mockReturnValue(mockInputData);
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return true;
return defaultValue;
});
jest
.spyOn(commonHelpers, 'getChatModel')
.mockResolvedValueOnce(mockModel)
.mockResolvedValueOnce(undefined);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
await expect(buildToolsAgentExecutionContext(mockContext)).rejects.toThrow(NodeOperationError);
});
it('should throw assertion error when no model is provided', async () => {
const mockInputData: INodeExecutionData[] = [{ json: { text: 'input 1' } }];
mockContext.getInputData.mockReturnValue(mockInputData);
mockContext.getNodeParameter.mockImplementation((_param, _i, defaultValue) => {
return defaultValue;
});
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
await expect(buildToolsAgentExecutionContext(mockContext)).rejects.toThrow(
'Please connect a model to the Chat Model input',
);
});
it('should include memory when available', async () => {
const mockInputData: INodeExecutionData[] = [{ json: { text: 'input 1' } }];
const mockModel = mock<BaseChatModel>();
const mockMemory = mock<any>();
mockContext.getInputData.mockReturnValue(mockInputData);
mockContext.getNodeParameter.mockImplementation((_param, _i, defaultValue) => {
return defaultValue;
});
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(mockMemory);
const result = await buildToolsAgentExecutionContext(mockContext);
expect(result.memory).toBe(mockMemory);
});
});

View File

@ -0,0 +1,178 @@
import type { EngineResponse } from 'n8n-workflow';
import * as agentExecution from '@utils/agent-execution';
import type { RequestResponseMetadata } from '../../types';
import { buildResponseMetadata } from '../buildResponseMetadata';
// Mock the buildSteps function from agent-execution
jest.mock('@utils/agent-execution', () => ({
buildSteps: jest.fn((response) => {
// Mock implementation: return previous requests if they exist
if (response?.actionResponses) {
return response.actionResponses.map((ar: any) => ({
action: {
tool: ar.action.nodeName,
toolInput: ar.action.input,
log: 'mock log',
toolCallId: ar.action.id,
type: 'tool_call',
},
observation: JSON.stringify(ar.data),
}));
}
return [];
}),
}));
describe('buildIterationMetadata', () => {
beforeEach(() => {
jest.clearAllMocks();
});
it('should return metadata with iterationCount 1 when response is undefined', () => {
const result = buildResponseMetadata(undefined, 0);
expect(result).toEqual({
previousRequests: [],
itemIndex: 0,
iterationCount: 1,
});
});
it('should return metadata with iterationCount 1 when response has no metadata', () => {
const response = {
actionResponses: [],
} as unknown as EngineResponse<RequestResponseMetadata>;
const result = buildResponseMetadata(response, 0);
expect(result).toEqual({
previousRequests: [],
itemIndex: 0,
iterationCount: 1,
});
});
it('should return metadata with iterationCount 1 when response metadata has no iterationCount', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {},
};
const result = buildResponseMetadata(response, 0);
expect(result).toEqual({
previousRequests: [],
itemIndex: 0,
iterationCount: 1,
});
});
it('should increment iterationCount when response has existing iterationCount', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 3,
},
};
const result = buildResponseMetadata(response, 0);
expect(result).toEqual({
previousRequests: [],
itemIndex: 0,
iterationCount: 4,
});
});
it('should include previousRequests when response has actionResponses', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
id: 'call_123',
nodeName: 'TestTool',
input: { input: 'test data', id: 'call_123' },
metadata: { itemIndex: 0 },
actionType: 'ExecutionNodeAction',
type: 'ai_tool',
},
data: {
data: { ai_tool: [[{ json: { result: 'tool result' } }]] },
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {
iterationCount: 1,
},
};
const result = buildResponseMetadata(response, 0);
expect(result.itemIndex).toBe(0);
expect(result.iterationCount).toBe(2);
expect(result.previousRequests).toHaveLength(1);
expect(result.previousRequests?.[0]).toMatchObject({
action: {
tool: 'TestTool',
toolCallId: 'call_123',
type: 'tool_call',
},
});
});
it('should handle multiple iterations correctly', () => {
// First iteration
const result1 = buildResponseMetadata(undefined, 0);
expect(result1.iterationCount).toBe(1);
// Second iteration
const response2: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: { iterationCount: 1 },
};
const result2 = buildResponseMetadata(response2, 0);
expect(result2.iterationCount).toBe(2);
// Third iteration
const response3: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: { iterationCount: 2 },
};
const result3 = buildResponseMetadata(response3, 0);
expect(result3.iterationCount).toBe(3);
});
it('should pass correct itemIndex to buildSteps', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: { iterationCount: 1 },
};
buildResponseMetadata(response, 5);
expect(agentExecution.buildSteps).toHaveBeenCalledWith(response, 5);
});
it('should handle iterationCount starting from 0', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 0,
},
};
const result = buildResponseMetadata(response, 0);
expect(result).toEqual({
previousRequests: [],
itemIndex: 0,
iterationCount: 1,
});
});
});

View File

@ -0,0 +1,133 @@
import { mock } from 'jest-mock-extended';
import { NodeOperationError } from 'n8n-workflow';
import type { INode, EngineResponse } from 'n8n-workflow';
import type { RequestResponseMetadata } from '../../types';
import { checkMaxIterations } from '../checkMaxIterations';
describe('checkMaxIterations', () => {
const mockNode = mock<INode>();
beforeEach(() => {
jest.clearAllMocks();
});
it('should not throw when response is undefined', () => {
expect(() => {
checkMaxIterations(undefined, 10, mockNode);
}).not.toThrow();
});
it('should not throw when response metadata is undefined', () => {
const response = {
actionResponses: [],
} as unknown as EngineResponse<RequestResponseMetadata>;
expect(() => {
checkMaxIterations(response, 10, mockNode);
}).not.toThrow();
});
it('should not throw when response metadata iterationCount is undefined', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {},
};
expect(() => {
checkMaxIterations(response, 10, mockNode);
}).not.toThrow();
});
it('should not throw when iterationCount is below maxIterations', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 5,
},
};
expect(() => {
checkMaxIterations(response, 10, mockNode);
}).not.toThrow();
});
it('should throw NodeOperationError when iterationCount equals maxIterations', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 10,
},
};
expect(() => {
checkMaxIterations(response, 10, mockNode);
}).toThrow(NodeOperationError);
expect(() => {
checkMaxIterations(response, 10, mockNode);
}).toThrow(
'Max iterations (10) reached. The agent could not complete the task within the allowed number of iterations.',
);
});
it('should throw NodeOperationError when iterationCount exceeds maxIterations', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 15,
},
};
expect(() => {
checkMaxIterations(response, 10, mockNode);
}).toThrow(NodeOperationError);
expect(() => {
checkMaxIterations(response, 10, mockNode);
}).toThrow(
'Max iterations (10) reached. The agent could not complete the task within the allowed number of iterations.',
);
});
it('should throw with correct error message for different maxIterations values', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 5,
},
};
expect(() => {
checkMaxIterations(response, 5, mockNode);
}).toThrow(
'Max iterations (5) reached. The agent could not complete the task within the allowed number of iterations.',
);
});
it('should handle edge case of maxIterations = 0', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 0,
},
};
expect(() => {
checkMaxIterations(response, 0, mockNode);
}).toThrow(NodeOperationError);
});
it('should handle edge case of maxIterations = 1 with iterationCount = 0', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: {
iterationCount: 0,
},
};
expect(() => {
checkMaxIterations(response, 1, mockNode);
}).not.toThrow();
});
});

View File

@ -0,0 +1,205 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import { RunnableSequence } from '@langchain/core/runnables';
import { mock } from 'jest-mock-extended';
import { createToolCallingAgent } from 'langchain/agents';
import type { Tool } from 'langchain/tools';
import * as commonHelpers from '../../../common';
import { createAgentSequence } from '../createAgentSequence';
jest.mock('langchain/agents', () => ({
createToolCallingAgent: jest.fn(),
}));
jest.mock('@langchain/core/runnables', () => ({
RunnableSequence: {
from: jest.fn(),
},
}));
jest.mock('../../../common', () => ({
getAgentStepsParser: jest.fn(),
fixEmptyContentMessage: jest.fn(),
}));
describe('createAgentSequence', () => {
const mockModel = mock<BaseChatModel>();
const mockPrompt = mock<ChatPromptTemplate>();
const mockTool = mock<Tool>();
beforeEach(() => {
jest.clearAllMocks();
});
it('should create agent sequence without fallback', () => {
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
const mockStepsParser = jest.fn();
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getAgentStepsParser').mockReturnValue(mockStepsParser);
const options = { maxIterations: 10, returnIntermediateSteps: false };
const result = createAgentSequence(mockModel, [mockTool], mockPrompt, options);
expect(createToolCallingAgent).toHaveBeenCalledWith({
llm: mockModel,
tools: [mockTool],
prompt: mockPrompt,
streamRunnable: false,
});
expect(RunnableSequence.from).toHaveBeenCalledWith([
mockAgent,
mockStepsParser,
commonHelpers.fixEmptyContentMessage,
]);
expect(result.singleAction).toBe(true);
expect(result.streamRunnable).toBe(false);
});
it('should create agent sequence with fallback model', () => {
const mockFallbackModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockFallbackAgent = mock<any>();
const mockAgentWithFallback = mock<any>();
const mockRunnableSequence = mock<any>();
const mockStepsParser = jest.fn();
mockAgent.withFallbacks = jest.fn().mockReturnValue(mockAgentWithFallback);
(createToolCallingAgent as jest.Mock)
.mockReturnValueOnce(mockAgent)
.mockReturnValueOnce(mockFallbackAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getAgentStepsParser').mockReturnValue(mockStepsParser);
const options = { maxIterations: 10, returnIntermediateSteps: false };
createAgentSequence(
mockModel,
[mockTool],
mockPrompt,
options,
undefined,
undefined,
mockFallbackModel,
);
expect(createToolCallingAgent).toHaveBeenCalledTimes(2);
expect(createToolCallingAgent).toHaveBeenNthCalledWith(1, {
llm: mockModel,
tools: [mockTool],
prompt: mockPrompt,
streamRunnable: false,
});
expect(createToolCallingAgent).toHaveBeenNthCalledWith(2, {
llm: mockFallbackModel,
tools: [mockTool],
prompt: mockPrompt,
streamRunnable: false,
});
expect(mockAgent.withFallbacks).toHaveBeenCalledWith([mockFallbackAgent]);
expect(RunnableSequence.from).toHaveBeenCalledWith([
mockAgentWithFallback,
mockStepsParser,
commonHelpers.fixEmptyContentMessage,
]);
});
it('should pass output parser to getAgentStepsParser', () => {
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
const mockOutputParser = mock<any>();
const mockStepsParser = jest.fn();
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getAgentStepsParser').mockReturnValue(mockStepsParser);
const options = { maxIterations: 10, returnIntermediateSteps: false };
createAgentSequence(mockModel, [mockTool], mockPrompt, options, mockOutputParser);
expect(commonHelpers.getAgentStepsParser).toHaveBeenCalledWith(mockOutputParser, undefined);
});
it('should pass memory to getAgentStepsParser', () => {
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
const mockMemory = mock<any>();
const mockStepsParser = jest.fn();
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getAgentStepsParser').mockReturnValue(mockStepsParser);
const options = { maxIterations: 10, returnIntermediateSteps: false };
createAgentSequence(mockModel, [mockTool], mockPrompt, options, undefined, mockMemory);
expect(commonHelpers.getAgentStepsParser).toHaveBeenCalledWith(undefined, mockMemory);
});
it('should set streamRunnable to false for agents', () => {
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
const mockStepsParser = jest.fn();
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getAgentStepsParser').mockReturnValue(mockStepsParser);
const options = { maxIterations: 10, returnIntermediateSteps: false };
createAgentSequence(mockModel, [mockTool], mockPrompt, options);
expect(createToolCallingAgent).toHaveBeenCalledWith(
expect.objectContaining({
streamRunnable: false,
}),
);
});
it('should handle null fallback model', () => {
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
const mockStepsParser = jest.fn();
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getAgentStepsParser').mockReturnValue(mockStepsParser);
const options = { maxIterations: 10, returnIntermediateSteps: false };
createAgentSequence(mockModel, [mockTool], mockPrompt, options, undefined, undefined, null);
// Should only create one agent (no fallback)
expect(createToolCallingAgent).toHaveBeenCalledTimes(1);
expect(RunnableSequence.from).toHaveBeenCalledWith([
mockAgent,
mockStepsParser,
commonHelpers.fixEmptyContentMessage,
]);
});
it('should create sequence with multiple tools', () => {
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
const mockTool2 = mock<Tool>();
const mockStepsParser = jest.fn();
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getAgentStepsParser').mockReturnValue(mockStepsParser);
const options = { maxIterations: 10, returnIntermediateSteps: false };
createAgentSequence(mockModel, [mockTool, mockTool2], mockPrompt, options);
expect(createToolCallingAgent).toHaveBeenCalledWith(
expect.objectContaining({
tools: [mockTool, mockTool2],
}),
);
});
});

View File

@ -0,0 +1,166 @@
import { mock } from 'jest-mock-extended';
import type { BaseChatMemory } from 'langchain/memory';
import type { N8nOutputParser } from '@utils/output_parsers/N8nOutputParser';
import { finalizeResult } from '../finalizeResult';
describe('finalizeResult', () => {
it('should finalize result without memory or output parser', () => {
const result = {
output: 'Test output',
system_message: 'You are a helpful assistant',
formatting_instructions: 'Format as JSON',
input: 'Test input',
chat_history: [],
agent_scratchpad: 'scratch',
};
const finalized = finalizeResult(result, 0, undefined, undefined);
expect(finalized).toEqual({
json: {
output: 'Test output',
},
pairedItem: { item: 0 },
});
});
it('should omit internal keys from result', () => {
const result = {
output: 'Test output',
customField: 'custom value',
system_message: 'You are a helpful assistant',
formatting_instructions: 'Format as JSON',
input: 'Test input',
chat_history: [],
agent_scratchpad: 'scratch',
};
const finalized = finalizeResult(result, 0, undefined, undefined);
expect(finalized.json).toEqual({
output: 'Test output',
customField: 'custom value',
});
expect(finalized.json).not.toHaveProperty('system_message');
expect(finalized.json).not.toHaveProperty('formatting_instructions');
expect(finalized.json).not.toHaveProperty('input');
expect(finalized.json).not.toHaveProperty('chat_history');
expect(finalized.json).not.toHaveProperty('agent_scratchpad');
});
it('should parse output when memory and outputParser are connected', () => {
const mockMemory = mock<BaseChatMemory>();
const mockOutputParser = mock<N8nOutputParser>();
const result = {
output: JSON.stringify({ output: { result: 'parsed result' } }),
};
const finalized = finalizeResult(result, 0, mockMemory, mockOutputParser);
expect(finalized.json.output).toEqual({ result: 'parsed result' });
});
it('should handle output without nested output field when parsing', () => {
const mockMemory = mock<BaseChatMemory>();
const mockOutputParser = mock<N8nOutputParser>();
const result = {
output: JSON.stringify({ result: 'direct result' }),
};
const finalized = finalizeResult(result, 0, mockMemory, mockOutputParser);
expect(finalized.json.output).toEqual({ result: 'direct result' });
});
it('should set correct pairedItem index', () => {
const result = {
output: 'Test output',
};
const finalized = finalizeResult(result, 5, undefined, undefined);
expect(finalized.pairedItem).toEqual({ item: 5 });
});
it('should preserve intermediate steps when present', () => {
const result = {
output: 'Test output',
intermediateSteps: [
{
action: { tool: 'test_tool', toolInput: {}, log: 'log', toolCallId: 'id', type: 'type' },
observation: 'observation',
},
],
};
const finalized = finalizeResult(result, 0, undefined, undefined);
expect(finalized.json.intermediateSteps).toBeDefined();
expect(finalized.json.intermediateSteps).toHaveLength(1);
});
it('should not parse output when only memory is connected', () => {
const mockMemory = mock<BaseChatMemory>();
const result = {
output: JSON.stringify({ output: { result: 'should not parse' } }),
};
const finalized = finalizeResult(result, 0, mockMemory, undefined);
// Should remain as string
expect(typeof finalized.json.output).toBe('string');
expect(finalized.json.output).toBe(JSON.stringify({ output: { result: 'should not parse' } }));
});
it('should not parse output when only outputParser is connected', () => {
const mockOutputParser = mock<N8nOutputParser>();
const result = {
output: JSON.stringify({ output: { result: 'should not parse' } }),
};
const finalized = finalizeResult(result, 0, undefined, mockOutputParser);
// Should remain as string
expect(typeof finalized.json.output).toBe('string');
expect(finalized.json.output).toBe(JSON.stringify({ output: { result: 'should not parse' } }));
});
it('should throw error when parsing invalid JSON', () => {
const mockMemory = mock<BaseChatMemory>();
const mockOutputParser = mock<N8nOutputParser>();
const result = {
output: 'not valid JSON',
};
// jsonParse throws an error on invalid JSON
expect(() => finalizeResult(result, 0, mockMemory, mockOutputParser)).toThrow();
});
it('should handle multiple custom fields in result', () => {
const result = {
output: 'Test output',
field1: 'value1',
field2: 123,
field3: true,
field4: { nested: 'object' },
system_message: 'should be omitted',
};
const finalized = finalizeResult(result, 0, undefined, undefined);
expect(finalized.json).toEqual({
output: 'Test output',
field1: 'value1',
field2: 123,
field3: true,
field4: { nested: 'object' },
});
});
});

View File

@ -0,0 +1,214 @@
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import { mock } from 'jest-mock-extended';
import type { Tool } from 'langchain/tools';
import type { IExecuteFunctions, INode, EngineResponse } from 'n8n-workflow';
import * as helpers from '@utils/helpers';
import * as outputParsers from '@utils/output_parsers/N8nOutputParser';
import * as commonHelpers from '../../../common';
import type { RequestResponseMetadata } from '../../types';
import { prepareItemContext } from '../prepareItemContext';
jest.mock('@utils/helpers', () => ({
getPromptInputByType: jest.fn(),
}));
jest.mock('@utils/output_parsers/N8nOutputParser', () => ({
getOptionalOutputParser: jest.fn(),
}));
jest.mock('../../../common', () => ({
getTools: jest.fn(),
prepareMessages: jest.fn(),
preparePrompt: jest.fn(),
}));
const mockContext = mock<IExecuteFunctions>();
const mockNode = mock<INode>();
beforeEach(() => {
jest.clearAllMocks();
mockContext.getNode.mockReturnValue(mockNode);
});
describe('processItem', () => {
it('should return null when item was already processed', async () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: { itemIndex: 0, previousRequests: [] },
};
const result = await prepareItemContext(mockContext, 0, response);
expect(result).toBeNull();
});
it('should process item and return context', async () => {
const mockTool = mock<Tool>();
const mockPrompt = mock<ChatPromptTemplate>();
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
jest.spyOn(outputParsers, 'getOptionalOutputParser').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mockTool]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mockPrompt);
mockContext.getNodeParameter.mockImplementation((param) => {
if (param === 'options') {
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
}
return undefined;
});
const result = await prepareItemContext(mockContext, 0);
expect(result).not.toBeNull();
expect(result?.itemIndex).toBe(0);
expect(result?.input).toBe('test input');
expect(result?.tools).toEqual([mockTool]);
expect(result?.prompt).toBe(mockPrompt);
expect(result?.steps).toEqual([]);
});
it('should enable streaming by default when not specified', async () => {
const mockTool = mock<Tool>();
const mockPrompt = mock<ChatPromptTemplate>();
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
jest.spyOn(outputParsers, 'getOptionalOutputParser').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mockTool]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mockPrompt);
mockContext.getNodeParameter.mockImplementation((param) => {
if (param === 'options') {
return {
systemMessage: 'You are a helpful assistant',
// enableStreaming not set
};
}
return undefined;
});
const result = await prepareItemContext(mockContext, 0);
expect(result?.options.enableStreaming).toBe(true);
});
it('should respect enableStreaming option when set', async () => {
const mockTool = mock<Tool>();
const mockPrompt = mock<ChatPromptTemplate>();
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
jest.spyOn(outputParsers, 'getOptionalOutputParser').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mockTool]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mockPrompt);
mockContext.getNodeParameter.mockImplementation((param) => {
if (param === 'options') {
return {
systemMessage: 'You are a helpful assistant',
enableStreaming: false,
};
}
return undefined;
});
const result = await prepareItemContext(mockContext, 0);
expect(result?.options.enableStreaming).toBe(false);
});
it('should include output parser when available', async () => {
const mockTool = mock<Tool>();
const mockPrompt = mock<ChatPromptTemplate>();
const mockOutputParser = mock<any>();
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
jest.spyOn(outputParsers, 'getOptionalOutputParser').mockResolvedValue(mockOutputParser);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mockTool]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mockPrompt);
mockContext.getNodeParameter.mockImplementation((param) => {
if (param === 'options') {
return {
systemMessage: 'You are a helpful assistant',
};
}
return undefined;
});
const result = await prepareItemContext(mockContext, 0);
expect(result?.outputParser).toBe(mockOutputParser);
});
it('should pass outputParser to prepareMessages', async () => {
const mockTool = mock<Tool>();
const mockPrompt = mock<ChatPromptTemplate>();
const mockOutputParser = mock<any>();
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
jest.spyOn(outputParsers, 'getOptionalOutputParser').mockResolvedValue(mockOutputParser);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mockTool]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mockPrompt);
mockContext.getNodeParameter.mockImplementation((param) => {
if (param === 'options') {
return {
systemMessage: 'Test system message',
passthroughBinaryImages: false,
};
}
return undefined;
});
await prepareItemContext(mockContext, 0);
expect(commonHelpers.prepareMessages).toHaveBeenCalledWith(mockContext, 0, {
systemMessage: 'Test system message',
passthroughBinaryImages: false,
outputParser: mockOutputParser,
});
});
it('should use passthroughBinaryImages default value when not specified', async () => {
const mockTool = mock<Tool>();
const mockPrompt = mock<ChatPromptTemplate>();
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
jest.spyOn(outputParsers, 'getOptionalOutputParser').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mockTool]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mockPrompt);
mockContext.getNodeParameter.mockImplementation((param) => {
if (param === 'options') {
return {
systemMessage: 'Test system message',
// passthroughBinaryImages not set
};
}
return undefined;
});
await prepareItemContext(mockContext, 0);
expect(commonHelpers.prepareMessages).toHaveBeenCalledWith(
mockContext,
0,
expect.objectContaining({
passthroughBinaryImages: true,
}),
);
});
});

View File

@ -0,0 +1,251 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import { mock } from 'jest-mock-extended';
import type { AgentRunnableSequence } from 'langchain/agents';
import type { Tool } from 'langchain/tools';
import type { IExecuteFunctions, INode, EngineResponse } from 'n8n-workflow';
import * as agentExecution from '@utils/agent-execution';
import type { RequestResponseMetadata } from '../../types';
import type { ItemContext } from '../prepareItemContext';
import { runAgent } from '../runAgent';
jest.mock('@utils/agent-execution', () => ({
loadMemory: jest.fn(),
processEventStream: jest.fn(),
buildSteps: jest.fn(),
createEngineRequests: jest.fn(),
saveToMemory: jest.fn(),
}));
const mockContext = mock<IExecuteFunctions>();
const mockNode = mock<INode>();
beforeEach(() => {
jest.clearAllMocks();
mockContext.getNode.mockReturnValue(mockNode);
mockNode.typeVersion = 3;
});
describe('runAgent - iteration count tracking', () => {
it('should set iteration count to 1 on first call (no response)', async () => {
const mockExecutor = mock<AgentRunnableSequence>({
invoke: jest.fn().mockResolvedValue([
{
toolCalls: [
{
id: 'call_123',
name: 'TestTool',
args: { input: 'test' },
type: 'tool_call',
},
],
},
]),
});
const mockModel = mock<BaseChatModel>();
const mockTool = mock<Tool>();
mockTool.name = 'TestTool';
mockTool.metadata = { sourceNodeName: 'Test Tool' };
const itemContext: ItemContext = {
itemIndex: 0,
input: 'test input',
steps: [],
tools: [mockTool],
prompt: mock(),
options: {
maxIterations: 10,
returnIntermediateSteps: false,
},
outputParser: undefined,
};
jest.spyOn(agentExecution, 'loadMemory').mockResolvedValue([]);
jest.spyOn(agentExecution, 'buildSteps').mockReturnValue([]);
jest.spyOn(agentExecution, 'createEngineRequests').mockResolvedValue([
{
actionType: 'ExecutionNodeAction' as const,
nodeName: 'Test Tool',
input: { input: 'test' },
type: 'ai_tool' as any,
id: 'call_123',
metadata: { itemIndex: 0 },
},
]);
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await runAgent(mockContext, mockExecutor, itemContext, mockModel, undefined);
expect(result).toHaveProperty('actions');
expect(result).toHaveProperty('metadata');
expect((result as any).metadata.iterationCount).toBe(1);
});
it('should increment iteration count when response is provided', async () => {
const mockExecutor = mock<AgentRunnableSequence>({
invoke: jest.fn().mockResolvedValue([
{
toolCalls: [
{
id: 'call_456',
name: 'TestTool',
args: { input: 'test2' },
type: 'tool_call',
},
],
},
]),
});
const mockModel = mock<BaseChatModel>();
const mockTool = mock<Tool>();
mockTool.name = 'TestTool';
mockTool.metadata = { sourceNodeName: 'Test Tool' };
const itemContext: ItemContext = {
itemIndex: 0,
input: 'test input',
steps: [],
tools: [mockTool],
prompt: mock(),
options: {
maxIterations: 10,
returnIntermediateSteps: false,
},
outputParser: undefined,
};
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [],
metadata: { itemIndex: 0, previousRequests: [], iterationCount: 2 },
};
jest.spyOn(agentExecution, 'loadMemory').mockResolvedValue([]);
jest.spyOn(agentExecution, 'buildSteps').mockReturnValue([]);
jest.spyOn(agentExecution, 'createEngineRequests').mockResolvedValue([
{
actionType: 'ExecutionNodeAction' as const,
nodeName: 'Test Tool',
input: { input: 'test2' },
type: 'ai_tool' as any,
id: 'call_456',
metadata: { itemIndex: 0 },
},
]);
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await runAgent(
mockContext,
mockExecutor,
itemContext,
mockModel,
undefined,
response,
);
expect(result).toHaveProperty('actions');
expect(result).toHaveProperty('metadata');
expect((result as any).metadata.iterationCount).toBe(3);
});
it('should set iteration count to 1 in streaming mode on first call', async () => {
const mockEventStream = (async function* () {})();
const mockExecutor = mock<AgentRunnableSequence>({
streamEvents: jest.fn().mockReturnValue(mockEventStream),
});
const mockModel = mock<BaseChatModel>();
const mockTool = mock<Tool>();
mockTool.name = 'TestTool';
mockTool.metadata = { sourceNodeName: 'Test Tool' };
const itemContext: ItemContext = {
itemIndex: 0,
input: 'test input',
steps: [],
tools: [mockTool],
prompt: mock(),
options: {
maxIterations: 10,
returnIntermediateSteps: false,
enableStreaming: true,
},
outputParser: undefined,
};
const mockContext = mock<IExecuteFunctions>({
getNode: jest.fn().mockReturnValue(mockNode),
isStreaming: jest.fn().mockReturnValue(true),
getExecutionCancelSignal: jest.fn().mockReturnValue(new AbortController().signal),
});
mockNode.typeVersion = 2.1;
// Mock streaming to return tool calls
jest.spyOn(agentExecution, 'loadMemory').mockResolvedValue([]);
jest.spyOn(agentExecution, 'processEventStream').mockResolvedValue({
output: '',
toolCalls: [
{
tool: 'TestTool',
toolInput: { input: 'test' },
toolCallId: 'call_123',
type: 'tool_call',
},
],
});
jest.spyOn(agentExecution, 'buildSteps').mockReturnValue([]);
jest.spyOn(agentExecution, 'createEngineRequests').mockResolvedValue([
{
actionType: 'ExecutionNodeAction' as const,
nodeName: 'Test Tool',
input: { input: 'test' },
type: 'ai_tool' as any,
id: 'call_123',
metadata: { itemIndex: 0 },
},
]);
const result = await runAgent(mockContext, mockExecutor, itemContext, mockModel, undefined);
expect(result).toHaveProperty('actions');
expect(result).toHaveProperty('metadata');
expect((result as any).metadata.iterationCount).toBe(1);
});
it('should not include iteration count when returning final result', async () => {
const mockExecutor = mock<AgentRunnableSequence>({
invoke: jest.fn().mockResolvedValue({
returnValues: {
output: 'Final answer',
},
}),
});
const mockModel = mock<BaseChatModel>();
const itemContext: ItemContext = {
itemIndex: 0,
input: 'test input',
steps: [],
tools: [],
prompt: mock(),
options: {
maxIterations: 10,
returnIntermediateSteps: false,
},
outputParser: undefined,
};
// Mock the agent to return a final result (no tool calls)
jest.spyOn(agentExecution, 'loadMemory').mockResolvedValue([]);
jest.spyOn(agentExecution, 'saveToMemory').mockResolvedValue();
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await runAgent(mockContext, mockExecutor, itemContext, mockModel, undefined);
expect(result).toHaveProperty('output');
expect(result).not.toHaveProperty('actions');
expect(result).not.toHaveProperty('metadata');
});
});

View File

@ -0,0 +1,34 @@
import type {
ToolCallData,
ToolCallRequest,
AgentResult,
RequestResponseMetadata as SharedRequestResponseMetadata,
} from '@utils/agent-execution';
// Re-export shared types for backwards compatibility
export type { ToolCallData, ToolCallRequest, AgentResult };
// Use the shared metadata type directly (it already includes previousRequests)
export type RequestResponseMetadata = SharedRequestResponseMetadata;
// Keep the IntermediateStep type for compatibility
export type IntermediateStep = {
action: {
tool: string;
toolInput: Record<string, unknown>;
log: string;
messageLog: unknown[];
toolCallId: string;
type: string;
};
observation?: string;
};
export type AgentOptions = {
systemMessage?: string;
maxIterations?: number;
returnIntermediateSteps?: boolean;
passthroughBinaryImages?: boolean;
enableStreaming?: boolean;
maxTokensFromMemory?: number;
};

View File

@ -3,7 +3,6 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'
import { HumanMessage } from '@langchain/core/messages';
import type { BaseMessagePromptTemplateLike } from '@langchain/core/prompts';
import { FakeLLM, FakeStreamingChatModel } from '@langchain/core/utils/testing';
import type { N8nOutputParser } from '@utils/output_parsers/N8nOutputParser';
import { Buffer } from 'buffer';
import { mock } from 'jest-mock-extended';
import type { AgentAction, AgentFinish } from 'langchain/agents';
@ -14,6 +13,8 @@ import { NodeOperationError, BINARY_ENCODING, NodeConnectionTypes } from 'n8n-wo
import type { ZodType } from 'zod';
import { z } from 'zod';
import type { N8nOutputParser } from '@utils/output_parsers/N8nOutputParser';
import {
getOutputParserSchema,
extractBinaryMessages,

View File

@ -0,0 +1,76 @@
import { AIMessage } from '@langchain/core/messages';
import { nodeNameToToolName } from 'n8n-workflow';
import type { EngineResponse, IDataObject } from 'n8n-workflow';
import type { RequestResponseMetadata, ToolCallData } from './types';
/**
* Rebuilds the agent steps from previous tool call responses.
* This is used to continue agent execution after tool calls have been made.
*
* This is a generalized version that can be used across different agent types
* (Tools Agent, OpenAI Functions Agent, etc.).
*
* @param response - The engine response containing tool call results
* @param itemIndex - The current item index being processed
* @returns Array of tool call data representing the agent steps
*/
export function buildSteps(
response: EngineResponse<RequestResponseMetadata> | undefined,
itemIndex: number,
): ToolCallData[] {
const steps: ToolCallData[] = [];
if (response) {
const responses = response?.actionResponses ?? [];
if (response.metadata?.previousRequests) {
steps.push.apply(steps, response.metadata.previousRequests);
}
for (const tool of responses) {
if (tool.action?.metadata?.itemIndex !== itemIndex) continue;
const toolInput: IDataObject = {
...tool.action.input,
id: tool.action.id,
};
if (!toolInput || !tool.data) {
continue;
}
const step = steps.find((step) => step.action.toolCallId === toolInput.id);
if (step) {
continue;
}
// Create a synthetic AI message for the messageLog
// This represents the AI's decision to call the tool
const syntheticAIMessage = new AIMessage({
content: `Calling ${tool.action.nodeName} with input: ${JSON.stringify(toolInput)}`,
tool_calls: [
{
id: (toolInput?.id as string) ?? 'reconstructed_call',
name: nodeNameToToolName(tool.action.nodeName),
args: toolInput,
type: 'tool_call',
},
],
});
const toolResult = {
action: {
tool: nodeNameToToolName(tool.action.nodeName),
toolInput: (toolInput.input as IDataObject) || {},
log: toolInput.log || syntheticAIMessage.content,
messageLog: [syntheticAIMessage],
toolCallId: toolInput?.id,
type: toolInput.type || 'tool_call',
},
observation: JSON.stringify(tool.data?.data?.ai_tool?.[0]?.map((item) => item?.json) ?? ''),
};
steps.push(toolResult);
}
}
return steps;
}

View File

@ -0,0 +1,54 @@
import type { DynamicStructuredTool, Tool } from 'langchain/tools';
import { NodeConnectionTypes } from 'n8n-workflow';
import type { EngineRequest, IDataObject } from 'n8n-workflow';
import type { RequestResponseMetadata, ToolCallRequest } from './types';
/**
* Creates engine requests from tool calls.
* Maps tool call information to the format expected by the n8n engine
* for executing tool nodes.
*
* This is a generalized version that can be used across different agent types
* (Tools Agent, OpenAI Functions Agent, etc.).
*
* @param toolCalls - Array of tool call requests to convert
* @param itemIndex - The current item index
* @param tools - Array of available tools
* @returns Array of engine request objects (filtered to remove undefined entries)
*/
export async function createEngineRequests(
toolCalls: ToolCallRequest[],
itemIndex: number,
tools: Array<DynamicStructuredTool | Tool>,
): Promise<EngineRequest<RequestResponseMetadata>['actions']> {
return toolCalls
.map((toolCall) => {
// First try to get from metadata (for toolkit tools)
const foundTool = tools.find((tool) => tool.name === toolCall.tool);
if (!foundTool) return undefined;
const nodeName = foundTool.metadata?.sourceNodeName as string | undefined;
// Ensure nodeName is defined
if (!nodeName) return undefined;
// For toolkit tools, include the tool name so the node knows which tool to execute
const input = foundTool.metadata?.isFromToolkit
? { ...toolCall.toolInput, tool: toolCall.tool }
: toolCall.toolInput;
return {
actionType: 'ExecutionNodeAction' as const,
nodeName,
input: input as IDataObject,
type: NodeConnectionTypes.AiTool,
id: toolCall.toolCallId,
metadata: {
itemIndex,
},
};
})
.filter((item): item is NonNullable<typeof item> => item !== undefined);
}

View File

@ -0,0 +1,20 @@
/**
* Agent Execution Utilities
*
* This module contains generalized utilities for agent execution that can be
* reused across different agent types (Tools Agent, OpenAI Functions Agent, etc.).
*
* These utilities support engine-based tool execution, where tool calls are
* delegated to the n8n workflow engine instead of being executed inline.
*/
export { createEngineRequests } from './createEngineRequests';
export { buildSteps } from './buildSteps';
export { processEventStream } from './processEventStream';
export { loadMemory, saveToMemory, saveToolResultsToMemory } from './memoryManagement';
export type {
ToolCallRequest,
ToolCallData,
AgentResult,
RequestResponseMetadata,
} from './types';

View File

@ -0,0 +1,113 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { BaseMessage } from '@langchain/core/messages';
import { trimMessages } from '@langchain/core/messages';
import type { BaseChatMemory } from 'langchain/memory';
import type { ToolCallData } from './types';
/**
* Loads chat history from memory and optionally trims it to fit within token limits.
*
* @param memory - The memory instance to load from
* @param model - Optional chat model for token counting (required if maxTokens is specified)
* @param maxTokens - Optional maximum number of tokens to load from memory
* @returns Array of base messages representing the chat history
*
* @example
* ```typescript
* // Load all history
* const messages = await loadMemory(memory);
*
* // Load with token limit
* const messages = await loadMemory(memory, model, 2000);
* ```
*/
export async function loadMemory(
memory?: BaseChatMemory,
model?: BaseChatModel,
maxTokens?: number,
): Promise<BaseMessage[] | undefined> {
if (!memory) {
return undefined;
}
const memoryVariables = await memory.loadMemoryVariables({});
let chatHistory = (memoryVariables['chat_history'] as BaseMessage[]) || [];
// Trim messages if token limit is specified and model is available
if (maxTokens && model) {
chatHistory = await trimMessages(chatHistory, {
strategy: 'last',
maxTokens,
tokenCounter: model,
includeSystem: true,
startOn: 'human',
allowPartial: true,
});
}
return chatHistory;
}
/**
* Saves a conversation turn (user input + agent output) to memory.
*
* @param memory - The memory instance to save to
* @param input - The user input/prompt
* @param output - The agent's output/response
*
* @example
* ```typescript
* await saveToMemory(memory, 'What is 2+2?', 'The answer is 4');
* ```
*/
export async function saveToMemory(
input: string,
output: string,
memory?: BaseChatMemory,
): Promise<void> {
if (!output || !memory) {
return;
}
await memory.saveContext({ input }, { output });
}
/**
* Saves tool call results to memory as formatted messages.
*
* This preserves the full conversation including tool interactions,
* which is important for agents that need to see their tool usage history.
*
* @param memory - The memory instance to save to
* @param input - The user input that triggered the tool calls
* @param toolResults - Array of tool call results to save
*
* @example
* ```typescript
* await saveToolResultsToMemory(memory, 'Calculate 2+2', [{
* action: {
* tool: 'calculator',
* toolInput: { expression: '2+2' },
* log: 'Using calculator',
* toolCallId: 'call_123',
* type: 'tool_call'
* },
* observation: '4'
* }]);
* ```
*/
export async function saveToolResultsToMemory(
input: string,
toolResults: ToolCallData[],
memory?: BaseChatMemory,
): Promise<void> {
if (!memory || !toolResults.length) {
return;
}
// Save each tool call as a formatted message
for (const result of toolResults) {
const toolMessage = `Tool: ${result.action.tool}, Input: ${JSON.stringify(result.action.toolInput)}, Result: ${result.observation}`;
await memory.saveContext({ input }, { output: toolMessage });
}
}

View File

@ -0,0 +1,153 @@
import type { StreamEvent } from '@langchain/core/dist/tracers/event_stream';
import type { IterableReadableStream } from '@langchain/core/dist/utils/stream';
import type { AIMessageChunk, MessageContentText } from '@langchain/core/messages';
import type { BaseChatMemory } from 'langchain/memory';
import type { IExecuteFunctions } from 'n8n-workflow';
import { saveToMemory, saveToolResultsToMemory } from './memoryManagement';
import type { AgentResult, ToolCallRequest } from './types';
/**
* Processes the event stream from a streaming agent execution.
* Handles streaming chunks, tool calls, and intermediate steps.
*
* This is a generalized version that can be used across different agent types
* (Tools Agent, OpenAI Functions Agent, etc.).
*
* @param ctx - The execution context
* @param eventStream - The stream of events from the agent
* @param itemIndex - The current item index
* @param returnIntermediateSteps - Whether to capture intermediate steps
* @param memory - Optional memory for saving context
* @param input - The original input prompt
* @returns AgentResult containing output and optional tool calls/steps
*/
export async function processEventStream(
ctx: IExecuteFunctions,
eventStream: IterableReadableStream<StreamEvent>,
itemIndex: number,
returnIntermediateSteps: boolean = false,
memory?: BaseChatMemory,
input?: string,
): Promise<AgentResult> {
const agentResult: AgentResult = {
output: '',
};
if (returnIntermediateSteps) {
agentResult.intermediateSteps = [];
}
const toolCalls: ToolCallRequest[] = [];
ctx.sendChunk('begin', itemIndex);
for await (const event of eventStream) {
// Stream chat model tokens as they come in
switch (event.event) {
case 'on_chat_model_stream':
const chunk = event.data?.chunk as AIMessageChunk;
if (chunk?.content) {
const chunkContent = chunk.content;
let chunkText = '';
if (Array.isArray(chunkContent)) {
for (const message of chunkContent) {
if (message?.type === 'text') {
chunkText += (message as MessageContentText)?.text;
}
}
} else if (typeof chunkContent === 'string') {
chunkText = chunkContent;
}
ctx.sendChunk('item', itemIndex, chunkText);
agentResult.output += chunkText;
}
break;
case 'on_chat_model_end':
// Capture full LLM response with tool calls for intermediate steps
if (event.data) {
const chatModelData = event.data;
const output = chatModelData.output;
// Check if this LLM response contains tool calls
if (output?.tool_calls && output.tool_calls.length > 0) {
// Collect tool calls for request building
for (const toolCall of output.tool_calls) {
toolCalls.push({
tool: toolCall.name,
toolInput: toolCall.args,
toolCallId: toolCall.id || 'unknown',
type: toolCall.type || 'tool_call',
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output],
});
}
// Also add to intermediate steps if needed
if (returnIntermediateSteps) {
for (const toolCall of output.tool_calls) {
agentResult.intermediateSteps?.push({
action: {
tool: toolCall.name,
toolInput: toolCall.args,
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output], // Include the full LLM response
toolCallId: toolCall.id || 'unknown',
type: toolCall.type || 'tool_call',
},
observation: '',
});
}
}
}
}
break;
case 'on_tool_end':
// Capture tool execution results and match with action
if (returnIntermediateSteps && event.data && agentResult.intermediateSteps!.length > 0) {
const toolData = event.data as { output?: string };
// Find the matching intermediate step for this tool call
const matchingStep = agentResult.intermediateSteps?.find(
(step) => !step.observation && step.action.tool === event.name,
);
if (matchingStep) {
matchingStep.observation = toolData.output || '';
// Save tool result to memory
if (matchingStep.observation && input) {
await saveToolResultsToMemory(
input,
[
{
action: matchingStep.action,
observation: matchingStep.observation,
},
],
memory,
);
}
}
}
break;
default:
break;
}
}
ctx.sendChunk('end', itemIndex);
// Save conversation to memory if memory is connected
if (input && agentResult.output) {
await saveToMemory(input, agentResult.output, memory);
}
// Include collected tool calls in the result
if (toolCalls.length > 0) {
agentResult.toolCalls = toolCalls;
}
return agentResult;
}

View File

@ -0,0 +1,541 @@
import type { EngineResponse } from 'n8n-workflow';
import { NodeConnectionTypes } from 'n8n-workflow';
import { buildSteps } from '../buildSteps';
import type { RequestResponseMetadata } from '../types';
describe('buildSteps', () => {
const itemIndex = 0;
describe('Basic functionality', () => {
it('should return empty array when response is undefined', () => {
const result = buildSteps(undefined, itemIndex);
expect(result).toEqual([]);
});
it('should build steps from engine response', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(1);
expect(result[0]).toMatchObject({
action: {
tool: 'Calculator',
toolInput: { expression: '2+2' },
toolCallId: 'call_123',
type: 'tool_call',
},
observation: JSON.stringify([{ result: '4' }]),
});
});
it('should handle multiple tool responses', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Search',
input: {
id: 'call_124',
input: { query: 'TypeScript' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_124',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { results: ['result1', 'result2'] } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(2);
expect(result[0].action.tool).toBe('Calculator');
expect(result[1].action.tool).toBe('Search');
});
it('should filter out responses for different item indexes', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Search',
input: {
id: 'call_124',
input: { query: 'TypeScript' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_124',
metadata: {
itemIndex: 1, // Different item index
},
},
data: {
data: {
ai_tool: [[{ json: { results: ['result1', 'result2'] } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, 0);
expect(result).toHaveLength(1);
expect(result[0].action.tool).toBe('Calculator');
});
it('should handle responses with minimal toolInput', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
// Missing input property - will result in empty toolInput
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
// Even with minimal input, a step is created with empty toolInput
expect(result).toHaveLength(1);
expect(result[0].action.toolInput).toEqual({});
});
describe('Previous requests handling', () => {
it('should include previous requests from metadata', () => {
const previousRequests = [
{
action: {
tool: 'previous_tool',
toolInput: { input: 'previous' },
log: 'Previous log',
toolCallId: 'call_prev',
type: 'tool_call',
},
observation: 'previous result',
},
];
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {
previousRequests,
},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(2);
expect(result[0]).toEqual(previousRequests[0]);
expect(result[1].action.tool).toBe('Calculator');
});
it('should not duplicate steps that already exist in previousRequests', () => {
const previousRequests = [
{
action: {
tool: 'calculator',
toolInput: { expression: '2+2' },
log: 'Previous log',
toolCallId: 'call_123',
type: 'tool_call',
},
observation: '4',
},
];
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123', // Same ID as in previousRequests
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {
previousRequests,
},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(1);
expect(result[0]).toEqual(previousRequests[0]);
});
});
describe('Synthetic AI message creation', () => {
it('should create synthetic AI message with tool calls', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator Node',
input: {
id: 'call_123',
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(1);
expect(result[0].action.messageLog).toBeDefined();
expect(result[0].action.messageLog).toHaveLength(1);
const message = result[0].action.messageLog![0];
expect(message).toHaveProperty('tool_calls');
expect(message.tool_calls).toHaveLength(1);
expect(message.tool_calls?.[0]).toMatchObject({
id: 'call_123',
name: 'Calculator_Node',
type: 'tool_call',
});
});
it('should use custom log if provided', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
log: 'Custom log message',
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(1);
expect(result[0].action.log).toBe('Custom log message');
});
it('should use custom type if provided', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
type: 'custom_type',
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [[{ json: { result: '4' } }]],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(1);
expect(result[0].action.type).toBe('custom_type');
});
});
});
describe('Observation formatting', () => {
it('should stringify tool result data correctly', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [
[
{ json: { result: '4', status: 'success' } },
{ json: { metadata: { timestamp: 123456789 } } },
],
],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(1);
expect(result[0].observation).toBe(
JSON.stringify([
{ result: '4', status: 'success' },
{ metadata: { timestamp: 123456789 } },
]),
);
});
it('should handle empty ai_tool data', () => {
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: {
id: 'call_123',
input: { expression: '2+2' },
},
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
},
data: {
data: {
ai_tool: [],
},
executionTime: 0,
startTime: 0,
executionIndex: 0,
source: [],
},
},
],
metadata: {},
};
const result = buildSteps(response, itemIndex);
expect(result).toHaveLength(1);
expect(result[0].observation).toBe('""');
});
});
});

View File

@ -0,0 +1,343 @@
import { DynamicStructuredTool } from 'langchain/tools';
import { NodeConnectionTypes } from 'n8n-workflow';
import { z } from 'zod';
import { createEngineRequests } from '../createEngineRequests';
import type { ToolCallRequest } from '../types';
describe('createEngineRequests', () => {
const createMockTool = (
name: string,
metadata?: {
sourceNodeName?: string;
isFromToolkit?: boolean;
},
) => {
return new DynamicStructuredTool({
name,
description: `A test tool named ${name}`,
schema: z.object({
input: z.string(),
}),
func: async () => 'result',
metadata,
});
};
describe('Basic functionality', () => {
it('should create engine requests from tool calls', async () => {
const tools = [
createMockTool('calculator', { sourceNodeName: 'Calculator' }),
createMockTool('search', { sourceNodeName: 'Search' }),
];
const toolCalls: ToolCallRequest[] = [
{
tool: 'calculator',
toolInput: { expression: '2+2' },
toolCallId: 'call_123',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(1);
expect(result[0]).toEqual({
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: { expression: '2+2' },
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 0,
},
});
});
it('should handle multiple tool calls', async () => {
const tools = [
createMockTool('calculator', { sourceNodeName: 'Calculator' }),
createMockTool('search', { sourceNodeName: 'Search' }),
];
const toolCalls: ToolCallRequest[] = [
{
tool: 'calculator',
toolInput: { expression: '2+2' },
toolCallId: 'call_123',
},
{
tool: 'search',
toolInput: { query: 'TypeScript' },
toolCallId: 'call_124',
},
];
const result = await createEngineRequests(toolCalls, 1, tools);
expect(result).toHaveLength(2);
expect(result[0]).toEqual({
actionType: 'ExecutionNodeAction',
nodeName: 'Calculator',
input: { expression: '2+2' },
type: NodeConnectionTypes.AiTool,
id: 'call_123',
metadata: {
itemIndex: 1,
},
});
expect(result[1]).toEqual({
actionType: 'ExecutionNodeAction',
nodeName: 'Search',
input: { query: 'TypeScript' },
type: NodeConnectionTypes.AiTool,
id: 'call_124',
metadata: {
itemIndex: 1,
},
});
});
it('should filter out tool calls for tools that are not found', async () => {
const tools = [createMockTool('calculator', { sourceNodeName: 'Calculator' })];
const toolCalls: ToolCallRequest[] = [
{
tool: 'calculator',
toolInput: { expression: '2+2' },
toolCallId: 'call_123',
},
{
tool: 'nonexistent',
toolInput: { input: 'test' },
toolCallId: 'call_124',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(1);
expect(result[0].nodeName).toBe('Calculator');
});
it('should filter out tool calls when sourceNodeName is missing', async () => {
const tools = [
createMockTool('calculator', { sourceNodeName: 'Calculator' }),
createMockTool('tool_without_node', {}),
];
const toolCalls: ToolCallRequest[] = [
{
tool: 'calculator',
toolInput: { expression: '2+2' },
toolCallId: 'call_123',
},
{
tool: 'tool_without_node',
toolInput: { input: 'test' },
toolCallId: 'call_124',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(1);
expect(result[0].nodeName).toBe('Calculator');
});
it('should handle empty tool calls array', async () => {
const tools = [createMockTool('calculator', { sourceNodeName: 'Calculator' })];
const toolCalls: ToolCallRequest[] = [];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(0);
});
it('should handle empty tools array', async () => {
const tools: DynamicStructuredTool[] = [];
const toolCalls: ToolCallRequest[] = [
{
tool: 'calculator',
toolInput: { expression: '2+2' },
toolCallId: 'call_123',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(0);
});
});
describe('Toolkit tools handling', () => {
it('should include tool name in input for toolkit tools', async () => {
const tools = [
createMockTool('toolkit_tool', {
sourceNodeName: 'ToolkitNode',
isFromToolkit: true,
}),
];
const toolCalls: ToolCallRequest[] = [
{
tool: 'toolkit_tool',
toolInput: { input: 'test' },
toolCallId: 'call_123',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(1);
expect(result[0].input).toEqual({
input: 'test',
tool: 'toolkit_tool',
});
});
it('should not include tool name in input for non-toolkit tools', async () => {
const tools = [
createMockTool('regular_tool', {
sourceNodeName: 'RegularNode',
isFromToolkit: false,
}),
];
const toolCalls: ToolCallRequest[] = [
{
tool: 'regular_tool',
toolInput: { input: 'test' },
toolCallId: 'call_123',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(1);
expect(result[0].input).toEqual({
input: 'test',
});
});
it('should handle mixed toolkit and regular tools', async () => {
const tools = [
createMockTool('toolkit_tool', {
sourceNodeName: 'ToolkitNode',
isFromToolkit: true,
}),
createMockTool('regular_tool', {
sourceNodeName: 'RegularNode',
isFromToolkit: false,
}),
];
const toolCalls: ToolCallRequest[] = [
{
tool: 'toolkit_tool',
toolInput: { input: 'toolkit test' },
toolCallId: 'call_123',
},
{
tool: 'regular_tool',
toolInput: { input: 'regular test' },
toolCallId: 'call_124',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(2);
expect(result[0].input).toEqual({
input: 'toolkit test',
tool: 'toolkit_tool',
});
expect(result[1].input).toEqual({
input: 'regular test',
});
});
});
describe('Item index handling', () => {
it('should correctly set itemIndex in metadata', async () => {
const tools = [createMockTool('calculator', { sourceNodeName: 'Calculator' })];
const toolCalls: ToolCallRequest[] = [
{
tool: 'calculator',
toolInput: { expression: '2+2' },
toolCallId: 'call_123',
},
];
const result = await createEngineRequests(toolCalls, 5, tools);
expect(result).toHaveLength(1);
expect(result[0].metadata.itemIndex).toBe(5);
});
});
describe('Complex tool inputs', () => {
it('should handle complex nested objects in tool input', async () => {
const tools = [createMockTool('complex_tool', { sourceNodeName: 'ComplexNode' })];
const toolCalls: ToolCallRequest[] = [
{
tool: 'complex_tool',
toolInput: {
nested: {
level1: {
level2: 'value',
},
},
array: [1, 2, 3],
string: 'test',
},
toolCallId: 'call_123',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(1);
expect(result[0].input).toEqual({
nested: {
level1: {
level2: 'value',
},
},
array: [1, 2, 3],
string: 'test',
});
});
it('should preserve all properties in toolInput', async () => {
const tools = [createMockTool('tool', { sourceNodeName: 'Node' })];
const toolCalls: ToolCallRequest[] = [
{
tool: 'tool',
toolInput: {
param1: 'value1',
param2: 42,
param3: true,
param4: null,
param5: undefined,
},
toolCallId: 'call_123',
},
];
const result = await createEngineRequests(toolCalls, 0, tools);
expect(result).toHaveLength(1);
expect(result[0].input).toEqual({
param1: 'value1',
param2: 42,
param3: true,
param4: null,
param5: undefined,
});
});
});
});

View File

@ -0,0 +1,266 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import { HumanMessage, AIMessage, SystemMessage, trimMessages } from '@langchain/core/messages';
import { mock } from 'jest-mock-extended';
import type { BaseChatMemory } from 'langchain/memory';
import { loadMemory, saveToMemory, saveToolResultsToMemory } from '../memoryManagement';
import type { ToolCallData } from '../types';
jest.mock('@langchain/core/messages', () => ({
...jest.requireActual('@langchain/core/messages'),
trimMessages: jest.fn(),
}));
describe('memoryManagement', () => {
let mockMemory: jest.Mocked<BaseChatMemory>;
let mockModel: jest.Mocked<BaseChatModel>;
beforeEach(() => {
jest.clearAllMocks();
mockMemory = mock<BaseChatMemory>();
mockModel = mock<BaseChatModel>();
});
describe('loadMemory', () => {
it('should return undefined when no memory is provided', async () => {
const result = await loadMemory(undefined);
expect(result).toBeUndefined();
});
it('should load chat history from memory', async () => {
const chatHistory = [new HumanMessage('Hello'), new AIMessage('Hi there!')];
mockMemory.loadMemoryVariables.mockResolvedValue({ chat_history: chatHistory });
const result = await loadMemory(mockMemory);
expect(result).toEqual(chatHistory);
expect(mockMemory.loadMemoryVariables).toHaveBeenCalledWith({});
});
it('should return empty array when chat_history is not present', async () => {
mockMemory.loadMemoryVariables.mockResolvedValue({});
const result = await loadMemory(mockMemory);
expect(result).toEqual([]);
});
it('should trim messages when maxTokens is provided', async () => {
const chatHistory = [
new SystemMessage('System prompt'),
new HumanMessage('Hello'),
new AIMessage('Hi there!'),
new HumanMessage('How are you?'),
new AIMessage('I am doing well!'),
];
const trimmedHistory = [
new SystemMessage('System prompt'),
new HumanMessage('How are you?'),
new AIMessage('I am doing well!'),
];
mockMemory.loadMemoryVariables.mockResolvedValue({ chat_history: chatHistory });
(trimMessages as jest.Mock).mockResolvedValue(trimmedHistory);
const result = await loadMemory(mockMemory, mockModel, 2000);
expect(result).toEqual(trimmedHistory);
expect(trimMessages).toHaveBeenCalledWith(chatHistory, {
strategy: 'last',
maxTokens: 2000,
tokenCounter: mockModel,
includeSystem: true,
startOn: 'human',
allowPartial: true,
});
});
it('should not trim messages when maxTokens is not provided', async () => {
const chatHistory = [new HumanMessage('Hello'), new AIMessage('Hi there!')];
mockMemory.loadMemoryVariables.mockResolvedValue({ chat_history: chatHistory });
const result = await loadMemory(mockMemory, mockModel);
expect(result).toEqual(chatHistory);
expect(trimMessages).not.toHaveBeenCalled();
});
it('should not trim messages when model is not provided', async () => {
const chatHistory = [new HumanMessage('Hello'), new AIMessage('Hi there!')];
mockMemory.loadMemoryVariables.mockResolvedValue({ chat_history: chatHistory });
const result = await loadMemory(mockMemory, undefined, 2000);
expect(result).toEqual(chatHistory);
expect(trimMessages).not.toHaveBeenCalled();
});
});
describe('saveToMemory', () => {
it('should save conversation to memory', async () => {
const input = 'What is 2+2?';
const output = 'The answer is 4';
await saveToMemory(input, output, mockMemory);
expect(mockMemory.saveContext).toHaveBeenCalledWith({ input }, { output });
});
it('should not save when output is empty', async () => {
const input = 'What is 2+2?';
const output = '';
await saveToMemory(input, output, mockMemory);
expect(mockMemory.saveContext).not.toHaveBeenCalled();
});
it('should not save when memory is not provided', async () => {
const input = 'What is 2+2?';
const output = 'The answer is 4';
await saveToMemory(input, output, undefined);
// Should not throw error
expect(mockMemory.saveContext).not.toHaveBeenCalled();
});
it('should not save when both output and memory are missing', async () => {
const input = 'What is 2+2?';
await saveToMemory(input, '', undefined);
expect(mockMemory.saveContext).not.toHaveBeenCalled();
});
});
describe('saveToolResultsToMemory', () => {
it('should save tool results to memory', async () => {
const input = 'Calculate 2+2';
const toolResults: ToolCallData[] = [
{
action: {
tool: 'calculator',
toolInput: { expression: '2+2' },
log: 'Using calculator',
toolCallId: 'call_123',
type: 'tool_call',
},
observation: '4',
},
];
await saveToolResultsToMemory(input, toolResults, mockMemory);
expect(mockMemory.saveContext).toHaveBeenCalledWith(
{ input },
{
output: 'Tool: calculator, Input: {"expression":"2+2"}, Result: 4',
},
);
});
it('should save multiple tool results', async () => {
const input = 'Get weather and time';
const toolResults: ToolCallData[] = [
{
action: {
tool: 'weather',
toolInput: { location: 'New York' },
log: 'Getting weather',
toolCallId: 'call_123',
type: 'tool_call',
},
observation: 'Sunny, 72°F',
},
{
action: {
tool: 'time',
toolInput: { timezone: 'EST' },
log: 'Getting time',
toolCallId: 'call_124',
type: 'tool_call',
},
observation: '14:30',
},
];
await saveToolResultsToMemory(input, toolResults, mockMemory);
expect(mockMemory.saveContext).toHaveBeenCalledTimes(2);
expect(mockMemory.saveContext).toHaveBeenNthCalledWith(
1,
{ input },
{
output: 'Tool: weather, Input: {"location":"New York"}, Result: Sunny, 72°F',
},
);
expect(mockMemory.saveContext).toHaveBeenNthCalledWith(
2,
{ input },
{
output: 'Tool: time, Input: {"timezone":"EST"}, Result: 14:30',
},
);
});
it('should not save when memory is not provided', async () => {
const input = 'Calculate 2+2';
const toolResults: ToolCallData[] = [
{
action: {
tool: 'calculator',
toolInput: { expression: '2+2' },
log: 'Using calculator',
toolCallId: 'call_123',
type: 'tool_call',
},
observation: '4',
},
];
await saveToolResultsToMemory(input, toolResults, undefined);
expect(mockMemory.saveContext).not.toHaveBeenCalled();
});
it('should not save when toolResults is empty', async () => {
const input = 'Calculate 2+2';
const toolResults: ToolCallData[] = [];
await saveToolResultsToMemory(input, toolResults, mockMemory);
expect(mockMemory.saveContext).not.toHaveBeenCalled();
});
it('should handle complex tool inputs', async () => {
const input = 'Search for information';
const toolResults: ToolCallData[] = [
{
action: {
tool: 'search',
toolInput: {
query: 'typescript testing',
filters: { language: 'en', date: '2024' },
limit: 10,
},
log: 'Searching',
toolCallId: 'call_125',
type: 'tool_call',
},
observation: 'Found 10 results',
},
];
await saveToolResultsToMemory(input, toolResults, mockMemory);
expect(mockMemory.saveContext).toHaveBeenCalledWith(
{ input },
{
output:
'Tool: search, Input: {"query":"typescript testing","filters":{"language":"en","date":"2024"},"limit":10}, Result: Found 10 results',
},
);
});
});
});

View File

@ -0,0 +1,61 @@
import type { AIMessage } from '@langchain/core/messages';
import type { IDataObject, GenericValue } from 'n8n-workflow';
/**
* Represents a tool call request from an LLM.
* This is a generic format that can be used across different agent types.
*/
export type ToolCallRequest = {
/** The name of the tool to call */
tool: string;
/** The input arguments for the tool */
toolInput: Record<string, unknown>;
/** Unique identifier for this tool call */
toolCallId: string;
/** Type of the tool call (e.g., 'tool_call', 'function') */
type?: string;
/** Log message or description */
log?: string;
/** Full message log including LLM response */
messageLog?: unknown[];
};
/**
* Represents a tool call action and its observation result.
* Used for building agent steps and maintaining conversation context.
*/
export type ToolCallData = {
action: {
tool: string;
toolInput: Record<string, unknown>;
log: string | number | true | object;
messageLog?: AIMessage[];
toolCallId: IDataObject | GenericValue | GenericValue[] | IDataObject[];
type: string | number | true | object;
};
observation: string;
};
/**
* Result from an agent execution, optionally including tool calls and intermediate steps.
*/
export type AgentResult = {
/** The final output from the agent */
output: string;
/** Tool calls that need to be executed */
toolCalls?: ToolCallRequest[];
/** Intermediate steps showing the agent's reasoning */
intermediateSteps?: ToolCallData[];
};
/**
* Metadata for engine requests and responses.
*/
export type RequestResponseMetadata = {
/** Item index being processed */
itemIndex?: number;
/** Previous tool call requests (for multi-turn conversations) */
previousRequests?: ToolCallData[];
/** Current iteration count (for max iterations enforcement) */
iterationCount?: number;
};