mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-29 15:57:00 +02:00
615 lines
18 KiB
TypeScript
615 lines
18 KiB
TypeScript
/**
|
|
* V2 CLI Entry Point
|
|
*
|
|
* Demonstrates how to use the v2 evaluation harness.
|
|
* Can be run directly or used as a reference for custom setups.
|
|
*/
|
|
|
|
import type { INodeTypeDescription } from 'n8n-workflow';
|
|
import pLimit from 'p-limit';
|
|
|
|
import { CodeWorkflowBuilder } from '@/code-builder';
|
|
import type { HistoryContext } from '@/code-builder';
|
|
import type { ConversationEntry } from '@/code-builder/utils/code-builder-session';
|
|
import type { CoordinationLogEntry } from '@/types/coordination';
|
|
import type { StreamChunk, WorkflowUpdateChunk } from '@/types/streaming';
|
|
import type { SimpleWorkflow } from '@/types/workflow';
|
|
import type { BuilderFeatureFlags } from '@/workflow-builder-agent';
|
|
|
|
/** Type guard for SimpleWorkflow */
|
|
function isSimpleWorkflow(value: unknown): value is SimpleWorkflow {
|
|
return (
|
|
typeof value === 'object' &&
|
|
value !== null &&
|
|
'name' in value &&
|
|
'nodes' in value &&
|
|
'connections' in value
|
|
);
|
|
}
|
|
|
|
import {
|
|
argsToStageModels,
|
|
getDefaultDatasetName,
|
|
getDefaultExperimentName,
|
|
parseEvaluationArgs,
|
|
} from './argument-parser';
|
|
import { buildCIMetadata } from './ci-metadata';
|
|
import {
|
|
loadTestCasesFromCsv,
|
|
loadDefaultTestCases,
|
|
getDefaultTestCaseIds,
|
|
} from './csv-prompt-loader';
|
|
import { sendWebhookNotification } from './webhook';
|
|
import { WorkflowGenerationError } from '../errors';
|
|
import {
|
|
collectAgentTextResponse,
|
|
extractSubgraphMetrics,
|
|
getChatPayload,
|
|
} from '../harness/evaluation-helpers';
|
|
import type { DatasetInputContext } from '../harness/harness-types';
|
|
import { createLogger } from '../harness/logger';
|
|
import type { GenerationCollectors, SubgraphMetricsCollector } from '../harness/runner';
|
|
import { TokenUsageTrackingHandler } from '../harness/token-tracking-handler';
|
|
import {
|
|
runEvaluation,
|
|
createConsoleLifecycle,
|
|
mergeLifecycles,
|
|
createLLMJudgeEvaluator,
|
|
createProgrammaticEvaluator,
|
|
createPairwiseEvaluator,
|
|
createSimilarityEvaluator,
|
|
createExecutionEvaluator,
|
|
createBinaryChecksEvaluator,
|
|
type RunConfig,
|
|
type TestCase,
|
|
type Evaluator,
|
|
type EvaluationContext,
|
|
type GenerationResult,
|
|
} from '../index';
|
|
import { generateRunId, isWorkflowStateValues } from '../langsmith/types';
|
|
import { createIntrospectionAnalysisLifecycle } from '../lifecycles/introspection-analysis';
|
|
import { AGENT_TYPES, EVAL_TYPES, EVAL_USERS } from '../support/constants';
|
|
import {
|
|
setupTestEnvironment,
|
|
createAgent,
|
|
resolveNodesBasePath,
|
|
type ResolvedStageLLMs,
|
|
} from '../support/environment';
|
|
import { generateEvalPinData } from '../support/pin-data-generator';
|
|
|
|
/**
|
|
* Type guard for workflow update chunks from streaming output.
|
|
*/
|
|
function isWorkflowUpdateChunk(chunk: StreamChunk): chunk is WorkflowUpdateChunk {
|
|
return chunk.type === 'workflow-updated';
|
|
}
|
|
|
|
/**
|
|
* Type guard to check if state values contain a coordination log.
|
|
*/
|
|
function hasCoordinationLog(
|
|
values: unknown,
|
|
): values is { coordinationLog: CoordinationLogEntry[] } {
|
|
if (!values || typeof values !== 'object') return false;
|
|
const obj = values as Record<string, unknown>;
|
|
return Array.isArray(obj.coordinationLog);
|
|
}
|
|
|
|
/**
|
|
* Report subgraph metrics from coordination log and workflow.
|
|
*/
|
|
function reportSubgraphMetrics(
|
|
collector: SubgraphMetricsCollector,
|
|
stateValues: unknown,
|
|
workflow: SimpleWorkflow,
|
|
): void {
|
|
const coordinationLog = hasCoordinationLog(stateValues) ? stateValues.coordinationLog : undefined;
|
|
const nodeCount = workflow.nodes?.length;
|
|
const metrics = extractSubgraphMetrics(coordinationLog, nodeCount);
|
|
|
|
if (
|
|
metrics.discoveryDurationMs !== undefined ||
|
|
metrics.builderDurationMs !== undefined ||
|
|
metrics.responderDurationMs !== undefined ||
|
|
metrics.nodeCount !== undefined
|
|
) {
|
|
collector(metrics);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a workflow generator function for the multi-agent system.
|
|
* LangSmith tracing is handled via traceable() in the runner.
|
|
* Callbacks are passed explicitly from the runner to ensure correct trace context
|
|
* under high concurrency (avoids AsyncLocalStorage race conditions).
|
|
*/
|
|
function createWorkflowGenerator(
|
|
parsedNodeTypes: INodeTypeDescription[],
|
|
llms: ResolvedStageLLMs,
|
|
featureFlags?: BuilderFeatureFlags,
|
|
): (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
) => Promise<GenerationResult> {
|
|
return async (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
): Promise<GenerationResult> => {
|
|
const runId = generateRunId();
|
|
|
|
const agent = createAgent({
|
|
parsedNodeTypes,
|
|
llms,
|
|
featureFlags,
|
|
});
|
|
|
|
// Create token tracking handler to capture usage from all LLM calls
|
|
// (supervisor, discovery, builder, responder agents)
|
|
const tokenTracker = collectors?.tokenUsage ? new TokenUsageTrackingHandler() : undefined;
|
|
|
|
const agentTextResponse = await collectAgentTextResponse(
|
|
agent.chat(
|
|
getChatPayload({
|
|
evalType: EVAL_TYPES.LANGSMITH,
|
|
message: prompt,
|
|
workflowId: runId,
|
|
featureFlags,
|
|
workflowContext: datasetInputContext?.workflowContext,
|
|
mode: datasetInputContext?.mode,
|
|
}),
|
|
EVAL_USERS.LANGSMITH,
|
|
undefined, // abortSignal
|
|
tokenTracker ? [tokenTracker] : undefined, // externalCallbacks
|
|
datasetInputContext?.historicalMessages,
|
|
),
|
|
);
|
|
|
|
const state = await agent.getState(runId, EVAL_USERS.LANGSMITH);
|
|
|
|
if (!state.values || !isWorkflowStateValues(state.values)) {
|
|
throw new Error('Invalid workflow state: workflow or messages missing');
|
|
}
|
|
|
|
const workflow = state.values.workflowJSON;
|
|
|
|
// Report accumulated token usage from all agents
|
|
if (collectors?.tokenUsage && tokenTracker) {
|
|
const usage = tokenTracker.getUsage();
|
|
if (usage.inputTokens > 0 || usage.outputTokens > 0) {
|
|
collectors.tokenUsage(usage);
|
|
}
|
|
}
|
|
|
|
// Extract and report subgraph metrics from coordination log
|
|
if (collectors?.subgraphMetrics) {
|
|
reportSubgraphMetrics(collectors.subgraphMetrics, state.values, workflow);
|
|
}
|
|
|
|
// Report introspection events
|
|
collectors?.introspectionEvents?.(state.values.introspectionEvents ?? []);
|
|
|
|
return { workflow, agentTextResponse };
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Create evaluators based on suite type.
|
|
*/
|
|
function createEvaluators(params: {
|
|
suite: string;
|
|
judgeLlm: ResolvedStageLLMs['judge'];
|
|
parsedNodeTypes: Parameters<typeof createProgrammaticEvaluator>[0];
|
|
numJudges: number;
|
|
checks?: string[];
|
|
}): Array<Evaluator<EvaluationContext>> {
|
|
const { suite, judgeLlm, parsedNodeTypes, numJudges, checks } = params;
|
|
const evaluators: Array<Evaluator<EvaluationContext>> = [];
|
|
|
|
switch (suite) {
|
|
case 'llm-judge':
|
|
evaluators.push(createLLMJudgeEvaluator(judgeLlm, parsedNodeTypes));
|
|
evaluators.push(createProgrammaticEvaluator(parsedNodeTypes));
|
|
break;
|
|
case 'pairwise':
|
|
evaluators.push(createPairwiseEvaluator(judgeLlm, { numJudges }));
|
|
evaluators.push(createProgrammaticEvaluator(parsedNodeTypes));
|
|
break;
|
|
case 'programmatic':
|
|
evaluators.push(createProgrammaticEvaluator(parsedNodeTypes));
|
|
break;
|
|
case 'similarity':
|
|
evaluators.push(createSimilarityEvaluator());
|
|
break;
|
|
case 'binary-checks':
|
|
evaluators.push(
|
|
createBinaryChecksEvaluator({
|
|
nodeTypes: parsedNodeTypes,
|
|
llm: judgeLlm,
|
|
checks,
|
|
}),
|
|
);
|
|
break;
|
|
}
|
|
|
|
return evaluators;
|
|
}
|
|
|
|
/**
|
|
* Convert raw LangChain messages from dataset into HistoryContext for the code builder.
|
|
* Pairs up human/AI messages as conversation entries.
|
|
*/
|
|
function buildHistoryContextFromMessages(messages: unknown[]): HistoryContext | undefined {
|
|
if (messages.length === 0) return undefined;
|
|
|
|
const entries: ConversationEntry[] = [];
|
|
|
|
for (let i = 0; i < messages.length; i++) {
|
|
const msg = messages[i];
|
|
if (!isUnknownRecord(msg)) continue;
|
|
|
|
const msgId = msg.id;
|
|
const isHuman = Array.isArray(msgId) && msgId.includes('HumanMessage');
|
|
|
|
if (!isHuman) continue;
|
|
|
|
const kwargs = msg.kwargs;
|
|
if (!isUnknownRecord(kwargs) || typeof kwargs.content !== 'string') continue;
|
|
|
|
const userContent = kwargs.content;
|
|
|
|
// Look for a following AI message to pair with
|
|
const nextMsg = i + 1 < messages.length ? messages[i + 1] : undefined;
|
|
const nextIsAI =
|
|
isUnknownRecord(nextMsg) && Array.isArray(nextMsg.id) && nextMsg.id.includes('AIMessage');
|
|
|
|
if (nextIsAI && isUnknownRecord(nextMsg)) {
|
|
const nextKwargs = nextMsg.kwargs;
|
|
const aiContent =
|
|
isUnknownRecord(nextKwargs) && typeof nextKwargs.content === 'string'
|
|
? nextKwargs.content
|
|
: '';
|
|
entries.push({
|
|
type: 'assistant-exchange',
|
|
userQuery: userContent,
|
|
assistantSummary: aiContent,
|
|
});
|
|
i++; // Skip the AI message
|
|
} else {
|
|
entries.push({ type: 'build-request', message: userContent });
|
|
}
|
|
}
|
|
|
|
return entries.length > 0 ? { conversationEntries: entries } : undefined;
|
|
}
|
|
|
|
function isUnknownRecord(value: unknown): value is Record<string, unknown> {
|
|
return typeof value === 'object' && value !== null && !Array.isArray(value);
|
|
}
|
|
|
|
/** Process a single stream message from CodeWorkflowBuilder, extracting workflow updates and text response */
|
|
function processCodeBuilderMessage(
|
|
message: StreamChunk,
|
|
state: { workflow: SimpleWorkflow | null; generatedCode?: string; textParts: string[] },
|
|
) {
|
|
if (isWorkflowUpdateChunk(message)) {
|
|
try {
|
|
const parsed: unknown = JSON.parse(message.codeSnippet);
|
|
if (isSimpleWorkflow(parsed)) {
|
|
state.workflow = parsed;
|
|
state.generatedCode = message.sourceCode;
|
|
}
|
|
} catch {
|
|
// Invalid JSON in codeSnippet — skip this message
|
|
}
|
|
} else if (message.type === 'message' && 'text' in message && typeof message.text === 'string') {
|
|
state.textParts.push(message.text);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a CodeWorkflowBuilder generator function.
|
|
* Uses the CodeWorkflowBuilder which coordinates planning and coding agents to generate
|
|
* workflows via TypeScript SDK code and emits workflow JSON directly in the stream.
|
|
* Returns GenerationResult including the source code for artifact saving.
|
|
*
|
|
* @param timeoutMs - Optional timeout in milliseconds. When provided, the agent will be
|
|
* aborted if it exceeds this duration. This ensures the generator
|
|
* actually stops instead of continuing to run after timeout rejection.
|
|
*/
|
|
function createCodeWorkflowBuilderGenerator(
|
|
parsedNodeTypes: INodeTypeDescription[],
|
|
llms: ResolvedStageLLMs,
|
|
timeoutMs?: number,
|
|
nodeDefinitionDirs?: string[],
|
|
): (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
) => Promise<GenerationResult> {
|
|
// Subgraph metrics are not applicable since CodeWorkflowBuilder doesn't use coordination logs.
|
|
return async (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
): Promise<GenerationResult> => {
|
|
const runId = generateRunId();
|
|
|
|
// Accumulate token usage across all LLM calls
|
|
let totalInputTokens = 0;
|
|
let totalOutputTokens = 0;
|
|
|
|
const builder = new CodeWorkflowBuilder({
|
|
llm: llms.builder,
|
|
nodeTypes: parsedNodeTypes,
|
|
nodeDefinitionDirs,
|
|
onTokenUsage: collectors?.tokenUsage
|
|
? (usage) => {
|
|
totalInputTokens += usage.inputTokens;
|
|
totalOutputTokens += usage.outputTokens;
|
|
}
|
|
: undefined,
|
|
});
|
|
|
|
const payload = getChatPayload({
|
|
evalType: EVAL_TYPES.LANGSMITH,
|
|
message: prompt,
|
|
workflowId: runId,
|
|
featureFlags: {},
|
|
workflowContext: datasetInputContext?.workflowContext,
|
|
mode: datasetInputContext?.mode,
|
|
});
|
|
|
|
// Build history context from dataset messages if available
|
|
const historyContext = datasetInputContext?.historicalMessages
|
|
? buildHistoryContextFromMessages(datasetInputContext.historicalMessages)
|
|
: undefined;
|
|
|
|
const streamState: {
|
|
workflow: SimpleWorkflow | null;
|
|
generatedCode?: string;
|
|
textParts: string[];
|
|
} = { workflow: null, textParts: [] };
|
|
|
|
// Create an AbortController to properly cancel the agent on timeout or error.
|
|
// Without this, the agent continues running even after Promise.race rejects,
|
|
// causing the full timeout duration to elapse before the error surfaces.
|
|
const abortController = new AbortController();
|
|
let timeoutId: NodeJS.Timeout | undefined;
|
|
|
|
if (timeoutMs !== undefined && timeoutMs > 0) {
|
|
timeoutId = setTimeout(() => {
|
|
abortController.abort(new Error(`CodeWorkflowBuilder timed out after ${timeoutMs}ms`));
|
|
}, timeoutMs);
|
|
}
|
|
|
|
try {
|
|
for await (const output of builder.chat(
|
|
payload,
|
|
EVAL_USERS.LANGSMITH,
|
|
abortController.signal,
|
|
historyContext,
|
|
)) {
|
|
for (const message of output.messages) {
|
|
processCodeBuilderMessage(message, streamState);
|
|
}
|
|
}
|
|
} finally {
|
|
if (timeoutId !== undefined) {
|
|
clearTimeout(timeoutId);
|
|
}
|
|
}
|
|
|
|
if (!streamState.workflow) {
|
|
throw new WorkflowGenerationError('CodeWorkflowBuilder did not produce a workflow');
|
|
}
|
|
|
|
// Report accumulated token usage
|
|
if (collectors?.tokenUsage && (totalInputTokens > 0 || totalOutputTokens > 0)) {
|
|
collectors.tokenUsage({ inputTokens: totalInputTokens, outputTokens: totalOutputTokens });
|
|
}
|
|
|
|
return {
|
|
workflow: streamState.workflow,
|
|
generatedCode: streamState.generatedCode,
|
|
agentTextResponse: streamState.textParts.join('') || undefined,
|
|
};
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Load test cases from various sources.
|
|
*/
|
|
function loadTestCases(args: ReturnType<typeof parseEvaluationArgs>): TestCase[] {
|
|
// From CSV file
|
|
if (args.promptsCsv) {
|
|
const testCases = loadTestCasesFromCsv(args.promptsCsv);
|
|
return args.maxExamples ? testCases.slice(0, args.maxExamples) : testCases;
|
|
}
|
|
|
|
// Predefined test case by id
|
|
if (args.testCase) {
|
|
const defaultCases = loadDefaultTestCases();
|
|
const match = defaultCases.find((tc) => tc.id === args.testCase);
|
|
if (!match) {
|
|
const options = getDefaultTestCaseIds().join(', ');
|
|
throw new Error(`Unknown --test-case "${args.testCase}". Available: ${options}`);
|
|
}
|
|
|
|
const testCases: TestCase[] = [
|
|
{
|
|
prompt: match.prompt,
|
|
id: match.id,
|
|
context: { dos: args.dos, donts: args.donts },
|
|
},
|
|
];
|
|
|
|
return args.maxExamples ? testCases.slice(0, args.maxExamples) : testCases;
|
|
}
|
|
|
|
// Single prompt from CLI
|
|
if (args.prompt) {
|
|
const testCases: TestCase[] = [
|
|
{
|
|
prompt: args.prompt,
|
|
context: {
|
|
dos: args.dos,
|
|
donts: args.donts,
|
|
},
|
|
},
|
|
];
|
|
return args.maxExamples ? testCases.slice(0, args.maxExamples) : testCases;
|
|
}
|
|
|
|
// Default: use bundled test cases
|
|
const defaultCases = loadDefaultTestCases();
|
|
return args.maxExamples ? defaultCases.slice(0, args.maxExamples) : defaultCases;
|
|
}
|
|
|
|
/**
|
|
* Main entry point for v2 evaluation CLI.
|
|
*/
|
|
export async function runV2Evaluation(): Promise<void> {
|
|
const args = parseEvaluationArgs();
|
|
|
|
if (args.backend === 'langsmith' && (args.prompt || args.promptsCsv || args.testCase)) {
|
|
throw new Error(
|
|
'LangSmith mode requires `--dataset` and does not support `--prompt`, `--prompts-csv`, or `--test-case`',
|
|
);
|
|
}
|
|
|
|
// Setup environment with per-stage model configuration
|
|
const logger = createLogger(args.verbose);
|
|
const stageModels = argsToStageModels(args);
|
|
|
|
const env = await setupTestEnvironment(stageModels, logger);
|
|
|
|
// Validate LangSmith client early if langsmith backend is requested
|
|
if (args.backend === 'langsmith' && !env.lsClient) {
|
|
throw new Error('LangSmith client not initialized - check LANGSMITH_API_KEY');
|
|
}
|
|
|
|
// Create workflow generator based on agent type
|
|
const generateWorkflow =
|
|
args.agent === AGENT_TYPES.CODE_BUILDER
|
|
? createCodeWorkflowBuilderGenerator(
|
|
env.parsedNodeTypes,
|
|
env.llms,
|
|
args.timeoutMs,
|
|
env.nodeDefinitionDirs,
|
|
)
|
|
: createWorkflowGenerator(env.parsedNodeTypes, env.llms, args.featureFlags);
|
|
|
|
// Create evaluators based on suite type
|
|
const evaluators = createEvaluators({
|
|
suite: args.suite,
|
|
judgeLlm: env.llms.judge,
|
|
parsedNodeTypes: env.parsedNodeTypes,
|
|
numJudges: args.numJudges,
|
|
checks: args.checks,
|
|
});
|
|
|
|
// Execution evaluator runs for all suites — validates workflows execute with pin data
|
|
evaluators.push(createExecutionEvaluator());
|
|
|
|
const llmCallLimiter = pLimit(args.concurrency);
|
|
|
|
// Merge console lifecycle with optional introspection analysis lifecycle
|
|
const mergedLifecycle = mergeLifecycles(
|
|
createConsoleLifecycle({ verbose: args.verbose, logger }),
|
|
args.suite === 'introspection'
|
|
? createIntrospectionAnalysisLifecycle({
|
|
judgeLlm: env.llms.judge,
|
|
outputDir: args.outputDir,
|
|
logger,
|
|
})
|
|
: undefined,
|
|
);
|
|
// Create pin data generator for mocking service node outputs in evaluations
|
|
const nodesBasePath = resolveNodesBasePath();
|
|
const pinDataGenerator = async (workflow: SimpleWorkflow) =>
|
|
await generateEvalPinData(workflow, {
|
|
llm: env.llms.judge,
|
|
nodeTypes: env.parsedNodeTypes,
|
|
nodesBasePath,
|
|
logger,
|
|
});
|
|
|
|
const baseConfig = {
|
|
generateWorkflow,
|
|
evaluators,
|
|
lifecycle: mergedLifecycle,
|
|
logger,
|
|
outputDir: args.outputDir,
|
|
outputCsv: args.outputCsv,
|
|
suite: args.suite,
|
|
timeoutMs: args.timeoutMs,
|
|
context: { llmCallLimiter },
|
|
passThreshold: args.suite === 'introspection' || args.suite === 'binary-checks' ? 0 : undefined,
|
|
pinDataGenerator,
|
|
};
|
|
|
|
const config: RunConfig =
|
|
args.backend === 'langsmith'
|
|
? {
|
|
...baseConfig,
|
|
mode: 'langsmith',
|
|
dataset: args.datasetName ?? getDefaultDatasetName(args.suite),
|
|
langsmithClient: env.lsClient!,
|
|
langsmithOptions: {
|
|
experimentName: args.experimentName ?? getDefaultExperimentName(args.suite),
|
|
repetitions: args.repetitions,
|
|
concurrency: args.concurrency,
|
|
maxExamples: args.maxExamples,
|
|
filters: args.filters,
|
|
experimentMetadata: {
|
|
...buildCIMetadata(),
|
|
...(args.suite === 'pairwise' && {
|
|
numJudges: args.numJudges,
|
|
scoringMethod: 'hierarchical',
|
|
}),
|
|
},
|
|
},
|
|
}
|
|
: {
|
|
...baseConfig,
|
|
mode: 'local',
|
|
dataset: loadTestCases(args),
|
|
concurrency: args.concurrency,
|
|
};
|
|
|
|
// Run evaluation
|
|
const summary = await runEvaluation(config);
|
|
|
|
if (args.webhookUrl) {
|
|
const dataset =
|
|
args.backend === 'langsmith'
|
|
? (args.datasetName ?? getDefaultDatasetName(args.suite))
|
|
: 'local-dataset';
|
|
|
|
await sendWebhookNotification({
|
|
webhookUrl: args.webhookUrl,
|
|
webhookSecret: args.webhookSecret,
|
|
summary,
|
|
dataset,
|
|
suite: args.suite,
|
|
metadata: { ...buildCIMetadata() },
|
|
logger,
|
|
});
|
|
}
|
|
|
|
// Always exit 0 on successful completion - pass/fail is informational, not an error
|
|
process.exit(0);
|
|
}
|
|
|
|
// Run if called directly
|
|
if (require.main === module) {
|
|
runV2Evaluation().catch((error) => {
|
|
const logger = createLogger(true);
|
|
const message = error instanceof Error ? (error.stack ?? error.message) : String(error);
|
|
logger.error(`Evaluation failed: ${message}`);
|
|
process.exit(1);
|
|
});
|
|
}
|