n8n/packages/@n8n/ai-workflow-builder.ee/evaluations/cli/index.ts
Eugene 00014420b1
refactor(core): Remove multi-agent architecture entry point from AI workflow builder (no-changelog) (#27925)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 16:32:19 +00:00

615 lines
18 KiB
TypeScript

/**
* V2 CLI Entry Point
*
* Demonstrates how to use the v2 evaluation harness.
* Can be run directly or used as a reference for custom setups.
*/
import type { INodeTypeDescription } from 'n8n-workflow';
import pLimit from 'p-limit';
import { CodeWorkflowBuilder } from '@/code-builder';
import type { HistoryContext } from '@/code-builder';
import type { ConversationEntry } from '@/code-builder/utils/code-builder-session';
import type { CoordinationLogEntry } from '@/types/coordination';
import type { StreamChunk, WorkflowUpdateChunk } from '@/types/streaming';
import type { SimpleWorkflow } from '@/types/workflow';
import type { BuilderFeatureFlags } from '@/workflow-builder-agent';
/** Type guard for SimpleWorkflow */
function isSimpleWorkflow(value: unknown): value is SimpleWorkflow {
return (
typeof value === 'object' &&
value !== null &&
'name' in value &&
'nodes' in value &&
'connections' in value
);
}
import {
argsToStageModels,
getDefaultDatasetName,
getDefaultExperimentName,
parseEvaluationArgs,
} from './argument-parser';
import { buildCIMetadata } from './ci-metadata';
import {
loadTestCasesFromCsv,
loadDefaultTestCases,
getDefaultTestCaseIds,
} from './csv-prompt-loader';
import { sendWebhookNotification } from './webhook';
import { WorkflowGenerationError } from '../errors';
import {
collectAgentTextResponse,
extractSubgraphMetrics,
getChatPayload,
} from '../harness/evaluation-helpers';
import type { DatasetInputContext } from '../harness/harness-types';
import { createLogger } from '../harness/logger';
import type { GenerationCollectors, SubgraphMetricsCollector } from '../harness/runner';
import { TokenUsageTrackingHandler } from '../harness/token-tracking-handler';
import {
runEvaluation,
createConsoleLifecycle,
mergeLifecycles,
createLLMJudgeEvaluator,
createProgrammaticEvaluator,
createPairwiseEvaluator,
createSimilarityEvaluator,
createExecutionEvaluator,
createBinaryChecksEvaluator,
type RunConfig,
type TestCase,
type Evaluator,
type EvaluationContext,
type GenerationResult,
} from '../index';
import { generateRunId, isWorkflowStateValues } from '../langsmith/types';
import { createIntrospectionAnalysisLifecycle } from '../lifecycles/introspection-analysis';
import { AGENT_TYPES, EVAL_TYPES, EVAL_USERS } from '../support/constants';
import {
setupTestEnvironment,
createAgent,
resolveNodesBasePath,
type ResolvedStageLLMs,
} from '../support/environment';
import { generateEvalPinData } from '../support/pin-data-generator';
/**
* Type guard for workflow update chunks from streaming output.
*/
function isWorkflowUpdateChunk(chunk: StreamChunk): chunk is WorkflowUpdateChunk {
return chunk.type === 'workflow-updated';
}
/**
* Type guard to check if state values contain a coordination log.
*/
function hasCoordinationLog(
values: unknown,
): values is { coordinationLog: CoordinationLogEntry[] } {
if (!values || typeof values !== 'object') return false;
const obj = values as Record<string, unknown>;
return Array.isArray(obj.coordinationLog);
}
/**
* Report subgraph metrics from coordination log and workflow.
*/
function reportSubgraphMetrics(
collector: SubgraphMetricsCollector,
stateValues: unknown,
workflow: SimpleWorkflow,
): void {
const coordinationLog = hasCoordinationLog(stateValues) ? stateValues.coordinationLog : undefined;
const nodeCount = workflow.nodes?.length;
const metrics = extractSubgraphMetrics(coordinationLog, nodeCount);
if (
metrics.discoveryDurationMs !== undefined ||
metrics.builderDurationMs !== undefined ||
metrics.responderDurationMs !== undefined ||
metrics.nodeCount !== undefined
) {
collector(metrics);
}
}
/**
* Create a workflow generator function for the multi-agent system.
* LangSmith tracing is handled via traceable() in the runner.
* Callbacks are passed explicitly from the runner to ensure correct trace context
* under high concurrency (avoids AsyncLocalStorage race conditions).
*/
function createWorkflowGenerator(
parsedNodeTypes: INodeTypeDescription[],
llms: ResolvedStageLLMs,
featureFlags?: BuilderFeatureFlags,
): (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
) => Promise<GenerationResult> {
return async (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
): Promise<GenerationResult> => {
const runId = generateRunId();
const agent = createAgent({
parsedNodeTypes,
llms,
featureFlags,
});
// Create token tracking handler to capture usage from all LLM calls
// (supervisor, discovery, builder, responder agents)
const tokenTracker = collectors?.tokenUsage ? new TokenUsageTrackingHandler() : undefined;
const agentTextResponse = await collectAgentTextResponse(
agent.chat(
getChatPayload({
evalType: EVAL_TYPES.LANGSMITH,
message: prompt,
workflowId: runId,
featureFlags,
workflowContext: datasetInputContext?.workflowContext,
mode: datasetInputContext?.mode,
}),
EVAL_USERS.LANGSMITH,
undefined, // abortSignal
tokenTracker ? [tokenTracker] : undefined, // externalCallbacks
datasetInputContext?.historicalMessages,
),
);
const state = await agent.getState(runId, EVAL_USERS.LANGSMITH);
if (!state.values || !isWorkflowStateValues(state.values)) {
throw new Error('Invalid workflow state: workflow or messages missing');
}
const workflow = state.values.workflowJSON;
// Report accumulated token usage from all agents
if (collectors?.tokenUsage && tokenTracker) {
const usage = tokenTracker.getUsage();
if (usage.inputTokens > 0 || usage.outputTokens > 0) {
collectors.tokenUsage(usage);
}
}
// Extract and report subgraph metrics from coordination log
if (collectors?.subgraphMetrics) {
reportSubgraphMetrics(collectors.subgraphMetrics, state.values, workflow);
}
// Report introspection events
collectors?.introspectionEvents?.(state.values.introspectionEvents ?? []);
return { workflow, agentTextResponse };
};
}
/**
* Create evaluators based on suite type.
*/
function createEvaluators(params: {
suite: string;
judgeLlm: ResolvedStageLLMs['judge'];
parsedNodeTypes: Parameters<typeof createProgrammaticEvaluator>[0];
numJudges: number;
checks?: string[];
}): Array<Evaluator<EvaluationContext>> {
const { suite, judgeLlm, parsedNodeTypes, numJudges, checks } = params;
const evaluators: Array<Evaluator<EvaluationContext>> = [];
switch (suite) {
case 'llm-judge':
evaluators.push(createLLMJudgeEvaluator(judgeLlm, parsedNodeTypes));
evaluators.push(createProgrammaticEvaluator(parsedNodeTypes));
break;
case 'pairwise':
evaluators.push(createPairwiseEvaluator(judgeLlm, { numJudges }));
evaluators.push(createProgrammaticEvaluator(parsedNodeTypes));
break;
case 'programmatic':
evaluators.push(createProgrammaticEvaluator(parsedNodeTypes));
break;
case 'similarity':
evaluators.push(createSimilarityEvaluator());
break;
case 'binary-checks':
evaluators.push(
createBinaryChecksEvaluator({
nodeTypes: parsedNodeTypes,
llm: judgeLlm,
checks,
}),
);
break;
}
return evaluators;
}
/**
* Convert raw LangChain messages from dataset into HistoryContext for the code builder.
* Pairs up human/AI messages as conversation entries.
*/
function buildHistoryContextFromMessages(messages: unknown[]): HistoryContext | undefined {
if (messages.length === 0) return undefined;
const entries: ConversationEntry[] = [];
for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
if (!isUnknownRecord(msg)) continue;
const msgId = msg.id;
const isHuman = Array.isArray(msgId) && msgId.includes('HumanMessage');
if (!isHuman) continue;
const kwargs = msg.kwargs;
if (!isUnknownRecord(kwargs) || typeof kwargs.content !== 'string') continue;
const userContent = kwargs.content;
// Look for a following AI message to pair with
const nextMsg = i + 1 < messages.length ? messages[i + 1] : undefined;
const nextIsAI =
isUnknownRecord(nextMsg) && Array.isArray(nextMsg.id) && nextMsg.id.includes('AIMessage');
if (nextIsAI && isUnknownRecord(nextMsg)) {
const nextKwargs = nextMsg.kwargs;
const aiContent =
isUnknownRecord(nextKwargs) && typeof nextKwargs.content === 'string'
? nextKwargs.content
: '';
entries.push({
type: 'assistant-exchange',
userQuery: userContent,
assistantSummary: aiContent,
});
i++; // Skip the AI message
} else {
entries.push({ type: 'build-request', message: userContent });
}
}
return entries.length > 0 ? { conversationEntries: entries } : undefined;
}
function isUnknownRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
/** Process a single stream message from CodeWorkflowBuilder, extracting workflow updates and text response */
function processCodeBuilderMessage(
message: StreamChunk,
state: { workflow: SimpleWorkflow | null; generatedCode?: string; textParts: string[] },
) {
if (isWorkflowUpdateChunk(message)) {
try {
const parsed: unknown = JSON.parse(message.codeSnippet);
if (isSimpleWorkflow(parsed)) {
state.workflow = parsed;
state.generatedCode = message.sourceCode;
}
} catch {
// Invalid JSON in codeSnippet — skip this message
}
} else if (message.type === 'message' && 'text' in message && typeof message.text === 'string') {
state.textParts.push(message.text);
}
}
/**
* Create a CodeWorkflowBuilder generator function.
* Uses the CodeWorkflowBuilder which coordinates planning and coding agents to generate
* workflows via TypeScript SDK code and emits workflow JSON directly in the stream.
* Returns GenerationResult including the source code for artifact saving.
*
* @param timeoutMs - Optional timeout in milliseconds. When provided, the agent will be
* aborted if it exceeds this duration. This ensures the generator
* actually stops instead of continuing to run after timeout rejection.
*/
function createCodeWorkflowBuilderGenerator(
parsedNodeTypes: INodeTypeDescription[],
llms: ResolvedStageLLMs,
timeoutMs?: number,
nodeDefinitionDirs?: string[],
): (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
) => Promise<GenerationResult> {
// Subgraph metrics are not applicable since CodeWorkflowBuilder doesn't use coordination logs.
return async (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
): Promise<GenerationResult> => {
const runId = generateRunId();
// Accumulate token usage across all LLM calls
let totalInputTokens = 0;
let totalOutputTokens = 0;
const builder = new CodeWorkflowBuilder({
llm: llms.builder,
nodeTypes: parsedNodeTypes,
nodeDefinitionDirs,
onTokenUsage: collectors?.tokenUsage
? (usage) => {
totalInputTokens += usage.inputTokens;
totalOutputTokens += usage.outputTokens;
}
: undefined,
});
const payload = getChatPayload({
evalType: EVAL_TYPES.LANGSMITH,
message: prompt,
workflowId: runId,
featureFlags: {},
workflowContext: datasetInputContext?.workflowContext,
mode: datasetInputContext?.mode,
});
// Build history context from dataset messages if available
const historyContext = datasetInputContext?.historicalMessages
? buildHistoryContextFromMessages(datasetInputContext.historicalMessages)
: undefined;
const streamState: {
workflow: SimpleWorkflow | null;
generatedCode?: string;
textParts: string[];
} = { workflow: null, textParts: [] };
// Create an AbortController to properly cancel the agent on timeout or error.
// Without this, the agent continues running even after Promise.race rejects,
// causing the full timeout duration to elapse before the error surfaces.
const abortController = new AbortController();
let timeoutId: NodeJS.Timeout | undefined;
if (timeoutMs !== undefined && timeoutMs > 0) {
timeoutId = setTimeout(() => {
abortController.abort(new Error(`CodeWorkflowBuilder timed out after ${timeoutMs}ms`));
}, timeoutMs);
}
try {
for await (const output of builder.chat(
payload,
EVAL_USERS.LANGSMITH,
abortController.signal,
historyContext,
)) {
for (const message of output.messages) {
processCodeBuilderMessage(message, streamState);
}
}
} finally {
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
}
}
if (!streamState.workflow) {
throw new WorkflowGenerationError('CodeWorkflowBuilder did not produce a workflow');
}
// Report accumulated token usage
if (collectors?.tokenUsage && (totalInputTokens > 0 || totalOutputTokens > 0)) {
collectors.tokenUsage({ inputTokens: totalInputTokens, outputTokens: totalOutputTokens });
}
return {
workflow: streamState.workflow,
generatedCode: streamState.generatedCode,
agentTextResponse: streamState.textParts.join('') || undefined,
};
};
}
/**
* Load test cases from various sources.
*/
function loadTestCases(args: ReturnType<typeof parseEvaluationArgs>): TestCase[] {
// From CSV file
if (args.promptsCsv) {
const testCases = loadTestCasesFromCsv(args.promptsCsv);
return args.maxExamples ? testCases.slice(0, args.maxExamples) : testCases;
}
// Predefined test case by id
if (args.testCase) {
const defaultCases = loadDefaultTestCases();
const match = defaultCases.find((tc) => tc.id === args.testCase);
if (!match) {
const options = getDefaultTestCaseIds().join(', ');
throw new Error(`Unknown --test-case "${args.testCase}". Available: ${options}`);
}
const testCases: TestCase[] = [
{
prompt: match.prompt,
id: match.id,
context: { dos: args.dos, donts: args.donts },
},
];
return args.maxExamples ? testCases.slice(0, args.maxExamples) : testCases;
}
// Single prompt from CLI
if (args.prompt) {
const testCases: TestCase[] = [
{
prompt: args.prompt,
context: {
dos: args.dos,
donts: args.donts,
},
},
];
return args.maxExamples ? testCases.slice(0, args.maxExamples) : testCases;
}
// Default: use bundled test cases
const defaultCases = loadDefaultTestCases();
return args.maxExamples ? defaultCases.slice(0, args.maxExamples) : defaultCases;
}
/**
* Main entry point for v2 evaluation CLI.
*/
export async function runV2Evaluation(): Promise<void> {
const args = parseEvaluationArgs();
if (args.backend === 'langsmith' && (args.prompt || args.promptsCsv || args.testCase)) {
throw new Error(
'LangSmith mode requires `--dataset` and does not support `--prompt`, `--prompts-csv`, or `--test-case`',
);
}
// Setup environment with per-stage model configuration
const logger = createLogger(args.verbose);
const stageModels = argsToStageModels(args);
const env = await setupTestEnvironment(stageModels, logger);
// Validate LangSmith client early if langsmith backend is requested
if (args.backend === 'langsmith' && !env.lsClient) {
throw new Error('LangSmith client not initialized - check LANGSMITH_API_KEY');
}
// Create workflow generator based on agent type
const generateWorkflow =
args.agent === AGENT_TYPES.CODE_BUILDER
? createCodeWorkflowBuilderGenerator(
env.parsedNodeTypes,
env.llms,
args.timeoutMs,
env.nodeDefinitionDirs,
)
: createWorkflowGenerator(env.parsedNodeTypes, env.llms, args.featureFlags);
// Create evaluators based on suite type
const evaluators = createEvaluators({
suite: args.suite,
judgeLlm: env.llms.judge,
parsedNodeTypes: env.parsedNodeTypes,
numJudges: args.numJudges,
checks: args.checks,
});
// Execution evaluator runs for all suites — validates workflows execute with pin data
evaluators.push(createExecutionEvaluator());
const llmCallLimiter = pLimit(args.concurrency);
// Merge console lifecycle with optional introspection analysis lifecycle
const mergedLifecycle = mergeLifecycles(
createConsoleLifecycle({ verbose: args.verbose, logger }),
args.suite === 'introspection'
? createIntrospectionAnalysisLifecycle({
judgeLlm: env.llms.judge,
outputDir: args.outputDir,
logger,
})
: undefined,
);
// Create pin data generator for mocking service node outputs in evaluations
const nodesBasePath = resolveNodesBasePath();
const pinDataGenerator = async (workflow: SimpleWorkflow) =>
await generateEvalPinData(workflow, {
llm: env.llms.judge,
nodeTypes: env.parsedNodeTypes,
nodesBasePath,
logger,
});
const baseConfig = {
generateWorkflow,
evaluators,
lifecycle: mergedLifecycle,
logger,
outputDir: args.outputDir,
outputCsv: args.outputCsv,
suite: args.suite,
timeoutMs: args.timeoutMs,
context: { llmCallLimiter },
passThreshold: args.suite === 'introspection' || args.suite === 'binary-checks' ? 0 : undefined,
pinDataGenerator,
};
const config: RunConfig =
args.backend === 'langsmith'
? {
...baseConfig,
mode: 'langsmith',
dataset: args.datasetName ?? getDefaultDatasetName(args.suite),
langsmithClient: env.lsClient!,
langsmithOptions: {
experimentName: args.experimentName ?? getDefaultExperimentName(args.suite),
repetitions: args.repetitions,
concurrency: args.concurrency,
maxExamples: args.maxExamples,
filters: args.filters,
experimentMetadata: {
...buildCIMetadata(),
...(args.suite === 'pairwise' && {
numJudges: args.numJudges,
scoringMethod: 'hierarchical',
}),
},
},
}
: {
...baseConfig,
mode: 'local',
dataset: loadTestCases(args),
concurrency: args.concurrency,
};
// Run evaluation
const summary = await runEvaluation(config);
if (args.webhookUrl) {
const dataset =
args.backend === 'langsmith'
? (args.datasetName ?? getDefaultDatasetName(args.suite))
: 'local-dataset';
await sendWebhookNotification({
webhookUrl: args.webhookUrl,
webhookSecret: args.webhookSecret,
summary,
dataset,
suite: args.suite,
metadata: { ...buildCIMetadata() },
logger,
});
}
// Always exit 0 on successful completion - pass/fail is informational, not an error
process.exit(0);
}
// Run if called directly
if (require.main === module) {
runV2Evaluation().catch((error) => {
const logger = createLogger(true);
const message = error instanceof Error ? (error.stack ?? error.message) : String(error);
logger.error(`Evaluation failed: ${message}`);
process.exit(1);
});
}