n8n/packages/@n8n/ai-workflow-builder.ee/evaluations/harness/runner.ts
2026-03-31 07:49:10 +00:00

1651 lines
47 KiB
TypeScript

import { AIMessage, HumanMessage, type BaseMessage } from '@langchain/core/messages';
import { evaluate } from 'langsmith/evaluation';
import type { Run, Example } from 'langsmith/schemas';
import { traceable } from 'langsmith/traceable';
import type { IPinData } from 'n8n-workflow';
import pLimit from 'p-limit';
import { runWithOptionalLimiter, withTimeout } from './evaluation-helpers';
import { toLangsmithEvaluationResult } from './feedback';
import {
isGenerationResult,
type DatasetInputContext,
type Evaluator,
type TestCase,
type EvaluationContext,
type GlobalRunContext,
type TestCaseContext,
type Feedback,
type RunConfig,
type LocalRunConfig,
type LangsmithRunConfig,
type ExampleResult,
type RunSummary,
type EvaluationLifecycle,
type LangsmithExampleFilters,
type LlmCallLimiter,
type GenerationResult,
} from './harness-types';
import type { EvalLogger } from './logger';
import { createArtifactSaver, type ArtifactSaver } from './output';
import {
calculateWeightedScore,
selectScoringItems,
calculateFiniteAverage,
} from './score-calculator';
import type { IntrospectionEvent } from '../../src/tools/introspect.tool.js';
import type { SimpleWorkflow } from '../../src/types/workflow';
import type { ChatPayload } from '../../src/workflow-builder-agent';
import { extractMessageContent } from '../langsmith/types';
const DEFAULT_PASS_THRESHOLD = 0.7;
/**
* Callback to collect token usage from generation.
* Called after each workflow generation with the token counts.
*/
export type TokenUsageCollector = (usage: { inputTokens: number; outputTokens: number }) => void;
/**
* Callback to collect subgraph metrics from generation.
* Called after each workflow generation with timing and node count data.
*/
export type SubgraphMetricsCollector = (metrics: {
discoveryDurationMs?: number;
builderDurationMs?: number;
responderDurationMs?: number;
nodeCount?: number;
}) => void;
/**
* Callback to collect introspection events from generation.
* Called after each workflow generation with the events array.
*/
export type IntrospectionEventsCollector = (events: IntrospectionEvent[]) => void;
/**
* Combined collectors for workflow generation metrics.
*/
export interface GenerationCollectors {
tokenUsage?: TokenUsageCollector;
subgraphMetrics?: SubgraphMetricsCollector;
introspectionEvents?: IntrospectionEventsCollector;
}
/**
* Run evaluators in parallel for a single workflow.
* Handles errors gracefully - skip and continue.
*/
async function evaluateWithPlugins(
workflow: SimpleWorkflow,
evaluators: Array<Evaluator<EvaluationContext>>,
context: EvaluationContext,
timeoutMs: number | undefined,
lifecycle?: Partial<EvaluationLifecycle>,
): Promise<Feedback[]> {
const results = await Promise.all(
evaluators.map(async (evaluator): Promise<Feedback[]> => {
try {
const feedback = await withTimeout({
promise: evaluator.evaluate(workflow, context),
timeoutMs,
label: `evaluator:${evaluator.name}`,
});
lifecycle?.onEvaluatorComplete?.(evaluator.name, feedback);
return feedback;
} catch (error) {
const evaluatorError = error instanceof Error ? error : new Error(String(error));
lifecycle?.onEvaluatorError?.(evaluator.name, evaluatorError);
const errorFeedback: Feedback = {
evaluator: evaluator.name,
metric: 'error',
score: 0,
kind: 'score',
comment: evaluatorError.message,
};
return [errorFeedback];
}
}),
);
return results.flat();
}
/**
* Calculate example score from feedback using evaluator-weighted scoring.
*/
function calculateExampleScore(feedback: Feedback[]): number {
return calculateWeightedScore(feedback);
}
/**
* Determine pass/fail status based on average score.
*/
function determineStatus(args: { score: number; passThreshold: number }): 'pass' | 'fail' {
const { score, passThreshold } = args;
return score >= passThreshold ? 'pass' : 'fail';
}
function hasErrorFeedback(feedback: Feedback[]): boolean {
return feedback.some((f) => f.metric === 'error');
}
/**
* Convert milliseconds to seconds for LangSmith metrics.
* LangSmith scores must be within [-99999.9999, 99999.9999], so we store
* latencies in seconds (supporting up to ~27 hours) instead of milliseconds.
*/
function msToSeconds(ms: number): number {
return ms / 1000;
}
/**
* Create feedback items for subgraph metrics.
* These are reported to LangSmith as 'metrics' evaluator feedback.
*
* Note: Latencies are converted from milliseconds to seconds to stay within
* LangSmith's score limits while preserving precision for long-running operations.
*/
function createMetricsFeedback(args: {
discoveryDurationMs?: number;
builderDurationMs?: number;
responderDurationMs?: number;
nodeCount?: number;
}): Feedback[] {
const feedback: Feedback[] = [];
if (args.discoveryDurationMs !== undefined) {
feedback.push({
evaluator: 'metrics',
metric: 'discovery_latency_s',
score: msToSeconds(args.discoveryDurationMs),
kind: 'metric',
});
}
if (args.builderDurationMs !== undefined) {
feedback.push({
evaluator: 'metrics',
metric: 'builder_latency_s',
score: msToSeconds(args.builderDurationMs),
kind: 'metric',
});
}
if (args.responderDurationMs !== undefined) {
feedback.push({
evaluator: 'metrics',
metric: 'responder_latency_s',
score: msToSeconds(args.responderDurationMs),
kind: 'metric',
});
}
if (args.nodeCount !== undefined) {
feedback.push({
evaluator: 'metrics',
metric: 'node_count',
score: args.nodeCount,
kind: 'metric',
});
}
return feedback;
}
/**
* Build a typed evaluation context for evaluators.
*/
function buildContext(args: {
prompt: string;
globalContext?: GlobalRunContext;
testCaseContext?: TestCaseContext;
referenceWorkflows?: SimpleWorkflow[];
generatedCode?: string;
agentTextResponse?: string;
pinData?: IPinData;
datasetInputContext?: DatasetInputContext;
}): EvaluationContext {
const {
prompt,
globalContext,
testCaseContext,
referenceWorkflows,
generatedCode,
agentTextResponse,
pinData,
datasetInputContext,
} = args;
return {
prompt,
...(globalContext ?? {}),
...(testCaseContext ?? {}),
...(referenceWorkflows?.length ? { referenceWorkflows } : {}),
...(generatedCode ? { generatedCode } : {}),
...(agentTextResponse ? { agentTextResponse } : {}),
...(pinData ? { pinData } : {}),
...(datasetInputContext ? { datasetInputContext } : {}),
};
}
function isUnknownRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function isUnknownArray(value: unknown): value is unknown[] {
return Array.isArray(value);
}
function asRecord(value: unknown): Record<string, unknown> {
return isUnknownRecord(value) ? value : {};
}
function isNumberArray2(value: unknown): value is [number, number] {
return (
Array.isArray(value) &&
value.length === 2 &&
typeof value[0] === 'number' &&
Number.isFinite(value[0]) &&
typeof value[1] === 'number' &&
Number.isFinite(value[1])
);
}
function isNodeLike(value: unknown): boolean {
if (!isUnknownRecord(value)) return false;
const name = value.name;
const type = value.type;
const typeVersion = value.typeVersion;
const position = value.position;
return (
typeof name === 'string' &&
name.length > 0 &&
typeof type === 'string' &&
type.length > 0 &&
typeof typeVersion === 'number' &&
Number.isFinite(typeVersion) &&
isNumberArray2(position)
);
}
function isConnectionsLike(value: unknown): boolean {
if (!isUnknownRecord(value)) return false;
for (const nodeConnections of Object.values(value)) {
if (!isUnknownRecord(nodeConnections)) return false;
for (const connectionTypeValue of Object.values(nodeConnections)) {
if (!Array.isArray(connectionTypeValue)) return false;
for (const output of connectionTypeValue) {
if (!Array.isArray(output)) return false;
for (const connection of output) {
if (!isUnknownRecord(connection)) return false;
}
}
}
}
return true;
}
function isSimpleWorkflow(value: unknown): value is SimpleWorkflow {
if (!isUnknownRecord(value)) return false;
if (!Array.isArray(value.nodes)) return false;
if (!isConnectionsLike(value.connections)) return false;
return value.nodes.every(isNodeLike);
}
function getNotionIdFromMetadata(metadata: unknown): string | undefined {
const record = asRecord(metadata);
return typeof record.notion_id === 'string' ? record.notion_id : undefined;
}
function getCategoriesFromMetadata(metadata: unknown): string[] | undefined {
const record = asRecord(metadata);
const categories = record.categories;
if (!Array.isArray(categories)) return undefined;
const strings = categories.filter((c): c is string => typeof c === 'string');
return strings.length > 0 ? strings : undefined;
}
function getEvalsFromExampleInputs(exampleInputs: unknown): { dos?: string; donts?: string } {
const inputs = asRecord(exampleInputs);
const evals = asRecord(inputs.evals);
const result: { dos?: string; donts?: string } = {};
if (typeof evals.dos === 'string') result.dos = evals.dos;
if (typeof evals.donts === 'string') result.donts = evals.donts;
return result;
}
function isFeedback(value: unknown): value is Feedback {
const kinds = new Set(['score', 'metric', 'detail'] as const);
return (
isUnknownRecord(value) &&
typeof value.evaluator === 'string' &&
typeof value.metric === 'string' &&
typeof value.score === 'number' &&
typeof value.kind === 'string' &&
kinds.has(value.kind as 'score' | 'metric' | 'detail')
);
}
function exampleMatchesFilters(example: Example, filters: LangsmithExampleFilters): boolean {
if (filters.notionId) {
if (getNotionIdFromMetadata(example.metadata) !== filters.notionId) return false;
}
if (filters.technique) {
const categories = getCategoriesFromMetadata(example.metadata) ?? [];
if (!categories.includes(filters.technique)) return false;
}
if (filters.doSearch || filters.dontSearch) {
const { dos, donts } = getEvalsFromExampleInputs(example.inputs);
if (filters.doSearch) {
const haystack = (dos ?? '').toLowerCase();
if (!haystack.includes(filters.doSearch.toLowerCase())) return false;
}
if (filters.dontSearch) {
const haystack = (donts ?? '').toLowerCase();
if (!haystack.includes(filters.dontSearch.toLowerCase())) return false;
}
}
return true;
}
async function loadExamplesFromDataset(params: {
lsClient: {
readDataset: (args: { datasetName: string }) => Promise<{ id: string }>;
listExamples: (args: { datasetId: string; limit?: number }) => AsyncIterable<Example>;
};
datasetName: string;
maxExamples?: number;
filters?: LangsmithExampleFilters;
}): Promise<Example[]> {
const { lsClient, datasetName, maxExamples, filters } = params;
const datasetInfo = await lsClient.readDataset({ datasetName });
const matches: Example[] = [];
let scanned = 0;
const listArgs: { datasetId: string; limit?: number } = { datasetId: datasetInfo.id };
if (!filters && maxExamples) listArgs.limit = maxExamples;
for await (const example of lsClient.listExamples(listArgs)) {
scanned++;
if (filters && !exampleMatchesFilters(example, filters)) continue;
matches.push(example);
if (maxExamples && matches.length >= maxExamples) break;
}
if (filters && matches.length === 0) {
const filterSummary = [
filters.notionId ? `id:${filters.notionId}` : undefined,
filters.technique ? `technique:${filters.technique}` : undefined,
filters.doSearch ? `do:${filters.doSearch}` : undefined,
filters.dontSearch ? `dont:${filters.dontSearch}` : undefined,
]
.filter((v): v is string => v !== undefined)
.join(', ');
throw new Error(
`No examples matched filters (${filterSummary}) in dataset "${datasetName}" (scanned ${scanned})`,
);
}
if (!filters && maxExamples && matches.length === 0) {
throw new Error(`No examples found in dataset "${datasetName}"`);
}
return matches;
}
async function resolveLangsmithData(params: {
dataset: string;
langsmithOptions: LangsmithRunConfig['langsmithOptions'];
lsClient: {
readDataset: (args: { datasetName: string }) => Promise<{ id: string }>;
listExamples: (args: { datasetId: string; limit?: number }) => AsyncIterable<Example>;
};
logger: EvalLogger;
}): Promise<string | Example[]> {
const { dataset, langsmithOptions, lsClient, logger } = params;
const datasetName = dataset;
const maxExamples = langsmithOptions.maxExamples;
const filters = langsmithOptions.filters;
const shouldLoadExamples =
(typeof maxExamples === 'number' && maxExamples > 0) || filters !== undefined;
if (!shouldLoadExamples) return datasetName;
logger.info(
filters
? `Loading examples from dataset "${datasetName}" with filters...`
: `Loading up to ${maxExamples} examples from dataset "${datasetName}"...`,
);
try {
return await loadExamplesFromDataset({
lsClient,
datasetName,
maxExamples,
filters,
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
if (
errorMessage.startsWith('No examples matched filters') ||
errorMessage.startsWith('No examples found in dataset')
) {
throw error instanceof Error ? error : new Error(errorMessage);
}
throw new Error(`Dataset "${datasetName}" not found: ${errorMessage}`);
}
}
function extractContextFromLangsmithInputs(inputs: unknown): TestCaseContext {
const record = asRecord(inputs);
const context: TestCaseContext = {};
if (typeof record.dos === 'string') context.dos = record.dos;
if (typeof record.donts === 'string') context.donts = record.donts;
// Support both legacy referenceWorkflow (single) and referenceWorkflows (array) from dataset
if (
Array.isArray(record.referenceWorkflows) &&
record.referenceWorkflows.every((wf) => isSimpleWorkflow(wf))
) {
context.referenceWorkflows = record.referenceWorkflows;
} else if (isSimpleWorkflow(record.referenceWorkflow)) {
// Convert legacy single reference to array
context.referenceWorkflows = [record.referenceWorkflow];
}
// Extract annotations for binary-checks evaluator
if (
record.annotations &&
typeof record.annotations === 'object' &&
!Array.isArray(record.annotations)
) {
context.annotations = record.annotations as Record<string, unknown>;
}
return context;
}
/**
* Collected metrics from workflow generation.
*/
interface CollectedMetrics {
genInputTokens?: number;
genOutputTokens?: number;
discoveryDurationMs?: number;
builderDurationMs?: number;
responderDurationMs?: number;
nodeCount?: number;
introspectionEvents?: IntrospectionEvent[];
}
/**
* Create collectors for workflow generation metrics.
* Returns collectors and a function to get the collected values.
*/
function createMetricsCollectors(): {
collectors: GenerationCollectors;
getMetrics: () => CollectedMetrics;
} {
const metrics: CollectedMetrics = {};
const collectors: GenerationCollectors = {
tokenUsage: (usage) => {
metrics.genInputTokens = usage.inputTokens;
metrics.genOutputTokens = usage.outputTokens;
},
subgraphMetrics: (m) => {
metrics.discoveryDurationMs = m.discoveryDurationMs;
metrics.builderDurationMs = m.builderDurationMs;
metrics.responderDurationMs = m.responderDurationMs;
metrics.nodeCount = m.nodeCount;
},
introspectionEvents: (events) => {
metrics.introspectionEvents = events;
},
};
return { collectors, getMetrics: () => metrics };
}
/**
* Build SubgraphMetrics object if any metrics are present, otherwise undefined.
*/
function buildSubgraphMetrics(metrics: CollectedMetrics): ExampleResult['subgraphMetrics'] {
const { discoveryDurationMs, builderDurationMs, responderDurationMs, nodeCount } = metrics;
const hasMetrics =
discoveryDurationMs !== undefined ||
builderDurationMs !== undefined ||
responderDurationMs !== undefined ||
nodeCount !== undefined;
return hasMetrics
? { discoveryDurationMs, builderDurationMs, responderDurationMs, nodeCount }
: undefined;
}
/**
* Create error feedback array.
*/
function createErrorFeedback(errorMessage: string): Feedback[] {
return [
{
evaluator: 'runner',
metric: 'error',
score: 0,
kind: 'score',
comment: errorMessage,
},
];
}
/**
* Execute the success path for a local example (generation + evaluation).
* Extracted to reduce complexity of runLocalExample.
*/
async function runLocalExampleSuccess(args: {
index: number;
startTime: number;
testCase: TestCase;
generateWorkflow: (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
) => Promise<SimpleWorkflow | GenerationResult>;
evaluators: Array<Evaluator<EvaluationContext>>;
globalContext?: GlobalRunContext;
passThreshold: number;
timeoutMs: number | undefined;
lifecycle?: Partial<EvaluationLifecycle>;
pinDataGenerator?: (workflow: SimpleWorkflow) => Promise<IPinData>;
}): Promise<ExampleResult> {
const {
index,
startTime,
testCase,
generateWorkflow,
evaluators,
globalContext,
passThreshold,
timeoutMs,
lifecycle,
pinDataGenerator,
} = args;
// Generate workflow with metrics collection
const genStartTime = Date.now();
const { collectors, getMetrics } = createMetricsCollectors();
const genResult = await runWithOptionalLimiter(async () => {
return await withTimeout({
promise: generateWorkflow(testCase.prompt, testCase.context?.datasetInputContext, collectors),
timeoutMs,
label: 'workflow_generation',
});
}, globalContext?.llmCallLimiter);
const genDurationMs = Date.now() - genStartTime;
// Extract workflow and optional generated code
const workflow = isGenerationResult(genResult) ? genResult.workflow : genResult;
const generatedCode = isGenerationResult(genResult) ? genResult.generatedCode : undefined;
const agentTextResponse = isGenerationResult(genResult) ? genResult.agentTextResponse : undefined;
lifecycle?.onWorkflowGenerated?.(workflow, genDurationMs);
// Generate pin data for service nodes (best-effort)
let pinData: IPinData | undefined;
if (pinDataGenerator) {
try {
pinData = await pinDataGenerator(workflow);
} catch {
// Pin data generation is best-effort — don't fail the evaluation
}
}
const context = buildContext({
prompt: testCase.prompt,
globalContext: {
...(globalContext ?? {}),
timeoutMs,
},
testCaseContext: testCase.context,
referenceWorkflows: testCase.referenceWorkflows,
generatedCode,
agentTextResponse,
pinData,
datasetInputContext: testCase.context?.datasetInputContext,
});
// Run evaluators in parallel
const evalStartTime = Date.now();
const feedback = await evaluateWithPlugins(workflow, evaluators, context, timeoutMs, lifecycle);
const evalDurationMs = Date.now() - evalStartTime;
// Calculate result
const score = calculateExampleScore(feedback);
const status = hasErrorFeedback(feedback) ? 'error' : determineStatus({ score, passThreshold });
const durationMs = Date.now() - startTime;
const metrics = getMetrics();
return {
index,
prompt: testCase.prompt,
status,
score,
feedback,
durationMs,
generationDurationMs: genDurationMs,
evaluationDurationMs: evalDurationMs,
generationInputTokens: metrics.genInputTokens,
generationOutputTokens: metrics.genOutputTokens,
subgraphMetrics: buildSubgraphMetrics(metrics),
introspectionEvents: metrics.introspectionEvents,
workflow,
generatedCode,
agentTextResponse,
};
}
async function runLocalExample(args: {
index: number;
total: number;
testCase: TestCase;
generateWorkflow: (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
) => Promise<SimpleWorkflow | GenerationResult>;
evaluators: Array<Evaluator<EvaluationContext>>;
globalContext?: GlobalRunContext;
passThreshold: number;
timeoutMs: number | undefined;
lifecycle?: Partial<EvaluationLifecycle>;
artifactSaver?: ArtifactSaver | null;
pinDataGenerator?: (workflow: SimpleWorkflow) => Promise<IPinData>;
}): Promise<ExampleResult> {
const {
index,
total,
testCase,
generateWorkflow,
evaluators,
globalContext,
passThreshold,
timeoutMs,
lifecycle,
artifactSaver,
pinDataGenerator,
} = args;
const startTime = Date.now();
lifecycle?.onExampleStart?.(index, total, testCase.prompt);
try {
const result = await runLocalExampleSuccess({
index,
startTime,
testCase,
generateWorkflow,
evaluators,
globalContext,
passThreshold,
timeoutMs,
lifecycle,
pinDataGenerator,
});
artifactSaver?.saveExample(result);
lifecycle?.onExampleComplete?.(index, result);
return result;
} catch (error) {
const durationMs = Date.now() - startTime;
const errorMessage = error instanceof Error ? error.message : String(error);
const result: ExampleResult = {
index,
prompt: testCase.prompt,
status: 'error',
score: 0,
feedback: createErrorFeedback(errorMessage),
durationMs,
error: errorMessage,
};
artifactSaver?.saveExample(result);
lifecycle?.onExampleComplete?.(index, result);
return result;
}
}
/**
* Run evaluation in local mode.
*/
function createArtifactSaverIfRequested(args: {
outputDir?: string;
logger: EvalLogger;
}): ArtifactSaver | null {
const { outputDir, logger } = args;
if (!outputDir) return null;
return createArtifactSaver({ outputDir, logger });
}
async function runLocalDataset(params: {
testCases: TestCase[];
generateWorkflow: (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
) => Promise<SimpleWorkflow | GenerationResult>;
evaluators: Array<Evaluator<EvaluationContext>>;
globalContext?: GlobalRunContext;
passThreshold: number;
timeoutMs: number | undefined;
lifecycle?: Partial<EvaluationLifecycle>;
artifactSaver: ArtifactSaver | null;
concurrency?: number;
pinDataGenerator?: (workflow: SimpleWorkflow) => Promise<IPinData>;
}): Promise<ExampleResult[]> {
const {
testCases,
generateWorkflow,
evaluators,
globalContext,
passThreshold,
timeoutMs,
lifecycle,
artifactSaver,
concurrency = 1,
pinDataGenerator,
} = params;
// Use pLimit to control concurrency of example execution
const exampleLimiter = pLimit(concurrency);
const resultPromises = testCases.map(async (testCase, i) => {
const index = i + 1;
return await exampleLimiter(
async () =>
await runLocalExample({
index,
total: testCases.length,
testCase,
generateWorkflow,
evaluators,
globalContext,
passThreshold,
timeoutMs,
lifecycle,
artifactSaver,
pinDataGenerator,
}),
);
});
return await Promise.all(resultPromises);
}
function buildRunSummary(results: ExampleResult[]): RunSummary {
const passed = results.filter((r) => r.status === 'pass').length;
const failed = results.filter((r) => r.status === 'fail').length;
const errors = results.filter((r) => r.status === 'error').length;
const averageScore =
results.length > 0 ? results.reduce((sum, r) => sum + r.score, 0) / results.length : 0;
const totalDurationMs = results.reduce((sum, r) => sum + r.durationMs, 0);
return {
totalExamples: results.length,
passed,
failed,
errors,
averageScore,
totalDurationMs,
};
}
/**
* Compute average scores per evaluator from example results.
*/
function computeEvaluatorAverages(
results: ExampleResult[],
logger?: EvalLogger,
): Record<string, number> {
const evaluatorStats: Record<string, { scores: number[] }> = {};
for (const result of results) {
// Group feedback by evaluator
const byEvaluator: Record<string, Feedback[]> = {};
for (const fb of result.feedback) {
if (!byEvaluator[fb.evaluator]) byEvaluator[fb.evaluator] = [];
byEvaluator[fb.evaluator].push(fb);
}
// Calculate per-evaluator average for this example
for (const [evaluator, items] of Object.entries(byEvaluator)) {
if (!evaluatorStats[evaluator]) {
evaluatorStats[evaluator] = { scores: [] };
}
const scoringItems = selectScoringItems(items);
const avg = calculateFiniteAverage(scoringItems);
evaluatorStats[evaluator].scores.push(avg);
}
}
// Compute overall average per evaluator
const evaluatorAverages: Record<string, number> = {};
for (const [name, stats] of Object.entries(evaluatorStats)) {
const avg = stats.scores.reduce((a, b) => a + b, 0) / stats.scores.length;
logger?.verbose(
`[computeEvaluatorAverages] Final avg for "${name}": ${stats.scores.join(', ')} -> ${avg}`,
);
evaluatorAverages[name] = avg;
}
logger?.verbose(`[computeEvaluatorAverages] Final result: ${JSON.stringify(evaluatorAverages)}`);
return evaluatorAverages;
}
interface LangsmithSummaryParams {
stats: {
total: number;
passed: number;
failed: number;
errors: number;
scoreSum: number;
durationSumMs: number;
};
langsmithData: {
experimentName?: string;
experimentId?: string;
datasetId?: string;
};
evaluatorAverages?: Record<string, number>;
}
function buildLangsmithSummary(params: LangsmithSummaryParams): RunSummary {
const { stats, langsmithData, evaluatorAverages } = params;
const { experimentName, experimentId, datasetId } = langsmithData;
const summary: RunSummary = {
totalExamples: stats.total,
passed: stats.passed,
failed: stats.failed,
errors: stats.errors,
averageScore: stats.total > 0 ? stats.scoreSum / stats.total : 0,
totalDurationMs: stats.durationSumMs,
evaluatorAverages,
};
// Add LangSmith IDs if available
if (experimentName && experimentId && datasetId) {
summary.langsmith = { experimentName, experimentId, datasetId };
}
return summary;
}
async function runLocal(config: LocalRunConfig): Promise<RunSummary> {
const {
dataset,
generateWorkflow,
evaluators,
context: globalContext,
passThreshold = DEFAULT_PASS_THRESHOLD,
timeoutMs,
lifecycle,
outputDir,
outputCsv,
suite,
logger,
concurrency = 1,
pinDataGenerator,
} = config;
const testCases: TestCase[] = dataset;
if (testCases.length === 0) {
logger.warn('No test cases provided');
}
const effectiveGlobalContext: GlobalRunContext = {
...(globalContext ?? {}),
llmCallLimiter: globalContext?.llmCallLimiter ?? pLimit(4),
timeoutMs,
};
// Create artifact saver if outputDir is provided
const artifactSaver = createArtifactSaverIfRequested({ outputDir, logger });
lifecycle?.onStart?.(config);
const results = await runLocalDataset({
testCases,
generateWorkflow,
evaluators,
globalContext: effectiveGlobalContext,
passThreshold,
timeoutMs,
lifecycle,
artifactSaver,
concurrency,
pinDataGenerator,
});
const summary = buildRunSummary(results);
// Save summary to disk if outputDir is provided
artifactSaver?.saveSummary(summary, results);
// Write CSV if requested
if (outputCsv) {
const { writeResultsCsv } = await import('./csv-writer.js');
// Map suite to CSV format
const csvSuite =
suite === 'llm-judge' || suite === 'pairwise' || suite === 'binary-checks'
? suite
: undefined;
writeResultsCsv(results, outputCsv, { suite: csvSuite });
logger.info(`Results written to: ${outputCsv}`);
}
await lifecycle?.onEnd?.(summary);
return summary;
}
/**
* Output from LangSmith target function.
*/
interface LangsmithTargetOutput {
workflow: SimpleWorkflow;
prompt: string;
feedback: Feedback[];
}
/**
* Input from LangSmith dataset.
* Supports both direct prompt string and messages array format.
*/
interface LangsmithDatasetInput {
prompt?: string;
messages?: BaseMessage[];
evals?: Record<string, unknown>;
workflowJSON?: unknown;
workflowContext?: unknown;
workflowOperations?: unknown[];
mode?: string;
/** Injected by enrichExamplesWithHistory - historical messages from outputs */
_historicalMessages?: unknown[];
[key: string]: unknown;
}
/**
* Extract prompt from dataset input.
* Supports both direct prompt and messages array format.
*/
function extractPrompt(inputs: LangsmithDatasetInput): string {
// Direct prompt string
if (inputs.prompt && typeof inputs.prompt === 'string') {
return inputs.prompt;
}
// Messages array format
if (inputs.messages && Array.isArray(inputs.messages) && inputs.messages.length > 0) {
return extractMessageContent(inputs.messages[0]);
}
throw new Error('No prompt found in inputs - expected "prompt" string or "messages" array');
}
/**
* Pre-process LangSmith examples to extract conversation history from outputs
* and inject it into inputs for the target function.
*
* The dataset format has:
* - inputs.messages[0]: The latest user turn
* - outputs.messages: The FULL conversation (all prior turns + latest + AI response)
*
* We find the latest turn in outputs and extract everything before it as historical.
*/
function enrichExamplesWithHistory(examples: Example[]): Example[] {
return examples.map((example) => {
const outputMessages = (example.outputs as Record<string, unknown> | undefined)?.messages;
if (!Array.isArray(outputMessages) || outputMessages.length <= 1) {
return example; // No history to extract
}
const inputMessages = (example.inputs as Record<string, unknown> | undefined)?.messages;
if (!Array.isArray(inputMessages) || inputMessages.length === 0) {
return example;
}
// Get the content of the latest turn from inputs
const inputMsg = inputMessages[0] as Record<string, unknown> | undefined;
const inputContent = (inputMsg?.kwargs as Record<string, unknown> | undefined)?.content;
if (typeof inputContent !== 'string') return example;
// Find the index of the latest turn in output messages by matching content
const latestTurnIndex = outputMessages.findIndex((msg: unknown) => {
if (!isUnknownRecord(msg)) return false;
const kwargs = msg.kwargs;
if (!isUnknownRecord(kwargs)) return false;
return kwargs.content === inputContent;
});
if (latestTurnIndex <= 0) return example; // No history before the latest turn
// Extract all messages before the latest turn as historical
const historicalMessages = outputMessages.slice(0, latestTurnIndex);
return {
...example,
inputs: {
...example.inputs,
_historicalMessages: historicalMessages,
},
};
});
}
/**
* Deserialize raw LangChain `lc` serialization format messages into BaseMessage instances.
* Dataset messages use the format: {id, lc: 1, type: "constructor", kwargs: {content, ...}}
* with the class ID in `id` array (e.g. ["langchain_core", "messages", "HumanMessage"]).
*
* Only keeps HumanMessage and AIMessage — tool calls/results are stripped
* to simplify the context passed to the code builder.
*/
function deserializeLcMessages(rawMessages: unknown[]): BaseMessage[] {
return rawMessages
.filter((msg): msg is Record<string, unknown> => {
if (!isUnknownRecord(msg)) return false;
const id = Array.isArray(msg.id) ? (msg.id as unknown[]) : [];
const last: unknown = id[id.length - 1];
return last === 'HumanMessage' || last === 'AIMessage';
})
.map((msg) => {
const kwargs = isUnknownRecord(msg.kwargs) ? msg.kwargs : {};
const content = typeof kwargs.content === 'string' ? kwargs.content : '';
const id = Array.isArray(msg.id) ? (msg.id as unknown[]) : [];
return id[id.length - 1] === 'HumanMessage'
? new HumanMessage(content)
: new AIMessage(content);
});
}
/**
* Extract DatasetInputContext from LangSmith dataset inputs.
* Captures the full agent context (workflowContext, existing workflow, mode)
* needed to replay the generation realistically.
*/
function extractDatasetInputContext(
inputs: LangsmithDatasetInput,
): DatasetInputContext | undefined {
const hasContext =
inputs.workflowContext ?? inputs.workflowJSON ?? inputs.mode ?? inputs._historicalMessages;
if (!hasContext) return undefined;
const context: DatasetInputContext = {};
if (isUnknownRecord(inputs.workflowContext)) {
context.workflowContext = inputs.workflowContext as ChatPayload['workflowContext'];
}
if (isSimpleWorkflow(inputs.workflowJSON)) {
context.existingWorkflow = inputs.workflowJSON;
}
if (inputs.mode === 'build' || inputs.mode === 'plan') {
context.mode = inputs.mode;
}
if (Array.isArray(inputs._historicalMessages) && inputs._historicalMessages.length > 0) {
context.historicalMessages = deserializeLcMessages(inputs._historicalMessages);
}
return Object.keys(context).length > 0 ? context : undefined;
}
function createLangsmithFeedbackExtractor(): (
rootRun: Run,
_example?: Example,
) => Promise<Array<{ key: string; score: number; comment?: string }>> {
return async (rootRun: Run, _example?: Example) => {
const outputs = rootRun.outputs;
const feedback =
isUnknownRecord(outputs) &&
isUnknownArray(outputs.feedback) &&
outputs.feedback.every(isFeedback)
? outputs.feedback
: undefined;
if (!feedback) {
return [
{
key: 'evaluationError',
score: 0,
comment: 'No feedback found in target output',
},
];
}
return feedback.map((fb) => toLangsmithEvaluationResult(fb));
};
}
function applyRepetitions(data: string | Example[], repetitions: number): string | Example[] {
if (!Array.isArray(data) || repetitions <= 1) return data;
return Array.from({ length: repetitions }, () => data).flat();
}
function computeFilterMetadata(filters?: LangsmithExampleFilters): {
runType: string;
filterValue?: string;
} {
if (!filters) return { runType: 'full' };
const parts: string[] = [];
const values: string[] = [];
if (filters.notionId) {
parts.push('id');
values.push(`id:${filters.notionId}`);
}
if (filters.technique) {
parts.push('category');
values.push(`category:${filters.technique}`);
}
if (filters.doSearch) {
parts.push('do');
values.push(`do:${filters.doSearch}`);
}
if (filters.dontSearch) {
parts.push('dont');
values.push(`dont:${filters.dontSearch}`);
}
if (parts.length === 0) return { runType: 'full' };
return {
runType: `by-${parts.join('-and-')}`,
filterValue: values.join(' '),
};
}
function logLangsmithInputsSummary(logger: EvalLogger, effectiveData: string | Example[]): void {
if (!Array.isArray(effectiveData)) {
logger.verbose('Data source: dataset (streaming)');
return;
}
logger.verbose(`Data source: preloaded examples (${effectiveData.length})`);
logger.verbose(
`Example IDs in data: ${effectiveData
.slice(0, 20)
.map((e) => e.id)
.join(', ')}`,
);
}
async function runLangsmithEvaluateAndFlush(params: {
target: (inputs: LangsmithDatasetInput) => Promise<LangsmithTargetOutput>;
effectiveData: string | Example[];
feedbackExtractor: ReturnType<typeof createLangsmithFeedbackExtractor>;
langsmithOptions: LangsmithRunConfig['langsmithOptions'];
lsClient: LangsmithRunConfig['langsmithClient'];
logger: EvalLogger;
targetCallCount: () => number;
}): Promise<{
experimentName?: string;
experimentId?: string;
datasetId?: string;
}> {
const {
target,
effectiveData,
feedbackExtractor,
langsmithOptions,
lsClient,
logger,
targetCallCount,
} = params;
const exampleCount = Array.isArray(effectiveData) ? effectiveData.length : 'dataset';
logger.info(
`Starting LangSmith evaluate() with ${exampleCount} examples, ${langsmithOptions.repetitions} repetitions, concurrency ${langsmithOptions.concurrency}...`,
);
const { runType, filterValue } = computeFilterMetadata(langsmithOptions.filters);
const evalStartTime = Date.now();
const experimentResults = await evaluate(target, {
data: effectiveData,
evaluators: [feedbackExtractor],
experimentPrefix: langsmithOptions.experimentName,
// Repetitions are applied explicitly when pre-loading examples to keep behavior consistent.
// When streaming from a dataset name, the SDK may support repetitions internally.
...(!Array.isArray(effectiveData) &&
langsmithOptions.repetitions > 1 && { numRepetitions: langsmithOptions.repetitions }),
maxConcurrency: langsmithOptions.concurrency,
client: lsClient,
metadata: {
repetitions: langsmithOptions.repetitions,
concurrency: langsmithOptions.concurrency,
runType,
...(filterValue && { filterValue }),
...langsmithOptions.experimentMetadata,
},
});
logger.info(
`Evaluation completed in ${((Date.now() - evalStartTime) / 1000).toFixed(1)}s (target called ${targetCallCount()} times)`,
);
// Flush pending traces to ensure all data is sent to LangSmith
logger.verbose('Flushing pending trace batches...');
const flushStartTime = Date.now();
await lsClient.awaitPendingTraceBatches();
logger.verbose(`Flush completed in ${((Date.now() - flushStartTime) / 1000).toFixed(1)}s`);
const experimentName = experimentResults.experimentName;
logger.info(`Experiment completed: ${experimentName}`);
let experimentId: string | undefined;
let datasetId: string | undefined;
try {
const manager = (
experimentResults as unknown as {
manager?: { _getExperiment?: () => { id: string }; datasetId?: Promise<string> };
}
).manager;
if (manager?._getExperiment) {
experimentId = manager._getExperiment().id;
}
if (manager?.datasetId) {
datasetId = await manager.datasetId;
}
} catch {
logger.verbose('Could not extract LangSmith IDs from experiment results');
}
return { experimentName, experimentId, datasetId };
}
/**
* Stats tracker for LangSmith evaluation.
*/
interface LangsmithStats {
total: number;
passed: number;
failed: number;
errors: number;
scoreSum: number;
durationSumMs: number;
}
/**
* Update stats based on example result status.
*/
function updateStats(
stats: LangsmithStats,
status: 'pass' | 'fail' | 'error',
score: number,
durationMs: number,
): void {
stats.total++;
stats.scoreSum += score;
stats.durationSumMs += durationMs;
if (status === 'pass') stats.passed++;
else if (status === 'fail') stats.failed++;
else stats.errors++;
}
/**
* Resolve LangSmith dataset examples and enrich them with conversation history.
* Handles fallback preloading when filters/maxExamples are requested on a string dataset.
*/
async function resolveAndEnrichLangsmithData(params: {
dataset: string;
langsmithOptions: LangsmithRunConfig['langsmithOptions'];
lsClient: LangsmithRunConfig['langsmithClient'];
logger: EvalLogger;
}): Promise<string | Example[]> {
const { dataset, langsmithOptions, lsClient, logger } = params;
let data = await resolveLangsmithData({ dataset, langsmithOptions, lsClient, logger });
// Defensive: if maxExamples/filters were requested but we still got a dataset name,
// fall back to preloading so we can honor limits instead of streaming everything.
if (
typeof data === 'string' &&
((langsmithOptions.maxExamples ?? 0) > 0 || langsmithOptions.filters !== undefined)
) {
data = await loadExamplesFromDataset({
lsClient,
datasetName: data,
maxExamples: langsmithOptions.maxExamples,
filters: langsmithOptions.filters,
});
}
// Enrich pre-loaded examples with conversation history from outputs
return Array.isArray(data) ? enrichExamplesWithHistory(data) : data;
}
/**
* Run evaluation in LangSmith mode.
*/
async function runLangsmith(config: LangsmithRunConfig): Promise<RunSummary> {
const {
dataset,
generateWorkflow,
evaluators,
context: globalContext,
outputDir,
outputCsv,
suite,
passThreshold = DEFAULT_PASS_THRESHOLD,
timeoutMs,
langsmithOptions,
langsmithClient: lsClient,
lifecycle,
logger,
pinDataGenerator,
} = config;
// Enable tracing (required in langsmith 0.4.x)
process.env.LANGSMITH_TRACING = 'true';
lifecycle?.onStart?.(config);
const effectiveGlobalContext: GlobalRunContext = {
...(globalContext ?? {}),
llmCallLimiter: globalContext?.llmCallLimiter ?? pLimit(langsmithOptions.concurrency),
timeoutMs,
};
const artifactSaver = createArtifactSaverIfRequested({ outputDir, logger });
const capturedResults: ExampleResult[] = [];
// Create traceable wrappers ONCE outside target function to avoid context leaking
// when running concurrent evaluations. Pass all parameters explicitly (no closures).
// IMPORTANT: Get callbacks INSIDE the traceable wrapper where AsyncLocalStorage context
// is correctly set, then pass them explicitly to genFn to avoid race conditions.
const traceableGenerateWorkflow = traceable(
async (args: {
prompt: string;
genFn: (
prompt: string,
datasetInputContext?: DatasetInputContext,
collectors?: GenerationCollectors,
) => Promise<SimpleWorkflow | GenerationResult>;
collectors?: GenerationCollectors;
limiter?: LlmCallLimiter;
genTimeoutMs?: number;
datasetInputContext?: DatasetInputContext;
}): Promise<SimpleWorkflow | GenerationResult> => {
return await runWithOptionalLimiter(async () => {
return await withTimeout({
promise: args.genFn(args.prompt, args.datasetInputContext, args.collectors),
timeoutMs: args.genTimeoutMs,
label: 'workflow_generation',
});
}, args.limiter);
},
{
name: 'workflow_generation',
run_type: 'chain',
client: lsClient,
},
);
// Separate traceable wrapper for evaluation so it appears as a sibling to
// workflow_generation in LangSmith traces (not nested under it).
const traceableEvaluateWorkflow = traceable(
async (args: {
workflow: SimpleWorkflow;
evaluators: Array<Evaluator<EvaluationContext>>;
context: EvaluationContext;
evalTimeoutMs?: number;
evalLifecycle?: Partial<EvaluationLifecycle>;
}): Promise<Feedback[]> => {
return await evaluateWithPlugins(
args.workflow,
args.evaluators,
args.context,
args.evalTimeoutMs,
args.evalLifecycle,
);
},
{
name: 'workflow_evaluation',
run_type: 'chain',
client: lsClient,
},
);
// Create target function that does ALL work (generation + evaluation)
// NOTE: Do NOT wrap target with traceable() - evaluate() handles tracing automatically.
let targetCallCount = 0;
let totalExamples = 0;
const stats: LangsmithStats = {
total: 0,
passed: 0,
failed: 0,
errors: 0,
scoreSum: 0,
durationSumMs: 0,
};
const target = async (inputs: LangsmithDatasetInput): Promise<LangsmithTargetOutput> => {
targetCallCount++;
const index = targetCallCount;
// Extract prompt from inputs (supports both direct prompt and messages array)
const prompt = extractPrompt(inputs);
const datasetInputContext = extractDatasetInputContext(inputs);
const { evals: datasetContext, ...rest } = inputs;
lifecycle?.onExampleStart?.(index, totalExamples, prompt);
const startTime = Date.now();
const genStart = Date.now();
const { collectors, getMetrics } = createMetricsCollectors();
try {
const genResult = await traceableGenerateWorkflow({
prompt,
genFn: generateWorkflow,
collectors,
limiter: effectiveGlobalContext.llmCallLimiter,
genTimeoutMs: timeoutMs,
datasetInputContext,
});
const genDurationMs = Date.now() - genStart;
// Extract workflow and optional generated code
const workflow = isGenerationResult(genResult) ? genResult.workflow : genResult;
const generatedCode = isGenerationResult(genResult) ? genResult.generatedCode : undefined;
const agentTextResponse = isGenerationResult(genResult)
? genResult.agentTextResponse
: undefined;
lifecycle?.onWorkflowGenerated?.(workflow, genDurationMs);
// Generate pin data for service nodes (best-effort)
let pinData: IPinData | undefined;
if (pinDataGenerator) {
try {
pinData = await pinDataGenerator(workflow);
} catch {
// Pin data generation is best-effort — don't fail the evaluation
}
}
const extracted = extractContextFromLangsmithInputs({
...asRecord(datasetContext),
...asRecord(rest),
});
const context = buildContext({
prompt,
globalContext: effectiveGlobalContext,
testCaseContext: extracted,
generatedCode,
agentTextResponse,
pinData,
datasetInputContext,
});
// Run all evaluators in parallel (wrapped in traceable so it appears
// as a sibling to workflow_generation in LangSmith traces).
// Fall back to direct call if traceable wrapper fails.
const evalStart = Date.now();
let feedback: Feedback[];
try {
feedback = await traceableEvaluateWorkflow({
workflow,
evaluators,
context,
evalTimeoutMs: timeoutMs,
evalLifecycle: lifecycle,
});
} catch {
feedback = await evaluateWithPlugins(workflow, evaluators, context, timeoutMs, lifecycle);
}
const evalDurationMs = Date.now() - evalStart;
const totalDurationMs = Date.now() - startTime;
const score = calculateExampleScore(feedback);
const status = hasErrorFeedback(feedback)
? 'error'
: determineStatus({ score, passThreshold });
updateStats(stats, status, score, totalDurationMs);
const metrics = getMetrics();
const result: ExampleResult = {
index,
prompt,
status,
score,
feedback,
durationMs: totalDurationMs,
generationDurationMs: genDurationMs,
evaluationDurationMs: evalDurationMs,
generationInputTokens: metrics.genInputTokens,
generationOutputTokens: metrics.genOutputTokens,
subgraphMetrics: buildSubgraphMetrics(metrics),
introspectionEvents: metrics.introspectionEvents,
workflow,
generatedCode,
agentTextResponse,
};
artifactSaver?.saveExample(result);
capturedResults.push(result);
lifecycle?.onExampleComplete?.(index, result);
// Create metrics feedback for LangSmith (subgraph timing + node count)
const metricsFeedback = createMetricsFeedback(metrics);
return {
workflow,
prompt,
// Include metrics feedback in the array sent to LangSmith
feedback: [...feedback, ...metricsFeedback],
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
const workflow: SimpleWorkflow = { name: 'Evaluation Error', nodes: [], connections: {} };
const feedback = createErrorFeedback(errorMessage);
const totalDurationMs = Date.now() - startTime;
const genDurationMs = Date.now() - genStart;
updateStats(stats, 'error', 0, totalDurationMs);
const result: ExampleResult = {
index,
prompt,
status: 'error',
score: 0,
feedback,
durationMs: totalDurationMs,
generationDurationMs: genDurationMs,
workflow,
error: errorMessage,
};
artifactSaver?.saveExample(result);
capturedResults.push(result);
lifecycle?.onExampleComplete?.(index, result);
return { workflow, prompt, feedback };
}
};
const feedbackExtractor = createLangsmithFeedbackExtractor();
if (typeof dataset !== 'string') {
throw new Error('LangSmith mode requires dataset to be a dataset name string');
}
const data = await resolveAndEnrichLangsmithData({ dataset, langsmithOptions, lsClient, logger });
const effectiveData = applyRepetitions(data, langsmithOptions.repetitions);
totalExamples = Array.isArray(effectiveData) ? effectiveData.length : 0;
logLangsmithInputsSummary(logger, effectiveData);
const { experimentName, experimentId, datasetId } = await runLangsmithEvaluateAndFlush({
target,
effectiveData,
feedbackExtractor,
langsmithOptions,
lsClient,
logger,
targetCallCount: () => targetCallCount,
});
// Compute evaluator averages from captured results
const evaluatorAverages = computeEvaluatorAverages(capturedResults, logger);
const summary: RunSummary = buildLangsmithSummary({
stats,
langsmithData: { experimentName, experimentId, datasetId },
evaluatorAverages,
});
if (artifactSaver) {
artifactSaver.saveSummary(summary, capturedResults);
}
// Write CSV if requested
if (outputCsv) {
const { writeResultsCsv } = await import('./csv-writer.js');
// Map suite to CSV format
const csvSuite =
suite === 'llm-judge' || suite === 'pairwise' || suite === 'binary-checks'
? suite
: undefined;
writeResultsCsv(capturedResults, outputCsv, { suite: csvSuite });
logger.info(`Results written to: ${outputCsv}`);
}
await lifecycle?.onEnd?.(summary);
return summary;
}
/**
* Main entry point for running evaluations.
*/
export async function runEvaluation(config: RunConfig): Promise<RunSummary> {
if (config.mode === 'langsmith') {
return await runLangsmith(config);
}
return await runLocal(config);
}