mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-05 02:59:27 +02:00
1651 lines
47 KiB
TypeScript
1651 lines
47 KiB
TypeScript
import { AIMessage, HumanMessage, type BaseMessage } from '@langchain/core/messages';
|
|
import { evaluate } from 'langsmith/evaluation';
|
|
import type { Run, Example } from 'langsmith/schemas';
|
|
import { traceable } from 'langsmith/traceable';
|
|
import type { IPinData } from 'n8n-workflow';
|
|
import pLimit from 'p-limit';
|
|
|
|
import { runWithOptionalLimiter, withTimeout } from './evaluation-helpers';
|
|
import { toLangsmithEvaluationResult } from './feedback';
|
|
import {
|
|
isGenerationResult,
|
|
type DatasetInputContext,
|
|
type Evaluator,
|
|
type TestCase,
|
|
type EvaluationContext,
|
|
type GlobalRunContext,
|
|
type TestCaseContext,
|
|
type Feedback,
|
|
type RunConfig,
|
|
type LocalRunConfig,
|
|
type LangsmithRunConfig,
|
|
type ExampleResult,
|
|
type RunSummary,
|
|
type EvaluationLifecycle,
|
|
type LangsmithExampleFilters,
|
|
type LlmCallLimiter,
|
|
type GenerationResult,
|
|
} from './harness-types';
|
|
import type { EvalLogger } from './logger';
|
|
import { createArtifactSaver, type ArtifactSaver } from './output';
|
|
import {
|
|
calculateWeightedScore,
|
|
selectScoringItems,
|
|
calculateFiniteAverage,
|
|
} from './score-calculator';
|
|
import type { IntrospectionEvent } from '../../src/tools/introspect.tool.js';
|
|
import type { SimpleWorkflow } from '../../src/types/workflow';
|
|
import type { ChatPayload } from '../../src/workflow-builder-agent';
|
|
import { extractMessageContent } from '../langsmith/types';
|
|
|
|
const DEFAULT_PASS_THRESHOLD = 0.7;
|
|
|
|
/**
|
|
* Callback to collect token usage from generation.
|
|
* Called after each workflow generation with the token counts.
|
|
*/
|
|
export type TokenUsageCollector = (usage: { inputTokens: number; outputTokens: number }) => void;
|
|
|
|
/**
|
|
* Callback to collect subgraph metrics from generation.
|
|
* Called after each workflow generation with timing and node count data.
|
|
*/
|
|
export type SubgraphMetricsCollector = (metrics: {
|
|
discoveryDurationMs?: number;
|
|
builderDurationMs?: number;
|
|
responderDurationMs?: number;
|
|
nodeCount?: number;
|
|
}) => void;
|
|
|
|
/**
|
|
* Callback to collect introspection events from generation.
|
|
* Called after each workflow generation with the events array.
|
|
*/
|
|
export type IntrospectionEventsCollector = (events: IntrospectionEvent[]) => void;
|
|
|
|
/**
|
|
* Combined collectors for workflow generation metrics.
|
|
*/
|
|
export interface GenerationCollectors {
|
|
tokenUsage?: TokenUsageCollector;
|
|
subgraphMetrics?: SubgraphMetricsCollector;
|
|
introspectionEvents?: IntrospectionEventsCollector;
|
|
}
|
|
|
|
/**
|
|
* Run evaluators in parallel for a single workflow.
|
|
* Handles errors gracefully - skip and continue.
|
|
*/
|
|
async function evaluateWithPlugins(
|
|
workflow: SimpleWorkflow,
|
|
evaluators: Array<Evaluator<EvaluationContext>>,
|
|
context: EvaluationContext,
|
|
timeoutMs: number | undefined,
|
|
lifecycle?: Partial<EvaluationLifecycle>,
|
|
): Promise<Feedback[]> {
|
|
const results = await Promise.all(
|
|
evaluators.map(async (evaluator): Promise<Feedback[]> => {
|
|
try {
|
|
const feedback = await withTimeout({
|
|
promise: evaluator.evaluate(workflow, context),
|
|
timeoutMs,
|
|
label: `evaluator:${evaluator.name}`,
|
|
});
|
|
lifecycle?.onEvaluatorComplete?.(evaluator.name, feedback);
|
|
return feedback;
|
|
} catch (error) {
|
|
const evaluatorError = error instanceof Error ? error : new Error(String(error));
|
|
lifecycle?.onEvaluatorError?.(evaluator.name, evaluatorError);
|
|
const errorFeedback: Feedback = {
|
|
evaluator: evaluator.name,
|
|
metric: 'error',
|
|
score: 0,
|
|
kind: 'score',
|
|
comment: evaluatorError.message,
|
|
};
|
|
return [errorFeedback];
|
|
}
|
|
}),
|
|
);
|
|
|
|
return results.flat();
|
|
}
|
|
|
|
/**
|
|
* Calculate example score from feedback using evaluator-weighted scoring.
|
|
*/
|
|
function calculateExampleScore(feedback: Feedback[]): number {
|
|
return calculateWeightedScore(feedback);
|
|
}
|
|
|
|
/**
|
|
* Determine pass/fail status based on average score.
|
|
*/
|
|
function determineStatus(args: { score: number; passThreshold: number }): 'pass' | 'fail' {
|
|
const { score, passThreshold } = args;
|
|
return score >= passThreshold ? 'pass' : 'fail';
|
|
}
|
|
|
|
function hasErrorFeedback(feedback: Feedback[]): boolean {
|
|
return feedback.some((f) => f.metric === 'error');
|
|
}
|
|
|
|
/**
|
|
* Convert milliseconds to seconds for LangSmith metrics.
|
|
* LangSmith scores must be within [-99999.9999, 99999.9999], so we store
|
|
* latencies in seconds (supporting up to ~27 hours) instead of milliseconds.
|
|
*/
|
|
function msToSeconds(ms: number): number {
|
|
return ms / 1000;
|
|
}
|
|
|
|
/**
|
|
* Create feedback items for subgraph metrics.
|
|
* These are reported to LangSmith as 'metrics' evaluator feedback.
|
|
*
|
|
* Note: Latencies are converted from milliseconds to seconds to stay within
|
|
* LangSmith's score limits while preserving precision for long-running operations.
|
|
*/
|
|
function createMetricsFeedback(args: {
|
|
discoveryDurationMs?: number;
|
|
builderDurationMs?: number;
|
|
responderDurationMs?: number;
|
|
nodeCount?: number;
|
|
}): Feedback[] {
|
|
const feedback: Feedback[] = [];
|
|
|
|
if (args.discoveryDurationMs !== undefined) {
|
|
feedback.push({
|
|
evaluator: 'metrics',
|
|
metric: 'discovery_latency_s',
|
|
score: msToSeconds(args.discoveryDurationMs),
|
|
kind: 'metric',
|
|
});
|
|
}
|
|
|
|
if (args.builderDurationMs !== undefined) {
|
|
feedback.push({
|
|
evaluator: 'metrics',
|
|
metric: 'builder_latency_s',
|
|
score: msToSeconds(args.builderDurationMs),
|
|
kind: 'metric',
|
|
});
|
|
}
|
|
|
|
if (args.responderDurationMs !== undefined) {
|
|
feedback.push({
|
|
evaluator: 'metrics',
|
|
metric: 'responder_latency_s',
|
|
score: msToSeconds(args.responderDurationMs),
|
|
kind: 'metric',
|
|
});
|
|
}
|
|
|
|
if (args.nodeCount !== undefined) {
|
|
feedback.push({
|
|
evaluator: 'metrics',
|
|
metric: 'node_count',
|
|
score: args.nodeCount,
|
|
kind: 'metric',
|
|
});
|
|
}
|
|
|
|
return feedback;
|
|
}
|
|
|
|
/**
|
|
* Build a typed evaluation context for evaluators.
|
|
*/
|
|
function buildContext(args: {
|
|
prompt: string;
|
|
globalContext?: GlobalRunContext;
|
|
testCaseContext?: TestCaseContext;
|
|
referenceWorkflows?: SimpleWorkflow[];
|
|
generatedCode?: string;
|
|
agentTextResponse?: string;
|
|
pinData?: IPinData;
|
|
datasetInputContext?: DatasetInputContext;
|
|
}): EvaluationContext {
|
|
const {
|
|
prompt,
|
|
globalContext,
|
|
testCaseContext,
|
|
referenceWorkflows,
|
|
generatedCode,
|
|
agentTextResponse,
|
|
pinData,
|
|
datasetInputContext,
|
|
} = args;
|
|
|
|
return {
|
|
prompt,
|
|
...(globalContext ?? {}),
|
|
...(testCaseContext ?? {}),
|
|
...(referenceWorkflows?.length ? { referenceWorkflows } : {}),
|
|
...(generatedCode ? { generatedCode } : {}),
|
|
...(agentTextResponse ? { agentTextResponse } : {}),
|
|
...(pinData ? { pinData } : {}),
|
|
...(datasetInputContext ? { datasetInputContext } : {}),
|
|
};
|
|
}
|
|
|
|
function isUnknownRecord(value: unknown): value is Record<string, unknown> {
|
|
return typeof value === 'object' && value !== null && !Array.isArray(value);
|
|
}
|
|
|
|
function isUnknownArray(value: unknown): value is unknown[] {
|
|
return Array.isArray(value);
|
|
}
|
|
|
|
function asRecord(value: unknown): Record<string, unknown> {
|
|
return isUnknownRecord(value) ? value : {};
|
|
}
|
|
|
|
function isNumberArray2(value: unknown): value is [number, number] {
|
|
return (
|
|
Array.isArray(value) &&
|
|
value.length === 2 &&
|
|
typeof value[0] === 'number' &&
|
|
Number.isFinite(value[0]) &&
|
|
typeof value[1] === 'number' &&
|
|
Number.isFinite(value[1])
|
|
);
|
|
}
|
|
|
|
function isNodeLike(value: unknown): boolean {
|
|
if (!isUnknownRecord(value)) return false;
|
|
const name = value.name;
|
|
const type = value.type;
|
|
const typeVersion = value.typeVersion;
|
|
const position = value.position;
|
|
return (
|
|
typeof name === 'string' &&
|
|
name.length > 0 &&
|
|
typeof type === 'string' &&
|
|
type.length > 0 &&
|
|
typeof typeVersion === 'number' &&
|
|
Number.isFinite(typeVersion) &&
|
|
isNumberArray2(position)
|
|
);
|
|
}
|
|
|
|
function isConnectionsLike(value: unknown): boolean {
|
|
if (!isUnknownRecord(value)) return false;
|
|
for (const nodeConnections of Object.values(value)) {
|
|
if (!isUnknownRecord(nodeConnections)) return false;
|
|
for (const connectionTypeValue of Object.values(nodeConnections)) {
|
|
if (!Array.isArray(connectionTypeValue)) return false;
|
|
for (const output of connectionTypeValue) {
|
|
if (!Array.isArray(output)) return false;
|
|
for (const connection of output) {
|
|
if (!isUnknownRecord(connection)) return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
function isSimpleWorkflow(value: unknown): value is SimpleWorkflow {
|
|
if (!isUnknownRecord(value)) return false;
|
|
if (!Array.isArray(value.nodes)) return false;
|
|
if (!isConnectionsLike(value.connections)) return false;
|
|
return value.nodes.every(isNodeLike);
|
|
}
|
|
|
|
function getNotionIdFromMetadata(metadata: unknown): string | undefined {
|
|
const record = asRecord(metadata);
|
|
return typeof record.notion_id === 'string' ? record.notion_id : undefined;
|
|
}
|
|
|
|
function getCategoriesFromMetadata(metadata: unknown): string[] | undefined {
|
|
const record = asRecord(metadata);
|
|
const categories = record.categories;
|
|
if (!Array.isArray(categories)) return undefined;
|
|
const strings = categories.filter((c): c is string => typeof c === 'string');
|
|
return strings.length > 0 ? strings : undefined;
|
|
}
|
|
|
|
function getEvalsFromExampleInputs(exampleInputs: unknown): { dos?: string; donts?: string } {
|
|
const inputs = asRecord(exampleInputs);
|
|
const evals = asRecord(inputs.evals);
|
|
const result: { dos?: string; donts?: string } = {};
|
|
if (typeof evals.dos === 'string') result.dos = evals.dos;
|
|
if (typeof evals.donts === 'string') result.donts = evals.donts;
|
|
return result;
|
|
}
|
|
|
|
function isFeedback(value: unknown): value is Feedback {
|
|
const kinds = new Set(['score', 'metric', 'detail'] as const);
|
|
return (
|
|
isUnknownRecord(value) &&
|
|
typeof value.evaluator === 'string' &&
|
|
typeof value.metric === 'string' &&
|
|
typeof value.score === 'number' &&
|
|
typeof value.kind === 'string' &&
|
|
kinds.has(value.kind as 'score' | 'metric' | 'detail')
|
|
);
|
|
}
|
|
|
|
function exampleMatchesFilters(example: Example, filters: LangsmithExampleFilters): boolean {
|
|
if (filters.notionId) {
|
|
if (getNotionIdFromMetadata(example.metadata) !== filters.notionId) return false;
|
|
}
|
|
|
|
if (filters.technique) {
|
|
const categories = getCategoriesFromMetadata(example.metadata) ?? [];
|
|
if (!categories.includes(filters.technique)) return false;
|
|
}
|
|
|
|
if (filters.doSearch || filters.dontSearch) {
|
|
const { dos, donts } = getEvalsFromExampleInputs(example.inputs);
|
|
if (filters.doSearch) {
|
|
const haystack = (dos ?? '').toLowerCase();
|
|
if (!haystack.includes(filters.doSearch.toLowerCase())) return false;
|
|
}
|
|
if (filters.dontSearch) {
|
|
const haystack = (donts ?? '').toLowerCase();
|
|
if (!haystack.includes(filters.dontSearch.toLowerCase())) return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
async function loadExamplesFromDataset(params: {
|
|
lsClient: {
|
|
readDataset: (args: { datasetName: string }) => Promise<{ id: string }>;
|
|
listExamples: (args: { datasetId: string; limit?: number }) => AsyncIterable<Example>;
|
|
};
|
|
datasetName: string;
|
|
maxExamples?: number;
|
|
filters?: LangsmithExampleFilters;
|
|
}): Promise<Example[]> {
|
|
const { lsClient, datasetName, maxExamples, filters } = params;
|
|
|
|
const datasetInfo = await lsClient.readDataset({ datasetName });
|
|
const matches: Example[] = [];
|
|
|
|
let scanned = 0;
|
|
const listArgs: { datasetId: string; limit?: number } = { datasetId: datasetInfo.id };
|
|
if (!filters && maxExamples) listArgs.limit = maxExamples;
|
|
|
|
for await (const example of lsClient.listExamples(listArgs)) {
|
|
scanned++;
|
|
if (filters && !exampleMatchesFilters(example, filters)) continue;
|
|
matches.push(example);
|
|
if (maxExamples && matches.length >= maxExamples) break;
|
|
}
|
|
|
|
if (filters && matches.length === 0) {
|
|
const filterSummary = [
|
|
filters.notionId ? `id:${filters.notionId}` : undefined,
|
|
filters.technique ? `technique:${filters.technique}` : undefined,
|
|
filters.doSearch ? `do:${filters.doSearch}` : undefined,
|
|
filters.dontSearch ? `dont:${filters.dontSearch}` : undefined,
|
|
]
|
|
.filter((v): v is string => v !== undefined)
|
|
.join(', ');
|
|
|
|
throw new Error(
|
|
`No examples matched filters (${filterSummary}) in dataset "${datasetName}" (scanned ${scanned})`,
|
|
);
|
|
}
|
|
|
|
if (!filters && maxExamples && matches.length === 0) {
|
|
throw new Error(`No examples found in dataset "${datasetName}"`);
|
|
}
|
|
|
|
return matches;
|
|
}
|
|
|
|
async function resolveLangsmithData(params: {
|
|
dataset: string;
|
|
langsmithOptions: LangsmithRunConfig['langsmithOptions'];
|
|
lsClient: {
|
|
readDataset: (args: { datasetName: string }) => Promise<{ id: string }>;
|
|
listExamples: (args: { datasetId: string; limit?: number }) => AsyncIterable<Example>;
|
|
};
|
|
logger: EvalLogger;
|
|
}): Promise<string | Example[]> {
|
|
const { dataset, langsmithOptions, lsClient, logger } = params;
|
|
|
|
const datasetName = dataset;
|
|
const maxExamples = langsmithOptions.maxExamples;
|
|
const filters = langsmithOptions.filters;
|
|
|
|
const shouldLoadExamples =
|
|
(typeof maxExamples === 'number' && maxExamples > 0) || filters !== undefined;
|
|
|
|
if (!shouldLoadExamples) return datasetName;
|
|
|
|
logger.info(
|
|
filters
|
|
? `Loading examples from dataset "${datasetName}" with filters...`
|
|
: `Loading up to ${maxExamples} examples from dataset "${datasetName}"...`,
|
|
);
|
|
|
|
try {
|
|
return await loadExamplesFromDataset({
|
|
lsClient,
|
|
datasetName,
|
|
maxExamples,
|
|
filters,
|
|
});
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
if (
|
|
errorMessage.startsWith('No examples matched filters') ||
|
|
errorMessage.startsWith('No examples found in dataset')
|
|
) {
|
|
throw error instanceof Error ? error : new Error(errorMessage);
|
|
}
|
|
throw new Error(`Dataset "${datasetName}" not found: ${errorMessage}`);
|
|
}
|
|
}
|
|
|
|
function extractContextFromLangsmithInputs(inputs: unknown): TestCaseContext {
|
|
const record = asRecord(inputs);
|
|
const context: TestCaseContext = {};
|
|
|
|
if (typeof record.dos === 'string') context.dos = record.dos;
|
|
if (typeof record.donts === 'string') context.donts = record.donts;
|
|
|
|
// Support both legacy referenceWorkflow (single) and referenceWorkflows (array) from dataset
|
|
if (
|
|
Array.isArray(record.referenceWorkflows) &&
|
|
record.referenceWorkflows.every((wf) => isSimpleWorkflow(wf))
|
|
) {
|
|
context.referenceWorkflows = record.referenceWorkflows;
|
|
} else if (isSimpleWorkflow(record.referenceWorkflow)) {
|
|
// Convert legacy single reference to array
|
|
context.referenceWorkflows = [record.referenceWorkflow];
|
|
}
|
|
|
|
// Extract annotations for binary-checks evaluator
|
|
if (
|
|
record.annotations &&
|
|
typeof record.annotations === 'object' &&
|
|
!Array.isArray(record.annotations)
|
|
) {
|
|
context.annotations = record.annotations as Record<string, unknown>;
|
|
}
|
|
|
|
return context;
|
|
}
|
|
|
|
/**
|
|
* Collected metrics from workflow generation.
|
|
*/
|
|
interface CollectedMetrics {
|
|
genInputTokens?: number;
|
|
genOutputTokens?: number;
|
|
discoveryDurationMs?: number;
|
|
builderDurationMs?: number;
|
|
responderDurationMs?: number;
|
|
nodeCount?: number;
|
|
introspectionEvents?: IntrospectionEvent[];
|
|
}
|
|
|
|
/**
|
|
* Create collectors for workflow generation metrics.
|
|
* Returns collectors and a function to get the collected values.
|
|
*/
|
|
function createMetricsCollectors(): {
|
|
collectors: GenerationCollectors;
|
|
getMetrics: () => CollectedMetrics;
|
|
} {
|
|
const metrics: CollectedMetrics = {};
|
|
|
|
const collectors: GenerationCollectors = {
|
|
tokenUsage: (usage) => {
|
|
metrics.genInputTokens = usage.inputTokens;
|
|
metrics.genOutputTokens = usage.outputTokens;
|
|
},
|
|
subgraphMetrics: (m) => {
|
|
metrics.discoveryDurationMs = m.discoveryDurationMs;
|
|
metrics.builderDurationMs = m.builderDurationMs;
|
|
metrics.responderDurationMs = m.responderDurationMs;
|
|
metrics.nodeCount = m.nodeCount;
|
|
},
|
|
introspectionEvents: (events) => {
|
|
metrics.introspectionEvents = events;
|
|
},
|
|
};
|
|
|
|
return { collectors, getMetrics: () => metrics };
|
|
}
|
|
|
|
/**
|
|
* Build SubgraphMetrics object if any metrics are present, otherwise undefined.
|
|
*/
|
|
function buildSubgraphMetrics(metrics: CollectedMetrics): ExampleResult['subgraphMetrics'] {
|
|
const { discoveryDurationMs, builderDurationMs, responderDurationMs, nodeCount } = metrics;
|
|
const hasMetrics =
|
|
discoveryDurationMs !== undefined ||
|
|
builderDurationMs !== undefined ||
|
|
responderDurationMs !== undefined ||
|
|
nodeCount !== undefined;
|
|
|
|
return hasMetrics
|
|
? { discoveryDurationMs, builderDurationMs, responderDurationMs, nodeCount }
|
|
: undefined;
|
|
}
|
|
|
|
/**
|
|
* Create error feedback array.
|
|
*/
|
|
function createErrorFeedback(errorMessage: string): Feedback[] {
|
|
return [
|
|
{
|
|
evaluator: 'runner',
|
|
metric: 'error',
|
|
score: 0,
|
|
kind: 'score',
|
|
comment: errorMessage,
|
|
},
|
|
];
|
|
}
|
|
|
|
/**
|
|
* Execute the success path for a local example (generation + evaluation).
|
|
* Extracted to reduce complexity of runLocalExample.
|
|
*/
|
|
async function runLocalExampleSuccess(args: {
|
|
index: number;
|
|
startTime: number;
|
|
testCase: TestCase;
|
|
generateWorkflow: (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
) => Promise<SimpleWorkflow | GenerationResult>;
|
|
evaluators: Array<Evaluator<EvaluationContext>>;
|
|
globalContext?: GlobalRunContext;
|
|
passThreshold: number;
|
|
timeoutMs: number | undefined;
|
|
lifecycle?: Partial<EvaluationLifecycle>;
|
|
pinDataGenerator?: (workflow: SimpleWorkflow) => Promise<IPinData>;
|
|
}): Promise<ExampleResult> {
|
|
const {
|
|
index,
|
|
startTime,
|
|
testCase,
|
|
generateWorkflow,
|
|
evaluators,
|
|
globalContext,
|
|
passThreshold,
|
|
timeoutMs,
|
|
lifecycle,
|
|
pinDataGenerator,
|
|
} = args;
|
|
|
|
// Generate workflow with metrics collection
|
|
const genStartTime = Date.now();
|
|
const { collectors, getMetrics } = createMetricsCollectors();
|
|
|
|
const genResult = await runWithOptionalLimiter(async () => {
|
|
return await withTimeout({
|
|
promise: generateWorkflow(testCase.prompt, testCase.context?.datasetInputContext, collectors),
|
|
timeoutMs,
|
|
label: 'workflow_generation',
|
|
});
|
|
}, globalContext?.llmCallLimiter);
|
|
const genDurationMs = Date.now() - genStartTime;
|
|
|
|
// Extract workflow and optional generated code
|
|
const workflow = isGenerationResult(genResult) ? genResult.workflow : genResult;
|
|
const generatedCode = isGenerationResult(genResult) ? genResult.generatedCode : undefined;
|
|
const agentTextResponse = isGenerationResult(genResult) ? genResult.agentTextResponse : undefined;
|
|
|
|
lifecycle?.onWorkflowGenerated?.(workflow, genDurationMs);
|
|
|
|
// Generate pin data for service nodes (best-effort)
|
|
let pinData: IPinData | undefined;
|
|
if (pinDataGenerator) {
|
|
try {
|
|
pinData = await pinDataGenerator(workflow);
|
|
} catch {
|
|
// Pin data generation is best-effort — don't fail the evaluation
|
|
}
|
|
}
|
|
|
|
const context = buildContext({
|
|
prompt: testCase.prompt,
|
|
globalContext: {
|
|
...(globalContext ?? {}),
|
|
timeoutMs,
|
|
},
|
|
testCaseContext: testCase.context,
|
|
referenceWorkflows: testCase.referenceWorkflows,
|
|
generatedCode,
|
|
agentTextResponse,
|
|
pinData,
|
|
datasetInputContext: testCase.context?.datasetInputContext,
|
|
});
|
|
|
|
// Run evaluators in parallel
|
|
const evalStartTime = Date.now();
|
|
const feedback = await evaluateWithPlugins(workflow, evaluators, context, timeoutMs, lifecycle);
|
|
const evalDurationMs = Date.now() - evalStartTime;
|
|
|
|
// Calculate result
|
|
const score = calculateExampleScore(feedback);
|
|
const status = hasErrorFeedback(feedback) ? 'error' : determineStatus({ score, passThreshold });
|
|
const durationMs = Date.now() - startTime;
|
|
const metrics = getMetrics();
|
|
|
|
return {
|
|
index,
|
|
prompt: testCase.prompt,
|
|
status,
|
|
score,
|
|
feedback,
|
|
durationMs,
|
|
generationDurationMs: genDurationMs,
|
|
evaluationDurationMs: evalDurationMs,
|
|
generationInputTokens: metrics.genInputTokens,
|
|
generationOutputTokens: metrics.genOutputTokens,
|
|
subgraphMetrics: buildSubgraphMetrics(metrics),
|
|
introspectionEvents: metrics.introspectionEvents,
|
|
workflow,
|
|
generatedCode,
|
|
agentTextResponse,
|
|
};
|
|
}
|
|
|
|
async function runLocalExample(args: {
|
|
index: number;
|
|
total: number;
|
|
testCase: TestCase;
|
|
generateWorkflow: (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
) => Promise<SimpleWorkflow | GenerationResult>;
|
|
evaluators: Array<Evaluator<EvaluationContext>>;
|
|
globalContext?: GlobalRunContext;
|
|
passThreshold: number;
|
|
timeoutMs: number | undefined;
|
|
lifecycle?: Partial<EvaluationLifecycle>;
|
|
artifactSaver?: ArtifactSaver | null;
|
|
pinDataGenerator?: (workflow: SimpleWorkflow) => Promise<IPinData>;
|
|
}): Promise<ExampleResult> {
|
|
const {
|
|
index,
|
|
total,
|
|
testCase,
|
|
generateWorkflow,
|
|
evaluators,
|
|
globalContext,
|
|
passThreshold,
|
|
timeoutMs,
|
|
lifecycle,
|
|
artifactSaver,
|
|
pinDataGenerator,
|
|
} = args;
|
|
|
|
const startTime = Date.now();
|
|
lifecycle?.onExampleStart?.(index, total, testCase.prompt);
|
|
|
|
try {
|
|
const result = await runLocalExampleSuccess({
|
|
index,
|
|
startTime,
|
|
testCase,
|
|
generateWorkflow,
|
|
evaluators,
|
|
globalContext,
|
|
passThreshold,
|
|
timeoutMs,
|
|
lifecycle,
|
|
pinDataGenerator,
|
|
});
|
|
|
|
artifactSaver?.saveExample(result);
|
|
lifecycle?.onExampleComplete?.(index, result);
|
|
return result;
|
|
} catch (error) {
|
|
const durationMs = Date.now() - startTime;
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
const result: ExampleResult = {
|
|
index,
|
|
prompt: testCase.prompt,
|
|
status: 'error',
|
|
score: 0,
|
|
feedback: createErrorFeedback(errorMessage),
|
|
durationMs,
|
|
error: errorMessage,
|
|
};
|
|
|
|
artifactSaver?.saveExample(result);
|
|
lifecycle?.onExampleComplete?.(index, result);
|
|
return result;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Run evaluation in local mode.
|
|
*/
|
|
function createArtifactSaverIfRequested(args: {
|
|
outputDir?: string;
|
|
logger: EvalLogger;
|
|
}): ArtifactSaver | null {
|
|
const { outputDir, logger } = args;
|
|
if (!outputDir) return null;
|
|
return createArtifactSaver({ outputDir, logger });
|
|
}
|
|
|
|
async function runLocalDataset(params: {
|
|
testCases: TestCase[];
|
|
generateWorkflow: (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
) => Promise<SimpleWorkflow | GenerationResult>;
|
|
evaluators: Array<Evaluator<EvaluationContext>>;
|
|
globalContext?: GlobalRunContext;
|
|
passThreshold: number;
|
|
timeoutMs: number | undefined;
|
|
lifecycle?: Partial<EvaluationLifecycle>;
|
|
artifactSaver: ArtifactSaver | null;
|
|
concurrency?: number;
|
|
pinDataGenerator?: (workflow: SimpleWorkflow) => Promise<IPinData>;
|
|
}): Promise<ExampleResult[]> {
|
|
const {
|
|
testCases,
|
|
generateWorkflow,
|
|
evaluators,
|
|
globalContext,
|
|
passThreshold,
|
|
timeoutMs,
|
|
lifecycle,
|
|
artifactSaver,
|
|
concurrency = 1,
|
|
pinDataGenerator,
|
|
} = params;
|
|
|
|
// Use pLimit to control concurrency of example execution
|
|
const exampleLimiter = pLimit(concurrency);
|
|
|
|
const resultPromises = testCases.map(async (testCase, i) => {
|
|
const index = i + 1;
|
|
return await exampleLimiter(
|
|
async () =>
|
|
await runLocalExample({
|
|
index,
|
|
total: testCases.length,
|
|
testCase,
|
|
generateWorkflow,
|
|
evaluators,
|
|
globalContext,
|
|
passThreshold,
|
|
timeoutMs,
|
|
lifecycle,
|
|
artifactSaver,
|
|
pinDataGenerator,
|
|
}),
|
|
);
|
|
});
|
|
|
|
return await Promise.all(resultPromises);
|
|
}
|
|
|
|
function buildRunSummary(results: ExampleResult[]): RunSummary {
|
|
const passed = results.filter((r) => r.status === 'pass').length;
|
|
const failed = results.filter((r) => r.status === 'fail').length;
|
|
const errors = results.filter((r) => r.status === 'error').length;
|
|
|
|
const averageScore =
|
|
results.length > 0 ? results.reduce((sum, r) => sum + r.score, 0) / results.length : 0;
|
|
const totalDurationMs = results.reduce((sum, r) => sum + r.durationMs, 0);
|
|
|
|
return {
|
|
totalExamples: results.length,
|
|
passed,
|
|
failed,
|
|
errors,
|
|
averageScore,
|
|
totalDurationMs,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Compute average scores per evaluator from example results.
|
|
*/
|
|
function computeEvaluatorAverages(
|
|
results: ExampleResult[],
|
|
logger?: EvalLogger,
|
|
): Record<string, number> {
|
|
const evaluatorStats: Record<string, { scores: number[] }> = {};
|
|
|
|
for (const result of results) {
|
|
// Group feedback by evaluator
|
|
const byEvaluator: Record<string, Feedback[]> = {};
|
|
for (const fb of result.feedback) {
|
|
if (!byEvaluator[fb.evaluator]) byEvaluator[fb.evaluator] = [];
|
|
byEvaluator[fb.evaluator].push(fb);
|
|
}
|
|
|
|
// Calculate per-evaluator average for this example
|
|
for (const [evaluator, items] of Object.entries(byEvaluator)) {
|
|
if (!evaluatorStats[evaluator]) {
|
|
evaluatorStats[evaluator] = { scores: [] };
|
|
}
|
|
const scoringItems = selectScoringItems(items);
|
|
const avg = calculateFiniteAverage(scoringItems);
|
|
evaluatorStats[evaluator].scores.push(avg);
|
|
}
|
|
}
|
|
|
|
// Compute overall average per evaluator
|
|
const evaluatorAverages: Record<string, number> = {};
|
|
for (const [name, stats] of Object.entries(evaluatorStats)) {
|
|
const avg = stats.scores.reduce((a, b) => a + b, 0) / stats.scores.length;
|
|
logger?.verbose(
|
|
`[computeEvaluatorAverages] Final avg for "${name}": ${stats.scores.join(', ')} -> ${avg}`,
|
|
);
|
|
evaluatorAverages[name] = avg;
|
|
}
|
|
|
|
logger?.verbose(`[computeEvaluatorAverages] Final result: ${JSON.stringify(evaluatorAverages)}`);
|
|
return evaluatorAverages;
|
|
}
|
|
|
|
interface LangsmithSummaryParams {
|
|
stats: {
|
|
total: number;
|
|
passed: number;
|
|
failed: number;
|
|
errors: number;
|
|
scoreSum: number;
|
|
durationSumMs: number;
|
|
};
|
|
langsmithData: {
|
|
experimentName?: string;
|
|
experimentId?: string;
|
|
datasetId?: string;
|
|
};
|
|
evaluatorAverages?: Record<string, number>;
|
|
}
|
|
|
|
function buildLangsmithSummary(params: LangsmithSummaryParams): RunSummary {
|
|
const { stats, langsmithData, evaluatorAverages } = params;
|
|
const { experimentName, experimentId, datasetId } = langsmithData;
|
|
|
|
const summary: RunSummary = {
|
|
totalExamples: stats.total,
|
|
passed: stats.passed,
|
|
failed: stats.failed,
|
|
errors: stats.errors,
|
|
averageScore: stats.total > 0 ? stats.scoreSum / stats.total : 0,
|
|
totalDurationMs: stats.durationSumMs,
|
|
evaluatorAverages,
|
|
};
|
|
|
|
// Add LangSmith IDs if available
|
|
if (experimentName && experimentId && datasetId) {
|
|
summary.langsmith = { experimentName, experimentId, datasetId };
|
|
}
|
|
|
|
return summary;
|
|
}
|
|
|
|
async function runLocal(config: LocalRunConfig): Promise<RunSummary> {
|
|
const {
|
|
dataset,
|
|
generateWorkflow,
|
|
evaluators,
|
|
context: globalContext,
|
|
passThreshold = DEFAULT_PASS_THRESHOLD,
|
|
timeoutMs,
|
|
lifecycle,
|
|
outputDir,
|
|
outputCsv,
|
|
suite,
|
|
logger,
|
|
concurrency = 1,
|
|
pinDataGenerator,
|
|
} = config;
|
|
|
|
const testCases: TestCase[] = dataset;
|
|
if (testCases.length === 0) {
|
|
logger.warn('No test cases provided');
|
|
}
|
|
|
|
const effectiveGlobalContext: GlobalRunContext = {
|
|
...(globalContext ?? {}),
|
|
llmCallLimiter: globalContext?.llmCallLimiter ?? pLimit(4),
|
|
timeoutMs,
|
|
};
|
|
|
|
// Create artifact saver if outputDir is provided
|
|
const artifactSaver = createArtifactSaverIfRequested({ outputDir, logger });
|
|
|
|
lifecycle?.onStart?.(config);
|
|
|
|
const results = await runLocalDataset({
|
|
testCases,
|
|
generateWorkflow,
|
|
evaluators,
|
|
globalContext: effectiveGlobalContext,
|
|
passThreshold,
|
|
timeoutMs,
|
|
lifecycle,
|
|
artifactSaver,
|
|
concurrency,
|
|
pinDataGenerator,
|
|
});
|
|
const summary = buildRunSummary(results);
|
|
|
|
// Save summary to disk if outputDir is provided
|
|
artifactSaver?.saveSummary(summary, results);
|
|
|
|
// Write CSV if requested
|
|
if (outputCsv) {
|
|
const { writeResultsCsv } = await import('./csv-writer.js');
|
|
// Map suite to CSV format
|
|
const csvSuite =
|
|
suite === 'llm-judge' || suite === 'pairwise' || suite === 'binary-checks'
|
|
? suite
|
|
: undefined;
|
|
writeResultsCsv(results, outputCsv, { suite: csvSuite });
|
|
logger.info(`Results written to: ${outputCsv}`);
|
|
}
|
|
|
|
await lifecycle?.onEnd?.(summary);
|
|
|
|
return summary;
|
|
}
|
|
|
|
/**
|
|
* Output from LangSmith target function.
|
|
*/
|
|
interface LangsmithTargetOutput {
|
|
workflow: SimpleWorkflow;
|
|
prompt: string;
|
|
feedback: Feedback[];
|
|
}
|
|
|
|
/**
|
|
* Input from LangSmith dataset.
|
|
* Supports both direct prompt string and messages array format.
|
|
*/
|
|
interface LangsmithDatasetInput {
|
|
prompt?: string;
|
|
messages?: BaseMessage[];
|
|
evals?: Record<string, unknown>;
|
|
workflowJSON?: unknown;
|
|
workflowContext?: unknown;
|
|
workflowOperations?: unknown[];
|
|
mode?: string;
|
|
/** Injected by enrichExamplesWithHistory - historical messages from outputs */
|
|
_historicalMessages?: unknown[];
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
/**
|
|
* Extract prompt from dataset input.
|
|
* Supports both direct prompt and messages array format.
|
|
*/
|
|
function extractPrompt(inputs: LangsmithDatasetInput): string {
|
|
// Direct prompt string
|
|
if (inputs.prompt && typeof inputs.prompt === 'string') {
|
|
return inputs.prompt;
|
|
}
|
|
|
|
// Messages array format
|
|
if (inputs.messages && Array.isArray(inputs.messages) && inputs.messages.length > 0) {
|
|
return extractMessageContent(inputs.messages[0]);
|
|
}
|
|
|
|
throw new Error('No prompt found in inputs - expected "prompt" string or "messages" array');
|
|
}
|
|
|
|
/**
|
|
* Pre-process LangSmith examples to extract conversation history from outputs
|
|
* and inject it into inputs for the target function.
|
|
*
|
|
* The dataset format has:
|
|
* - inputs.messages[0]: The latest user turn
|
|
* - outputs.messages: The FULL conversation (all prior turns + latest + AI response)
|
|
*
|
|
* We find the latest turn in outputs and extract everything before it as historical.
|
|
*/
|
|
function enrichExamplesWithHistory(examples: Example[]): Example[] {
|
|
return examples.map((example) => {
|
|
const outputMessages = (example.outputs as Record<string, unknown> | undefined)?.messages;
|
|
if (!Array.isArray(outputMessages) || outputMessages.length <= 1) {
|
|
return example; // No history to extract
|
|
}
|
|
|
|
const inputMessages = (example.inputs as Record<string, unknown> | undefined)?.messages;
|
|
if (!Array.isArray(inputMessages) || inputMessages.length === 0) {
|
|
return example;
|
|
}
|
|
|
|
// Get the content of the latest turn from inputs
|
|
const inputMsg = inputMessages[0] as Record<string, unknown> | undefined;
|
|
const inputContent = (inputMsg?.kwargs as Record<string, unknown> | undefined)?.content;
|
|
if (typeof inputContent !== 'string') return example;
|
|
|
|
// Find the index of the latest turn in output messages by matching content
|
|
const latestTurnIndex = outputMessages.findIndex((msg: unknown) => {
|
|
if (!isUnknownRecord(msg)) return false;
|
|
const kwargs = msg.kwargs;
|
|
if (!isUnknownRecord(kwargs)) return false;
|
|
return kwargs.content === inputContent;
|
|
});
|
|
|
|
if (latestTurnIndex <= 0) return example; // No history before the latest turn
|
|
|
|
// Extract all messages before the latest turn as historical
|
|
const historicalMessages = outputMessages.slice(0, latestTurnIndex);
|
|
|
|
return {
|
|
...example,
|
|
inputs: {
|
|
...example.inputs,
|
|
_historicalMessages: historicalMessages,
|
|
},
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Deserialize raw LangChain `lc` serialization format messages into BaseMessage instances.
|
|
* Dataset messages use the format: {id, lc: 1, type: "constructor", kwargs: {content, ...}}
|
|
* with the class ID in `id` array (e.g. ["langchain_core", "messages", "HumanMessage"]).
|
|
*
|
|
* Only keeps HumanMessage and AIMessage — tool calls/results are stripped
|
|
* to simplify the context passed to the code builder.
|
|
*/
|
|
function deserializeLcMessages(rawMessages: unknown[]): BaseMessage[] {
|
|
return rawMessages
|
|
.filter((msg): msg is Record<string, unknown> => {
|
|
if (!isUnknownRecord(msg)) return false;
|
|
const id = Array.isArray(msg.id) ? (msg.id as unknown[]) : [];
|
|
const last: unknown = id[id.length - 1];
|
|
return last === 'HumanMessage' || last === 'AIMessage';
|
|
})
|
|
.map((msg) => {
|
|
const kwargs = isUnknownRecord(msg.kwargs) ? msg.kwargs : {};
|
|
const content = typeof kwargs.content === 'string' ? kwargs.content : '';
|
|
const id = Array.isArray(msg.id) ? (msg.id as unknown[]) : [];
|
|
|
|
return id[id.length - 1] === 'HumanMessage'
|
|
? new HumanMessage(content)
|
|
: new AIMessage(content);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Extract DatasetInputContext from LangSmith dataset inputs.
|
|
* Captures the full agent context (workflowContext, existing workflow, mode)
|
|
* needed to replay the generation realistically.
|
|
*/
|
|
function extractDatasetInputContext(
|
|
inputs: LangsmithDatasetInput,
|
|
): DatasetInputContext | undefined {
|
|
const hasContext =
|
|
inputs.workflowContext ?? inputs.workflowJSON ?? inputs.mode ?? inputs._historicalMessages;
|
|
if (!hasContext) return undefined;
|
|
|
|
const context: DatasetInputContext = {};
|
|
|
|
if (isUnknownRecord(inputs.workflowContext)) {
|
|
context.workflowContext = inputs.workflowContext as ChatPayload['workflowContext'];
|
|
}
|
|
|
|
if (isSimpleWorkflow(inputs.workflowJSON)) {
|
|
context.existingWorkflow = inputs.workflowJSON;
|
|
}
|
|
|
|
if (inputs.mode === 'build' || inputs.mode === 'plan') {
|
|
context.mode = inputs.mode;
|
|
}
|
|
|
|
if (Array.isArray(inputs._historicalMessages) && inputs._historicalMessages.length > 0) {
|
|
context.historicalMessages = deserializeLcMessages(inputs._historicalMessages);
|
|
}
|
|
|
|
return Object.keys(context).length > 0 ? context : undefined;
|
|
}
|
|
|
|
function createLangsmithFeedbackExtractor(): (
|
|
rootRun: Run,
|
|
_example?: Example,
|
|
) => Promise<Array<{ key: string; score: number; comment?: string }>> {
|
|
return async (rootRun: Run, _example?: Example) => {
|
|
const outputs = rootRun.outputs;
|
|
const feedback =
|
|
isUnknownRecord(outputs) &&
|
|
isUnknownArray(outputs.feedback) &&
|
|
outputs.feedback.every(isFeedback)
|
|
? outputs.feedback
|
|
: undefined;
|
|
|
|
if (!feedback) {
|
|
return [
|
|
{
|
|
key: 'evaluationError',
|
|
score: 0,
|
|
comment: 'No feedback found in target output',
|
|
},
|
|
];
|
|
}
|
|
|
|
return feedback.map((fb) => toLangsmithEvaluationResult(fb));
|
|
};
|
|
}
|
|
|
|
function applyRepetitions(data: string | Example[], repetitions: number): string | Example[] {
|
|
if (!Array.isArray(data) || repetitions <= 1) return data;
|
|
return Array.from({ length: repetitions }, () => data).flat();
|
|
}
|
|
|
|
function computeFilterMetadata(filters?: LangsmithExampleFilters): {
|
|
runType: string;
|
|
filterValue?: string;
|
|
} {
|
|
if (!filters) return { runType: 'full' };
|
|
|
|
const parts: string[] = [];
|
|
const values: string[] = [];
|
|
|
|
if (filters.notionId) {
|
|
parts.push('id');
|
|
values.push(`id:${filters.notionId}`);
|
|
}
|
|
if (filters.technique) {
|
|
parts.push('category');
|
|
values.push(`category:${filters.technique}`);
|
|
}
|
|
if (filters.doSearch) {
|
|
parts.push('do');
|
|
values.push(`do:${filters.doSearch}`);
|
|
}
|
|
if (filters.dontSearch) {
|
|
parts.push('dont');
|
|
values.push(`dont:${filters.dontSearch}`);
|
|
}
|
|
|
|
if (parts.length === 0) return { runType: 'full' };
|
|
|
|
return {
|
|
runType: `by-${parts.join('-and-')}`,
|
|
filterValue: values.join(' '),
|
|
};
|
|
}
|
|
|
|
function logLangsmithInputsSummary(logger: EvalLogger, effectiveData: string | Example[]): void {
|
|
if (!Array.isArray(effectiveData)) {
|
|
logger.verbose('Data source: dataset (streaming)');
|
|
return;
|
|
}
|
|
|
|
logger.verbose(`Data source: preloaded examples (${effectiveData.length})`);
|
|
logger.verbose(
|
|
`Example IDs in data: ${effectiveData
|
|
.slice(0, 20)
|
|
.map((e) => e.id)
|
|
.join(', ')}`,
|
|
);
|
|
}
|
|
|
|
async function runLangsmithEvaluateAndFlush(params: {
|
|
target: (inputs: LangsmithDatasetInput) => Promise<LangsmithTargetOutput>;
|
|
effectiveData: string | Example[];
|
|
feedbackExtractor: ReturnType<typeof createLangsmithFeedbackExtractor>;
|
|
langsmithOptions: LangsmithRunConfig['langsmithOptions'];
|
|
lsClient: LangsmithRunConfig['langsmithClient'];
|
|
logger: EvalLogger;
|
|
targetCallCount: () => number;
|
|
}): Promise<{
|
|
experimentName?: string;
|
|
experimentId?: string;
|
|
datasetId?: string;
|
|
}> {
|
|
const {
|
|
target,
|
|
effectiveData,
|
|
feedbackExtractor,
|
|
langsmithOptions,
|
|
lsClient,
|
|
logger,
|
|
targetCallCount,
|
|
} = params;
|
|
|
|
const exampleCount = Array.isArray(effectiveData) ? effectiveData.length : 'dataset';
|
|
|
|
logger.info(
|
|
`Starting LangSmith evaluate() with ${exampleCount} examples, ${langsmithOptions.repetitions} repetitions, concurrency ${langsmithOptions.concurrency}...`,
|
|
);
|
|
|
|
const { runType, filterValue } = computeFilterMetadata(langsmithOptions.filters);
|
|
|
|
const evalStartTime = Date.now();
|
|
const experimentResults = await evaluate(target, {
|
|
data: effectiveData,
|
|
evaluators: [feedbackExtractor],
|
|
experimentPrefix: langsmithOptions.experimentName,
|
|
// Repetitions are applied explicitly when pre-loading examples to keep behavior consistent.
|
|
// When streaming from a dataset name, the SDK may support repetitions internally.
|
|
...(!Array.isArray(effectiveData) &&
|
|
langsmithOptions.repetitions > 1 && { numRepetitions: langsmithOptions.repetitions }),
|
|
maxConcurrency: langsmithOptions.concurrency,
|
|
client: lsClient,
|
|
metadata: {
|
|
repetitions: langsmithOptions.repetitions,
|
|
concurrency: langsmithOptions.concurrency,
|
|
runType,
|
|
...(filterValue && { filterValue }),
|
|
...langsmithOptions.experimentMetadata,
|
|
},
|
|
});
|
|
logger.info(
|
|
`Evaluation completed in ${((Date.now() - evalStartTime) / 1000).toFixed(1)}s (target called ${targetCallCount()} times)`,
|
|
);
|
|
|
|
// Flush pending traces to ensure all data is sent to LangSmith
|
|
logger.verbose('Flushing pending trace batches...');
|
|
const flushStartTime = Date.now();
|
|
await lsClient.awaitPendingTraceBatches();
|
|
logger.verbose(`Flush completed in ${((Date.now() - flushStartTime) / 1000).toFixed(1)}s`);
|
|
|
|
const experimentName = experimentResults.experimentName;
|
|
logger.info(`Experiment completed: ${experimentName}`);
|
|
|
|
let experimentId: string | undefined;
|
|
let datasetId: string | undefined;
|
|
|
|
try {
|
|
const manager = (
|
|
experimentResults as unknown as {
|
|
manager?: { _getExperiment?: () => { id: string }; datasetId?: Promise<string> };
|
|
}
|
|
).manager;
|
|
if (manager?._getExperiment) {
|
|
experimentId = manager._getExperiment().id;
|
|
}
|
|
if (manager?.datasetId) {
|
|
datasetId = await manager.datasetId;
|
|
}
|
|
} catch {
|
|
logger.verbose('Could not extract LangSmith IDs from experiment results');
|
|
}
|
|
|
|
return { experimentName, experimentId, datasetId };
|
|
}
|
|
|
|
/**
|
|
* Stats tracker for LangSmith evaluation.
|
|
*/
|
|
interface LangsmithStats {
|
|
total: number;
|
|
passed: number;
|
|
failed: number;
|
|
errors: number;
|
|
scoreSum: number;
|
|
durationSumMs: number;
|
|
}
|
|
|
|
/**
|
|
* Update stats based on example result status.
|
|
*/
|
|
function updateStats(
|
|
stats: LangsmithStats,
|
|
status: 'pass' | 'fail' | 'error',
|
|
score: number,
|
|
durationMs: number,
|
|
): void {
|
|
stats.total++;
|
|
stats.scoreSum += score;
|
|
stats.durationSumMs += durationMs;
|
|
|
|
if (status === 'pass') stats.passed++;
|
|
else if (status === 'fail') stats.failed++;
|
|
else stats.errors++;
|
|
}
|
|
|
|
/**
|
|
* Resolve LangSmith dataset examples and enrich them with conversation history.
|
|
* Handles fallback preloading when filters/maxExamples are requested on a string dataset.
|
|
*/
|
|
async function resolveAndEnrichLangsmithData(params: {
|
|
dataset: string;
|
|
langsmithOptions: LangsmithRunConfig['langsmithOptions'];
|
|
lsClient: LangsmithRunConfig['langsmithClient'];
|
|
logger: EvalLogger;
|
|
}): Promise<string | Example[]> {
|
|
const { dataset, langsmithOptions, lsClient, logger } = params;
|
|
let data = await resolveLangsmithData({ dataset, langsmithOptions, lsClient, logger });
|
|
// Defensive: if maxExamples/filters were requested but we still got a dataset name,
|
|
// fall back to preloading so we can honor limits instead of streaming everything.
|
|
if (
|
|
typeof data === 'string' &&
|
|
((langsmithOptions.maxExamples ?? 0) > 0 || langsmithOptions.filters !== undefined)
|
|
) {
|
|
data = await loadExamplesFromDataset({
|
|
lsClient,
|
|
datasetName: data,
|
|
maxExamples: langsmithOptions.maxExamples,
|
|
filters: langsmithOptions.filters,
|
|
});
|
|
}
|
|
// Enrich pre-loaded examples with conversation history from outputs
|
|
return Array.isArray(data) ? enrichExamplesWithHistory(data) : data;
|
|
}
|
|
|
|
/**
|
|
* Run evaluation in LangSmith mode.
|
|
*/
|
|
async function runLangsmith(config: LangsmithRunConfig): Promise<RunSummary> {
|
|
const {
|
|
dataset,
|
|
generateWorkflow,
|
|
evaluators,
|
|
context: globalContext,
|
|
outputDir,
|
|
outputCsv,
|
|
suite,
|
|
passThreshold = DEFAULT_PASS_THRESHOLD,
|
|
timeoutMs,
|
|
langsmithOptions,
|
|
langsmithClient: lsClient,
|
|
lifecycle,
|
|
logger,
|
|
pinDataGenerator,
|
|
} = config;
|
|
|
|
// Enable tracing (required in langsmith 0.4.x)
|
|
process.env.LANGSMITH_TRACING = 'true';
|
|
|
|
lifecycle?.onStart?.(config);
|
|
|
|
const effectiveGlobalContext: GlobalRunContext = {
|
|
...(globalContext ?? {}),
|
|
llmCallLimiter: globalContext?.llmCallLimiter ?? pLimit(langsmithOptions.concurrency),
|
|
timeoutMs,
|
|
};
|
|
|
|
const artifactSaver = createArtifactSaverIfRequested({ outputDir, logger });
|
|
const capturedResults: ExampleResult[] = [];
|
|
|
|
// Create traceable wrappers ONCE outside target function to avoid context leaking
|
|
// when running concurrent evaluations. Pass all parameters explicitly (no closures).
|
|
// IMPORTANT: Get callbacks INSIDE the traceable wrapper where AsyncLocalStorage context
|
|
// is correctly set, then pass them explicitly to genFn to avoid race conditions.
|
|
const traceableGenerateWorkflow = traceable(
|
|
async (args: {
|
|
prompt: string;
|
|
genFn: (
|
|
prompt: string,
|
|
datasetInputContext?: DatasetInputContext,
|
|
collectors?: GenerationCollectors,
|
|
) => Promise<SimpleWorkflow | GenerationResult>;
|
|
collectors?: GenerationCollectors;
|
|
limiter?: LlmCallLimiter;
|
|
genTimeoutMs?: number;
|
|
datasetInputContext?: DatasetInputContext;
|
|
}): Promise<SimpleWorkflow | GenerationResult> => {
|
|
return await runWithOptionalLimiter(async () => {
|
|
return await withTimeout({
|
|
promise: args.genFn(args.prompt, args.datasetInputContext, args.collectors),
|
|
timeoutMs: args.genTimeoutMs,
|
|
label: 'workflow_generation',
|
|
});
|
|
}, args.limiter);
|
|
},
|
|
{
|
|
name: 'workflow_generation',
|
|
run_type: 'chain',
|
|
client: lsClient,
|
|
},
|
|
);
|
|
|
|
// Separate traceable wrapper for evaluation so it appears as a sibling to
|
|
// workflow_generation in LangSmith traces (not nested under it).
|
|
const traceableEvaluateWorkflow = traceable(
|
|
async (args: {
|
|
workflow: SimpleWorkflow;
|
|
evaluators: Array<Evaluator<EvaluationContext>>;
|
|
context: EvaluationContext;
|
|
evalTimeoutMs?: number;
|
|
evalLifecycle?: Partial<EvaluationLifecycle>;
|
|
}): Promise<Feedback[]> => {
|
|
return await evaluateWithPlugins(
|
|
args.workflow,
|
|
args.evaluators,
|
|
args.context,
|
|
args.evalTimeoutMs,
|
|
args.evalLifecycle,
|
|
);
|
|
},
|
|
{
|
|
name: 'workflow_evaluation',
|
|
run_type: 'chain',
|
|
client: lsClient,
|
|
},
|
|
);
|
|
|
|
// Create target function that does ALL work (generation + evaluation)
|
|
// NOTE: Do NOT wrap target with traceable() - evaluate() handles tracing automatically.
|
|
let targetCallCount = 0;
|
|
let totalExamples = 0;
|
|
const stats: LangsmithStats = {
|
|
total: 0,
|
|
passed: 0,
|
|
failed: 0,
|
|
errors: 0,
|
|
scoreSum: 0,
|
|
durationSumMs: 0,
|
|
};
|
|
const target = async (inputs: LangsmithDatasetInput): Promise<LangsmithTargetOutput> => {
|
|
targetCallCount++;
|
|
const index = targetCallCount;
|
|
// Extract prompt from inputs (supports both direct prompt and messages array)
|
|
const prompt = extractPrompt(inputs);
|
|
const datasetInputContext = extractDatasetInputContext(inputs);
|
|
const { evals: datasetContext, ...rest } = inputs;
|
|
|
|
lifecycle?.onExampleStart?.(index, totalExamples, prompt);
|
|
const startTime = Date.now();
|
|
const genStart = Date.now();
|
|
const { collectors, getMetrics } = createMetricsCollectors();
|
|
|
|
try {
|
|
const genResult = await traceableGenerateWorkflow({
|
|
prompt,
|
|
genFn: generateWorkflow,
|
|
collectors,
|
|
limiter: effectiveGlobalContext.llmCallLimiter,
|
|
genTimeoutMs: timeoutMs,
|
|
datasetInputContext,
|
|
});
|
|
const genDurationMs = Date.now() - genStart;
|
|
|
|
// Extract workflow and optional generated code
|
|
const workflow = isGenerationResult(genResult) ? genResult.workflow : genResult;
|
|
const generatedCode = isGenerationResult(genResult) ? genResult.generatedCode : undefined;
|
|
const agentTextResponse = isGenerationResult(genResult)
|
|
? genResult.agentTextResponse
|
|
: undefined;
|
|
|
|
lifecycle?.onWorkflowGenerated?.(workflow, genDurationMs);
|
|
|
|
// Generate pin data for service nodes (best-effort)
|
|
let pinData: IPinData | undefined;
|
|
if (pinDataGenerator) {
|
|
try {
|
|
pinData = await pinDataGenerator(workflow);
|
|
} catch {
|
|
// Pin data generation is best-effort — don't fail the evaluation
|
|
}
|
|
}
|
|
|
|
const extracted = extractContextFromLangsmithInputs({
|
|
...asRecord(datasetContext),
|
|
...asRecord(rest),
|
|
});
|
|
const context = buildContext({
|
|
prompt,
|
|
globalContext: effectiveGlobalContext,
|
|
testCaseContext: extracted,
|
|
generatedCode,
|
|
agentTextResponse,
|
|
pinData,
|
|
datasetInputContext,
|
|
});
|
|
|
|
// Run all evaluators in parallel (wrapped in traceable so it appears
|
|
// as a sibling to workflow_generation in LangSmith traces).
|
|
// Fall back to direct call if traceable wrapper fails.
|
|
const evalStart = Date.now();
|
|
let feedback: Feedback[];
|
|
try {
|
|
feedback = await traceableEvaluateWorkflow({
|
|
workflow,
|
|
evaluators,
|
|
context,
|
|
evalTimeoutMs: timeoutMs,
|
|
evalLifecycle: lifecycle,
|
|
});
|
|
} catch {
|
|
feedback = await evaluateWithPlugins(workflow, evaluators, context, timeoutMs, lifecycle);
|
|
}
|
|
const evalDurationMs = Date.now() - evalStart;
|
|
const totalDurationMs = Date.now() - startTime;
|
|
|
|
const score = calculateExampleScore(feedback);
|
|
const status = hasErrorFeedback(feedback)
|
|
? 'error'
|
|
: determineStatus({ score, passThreshold });
|
|
|
|
updateStats(stats, status, score, totalDurationMs);
|
|
const metrics = getMetrics();
|
|
|
|
const result: ExampleResult = {
|
|
index,
|
|
prompt,
|
|
status,
|
|
score,
|
|
feedback,
|
|
durationMs: totalDurationMs,
|
|
generationDurationMs: genDurationMs,
|
|
evaluationDurationMs: evalDurationMs,
|
|
generationInputTokens: metrics.genInputTokens,
|
|
generationOutputTokens: metrics.genOutputTokens,
|
|
subgraphMetrics: buildSubgraphMetrics(metrics),
|
|
introspectionEvents: metrics.introspectionEvents,
|
|
workflow,
|
|
generatedCode,
|
|
agentTextResponse,
|
|
};
|
|
|
|
artifactSaver?.saveExample(result);
|
|
capturedResults.push(result);
|
|
lifecycle?.onExampleComplete?.(index, result);
|
|
|
|
// Create metrics feedback for LangSmith (subgraph timing + node count)
|
|
const metricsFeedback = createMetricsFeedback(metrics);
|
|
|
|
return {
|
|
workflow,
|
|
prompt,
|
|
// Include metrics feedback in the array sent to LangSmith
|
|
feedback: [...feedback, ...metricsFeedback],
|
|
};
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
const workflow: SimpleWorkflow = { name: 'Evaluation Error', nodes: [], connections: {} };
|
|
const feedback = createErrorFeedback(errorMessage);
|
|
|
|
const totalDurationMs = Date.now() - startTime;
|
|
const genDurationMs = Date.now() - genStart;
|
|
updateStats(stats, 'error', 0, totalDurationMs);
|
|
|
|
const result: ExampleResult = {
|
|
index,
|
|
prompt,
|
|
status: 'error',
|
|
score: 0,
|
|
feedback,
|
|
durationMs: totalDurationMs,
|
|
generationDurationMs: genDurationMs,
|
|
workflow,
|
|
error: errorMessage,
|
|
};
|
|
|
|
artifactSaver?.saveExample(result);
|
|
capturedResults.push(result);
|
|
lifecycle?.onExampleComplete?.(index, result);
|
|
|
|
return { workflow, prompt, feedback };
|
|
}
|
|
};
|
|
|
|
const feedbackExtractor = createLangsmithFeedbackExtractor();
|
|
|
|
if (typeof dataset !== 'string') {
|
|
throw new Error('LangSmith mode requires dataset to be a dataset name string');
|
|
}
|
|
|
|
const data = await resolveAndEnrichLangsmithData({ dataset, langsmithOptions, lsClient, logger });
|
|
|
|
const effectiveData = applyRepetitions(data, langsmithOptions.repetitions);
|
|
|
|
totalExamples = Array.isArray(effectiveData) ? effectiveData.length : 0;
|
|
|
|
logLangsmithInputsSummary(logger, effectiveData);
|
|
const { experimentName, experimentId, datasetId } = await runLangsmithEvaluateAndFlush({
|
|
target,
|
|
effectiveData,
|
|
feedbackExtractor,
|
|
langsmithOptions,
|
|
lsClient,
|
|
logger,
|
|
targetCallCount: () => targetCallCount,
|
|
});
|
|
|
|
// Compute evaluator averages from captured results
|
|
|
|
const evaluatorAverages = computeEvaluatorAverages(capturedResults, logger);
|
|
|
|
const summary: RunSummary = buildLangsmithSummary({
|
|
stats,
|
|
langsmithData: { experimentName, experimentId, datasetId },
|
|
evaluatorAverages,
|
|
});
|
|
|
|
if (artifactSaver) {
|
|
artifactSaver.saveSummary(summary, capturedResults);
|
|
}
|
|
|
|
// Write CSV if requested
|
|
if (outputCsv) {
|
|
const { writeResultsCsv } = await import('./csv-writer.js');
|
|
// Map suite to CSV format
|
|
const csvSuite =
|
|
suite === 'llm-judge' || suite === 'pairwise' || suite === 'binary-checks'
|
|
? suite
|
|
: undefined;
|
|
writeResultsCsv(capturedResults, outputCsv, { suite: csvSuite });
|
|
logger.info(`Results written to: ${outputCsv}`);
|
|
}
|
|
|
|
await lifecycle?.onEnd?.(summary);
|
|
|
|
return summary;
|
|
}
|
|
|
|
/**
|
|
* Main entry point for running evaluations.
|
|
*/
|
|
export async function runEvaluation(config: RunConfig): Promise<RunSummary> {
|
|
if (config.mode === 'langsmith') {
|
|
return await runLangsmith(config);
|
|
}
|
|
return await runLocal(config);
|
|
}
|