n8n/packages/@n8n/instance-ai/evaluations/harness/runner.ts
José Braulio González Valido 30d9a168bc
feat(ai-builder): Add --prebuilt-workflows flag for eval CLI (no-changelog) (#29830)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 11:47:29 +00:00

600 lines
18 KiB
TypeScript

// ---------------------------------------------------------------------------
// Workflow test case evaluation orchestrator
//
// Manages the full lifecycle of a workflow test case evaluation:
// authentication, SSE capture, workflow build, scenario execution with
// LLM-mocked HTTP, checklist verification, and result aggregation.
// ---------------------------------------------------------------------------
import type { InstanceAiEvalExecutionResult } from '@n8n/api-types';
import crypto from 'node:crypto';
import { setTimeout as delay } from 'node:timers/promises';
import { SSE_SETTLE_DELAY_MS, startSseConnection, waitForAllActivity } from './chat-loop';
import { type EvalLogger } from './logger';
import { fetchPrebuiltBuild } from './prebuilt-workflows';
import { verifyChecklist } from '../checklist/verifier';
import type { N8nClient, WorkflowResponse } from '../clients/n8n-client';
import { extractOutcomeFromEvents } from '../outcome/event-parser';
import { buildAgentOutcome, extractWorkflowIdsFromMessages } from '../outcome/workflow-discovery';
import type {
ChecklistItem,
CapturedEvent,
ScenarioResult,
TestScenario,
WorkflowTestCase,
WorkflowTestCaseResult,
} from '../types';
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const DEFAULT_TIMEOUT_MS = 900_000;
/** Max concurrent scenario executions per test case */
const MAX_CONCURRENT_SCENARIOS = 99;
// ---------------------------------------------------------------------------
// Workflow test case runner — build once, run scenarios against it
// ---------------------------------------------------------------------------
interface WorkflowTestCaseConfig {
client: N8nClient;
testCase: WorkflowTestCase;
timeoutMs: number;
seededCredentialTypes: string[];
preRunWorkflowIds: Set<string>;
claimedWorkflowIds: Set<string>;
logger: EvalLogger;
keepWorkflows: boolean;
/** Optional " [lane N/M]" suffix appended to per-build log lines. */
laneTag?: string;
/** When set, skip the orchestrator build and verify this existing workflow
* instead. The harness leaves it in place — caller owns its lifecycle. */
prebuiltWorkflowId?: string;
}
/**
* All-in-one test case runner: build workflow + run all scenarios + cleanup.
* Used by the CLI. The split API (buildWorkflow + executeScenario + cleanupBuild)
* is available for custom orchestration (e.g. LangSmith evaluate).
*/
export async function runWorkflowTestCase(
config: WorkflowTestCaseConfig,
): Promise<WorkflowTestCaseResult> {
const { client, testCase, logger } = config;
const timeoutMs = config.timeoutMs > 0 ? config.timeoutMs : DEFAULT_TIMEOUT_MS;
const result: WorkflowTestCaseResult = {
testCase,
workflowBuildSuccess: false,
scenarioResults: [],
};
const build = config.prebuiltWorkflowId
? await fetchPrebuiltBuild(client, config.prebuiltWorkflowId, logger)
: await buildWorkflow({
client,
prompt: testCase.prompt,
timeoutMs,
preRunWorkflowIds: config.preRunWorkflowIds,
claimedWorkflowIds: config.claimedWorkflowIds,
logger,
laneTag: config.laneTag,
});
if (!build.success || !build.workflowId) {
result.buildError = build.error;
return result;
}
result.workflowBuildSuccess = true;
result.workflowId = build.workflowId;
result.workflowJson = build.workflowJsons[0];
const scenarioStart = Date.now();
result.scenarioResults = await runWithConcurrency(
testCase.scenarios,
async (scenario) => {
try {
return await executeScenario(
client,
build.workflowId!,
scenario,
build.workflowJsons,
logger,
timeoutMs,
);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.error(` ERROR [${scenario.name}]: ${errorMessage}`);
return {
scenario,
success: false,
score: 0,
reasoning: `Error: ${errorMessage}`,
} satisfies ScenarioResult;
}
},
MAX_CONCURRENT_SCENARIOS,
);
const scenarioMs = Date.now() - scenarioStart;
logger.info(
` Scenarios done: ${String(result.scenarioResults.length)} scenarios [${String(Math.round(scenarioMs / 1000))}s]${config.laneTag ?? ''}`,
);
if (!config.keepWorkflows) {
await cleanupBuild(client, build, logger);
}
return result;
}
// ---------------------------------------------------------------------------
// Split API: build once, run scenarios independently
// ---------------------------------------------------------------------------
export interface BuildResult {
success: boolean;
workflowId?: string;
workflowJsons: WorkflowResponse[];
error?: string;
/** IDs to pass to cleanupBuild() */
createdWorkflowIds: string[];
createdDataTableIds: string[];
}
export interface BuildWorkflowConfig {
client: N8nClient;
prompt: string;
timeoutMs?: number;
preRunWorkflowIds: Set<string>;
claimedWorkflowIds: Set<string>;
logger: EvalLogger;
/** Optional " [lane N/M]" suffix appended to the build log line. */
laneTag?: string;
}
/**
* Build a workflow via Instance AI. Returns the workflow ID for use with
* executeScenario(). Call cleanupBuild() when done.
*/
export async function buildWorkflow(config: BuildWorkflowConfig): Promise<BuildResult> {
const { client, prompt, logger } = config;
const threadId = `eval-${crypto.randomUUID()}`;
const startTime = Date.now();
const timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
const abortController = new AbortController();
const events: CapturedEvent[] = [];
const approvedRequests = new Set<string>();
try {
const buildStart = Date.now();
logger.info(` Building workflow: "${truncate(prompt, 60)}"${config.laneTag ?? ''}`);
const ssePromise = startSseConnection(client, threadId, events, abortController.signal).catch(
() => {},
);
await delay(SSE_SETTLE_DELAY_MS);
await client.sendMessage(threadId, prompt);
await waitForAllActivity({
client,
threadId,
events,
approvedRequests,
startTime,
timeoutMs,
logger,
});
abortController.abort();
await ssePromise.catch(() => {});
let threadMessages;
try {
threadMessages = await client.getThreadMessages(threadId);
} catch {
threadMessages = { messages: [] };
}
const messageWorkflowIds = extractWorkflowIdsFromMessages(threadMessages.messages);
const eventOutcome = extractOutcomeFromEvents(events);
const outcome = await buildAgentOutcome(
client,
eventOutcome,
config.preRunWorkflowIds,
config.claimedWorkflowIds,
);
if (messageWorkflowIds.length > 0) {
const messageWfSet = new Set(messageWorkflowIds);
outcome.workflowsCreated = outcome.workflowsCreated.filter((wf) => messageWfSet.has(wf.id));
outcome.workflowJsons = outcome.workflowJsons.filter(
(wf) => typeof wf.id === 'string' && messageWfSet.has(wf.id),
);
}
if (outcome.workflowsCreated.length === 0) {
const toolErrors = events
.filter((e) => e.type === 'tool-error')
.map((e) => {
const payload =
typeof e.data.payload === 'object' && e.data.payload !== null
? (e.data.payload as Record<string, unknown>)
: e.data;
const toolError = payload.error ?? payload.message;
return typeof toolError === 'string' ? toolError : 'unknown tool error';
});
const agentText = events
.filter((e) => e.type === 'text-delta')
.map((e) => {
const text =
typeof e.data.text === 'string'
? e.data.text
: typeof e.data.payload === 'object' &&
e.data.payload !== null &&
'text' in (e.data.payload as Record<string, unknown>)
? String((e.data.payload as Record<string, unknown>).text)
: '';
return text;
})
.join('');
const buildError =
toolErrors.length > 0
? `Tool errors: ${toolErrors.join('; ')}`
: agentText.length > 0
? `Agent response: ${agentText.slice(0, 500)}`
: 'No workflow produced — no error details captured';
return {
success: false,
error: buildError,
workflowJsons: [],
createdWorkflowIds: [],
createdDataTableIds: outcome.dataTablesCreated,
};
}
const buildMs = Date.now() - buildStart;
logger.info(
` Workflow built: ${outcome.workflowsCreated[0].name} (${String(outcome.workflowsCreated[0].nodeCount)} nodes) [${String(Math.round(buildMs / 1000))}s]`,
);
return {
success: true,
workflowId: outcome.workflowsCreated[0].id,
workflowJsons: outcome.workflowJsons,
createdWorkflowIds: outcome.workflowsCreated.map((wf) => wf.id),
createdDataTableIds: outcome.dataTablesCreated,
};
} catch (error: unknown) {
abortController.abort();
return {
success: false,
error: error instanceof Error ? error.message : String(error),
workflowJsons: [],
createdWorkflowIds: [],
createdDataTableIds: [],
};
}
}
/**
* Execute a single scenario against a pre-built workflow and verify the result.
*/
export async function executeScenario(
client: N8nClient,
workflowId: string,
scenario: TestScenario,
workflowJsons: WorkflowResponse[],
logger: EvalLogger,
timeoutMs?: number,
): Promise<ScenarioResult> {
return await runScenario(client, scenario, workflowId, workflowJsons, logger, timeoutMs);
}
/**
* Clean up workflows and data tables created during a build.
*/
export async function cleanupBuild(
client: N8nClient,
build: BuildResult,
logger: EvalLogger,
): Promise<void> {
for (const id of build.createdWorkflowIds) {
try {
await client.deleteWorkflow(id);
} catch {
// Best-effort cleanup
}
}
if (build.createdDataTableIds.length > 0) {
try {
const projectId = await client.getPersonalProjectId();
for (const dtId of build.createdDataTableIds) {
try {
await client.deleteDataTable(projectId, dtId);
} catch {
// Best-effort cleanup
}
}
logger.verbose(` Cleaned up ${String(build.createdDataTableIds.length)} data table(s)`);
} catch {
// Non-fatal — project ID lookup may fail
}
}
}
// ---------------------------------------------------------------------------
// Scenario execution (internal)
// ---------------------------------------------------------------------------
async function runScenario(
client: N8nClient,
scenario: TestScenario,
workflowId: string,
workflowJsons: WorkflowResponse[],
logger: EvalLogger,
timeoutMs?: number,
): Promise<ScenarioResult> {
const execStart = Date.now();
const evalResult = await client.executeWithLlmMock(workflowId, scenario.dataSetup, timeoutMs);
const execMs = Date.now() - execStart;
logger.info(
` [${scenario.name}] exec=${String(Math.round(execMs / 1000))}s (${Object.keys(evalResult.nodeResults).length} nodes)`,
);
const verifyStart = Date.now();
const verificationArtifact = buildVerificationArtifact(scenario, evalResult, workflowJsons);
const scenarioChecklist: ChecklistItem[] = [
{
id: 1,
description: scenario.successCriteria,
category: 'execution',
strategy: 'llm',
},
];
const verificationResults = await verifyChecklist(
scenarioChecklist,
verificationArtifact,
workflowJsons,
);
const verifyMs = Date.now() - verifyStart;
const passed = verificationResults.length > 0 && verificationResults[0].pass;
const result = verificationResults[0];
const reasoning = result?.reasoning ?? 'No verification result — LLM verifier returned empty';
const failureCategory = result?.failureCategory ?? (result ? undefined : 'verification_failure');
const rootCause = result?.rootCause;
const categoryLabel = failureCategory ? ` [${failureCategory}]` : '';
logger.info(
` [${scenario.name}] ${passed ? 'PASS' : 'FAIL'}${categoryLabel} verify=${String(Math.round(verifyMs / 1000))}s`,
);
if (!passed) {
logger.info(` [${scenario.name}] ${reasoning}`);
}
return {
scenario,
success: passed,
evalResult,
score: passed ? 1 : 0,
reasoning,
failureCategory,
rootCause,
};
}
// ---------------------------------------------------------------------------
// Verification artifact builder
// ---------------------------------------------------------------------------
/**
* Build a rich verification artifact from the execution result.
* Includes execution trace with mock responses, config issues,
* and pre-analysis flags so the verifier can diagnose root causes.
*/
function buildVerificationArtifact(
scenario: TestScenario,
evalResult: InstanceAiEvalExecutionResult,
workflowJsons: WorkflowResponse[],
): string {
const sections: string[] = [];
// --- Scenario context ---
sections.push(
'## Scenario',
'',
`**Name:** ${scenario.name}${scenario.description}`,
`**Data setup:** ${scenario.dataSetup}`,
'',
);
// --- Pre-analysis: flag known issues programmatically ---
const preAnalysis: string[] = [];
// Flag Phase 1 failures — these cause empty trigger data and cascade failures
if (evalResult.hints.warnings.length > 0) {
for (const warning of evalResult.hints.warnings) {
preAnalysis.push(`⚠ FRAMEWORK ISSUE: ${warning}`);
}
}
if (Object.keys(evalResult.hints.triggerContent).length === 0) {
preAnalysis.push(
'⚠ FRAMEWORK ISSUE: Trigger content is empty — the start node received no input data. All downstream failures are likely caused by this, not by the workflow builder.',
);
}
for (const [nodeName, nr] of Object.entries(evalResult.nodeResults)) {
if (nr.configIssues && Object.keys(nr.configIssues).length > 0) {
preAnalysis.push(
`⚠ BUILDER ISSUE: "${nodeName}" has missing config: ${Object.values(nr.configIssues).flat().join('; ')}`,
);
}
for (const req of nr.interceptedRequests) {
if (
typeof req.mockResponse === 'object' &&
req.mockResponse !== null &&
'_evalMockError' in (req.mockResponse as Record<string, unknown>)
) {
const msg = (req.mockResponse as Record<string, unknown>).message;
const msgStr = typeof msg === 'string' ? msg : 'unknown';
preAnalysis.push(
`⚠ MOCK ISSUE: "${nodeName}" ${req.method} ${req.url} → mock generation failed: ${msgStr}`,
);
}
}
}
if (preAnalysis.length > 0) {
sections.push('## Pre-analysis (automated flags)', '', ...preAnalysis, '');
}
// --- Execution summary ---
const mockedNodes: string[] = [];
const pinnedNodes: string[] = [];
const realNodes: string[] = [];
for (const [nodeName, nr] of Object.entries(evalResult.nodeResults)) {
if (nr.executionMode === 'mocked') mockedNodes.push(nodeName);
else if (nr.executionMode === 'pinned') pinnedNodes.push(nodeName);
else realNodes.push(nodeName);
}
sections.push(
'## Execution summary',
'',
`**Status:** ${evalResult.success ? 'success' : 'failed'}`,
`**Mocked nodes** (HTTP intercepted, responses generated by LLM): ${mockedNodes.join(', ') || 'none'}`,
`**Pinned nodes** (trigger data provided, not executed): ${pinnedNodes.join(', ') || 'none'}`,
`**Real nodes** (executed with actual logic on mock/pinned data): ${realNodes.join(', ') || 'none'}`,
'',
);
if (evalResult.errors.length > 0) {
sections.push('## Errors', '', ...evalResult.errors.map((e) => `- ${e}`), '');
}
// --- Build a node config lookup from workflow JSON ---
const nodeConfigs = new Map<string, Record<string, unknown>>();
const wf = workflowJsons[0];
if (wf) {
for (const node of wf.nodes) {
if (node.name && node.parameters) {
nodeConfigs.set(node.name, { type: node.type, parameters: node.parameters });
}
}
}
// --- Workflow structure: ALL nodes and connections ---
const executedNodeNames = new Set(Object.keys(evalResult.nodeResults));
if (wf) {
sections.push('## Workflow structure (all nodes)', '');
for (const node of wf.nodes) {
const ran = node.name ? executedNodeNames.has(node.name) : false;
const status = ran ? 'EXECUTED' : 'DID NOT RUN';
sections.push(`- **${node.name ?? '(unnamed)'}** (${node.type}) — ${status}`);
}
sections.push('');
sections.push('**Connections:**');
sections.push('```json', JSON.stringify(wf.connections, null, 2), '```');
sections.push('');
}
// --- Execution trace: per-node detail (sorted by execution order) ---
sections.push('## Execution trace', '');
const sortedNodeResults = Object.entries(evalResult.nodeResults).sort(
([, a], [, b]) => (a.startTime ?? 0) - (b.startTime ?? 0),
);
for (const [nodeName, nr] of sortedNodeResults) {
sections.push(`### ${nodeName} [${nr.executionMode}]`);
// Node configuration (from workflow JSON)
const nodeConfig = nodeConfigs.get(nodeName);
if (nodeConfig) {
sections.push('**Node config:**');
sections.push('```json', JSON.stringify(nodeConfig, null, 2), '```');
}
// Config issues
if (nr.configIssues && Object.keys(nr.configIssues).length > 0) {
sections.push(`**Config issues:** ${Object.values(nr.configIssues).flat().join('; ')}`);
}
// Intercepted requests + mock responses (for mocked nodes)
for (const req of nr.interceptedRequests) {
sections.push(`**Request:** ${req.method} ${req.url}`);
if (req.requestBody) {
sections.push('```json', JSON.stringify(req.requestBody, null, 2), '```');
}
if (req.mockResponse) {
sections.push('**Mock response:**');
sections.push('```json', JSON.stringify(req.mockResponse, null, 2), '```');
}
}
// Node output
if (nr.output !== null && nr.output !== undefined) {
sections.push('**Output:**');
sections.push('```json', JSON.stringify(nr.output, null, 2), '```');
} else {
sections.push('**Output:** none');
}
sections.push('');
}
return sections.join('\n');
}
// ---------------------------------------------------------------------------
// Concurrency control
// ---------------------------------------------------------------------------
/**
* Run tasks with bounded concurrency. Like Promise.all but limits how many
* tasks execute simultaneously to avoid API rate limits.
*/
export async function runWithConcurrency<T, R>(
items: T[],
fn: (item: T) => Promise<R>,
limit: number,
): Promise<R[]> {
const results = new Array<R>(items.length);
let nextIndex = 0;
async function worker(): Promise<void> {
while (nextIndex < items.length) {
const index = nextIndex++;
results[index] = await fn(items[index]);
}
}
const workers = Array.from({ length: Math.min(limit, items.length) }, async () => await worker());
await Promise.all(workers);
return results;
}
// ---------------------------------------------------------------------------
// Utility helpers
// ---------------------------------------------------------------------------
function truncate(text: string, maxLength: number): string {
if (text.length <= maxLength) return text;
return text.slice(0, maxLength) + '...';
}