feat: Implement Agent executing tools in the engine (#20030)

This commit is contained in:
Benjamin Schroth 2025-09-29 19:22:10 +02:00 committed by GitHub
parent 7570922d16
commit fadfb756ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1770 additions and 10 deletions

View File

@ -3,6 +3,7 @@ import { VersionedNodeType } from 'n8n-workflow';
import { AgentV1 } from './V1/AgentV1.node';
import { AgentV2 } from './V2/AgentV2.node';
import { AgentV3 } from './V3/AgentV3.node';
export class Agent extends VersionedNodeType {
constructor() {
@ -27,7 +28,7 @@ export class Agent extends VersionedNodeType {
],
},
},
defaultVersion: 2.2,
defaultVersion: 3,
};
const nodeVersions: IVersionedNodeType['nodeVersions'] = {
@ -44,6 +45,7 @@ export class Agent extends VersionedNodeType {
2: new AgentV2(baseDescription),
2.1: new AgentV2(baseDescription),
2.2: new AgentV2(baseDescription),
3: new AgentV3(baseDescription),
// IMPORTANT Reminder to update AgentTool
};

View File

@ -9,9 +9,9 @@ import type {
import { promptTypeOptions, textFromPreviousNode, textInput } from '@utils/descriptions';
import { getInputs } from './utils';
import { getToolsAgentProperties } from '../agents/ToolsAgent/V2/description';
import { toolsAgentExecute } from '../agents/ToolsAgent/V2/execute';
import { getInputs } from '../utils';
export class AgentV2 implements INodeType {
description: INodeTypeDescription;

View File

@ -0,0 +1,119 @@
import { promptTypeOptions, textFromPreviousNode, textInput } from '@utils/descriptions';
import { NodeConnectionTypes } from 'n8n-workflow';
import type {
IExecuteFunctions,
INodeExecutionData,
INodeType,
INodeTypeDescription,
INodeTypeBaseDescription,
EngineResponse,
EngineRequest,
} from 'n8n-workflow';
import { toolsAgentProperties } from '../agents/ToolsAgent/V3/description';
import type { RequestResponseMetadata } from '../agents/ToolsAgent/V3/execute';
import { toolsAgentExecute } from '../agents/ToolsAgent/V3/execute';
import { getInputs } from '../utils';
export class AgentV3 implements INodeType {
description: INodeTypeDescription;
constructor(baseDescription: INodeTypeBaseDescription) {
this.description = {
...baseDescription,
version: [3],
defaults: {
name: 'AI Agent',
color: '#404040',
},
inputs: `={{
((hasOutputParser, needsFallback) => {
${getInputs.toString()};
return getInputs(true, hasOutputParser, needsFallback);
})($parameter.hasOutputParser === undefined || $parameter.hasOutputParser === true, $parameter.needsFallback !== undefined && $parameter.needsFallback === true)
}}`,
outputs: [NodeConnectionTypes.Main],
properties: [
{
displayName:
'Tip: Get a feel for agents with our quick <a href="https://docs.n8n.io/advanced-ai/intro-tutorial/" target="_blank">tutorial</a> or see an <a href="/workflows/templates/1954" target="_blank">example</a> of how this node works',
name: 'aiAgentStarterCallout',
type: 'callout',
default: '',
},
promptTypeOptions,
{
...textFromPreviousNode,
displayOptions: {
show: {
promptType: ['auto'],
},
},
},
{
...textInput,
displayOptions: {
show: {
promptType: ['define'],
},
},
},
{
displayName: 'Require Specific Output Format',
name: 'hasOutputParser',
type: 'boolean',
default: false,
noDataExpression: true,
},
{
displayName: `Connect an <a data-action='openSelectiveNodeCreator' data-action-parameter-connectiontype='${NodeConnectionTypes.AiOutputParser}'>output parser</a> on the canvas to specify the output format you require`,
name: 'notice',
type: 'notice',
default: '',
displayOptions: {
show: {
hasOutputParser: [true],
},
},
},
{
displayName: 'Enable Fallback Model',
name: 'needsFallback',
type: 'boolean',
default: false,
noDataExpression: true,
},
{
displayName:
'Connect an additional language model on the canvas to use it as a fallback if the main model fails',
name: 'fallbackNotice',
type: 'notice',
default: '',
displayOptions: {
show: {
needsFallback: [true],
},
},
},
toolsAgentProperties,
],
hints: [
{
message:
'You are using streaming responses. Make sure to set the response mode to "Streaming Response" on the connected trigger node.',
type: 'warning',
location: 'outputPane',
whenToDisplay: 'afterExecution',
displayCondition: '={{ $parameter["enableStreaming"] === true }}',
},
],
};
}
async execute(
this: IExecuteFunctions,
response?: EngineResponse<RequestResponseMetadata>,
): Promise<INodeExecutionData[][] | EngineRequest<RequestResponseMetadata>> {
return await toolsAgentExecute.call(this, response);
}
}

View File

@ -0,0 +1,22 @@
import type { INodeProperties } from 'n8n-workflow';
import { getBatchingOptionFields } from '@utils/sharedFields';
import { commonOptions } from '../options';
const enableStreaminOption: INodeProperties = {
displayName: 'Enable Streaming',
name: 'enableStreaming',
type: 'boolean',
default: true,
description: 'Whether this agent will stream the response in real-time as it generates text',
};
export const toolsAgentProperties: INodeProperties = {
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add Option',
options: [...commonOptions, enableStreaminOption, getBatchingOptionFields(undefined, 1)],
};

View File

@ -0,0 +1,588 @@
import type { StreamEvent } from '@langchain/core/dist/tracers/event_stream';
import type { IterableReadableStream } from '@langchain/core/dist/utils/stream';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { AIMessageChunk, MessageContentText } from '@langchain/core/messages';
import { AIMessage } from '@langchain/core/messages';
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import { RunnableSequence } from '@langchain/core/runnables';
import { getPromptInputByType } from '@utils/helpers';
import {
getOptionalOutputParser,
type N8nOutputParser,
} from '@utils/output_parsers/N8nOutputParser';
import { type AgentRunnableSequence, createToolCallingAgent } from 'langchain/agents';
import type { BaseChatMemory } from 'langchain/memory';
import type { DynamicStructuredTool, Tool } from 'langchain/tools';
import omit from 'lodash/omit';
import {
jsonParse,
NodeConnectionTypes,
nodeNameToToolName,
NodeOperationError,
sleep,
} from 'n8n-workflow';
import type {
EngineRequest,
GenericValue,
IDataObject,
IExecuteFunctions,
INodeExecutionData,
ISupplyDataFunctions,
EngineResponse,
} from 'n8n-workflow';
import assert from 'node:assert';
import {
fixEmptyContentMessage,
getAgentStepsParser,
getChatModel,
getOptionalMemory,
getTools,
prepareMessages,
preparePrompt,
} from '../common';
import { SYSTEM_MESSAGE } from '../prompt';
import type { ToolCall } from '@langchain/core/messages/tool';
type ToolCallRequest = {
tool: string;
toolInput: Record<string, unknown>;
toolCallId: string;
type?: string;
log?: string;
messageLog?: unknown[];
};
function createEngineRequests(
ctx: IExecuteFunctions | ISupplyDataFunctions,
toolCalls: ToolCallRequest[],
itemIndex: number,
) {
const connectedSubnodes = ctx.getParentNodes(ctx.getNode().name, {
connectionType: NodeConnectionTypes.AiTool,
depth: 1,
});
return toolCalls.map((toolCall) => ({
nodeName:
connectedSubnodes.find(
(node: { name: string }) => nodeNameToToolName(node.name) === toolCall.tool,
)?.name ?? toolCall.tool,
input: toolCall.toolInput,
type: NodeConnectionTypes.AiTool,
id: toolCall.toolCallId,
metadata: {
itemIndex,
},
}));
}
/**
* Creates an agent executor with the given configuration
*/
function createAgentSequence(
model: BaseChatModel,
tools: Array<DynamicStructuredTool | Tool>,
prompt: ChatPromptTemplate,
_options: { maxIterations?: number; returnIntermediateSteps?: boolean },
outputParser?: N8nOutputParser,
memory?: BaseChatMemory,
fallbackModel?: BaseChatModel | null,
) {
const agent = createToolCallingAgent({
llm: model,
tools,
prompt,
streamRunnable: false,
});
let fallbackAgent: AgentRunnableSequence | undefined;
if (fallbackModel) {
fallbackAgent = createToolCallingAgent({
llm: fallbackModel,
tools,
prompt,
streamRunnable: false,
});
}
const runnableAgent = RunnableSequence.from([
fallbackAgent ? agent.withFallbacks([fallbackAgent]) : agent,
getAgentStepsParser(outputParser, memory),
fixEmptyContentMessage,
]) as AgentRunnableSequence;
runnableAgent.singleAction = true;
runnableAgent.streamRunnable = false;
return runnableAgent;
}
type IntermediateStep = {
action: {
tool: string;
toolInput: Record<string, unknown>;
log: string;
messageLog: unknown[];
toolCallId: string;
type: string;
};
observation?: string;
};
type AgentResult = {
output: string;
intermediateSteps?: IntermediateStep[];
toolCalls?: ToolCallRequest[];
};
async function processEventStream(
ctx: IExecuteFunctions,
eventStream: IterableReadableStream<StreamEvent>,
itemIndex: number,
returnIntermediateSteps: boolean = false,
memory?: BaseChatMemory,
input?: string,
): Promise<AgentResult> {
const agentResult: AgentResult = {
output: '',
};
if (returnIntermediateSteps) {
agentResult.intermediateSteps = [];
}
const toolCalls: ToolCallRequest[] = [];
ctx.sendChunk('begin', itemIndex);
for await (const event of eventStream) {
// Stream chat model tokens as they come in
switch (event.event) {
case 'on_chat_model_stream':
const chunk = event.data?.chunk as AIMessageChunk;
if (chunk?.content) {
const chunkContent = chunk.content;
let chunkText = '';
if (Array.isArray(chunkContent)) {
for (const message of chunkContent) {
if (message?.type === 'text') {
chunkText += (message as MessageContentText)?.text;
}
}
} else if (typeof chunkContent === 'string') {
chunkText = chunkContent;
}
ctx.sendChunk('item', itemIndex, chunkText);
agentResult.output += chunkText;
}
break;
case 'on_chat_model_end':
// Capture full LLM response with tool calls for intermediate steps
if (event.data) {
const chatModelData = event.data as {
output?: { tool_calls?: ToolCall[]; content?: string };
};
const output = chatModelData.output;
// Check if this LLM response contains tool calls
if (output?.tool_calls && output.tool_calls.length > 0) {
// Collect tool calls for request building
for (const toolCall of output.tool_calls) {
toolCalls.push({
tool: toolCall.name,
toolInput: toolCall.args,
toolCallId: toolCall.id || 'unknown',
type: toolCall.type || 'tool_call',
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output],
});
}
// Also add to intermediate steps if needed
if (returnIntermediateSteps) {
for (const toolCall of output.tool_calls) {
agentResult.intermediateSteps!.push({
action: {
tool: toolCall.name,
toolInput: toolCall.args,
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output], // Include the full LLM response
toolCallId: toolCall.id || 'unknown',
type: toolCall.type || 'tool_call',
},
});
}
}
}
}
break;
case 'on_tool_end':
// Capture tool execution results and match with action
if (returnIntermediateSteps && event.data && agentResult.intermediateSteps!.length > 0) {
const toolData = event.data as { output?: string };
// Find the matching intermediate step for this tool call
const matchingStep = agentResult.intermediateSteps!.find(
(step) => !step.observation && step.action.tool === event.name,
);
if (matchingStep) {
matchingStep.observation = toolData.output || '';
}
}
break;
default:
break;
}
}
ctx.sendChunk('end', itemIndex);
// Save conversation to memory if memory is connected
if (memory && input && agentResult.output) {
await memory.saveContext({ input }, { output: agentResult.output });
}
// Include collected tool calls in the result
if (toolCalls.length > 0) {
agentResult.toolCalls = toolCalls;
}
return agentResult;
}
export type RequestResponseMetadata = {
itemIndex?: number;
previousRequests: ToolCallData[];
};
type ToolCallData = {
action: {
tool: string;
toolInput: Record<string, unknown>;
log: string | number | true | object;
toolCallId: IDataObject | GenericValue | GenericValue[] | IDataObject[];
type: string | number | true | object;
};
observation: string;
};
function buildSteps(
response: EngineResponse<RequestResponseMetadata> | undefined,
itemIndex: number,
): ToolCallData[] {
const steps: ToolCallData[] = [];
if (response) {
const responses = response?.actionResponses ?? [];
for (const tool of responses) {
if (tool.action?.metadata?.itemIndex !== itemIndex) continue;
const toolInput: IDataObject = {
...tool.action.input,
id: tool.action.id,
};
if (!toolInput || !tool.data) {
continue;
}
const step = steps.find((step) => step.action.toolCallId === toolInput.id);
if (step) {
continue;
}
// Create a synthetic AI message for the messageLog
// This represents the AI's decision to call the tool
const syntheticAIMessage = new AIMessage({
content: `Calling ${tool.action.nodeName} with input: ${JSON.stringify(toolInput)}`,
tool_calls: [
{
id: (toolInput?.id as string) ?? 'reconstructed_call',
name: nodeNameToToolName(tool.action.nodeName),
args: toolInput,
type: 'tool_call',
},
],
});
const toolResult = {
action: {
tool: nodeNameToToolName(tool.action.nodeName),
toolInput: (toolInput.input as IDataObject) || {},
log: toolInput.log || syntheticAIMessage.content,
messageLog: [syntheticAIMessage],
toolCallId: toolInput?.id,
type: toolInput.type || 'tool_call',
},
observation: JSON.stringify(tool.data),
};
steps.push(toolResult);
}
}
return steps;
}
/* -----------------------------------------------------------
Main Executor Function
----------------------------------------------------------- */
/**
* The main executor method for the Tools Agent.
*
* This function retrieves necessary components (model, memory, tools), prepares the prompt,
* creates the agent, and processes each input item. The error handling for each item is also
* managed here based on the node's continueOnFail setting.
*
* @param this Execute context. SupplyDataContext is passed when agent is as a tool
*
* @returns The array of execution data for all processed items
*/
export async function toolsAgentExecute(
this: IExecuteFunctions | ISupplyDataFunctions,
response?: EngineResponse<RequestResponseMetadata>,
): Promise<INodeExecutionData[][] | EngineRequest<RequestResponseMetadata>> {
this.logger.debug('Executing Tools Agent V3');
const returnData: INodeExecutionData[] = [];
let request: EngineRequest<RequestResponseMetadata> | undefined = undefined;
const items = this.getInputData();
const batchSize = this.getNodeParameter('options.batching.batchSize', 0, 1) as number;
const delayBetweenBatches = this.getNodeParameter(
'options.batching.delayBetweenBatches',
0,
0,
) as number;
const needsFallback = this.getNodeParameter('needsFallback', 0, false) as boolean;
const memory = await getOptionalMemory(this);
const model = await getChatModel(this, 0);
assert(model, 'Please connect a model to the Chat Model input');
const fallbackModel = needsFallback ? await getChatModel(this, 1) : null;
if (needsFallback && !fallbackModel) {
throw new NodeOperationError(
this.getNode(),
'Please connect a model to the Fallback Model input or disable the fallback option',
);
}
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
if (response && response?.metadata?.itemIndex === itemIndex) {
return null;
}
const steps = buildSteps(response, itemIndex);
const input = getPromptInputByType({
ctx: this,
i: itemIndex,
inputKey: 'text',
promptTypeKey: 'promptType',
});
if (input === undefined) {
throw new NodeOperationError(this.getNode(), 'The "text" parameter is empty.');
}
const outputParser = await getOptionalOutputParser(this, itemIndex);
const tools = await getTools(this, outputParser);
const options = this.getNodeParameter('options', itemIndex, { enableStreaming: true }) as {
systemMessage?: string;
maxIterations?: number;
returnIntermediateSteps?: boolean;
passthroughBinaryImages?: boolean;
enableStreaming?: boolean;
};
// Prepare the prompt messages and prompt template.
const messages = await prepareMessages(this, itemIndex, {
systemMessage: options.systemMessage,
passthroughBinaryImages: options.passthroughBinaryImages ?? true,
outputParser,
});
const prompt: ChatPromptTemplate = preparePrompt(messages);
// Create executors for primary and fallback models
const executor = createAgentSequence(
model,
tools,
prompt,
options,
outputParser,
memory,
fallbackModel,
);
// Invoke with fallback logic
const invokeParams = {
steps,
input,
system_message: options.systemMessage ?? SYSTEM_MESSAGE,
formatting_instructions:
'IMPORTANT: For your response to user, you MUST use the `format_final_json_response` tool with your complete answer formatted according to the required schema. Do not attempt to format the JSON manually - always use this tool. Your response will be rejected if it is not properly formatted through this tool. Only use this tool once you are ready to provide your final answer.',
};
const executeOptions = { signal: this.getExecutionCancelSignal() };
// Check if streaming is actually available
const isStreamingAvailable = 'isStreaming' in this ? this.isStreaming?.() : undefined;
if (
'isStreaming' in this &&
options.enableStreaming &&
isStreamingAvailable &&
this.getNode().typeVersion >= 2.1
) {
let chatHistory = undefined;
if (memory) {
// Load memory variables to respect context window length
const memoryVariables = await memory.loadMemoryVariables({});
chatHistory = memoryVariables['chat_history'];
}
const eventStream = executor.streamEvents(
{
...invokeParams,
chat_history: chatHistory,
},
{
version: 'v2',
...executeOptions,
},
);
const result = await processEventStream(
this,
eventStream,
itemIndex,
options.returnIntermediateSteps,
memory,
input,
);
// If result contains tool calls, build the request object like the normal flow
if (result.toolCalls && result.toolCalls.length > 0) {
const actions = createEngineRequests(this, result.toolCalls, itemIndex);
return {
actions,
metadata: { previousRequests: buildSteps(response, itemIndex) },
};
}
return result;
} else {
// Handle regular execution
let chatHistory = undefined;
if (memory) {
// Load memory variables to respect context window length
const memoryVariables = await memory.loadMemoryVariables({});
chatHistory = memoryVariables['chat_history'];
}
const response = await executor.invoke({
...invokeParams,
chat_history: chatHistory,
});
if ('returnValues' in response) {
// Save conversation to memory including any tool call context
if (memory && input && response.returnValues.output) {
// If there were tool calls in this conversation, include them in the context
let fullOutput = response.returnValues.output as string;
if (steps.length > 0) {
// Include tool call information in the conversation context
const toolContext = steps
.map(
(step) =>
`Tool: ${step.action.tool}, Input: ${JSON.stringify(step.action.toolInput)}, Result: ${step.observation}`,
)
.join('; ');
fullOutput = `[Used tools: ${toolContext}] ${fullOutput}`;
}
await memory.saveContext({ input }, { output: fullOutput });
}
// Include intermediate steps if requested
const result = { ...response.returnValues };
if (options.returnIntermediateSteps && steps.length > 0) {
result.intermediateSteps = steps;
}
return result;
}
// If response contains tool calls, we need to return this in the right format
const actions = createEngineRequests(this, response, itemIndex);
return {
actions,
metadata: { previousRequests: buildSteps(response, itemIndex) },
};
}
});
const batchResults = await Promise.allSettled(batchPromises);
// This is only used to check if the output parser is connected
// so we can parse the output if needed. Actual output parsing is done in the loop above
const outputParser = await getOptionalOutputParser(this, 0);
batchResults.forEach((result, index) => {
const itemIndex = i + index;
if (result.status === 'rejected') {
const error = result.reason as Error;
if (this.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
} as INodeExecutionData);
return;
} else {
throw new NodeOperationError(this.getNode(), error);
}
}
const response = result.value;
if ('actions' in response) {
if (!request) {
request = {
actions: response.actions,
metadata: response.metadata,
};
} else {
request.actions.push(...response.actions);
}
return;
}
// If memory and outputParser are connected, parse the output.
if (memory && outputParser) {
const parsedOutput = jsonParse<{ output: Record<string, unknown> }>(
response.output as string,
);
response.output = parsedOutput?.output ?? parsedOutput;
}
// Omit internal keys before returning the result.
const itemResult: INodeExecutionData = {
json: omit(
response,
'system_message',
'formatting_instructions',
'input',
'chat_history',
'agent_scratchpad',
),
pairedItem: { item: itemIndex },
};
returnData.push(itemResult);
});
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
}
}
// Check if we have any Request objects (tool calls)
if (request) {
return request;
}
// Otherwise return execution data
return [returnData];
}

View File

@ -0,0 +1,783 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { AIMessageChunk } from '@langchain/core/messages';
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import { RunnableSequence } from '@langchain/core/runnables';
import { mock } from 'jest-mock-extended';
import { createToolCallingAgent } from 'langchain/agents';
import type { BaseChatMemory } from 'langchain/memory';
import type { Tool } from 'langchain/tools';
import { NodeOperationError } from 'n8n-workflow';
import type {
ISupplyDataFunctions,
IExecuteFunctions,
INode,
EngineRequest,
EngineResponse,
INodeExecutionData,
} from 'n8n-workflow';
import * as helpers from '../../../../../utils/helpers';
import * as commonHelpers from '../../agents/ToolsAgent/common';
import { toolsAgentExecute } from '../../agents/ToolsAgent/V3/execute';
import type { RequestResponseMetadata } from '../../agents/ToolsAgent/V3/execute';
jest.mock('../../../../../utils/output_parsers/N8nOutputParser', () => ({
getOptionalOutputParser: jest.fn(),
N8nStructuredOutputParser: jest.fn(),
}));
jest.mock('langchain/agents', () => ({
AgentExecutor: {
fromAgentAndTools: jest.fn(),
},
createToolCallingAgent: jest.fn(),
}));
jest.mock('@langchain/core/runnables', () => ({
RunnableSequence: {
from: jest.fn(),
},
}));
const mockHelpers = mock<IExecuteFunctions['helpers']>();
const mockContext = mock<IExecuteFunctions>({ helpers: mockHelpers });
beforeEach(() => {
jest.clearAllMocks();
jest.resetAllMocks();
});
describe('toolsAgentExecute V3', () => {
beforeEach(() => {
jest.clearAllMocks();
mockContext.logger = {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
});
it('should process items sequentially when batchSize is not set', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([
{ json: { text: 'test input 1' } },
{ json: { text: 'test input 2' } },
]);
const mockModel = mock<BaseChatModel>();
mockModel.bindTools = jest.fn();
mockModel.lc_namespace = ['chat_models'];
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest
.fn()
.mockResolvedValueOnce({ returnValues: { output: 'success 1' } })
.mockResolvedValueOnce({ returnValues: { output: 'success 2' } });
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'options.enableStreaming') return false;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await toolsAgentExecute.call(mockContext);
expect(mockRunnableSequence.invoke).toHaveBeenCalledTimes(2);
expect((result as INodeExecutionData[][])[0]).toHaveLength(2);
expect((result as INodeExecutionData[][])[0][0].json).toEqual({ output: 'success 1' });
expect((result as INodeExecutionData[][])[0][1].json).toEqual({ output: 'success 2' });
});
it('should handle fallback model when needsFallback is true', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
const mockModel = mock<BaseChatModel>();
const mockFallbackModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockFallbackAgent = mock<any>();
mockAgent.withFallbacks = jest.fn().mockReturnValue(mockAgent);
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest
.fn()
.mockResolvedValue({ returnValues: { output: 'success with fallback' } });
(createToolCallingAgent as jest.Mock)
.mockReturnValueOnce(mockAgent)
.mockReturnValueOnce(mockFallbackAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest
.spyOn(commonHelpers, 'getChatModel')
.mockResolvedValueOnce(mockModel)
.mockResolvedValueOnce(mockFallbackModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return true;
if (param === 'options.enableStreaming') return false;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await toolsAgentExecute.call(mockContext);
expect(createToolCallingAgent).toHaveBeenCalledTimes(2);
expect(mockAgent.withFallbacks).toHaveBeenCalledWith([mockFallbackAgent]);
expect((result as INodeExecutionData[][])[0][0].json).toEqual({
output: 'success with fallback',
});
});
it('should throw error when fallback is needed but no fallback model is provided', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
const mockModel = mock<BaseChatModel>();
jest
.spyOn(commonHelpers, 'getChatModel')
.mockResolvedValueOnce(mockModel)
.mockResolvedValueOnce(undefined);
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return true;
return defaultValue;
});
await expect(toolsAgentExecute.call(mockContext)).rejects.toThrow(NodeOperationError);
});
it('should handle regular execution path without tool calls', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest
.fn()
.mockResolvedValue({ returnValues: { output: 'regular response' } });
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return false;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await toolsAgentExecute.call(mockContext);
expect(mockRunnableSequence.invoke).toHaveBeenCalledTimes(1);
expect((result as INodeExecutionData[][])[0]).toHaveLength(1);
expect((result as INodeExecutionData[][])[0][0].json).toEqual({ output: 'regular response' });
});
it('should handle streaming with tool calls detected', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
mockContext.getParentNodes.mockReturnValue([
{ name: 'TestTool', type: 'Tool', typeVersion: 3, disabled: false },
]);
mockContext.isStreaming = jest.fn().mockReturnValue(true) as any;
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
// Mock streaming events with tool calls
const mockStreamEvents = async function* () {
yield {
event: 'on_chat_model_stream',
data: {
chunk: {
content: 'I need to call a tool',
} as AIMessageChunk,
},
};
yield {
event: 'on_chat_model_end',
data: {
output: {
content: 'I need to call a tool',
tool_calls: [
{
id: 'call_123',
name: 'TestTool',
args: { input: 'test data' },
type: 'tool_call',
},
],
},
},
};
};
mockRunnableSequence.streamEvents = jest.fn().mockReturnValue(mockStreamEvents());
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return true;
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
enableStreaming: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
mockContext.sendChunk = jest.fn() as any;
const result = (await toolsAgentExecute.call(
mockContext,
)) as EngineRequest<RequestResponseMetadata>;
expect(mockContext.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 0, 'I need to call a tool');
expect(mockContext.sendChunk).toHaveBeenCalledWith('end', 0);
expect(result.actions).toBeDefined();
expect(result.actions).toHaveLength(1);
expect(result.actions[0].nodeName).toBe('TestTool');
expect(result.actions[0].input).toEqual({ input: 'test data' });
});
it('should handle streaming without tool calls', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
mockContext.isStreaming = jest.fn().mockReturnValue(true) as any;
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
// Mock streaming events without tool calls
const mockStreamEvents = async function* () {
yield {
event: 'on_chat_model_stream',
data: {
chunk: {
content: 'Hello ',
} as AIMessageChunk,
},
};
yield {
event: 'on_chat_model_stream',
data: {
chunk: {
content: 'world!',
} as AIMessageChunk,
},
};
};
mockRunnableSequence.streamEvents = jest.fn().mockReturnValue(mockStreamEvents());
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return true;
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
enableStreaming: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
mockContext.sendChunk = jest.fn() as any;
const result = await toolsAgentExecute.call(mockContext);
expect(mockContext.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 0, 'Hello ');
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 0, 'world!');
expect(mockContext.sendChunk).toHaveBeenCalledWith('end', 0);
expect((result as INodeExecutionData[][])[0]).toHaveLength(1);
expect((result as INodeExecutionData[][])[0][0].json.output).toBe('Hello world!');
});
it('should capture intermediate steps during streaming when returnIntermediateSteps is true', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
mockContext.isStreaming = jest.fn().mockReturnValue(true) as any;
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
// Mock streaming events without tool calls at the end - just streaming response
const mockStreamEvents = async function* () {
yield {
event: 'on_chat_model_stream',
data: {
chunk: {
content: 'Final response',
} as AIMessageChunk,
},
};
};
mockRunnableSequence.streamEvents = jest.fn().mockReturnValue(mockStreamEvents());
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return true;
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: true, // Enable intermediate steps
passthroughBinaryImages: true,
enableStreaming: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
mockContext.sendChunk = jest.fn() as any;
const result = await toolsAgentExecute.call(mockContext);
expect((result as INodeExecutionData[][])[0]).toHaveLength(1);
expect((result as INodeExecutionData[][])[0][0].json.output).toBe('Final response');
// Since we removed the tool calls from this test, there should be no intermediate steps
expect((result as INodeExecutionData[][])[0][0].json.intermediateSteps).toBeDefined();
expect((result as INodeExecutionData[][])[0][0].json.intermediateSteps).toHaveLength(0);
});
it('should handle response with previous tool calls', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest
.fn()
.mockResolvedValue({ returnValues: { output: 'success with tools' } });
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return false;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
// Mock a response with tool call results - but don't set itemIndex to match current item
// so it doesn't return null and skip processing
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
id: 'call_123',
nodeName: 'TestTool',
input: { input: 'test data', id: 'call_123' },
metadata: { itemIndex: 0, previousRequests: [] },
actionType: 'ExecutionNodeAction',
type: 'ai_tool',
},
data: [{ json: { result: 'tool result' } }] as any,
},
],
metadata: { itemIndex: 999, previousRequests: [] }, // Different itemIndex so it doesn't skip
};
const result = await toolsAgentExecute.call(mockContext, response);
expect(mockRunnableSequence.invoke).toHaveBeenCalledTimes(1);
expect((result as INodeExecutionData[][])[0]).toHaveLength(1);
expect((result as INodeExecutionData[][])[0][0].json).toEqual({ output: 'success with tools' });
// Check that invoke was called with the built steps
const invokeCall = mockRunnableSequence.invoke.mock.calls[0][0];
expect(invokeCall.steps).toBeDefined();
expect(invokeCall.steps).toHaveLength(1);
expect(invokeCall.steps[0].action.tool).toBe('TestTool');
expect(invokeCall.steps[0].observation).toBe('[{"json":{"result":"tool result"}}]');
});
it('should handle memory save with tool call context', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
const mockModel = mock<BaseChatModel>();
const mockMemory = mock<BaseChatMemory>();
mockMemory.saveContext = jest.fn() as any;
(mockMemory.loadMemoryVariables as jest.Mock) = jest
.fn()
.mockResolvedValue({ chat_history: [] });
mockMemory.chatHistory = { getMessages: jest.fn().mockResolvedValue([]) } as any;
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest
.fn()
.mockResolvedValue({ returnValues: { output: 'success' } });
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(mockMemory);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return false;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
// Mock a response with tool call results - use different itemIndex to avoid skipping
const response: EngineResponse<RequestResponseMetadata> = {
actionResponses: [
{
action: {
id: 'call_123',
nodeName: 'TestTool',
input: { input: 'test data', id: 'call_123' },
metadata: { itemIndex: 0, previousRequests: [] },
actionType: 'ExecutionNodeAction' as const,
type: 'ai_tool' as const,
},
data: [{ json: { result: 'tool result' } }] as any,
},
],
metadata: { itemIndex: 999, previousRequests: [] }, // Different itemIndex so it doesn't skip
};
await toolsAgentExecute.call(mockContext, response);
expect(mockMemory.saveContext).toHaveBeenCalledWith(
{ input: 'test input' },
{
output:
'[Used tools: Tool: TestTool, Input: "test data", Result: [{"json":{"result":"tool result"}}]] success',
},
);
});
it('should handle errors in batch processing when continueOnFail is true', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([
{ json: { text: 'test input 1' } },
{ json: { text: 'test input 2' } },
]);
mockContext.continueOnFail.mockReturnValue(true);
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest
.fn()
.mockResolvedValueOnce({ returnValues: { output: 'success' } })
.mockRejectedValueOnce(new Error('Test error'));
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.batching.batchSize') return 2;
if (param === 'options.batching.delayBetweenBatches') return 0;
if (param === 'options.enableStreaming') return false;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await toolsAgentExecute.call(mockContext);
expect((result as INodeExecutionData[][])[0]).toHaveLength(2);
expect((result as INodeExecutionData[][])[0][0].json).toEqual({ output: 'success' });
expect((result as INodeExecutionData[][])[0][1].json).toEqual({ error: 'Test error' });
});
it('should throw error when continueOnFail is false', async () => {
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
mockContext.continueOnFail.mockReturnValue(false);
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest.fn().mockRejectedValue(new Error('Test error'));
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return false;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
await expect(toolsAgentExecute.call(mockContext)).rejects.toThrow(NodeOperationError);
});
it('should handle streaming when not available (SupplyDataFunctions context)', async () => {
const mockSupplyDataContext = mock<ISupplyDataFunctions>();
mockSupplyDataContext.logger = {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};
const mockNode = mock<INode>();
mockNode.typeVersion = 3;
mockSupplyDataContext.getNode.mockReturnValue(mockNode);
mockSupplyDataContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
const mockModel = mock<BaseChatModel>();
const mockAgent = mock<any>();
const mockRunnableSequence = mock<any>();
mockRunnableSequence.singleAction = true;
mockRunnableSequence.streamRunnable = false;
mockRunnableSequence.invoke = jest
.fn()
.mockResolvedValue({ returnValues: { output: 'success' } });
(createToolCallingAgent as jest.Mock).mockReturnValue(mockAgent);
(RunnableSequence.from as jest.Mock).mockReturnValue(mockRunnableSequence);
jest.spyOn(commonHelpers, 'getChatModel').mockResolvedValue(mockModel);
jest.spyOn(commonHelpers, 'getOptionalMemory').mockResolvedValue(undefined);
jest.spyOn(commonHelpers, 'getTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(commonHelpers, 'prepareMessages').mockResolvedValue([]);
jest.spyOn(commonHelpers, 'preparePrompt').mockReturnValue(mock<ChatPromptTemplate>());
jest.spyOn(helpers, 'getPromptInputByType').mockReturnValue('test input');
mockSupplyDataContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'needsFallback') return false;
if (param === 'options.enableStreaming') return true;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
mockSupplyDataContext.getExecutionCancelSignal.mockReturnValue(new AbortController().signal);
const result = await toolsAgentExecute.call(mockSupplyDataContext);
expect(mockRunnableSequence.invoke).toHaveBeenCalledTimes(1);
expect((result as INodeExecutionData[][])[0]).toHaveLength(1);
expect((result as INodeExecutionData[][])[0][0].json).toEqual({ output: 'success' });
});
});

View File

@ -6,7 +6,7 @@ import { z } from 'zod';
import type { ZodObjectAny } from '../../../../types/types';
import { checkForStructuredTools } from '../agents/utils';
import { getInputs } from '../V2/utils';
import { getInputs } from '../utils';
describe('checkForStructuredTools', () => {
let mockNode: INode;

View File

@ -0,0 +1,95 @@
// Function used in the inputs expression to figure out which inputs to
import {
type INodeInputConfiguration,
type INodeInputFilter,
type NodeConnectionType,
} from 'n8n-workflow';
// display based on the agent type
export function getInputs(
hasMainInput?: boolean,
hasOutputParser?: boolean,
needsFallback?: boolean,
): Array<NodeConnectionType | INodeInputConfiguration> {
interface SpecialInput {
type: NodeConnectionType;
filter?: INodeInputFilter;
displayName: string;
required?: boolean;
}
const getInputData = (
inputs: SpecialInput[],
): Array<NodeConnectionType | INodeInputConfiguration> => {
return inputs.map(({ type, filter, displayName, required }) => {
const input: INodeInputConfiguration = {
type,
displayName,
required,
maxConnections: ['ai_languageModel', 'ai_memory', 'ai_outputParser'].includes(type)
? 1
: undefined,
};
if (filter) {
input.filter = filter;
}
return input;
});
};
let specialInputs: SpecialInput[] = [
{
type: 'ai_languageModel',
displayName: 'Chat Model',
required: true,
filter: {
excludedNodes: [
'@n8n/n8n-nodes-langchain.lmCohere',
'@n8n/n8n-nodes-langchain.lmOllama',
'n8n/n8n-nodes-langchain.lmOpenAi',
'@n8n/n8n-nodes-langchain.lmOpenHuggingFaceInference',
],
},
},
{
type: 'ai_languageModel',
displayName: 'Fallback Model',
required: true,
filter: {
excludedNodes: [
'@n8n/n8n-nodes-langchain.lmCohere',
'@n8n/n8n-nodes-langchain.lmOllama',
'n8n/n8n-nodes-langchain.lmOpenAi',
'@n8n/n8n-nodes-langchain.lmOpenHuggingFaceInference',
],
},
},
{
displayName: 'Memory',
type: 'ai_memory',
},
{
displayName: 'Tool',
type: 'ai_tool',
},
{
displayName: 'Output Parser',
type: 'ai_outputParser',
},
];
if (hasOutputParser === false) {
specialInputs = specialInputs.filter((input) => input.type !== 'ai_outputParser');
}
if (needsFallback === false) {
specialInputs = specialInputs.filter((input) => input.displayName !== 'Fallback Model');
}
// Note cannot use NodeConnectionType.Main
// otherwise expression won't evaluate correctly on the FE
const mainInputs = hasMainInput ? ['main' as NodeConnectionType] : [];
return [...mainInputs, ...getInputData(specialInputs)];
}

View File

@ -110,9 +110,20 @@ export abstract class NodeExecutionContext implements Omit<FunctionsBase, 'getCr
return output;
}
getParentNodes(nodeName: string, options?: { includeNodeParameters?: boolean }) {
getParentNodes(
nodeName: string,
options?: {
includeNodeParameters?: boolean;
connectionType?: NodeConnectionType;
depth?: number;
},
) {
const output: NodeTypeAndVersion[] = [];
const nodeNames = this.workflow.getParentNodes(nodeName);
const nodeNames = this.workflow.getParentNodes(
nodeName,
options?.connectionType,
options?.depth,
);
for (const n of nodeNames) {
const node = this.workflow.nodes[n];

View File

@ -904,7 +904,11 @@ export interface FunctionsBase {
): NodeTypeAndVersion[];
getParentNodes(
nodeName: string,
options?: { includeNodeParameters?: boolean },
options?: {
includeNodeParameters?: boolean;
connectionType?: NodeConnectionType;
depth?: number;
},
): NodeTypeAndVersion[];
getKnownNodeTypes(): IDataObject;
getMode?: () => WorkflowExecuteMode;

View File

@ -1040,10 +1040,19 @@ export class WorkflowDataProxy {
},
);
}
const inputData =
that.runExecutionData?.resultData.runData[that.activeNodeName]?.[runIndex].inputOverride;
const placeholdersDataInputData =
inputData?.[NodeConnectionTypes.AiTool]?.[0]?.[itemIndex].json;
const resultData = that.runExecutionData?.resultData.runData[that.activeNodeName]?.[runIndex];
let inputData;
let placeholdersDataInputData;
if (!resultData) {
inputData = this.connectionInputData?.[runIndex];
placeholdersDataInputData = inputData.json;
} else {
inputData =
that.runExecutionData?.resultData.runData[that.activeNodeName]?.[runIndex].inputOverride;
placeholdersDataInputData = inputData?.[NodeConnectionTypes.AiTool]?.[0]?.[itemIndex].json;
}
if (!placeholdersDataInputData) {
throw new ExpressionError('No execution data available', {

View File

@ -542,6 +542,133 @@ describe('WorkflowDataProxy', () => {
expect(() => getFromAIProxy().$fromAI('invalid key')).toThrow(ExpressionError);
expect(() => getFromAIProxy().$fromAI('invalid!')).toThrow(ExpressionError);
});
test('Falls back to connectionInputData when no resultData exists', () => {
// Create a workflow with connectionInputData but no resultData
const workflowWithoutResultData: IWorkflowBase = {
id: '123',
name: 'test workflow',
nodes: [
{
id: 'aiNode',
name: 'AI Node',
type: 'n8n-nodes-base.aiAgent',
typeVersion: 1,
position: [0, 0],
parameters: {},
},
],
connections: {},
active: false,
isArchived: false,
createdAt: new Date(),
updatedAt: new Date(),
};
// Create connection input data with AI query data
const connectionInputData = [
{
json: {
full_name: 'Test User',
email: 'test@example.com',
},
pairedItem: { item: 0 },
},
];
const dataProxy = new WorkflowDataProxy(
new Workflow({
id: '123',
name: 'test workflow',
nodes: workflowWithoutResultData.nodes,
connections: workflowWithoutResultData.connections,
active: false,
nodeTypes: Helpers.NodeTypes(),
}),
null, // No run execution data
0,
0,
'AI Node',
connectionInputData,
{},
'manual',
{},
undefined,
);
const proxy = dataProxy.getDataProxy();
expect(proxy.$fromAI('full_name')).toEqual('Test User');
expect(proxy.$fromAI('email')).toEqual('test@example.com');
expect(proxy.$fromAI('non_existent_key', 'description', 'string', 'default_value')).toEqual(
'default_value',
);
});
test('Returns default value when connection input data lacks expected keys', () => {
// Create a workflow with connection input data that doesn't have the expected AI keys
const workflowWithLimitedData: IWorkflowBase = {
id: '123',
name: 'test workflow',
nodes: [
{
id: 'aiNode',
name: 'AI Node',
type: 'n8n-nodes-base.aiAgent',
typeVersion: 1,
position: [0, 0],
parameters: {},
},
],
connections: {},
active: false,
isArchived: false,
createdAt: new Date(),
updatedAt: new Date(),
};
// Connection input data without the expected AI keys
const connectionInputData = [
{
json: {
some_other_field: 'other data',
regular_field: 'regular_value',
},
pairedItem: { item: 0 },
},
];
const dataProxy = new WorkflowDataProxy(
new Workflow({
id: '123',
name: 'test workflow',
nodes: workflowWithLimitedData.nodes,
connections: workflowWithLimitedData.connections,
active: false,
nodeTypes: Helpers.NodeTypes(),
}),
null, // No run execution data
0,
0,
'AI Node',
connectionInputData,
{},
'manual',
{},
undefined,
);
const proxy = dataProxy.getDataProxy();
// Should return undefined for missing keys and default value when provided
expect(proxy.$fromAI('missing_key')).toBeUndefined();
expect(proxy.$fromAI('missing_key', 'description', 'string', 'default_value')).toEqual(
'default_value',
);
// Should return existing values for keys that are present
expect(proxy.$fromAI('regular_field')).toEqual('regular_value');
});
});
describe('$rawParameter', () => {