ci(ai-builder): Parallelize Instance AI eval CI across multiple n8n containers (#29545)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
José Braulio González Valido 2026-04-30 08:22:28 +01:00 committed by GitHub
parent 0dbe6c533e
commit 4fd68bfc99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 702 additions and 174 deletions

View File

@ -266,10 +266,15 @@ jobs:
ref: ${{ needs.install-and-build.outputs.commit_sha }}
secrets: inherit
# Depends on prepare-docker so the eval workflow can load the SHA-keyed image cache.
# prepare-docker may be skipped (its filter excludes .github/**); the eval falls back to a local build.
instance-ai-workflow-evals:
name: Instance AI Workflow Evals
needs: install-and-build
needs: [install-and-build, prepare-docker]
if: >-
!cancelled() &&
needs.install-and-build.result == 'success' &&
(needs.prepare-docker.result == 'success' || needs.prepare-docker.result == 'skipped') &&
needs.install-and-build.outputs.instance_ai_workflow_eval == 'true' &&
github.repository == 'n8n-io/n8n' &&
(github.event_name != 'pull_request' || !github.event.pull_request.head.repo.fork)

View File

@ -29,6 +29,12 @@ jobs:
name: 'Run Evals'
runs-on: blacksmith-4vcpu-ubuntu-2204
timeout-minutes: 45
env:
# Each port hosts an independent n8n container. The eval CLI's
# work-stealing allocator dispatches builds across them, capped per-lane.
# 9 lanes on 4vcpu — builds are LLM-bound so CPU headroom is sufficient;
# bump back to 8vcpu if contention shows up.
LANE_PORTS: '5678,5679,5680,5681,5682,5683,5684,5685,5686'
permissions:
contents: read
pull-requests: write
@ -45,56 +51,80 @@ jobs:
with:
build-command: 'pnpm build'
- name: Build Docker image
# Cache populated by prepare-docker; fallback covers PRs that only touch this workflow file.
- name: Load n8n Docker image
id: load-image
continue-on-error: true
uses: ./.github/actions/load-n8n-docker
- name: Build Docker image (fallback on cache miss)
if: steps.load-image.outcome == 'failure'
run: pnpm build:docker
env:
INCLUDE_TEST_CONTROLLER: 'true'
- name: Start n8n container
- name: Start n8n containers
env:
EVALS_ANTHROPIC_KEY: ${{ secrets.EVALS_ANTHROPIC_KEY }}
N8N_LICENSE_ACTIVATION_KEY: ${{ secrets.N8N_LICENSE_ACTIVATION_KEY }}
N8N_LICENSE_CERT: ${{ secrets.N8N_LICENSE_CERT }}
N8N_ENCRYPTION_KEY: ${{ secrets.N8N_ENCRYPTION_KEY }}
run: |
docker run -d --name n8n-eval \
-e E2E_TESTS=true \
-e N8N_ENABLED_MODULES=instance-ai \
-e N8N_AI_ENABLED=true \
-e N8N_INSTANCE_AI_MODEL_API_KEY=${{ secrets.EVALS_ANTHROPIC_KEY }} \
-e N8N_LICENSE_ACTIVATION_KEY=${{ secrets.N8N_LICENSE_ACTIVATION_KEY }} \
-e N8N_LICENSE_CERT=${{ secrets.N8N_LICENSE_CERT }} \
-e N8N_ENCRYPTION_KEY=${{ secrets.N8N_ENCRYPTION_KEY }} \
-p 5678:5678 \
n8nio/n8n:local
echo "Waiting for n8n to be ready..."
for i in $(seq 1 60); do
if curl -s http://localhost:5678/healthz/readiness -o /dev/null -w "%{http_code}" | grep -q 200; then
echo "n8n ready after ${i}s"
exit 0
fi
sleep 1
IFS=',' read -ra PORTS <<< "$LANE_PORTS"
for i in "${!PORTS[@]}"; do
port="${PORTS[$i]}"
docker run -d --name "n8n-eval-$((i+1))" \
-e E2E_TESTS=true \
-e N8N_ENABLED_MODULES=instance-ai \
-e N8N_AI_ENABLED=true \
-e N8N_INSTANCE_AI_MODEL_API_KEY="$EVALS_ANTHROPIC_KEY" \
-e N8N_AI_ASSISTANT_BASE_URL="" \
-e N8N_LICENSE_ACTIVATION_KEY="$N8N_LICENSE_ACTIVATION_KEY" \
-e N8N_LICENSE_CERT="$N8N_LICENSE_CERT" \
-e N8N_ENCRYPTION_KEY="$N8N_ENCRYPTION_KEY" \
-p "$port:5678" \
n8nio/n8n:local
done
# 120s budget per port: containers booting in parallel on a shared
# 4vcpu runner contend for CPU/disk during n8n's startup (DB migrations,
# license init), so each takes longer than a solo boot.
for port in "${PORTS[@]}"; do
ready=false
for i in $(seq 1 120); do
if curl -s "http://localhost:$port/healthz/readiness" -o /dev/null -w "%{http_code}" | grep -q 200; then
echo "n8n on port $port ready after ${i}s"
ready=true
break
fi
sleep 1
done
if [ "$ready" != "true" ]; then
echo "::error::n8n on port $port failed to start within 120s"
for n in $(docker ps -aq --filter "name=n8n-eval-"); do
echo "Logs for $n:"
docker logs "$n" --tail 30 || true
done
exit 1
fi
done
echo "::error::n8n failed to start within 60s"
docker logs n8n-eval --tail 30
exit 1
- name: Create test user
- name: Create test users
run: |
curl -sf -X POST http://localhost:5678/rest/e2e/reset \
-H "Content-Type: application/json" \
-d '{
"owner":{"email":"nathan@n8n.io","password":"PlaywrightTest123","firstName":"Eval","lastName":"Owner"},
"admin":{"email":"admin@n8n.io","password":"PlaywrightTest123","firstName":"Admin","lastName":"User"},
"members":[],
"chat":{"email":"chat@n8n.io","password":"PlaywrightTest123","firstName":"Chat","lastName":"User"}
}'
IFS=',' read -ra PORTS <<< "$LANE_PORTS"
for port in "${PORTS[@]}"; do
curl -sf -X POST "http://localhost:$port/rest/e2e/reset" \
-H "Content-Type: application/json" \
-d '{
"owner":{"email":"nathan@n8n.io","password":"PlaywrightTest123","firstName":"Eval","lastName":"Owner"},
"admin":{"email":"admin@n8n.io","password":"PlaywrightTest123","firstName":"Admin","lastName":"User"},
"members":[],
"chat":{"email":"chat@n8n.io","password":"PlaywrightTest123","firstName":"Chat","lastName":"User"}
}'
done
- name: Run Instance AI Evals
continue-on-error: true
working-directory: packages/@n8n/instance-ai
run: >-
pnpm eval:instance-ai
--base-url http://localhost:5678
--concurrency 4
--verbose
--iterations 3
${{ inputs.filter && format('--filter "{0}"', inputs.filter) || '' }}
env:
N8N_INSTANCE_AI_MODEL_API_KEY: ${{ secrets.EVALS_ANTHROPIC_KEY }}
LANGSMITH_TRACING: 'true'
@ -102,10 +132,28 @@ jobs:
LANGSMITH_API_KEY: ${{ secrets.EVALS_LANGSMITH_API_KEY }}
LANGSMITH_REVISION_ID: ${{ github.sha }}
LANGSMITH_BRANCH: ${{ github.head_ref || github.ref_name }}
run: |
IFS=',' read -ra PORTS <<< "$LANE_PORTS"
URLS=()
for port in "${PORTS[@]}"; do
URLS+=("http://localhost:$port")
done
BASE_URLS=$(IFS=,; printf '%s' "${URLS[*]}")
pnpm eval:instance-ai \
--base-url "$BASE_URLS" \
--concurrency 32 \
--verbose \
--iterations 3 \
${{ inputs.filter && format('--filter "{0}"', inputs.filter) || '' }}
- name: Stop n8n container
- name: Stop n8n containers
if: ${{ always() }}
run: docker stop n8n-eval && docker rm n8n-eval || true
run: |
mapfile -t ids < <(docker ps -aq --filter "name=n8n-eval-")
if [ "${#ids[@]}" -gt 0 ]; then
docker stop "${ids[@]}" 2>/dev/null || true
docker rm "${ids[@]}" 2>/dev/null || true
fi
- name: Post eval results to PR
if: ${{ always() && github.event_name == 'pull_request' }}

View File

@ -0,0 +1,39 @@
import { parseCliArgs } from '../cli/args';
describe('parseCliArgs --base-url', () => {
it('defaults to a single localhost URL when --base-url is not provided', () => {
const args = parseCliArgs([]);
expect(args.baseUrls).toEqual(['http://localhost:5678']);
});
it('accepts a single URL', () => {
const args = parseCliArgs(['--base-url', 'http://localhost:5678']);
expect(args.baseUrls).toEqual(['http://localhost:5678']);
});
it('splits comma-separated URLs into a list of lanes', () => {
const args = parseCliArgs([
'--base-url',
'http://localhost:5678,http://localhost:5679,http://localhost:5680',
]);
expect(args.baseUrls).toEqual([
'http://localhost:5678',
'http://localhost:5679',
'http://localhost:5680',
]);
});
it('trims surrounding whitespace from each URL', () => {
const args = parseCliArgs(['--base-url', ' http://localhost:5678 , http://localhost:5679 ']);
expect(args.baseUrls).toEqual(['http://localhost:5678', 'http://localhost:5679']);
});
it('drops empty entries from a stray comma', () => {
const args = parseCliArgs(['--base-url', 'http://localhost:5678,,http://localhost:5679']);
expect(args.baseUrls).toEqual(['http://localhost:5678', 'http://localhost:5679']);
});
it('rejects a non-URL entry', () => {
expect(() => parseCliArgs(['--base-url', 'http://localhost:5678,not-a-url'])).toThrow();
});
});

View File

@ -0,0 +1,94 @@
import { LaneAllocator, type AllocatableLane } from '../cli/lane-allocator';
interface TestLane extends AllocatableLane {
id: number;
}
function newLanes(count: number): TestLane[] {
return Array.from({ length: count }, (_, i) => ({
id: i,
activeBuilds: 0,
inflightPrompts: new Set<string>(),
}));
}
describe('LaneAllocator', () => {
it('spreads builds across lanes by picking the least-loaded eligible lane', async () => {
const lanes = newLanes(3);
const a = new LaneAllocator(lanes, 4);
const l1 = await a.acquire('p1');
const l2 = await a.acquire('p2');
const l3 = await a.acquire('p3');
expect([l1.id, l2.id, l3.id]).toEqual([0, 1, 2]);
expect(lanes.map((l) => l.activeBuilds)).toEqual([1, 1, 1]);
});
it('skips a lane already running the same prompt', async () => {
const lanes = newLanes(2);
const a = new LaneAllocator(lanes, 4);
const l1 = await a.acquire('p1');
const l2 = await a.acquire('p1');
expect(l1.id).toBe(0);
expect(l2.id).toBe(1);
expect(lanes[0].inflightPrompts.has('p1')).toBe(true);
expect(lanes[1].inflightPrompts.has('p1')).toBe(true);
});
it('queues acquires when no lane can serve the prompt', async () => {
const lanes = newLanes(1);
const a = new LaneAllocator(lanes, 4);
await a.acquire('p1');
let resolvedSecond = false;
const second = a.acquire('p1').then((l) => {
resolvedSecond = true;
return l;
});
await new Promise((r) => setImmediate(r));
expect(resolvedSecond).toBe(false);
a.release(lanes[0], 'p1');
const lane = await second;
expect(lane.id).toBe(0);
expect(lanes[0].inflightPrompts.has('p1')).toBe(true);
});
it('respects maxConcurrentBuilds per lane', async () => {
const lanes = newLanes(1);
const a = new LaneAllocator(lanes, 2);
await a.acquire('p1');
await a.acquire('p2');
let resolved = false;
const blocked = a.acquire('p3').then((l) => {
resolved = true;
return l;
});
await new Promise((r) => setImmediate(r));
expect(resolved).toBe(false);
a.release(lanes[0], 'p1');
await blocked;
expect(resolved).toBe(true);
});
it('skips queued waiters with conflicting prompts when a lane frees up', async () => {
const lanes = newLanes(1);
const a = new LaneAllocator(lanes, 2);
await a.acquire('p1');
await a.acquire('p2');
const order: string[] = [];
const w1 = a.acquire('p1').then((l) => {
order.push('p1');
return l;
});
const w3 = a.acquire('p3').then((l) => {
order.push('p3');
return l;
});
await new Promise((r) => setImmediate(r));
expect(order).toEqual([]);
a.release(lanes[0], 'p2');
await w3;
expect(order).toEqual(['p3']);
a.release(lanes[0], 'p1');
await w1;
expect(order).toEqual(['p3', 'p1']);
});
});

View File

@ -0,0 +1,117 @@
import { expandWithIterations, partitionRoundRobin } from '../cli/lanes';
describe('partitionRoundRobin', () => {
it('splits 9 items into 3 lanes by index modulo', () => {
const items = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'];
expect(partitionRoundRobin(items, 3)).toEqual([
['a', 'd', 'g'],
['b', 'e', 'h'],
['c', 'f', 'i'],
]);
});
it('returns a single bucket containing every item when laneCount is 1', () => {
const items = [1, 2, 3, 4, 5];
expect(partitionRoundRobin(items, 1)).toEqual([[1, 2, 3, 4, 5]]);
});
it('returns empty buckets for lanes that get no items when laneCount > items.length', () => {
const items = ['only'];
expect(partitionRoundRobin(items, 3)).toEqual([['only'], [], []]);
});
it('returns laneCount empty buckets when items is empty', () => {
expect(partitionRoundRobin([], 3)).toEqual([[], [], []]);
});
it('preserves item identity (no clone)', () => {
const a = { id: 'a' };
const b = { id: 'b' };
const buckets = partitionRoundRobin([a, b], 2);
expect(buckets[0][0]).toBe(a);
expect(buckets[1][0]).toBe(b);
});
it('throws when laneCount < 1', () => {
expect(() => partitionRoundRobin([1, 2], 0)).toThrow(/laneCount must be >= 1/);
expect(() => partitionRoundRobin([1, 2], -1)).toThrow(/laneCount must be >= 1/);
});
it('reconstructs source order when re-sorted by an embedded original index', () => {
// Mirrors runDirectLoop's flow: tag each item with its origIdx, partition
// across lanes, flatten lane outputs, sort back by origIdx.
const items = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'];
const indexed = items.map((value, origIdx) => ({ value, origIdx }));
const buckets = partitionRoundRobin(indexed, 3);
const flat = buckets.flat();
flat.sort((x, y) => x.origIdx - y.origIdx);
expect(flat.map((x) => x.value)).toEqual(items);
});
});
describe('expandWithIterations', () => {
type Item = { file: string; scen: string; iter?: number };
const tag = (item: Item, iter: number): Item => ({ ...item, iter });
const getFile = (item: Item): string => item.file;
it('round-robins across files in the first round', () => {
const items: Item[] = [
{ file: 'A', scen: '1' },
{ file: 'A', scen: '2' },
{ file: 'B', scen: '1' },
{ file: 'C', scen: '1' },
];
const out = [...expandWithIterations(items, getFile, 1, tag)];
// Round 1 yields one scenario per file in insertion order, then round 2 picks up A's second scenario.
expect(out.map((x) => `${x.file}.${x.scen}`)).toEqual(['A.1', 'B.1', 'C.1', 'A.2']);
});
it('iter-interleaves per scenario before moving on', () => {
const items: Item[] = [
{ file: 'A', scen: '1' },
{ file: 'B', scen: '1' },
];
const out = [...expandWithIterations(items, getFile, 3, tag)];
expect(out.map((x) => `${x.file}.${x.scen}.${String(x.iter)}`)).toEqual([
'A.1.0',
'A.1.1',
'A.1.2',
'B.1.0',
'B.1.1',
'B.1.2',
]);
});
it('skips files that ran out of scenarios in later rounds', () => {
const items: Item[] = [
{ file: 'A', scen: '1' },
{ file: 'A', scen: '2' },
{ file: 'B', scen: '1' },
];
const out = [...expandWithIterations(items, getFile, 1, tag)];
// Round 1: A.1, B.1. Round 2: A.2 (B has no second scenario, skipped).
expect(out.map((x) => `${x.file}.${x.scen}`)).toEqual(['A.1', 'B.1', 'A.2']);
});
it('yields nothing for empty input', () => {
expect([...expandWithIterations<Item>([], getFile, 3, tag)]).toEqual([]);
});
it('yields nothing when iterations is 0', () => {
const items: Item[] = [{ file: 'A', scen: '1' }];
expect([...expandWithIterations(items, getFile, 0, tag)]).toEqual([]);
});
it('first wave covers all files after enough items pulled', () => {
const items: Item[] = [];
for (const f of ['A', 'B', 'C', 'D', 'E']) {
for (const s of ['1', '2', '3']) items.push({ file: f, scen: s });
}
const out = [...expandWithIterations(items, getFile, 3, tag)];
// Total: 5 files × 3 scenarios × 3 iters = 45 yielded items.
expect(out).toHaveLength(45);
// First 5×3 = 15 items cover one scenario per file × all 3 iterations.
const firstWave = out.slice(0, 15).map((x) => x.file);
expect(new Set(firstWave)).toEqual(new Set(['A', 'B', 'C', 'D', 'E']));
});
});

View File

@ -14,7 +14,11 @@ import { z } from 'zod';
export interface CliArgs {
/** TimeoutMs is defined per iteration, not as the total timeout for all iterations */
timeoutMs: number;
baseUrl: string;
/** One or more n8n base URLs. Multi-lane runs use a work-stealing allocator
* that dispatches each build to a lane that isn't already running its
* prompt, capped per-lane at MAX_CONCURRENT_BUILDS=4. Pass comma-separated
* to `--base-url`. */
baseUrls: string[];
email?: string;
password?: string;
verbose: boolean;
@ -26,7 +30,8 @@ export interface CliArgs {
outputDir?: string;
/** LangSmith dataset name (synced from JSON test cases before each run) */
dataset: string;
/** Max concurrent scenarios in evaluate(). Builds are separately limited to 4 by semaphore. */
/** Max concurrent target() calls in LangSmith evaluate(). Build concurrency is
* enforced separately by the LaneAllocator (cap=4 per lane). */
concurrency: number;
/** LangSmith experiment name prefix (auto-generated if not set) */
experimentName?: string;
@ -41,7 +46,7 @@ export interface CliArgs {
const cliArgsSchema = z.object({
timeoutMs: z.number().int().positive().default(600_000),
baseUrl: z.string().url().default('http://localhost:5678'),
baseUrls: z.array(z.string().url()).min(1).default(['http://localhost:5678']),
email: z.string().optional(),
password: z.string().optional(),
verbose: z.boolean().default(false),
@ -64,7 +69,7 @@ export function parseCliArgs(argv: string[]): CliArgs {
return {
timeoutMs: validated.timeoutMs,
baseUrl: validated.baseUrl,
baseUrls: validated.baseUrls,
email: validated.email,
password: validated.password,
verbose: validated.verbose,
@ -84,7 +89,7 @@ export function parseCliArgs(argv: string[]): CliArgs {
interface RawArgs {
timeoutMs: number;
baseUrl: string;
baseUrls: string[];
email?: string;
password?: string;
verbose: boolean;
@ -100,7 +105,7 @@ interface RawArgs {
function parseRawArgs(argv: string[]): RawArgs {
const result: RawArgs = {
timeoutMs: 600_000,
baseUrl: 'http://localhost:5678',
baseUrls: ['http://localhost:5678'],
verbose: false,
keepWorkflows: false,
outputDir: undefined,
@ -119,10 +124,15 @@ function parseRawArgs(argv: string[]): RawArgs {
i++;
break;
case '--base-url':
result.baseUrl = nextArg(argv, i, '--base-url');
case '--base-url': {
const raw = nextArg(argv, i, '--base-url');
result.baseUrls = raw
.split(',')
.map((s) => s.trim())
.filter((s) => s.length > 0);
i++;
break;
}
case '--email':
result.email = nextArg(argv, i, '--email');

View File

@ -14,13 +14,14 @@ import { evaluate } from 'langsmith/evaluation';
import type { EvaluationResult } from 'langsmith/evaluation';
import type { Example, Run } from 'langsmith/schemas';
import { traceable } from 'langsmith/traceable';
import pLimit from 'p-limit';
import { join } from 'path';
import { z } from 'zod';
import { aggregateResults, passAtK, passHatK } from './aggregator';
import { parseCliArgs } from './args';
import { buildCIMetadata, computeExperimentPrefix } from './ci-metadata';
import { LaneAllocator } from './lane-allocator';
import { expandWithIterations, partitionRoundRobin } from './lanes';
import { N8nClient } from '../clients/n8n-client';
import { seedCredentials, cleanupCredentials } from '../credentials/seeder';
import { loadWorkflowTestCasesWithFiles } from '../data/workflows';
@ -112,21 +113,46 @@ const runInputsSchema = z
/** Target input shape with the iteration index we inject for multi-run. */
type TargetInputs = DatasetExampleInputs & { _iteration?: number };
interface Lane {
client: N8nClient;
preRunWorkflowIds: Set<string>;
claimedWorkflowIds: Set<string>;
seedResult: { seededTypes: string[]; credentialIds: string[] };
}
interface RunConfig {
args: ReturnType<typeof parseCliArgs>;
lanes: Lane[];
logger: EvalLogger;
}
async function main(): Promise<void> {
const args = parseCliArgs(process.argv.slice(2));
const logger = createLogger(args.verbose);
const client = new N8nClient(args.baseUrl);
logger.info(`Authenticating with ${args.baseUrl}...`);
await client.login(args.email, args.password);
logger.success('Authenticated');
// One lane per base URL. The LangSmith path then uses a work-stealing
// allocator (lane-allocator.ts) to dispatch builds across lanes; the direct
// path partitions test cases statically per lane.
const lanes: Lane[] = await Promise.all(
args.baseUrls.map(async (baseUrl, idx) => {
const tag =
args.baseUrls.length > 1
? ` [lane ${String(idx + 1)}/${String(args.baseUrls.length)}]`
: '';
const client = new N8nClient(baseUrl);
logger.info(`Authenticating with ${baseUrl}...${tag}`);
await client.login(args.email, args.password);
logger.success(`Authenticated${tag}`);
logger.info('Seeding credentials...');
const seedResult = await seedCredentials(client, undefined, logger);
logger.info(`Seeded ${String(seedResult.credentialIds.length)} credential(s)`);
logger.info(`Seeding credentials...${tag}`);
const seedResult = await seedCredentials(client, undefined, logger);
logger.info(`Seeded ${String(seedResult.credentialIds.length)} credential(s)${tag}`);
const preRunWorkflowIds = await snapshotWorkflowIds(client);
const claimedWorkflowIds = new Set<string>();
const preRunWorkflowIds = await snapshotWorkflowIds(client);
const claimedWorkflowIds = new Set<string>();
return { client, preRunWorkflowIds, claimedWorkflowIds, seedResult };
}),
);
const startTime = Date.now();
@ -137,24 +163,10 @@ async function main(): Promise<void> {
if (hasLangSmith) {
logger.info('LangSmith API key detected, using evaluate() with experiment tracking');
evaluation = await runWithLangSmith({
args,
client,
preRunWorkflowIds,
claimedWorkflowIds,
logger,
seedResult,
});
evaluation = await runWithLangSmith({ args, lanes, logger });
} else {
logger.info('No LANGSMITH_API_KEY, running direct loop (results in eval-results.json only)');
evaluation = await runDirectLoop({
args,
client,
preRunWorkflowIds,
claimedWorkflowIds,
logger,
seedResult,
});
evaluation = await runDirectLoop({ args, lanes, logger });
}
const totalDuration = Date.now() - startTime;
@ -164,7 +176,11 @@ async function main(): Promise<void> {
console.log(`Report: ${htmlPath}`);
printSummary(evaluation);
} finally {
await cleanupCredentials(client, seedResult.credentialIds).catch(() => {});
await Promise.all(
lanes.map(async (lane) => {
await cleanupCredentials(lane.client, lane.seedResult.credentialIds).catch(() => {});
}),
);
}
}
@ -172,79 +188,111 @@ async function main(): Promise<void> {
// LangSmith mode: evaluate() with dataset sync, tracing, experiments
// ---------------------------------------------------------------------------
interface RunConfig {
args: ReturnType<typeof parseCliArgs>;
client: N8nClient;
preRunWorkflowIds: Set<string>;
claimedWorkflowIds: Set<string>;
logger: EvalLogger;
seedResult: { seededTypes: string[]; credentialIds: string[] };
}
async function runWithLangSmith(config: RunConfig): Promise<MultiRunEvaluation> {
const { args, client, preRunWorkflowIds, claimedWorkflowIds, logger } = config;
const { args, lanes, logger } = config;
const lsClient = new Client();
const datasetName = await syncDataset(lsClient, args.dataset, logger, args.filter);
const testCasesWithFiles = loadWorkflowTestCasesWithFiles(args.filter);
const buildLimiter = pLimit(MAX_CONCURRENT_BUILDS);
// Keyed by `${iteration}:${prompt}` so the same prompt gets a fresh build
// per iteration — pass@k captures real builder variance.
const buildCache = new Map<string, Promise<BuildResult>>();
const buildDurations = new Map<string, number>();
// LaneState carries the allocator-managed counters (activeBuilds,
// inflightPrompts) plus the lane's traced LangSmith wrappers. `runner` is
// the underlying Lane (n8n client, credential state) — named distinctly so
// it doesn't shadow the iteration variable `lane` in lanes.map().
interface LaneState {
runner: Lane;
activeBuilds: number;
inflightPrompts: Set<string>;
tracedBuild: (prompt: string) => Promise<BuildResult>;
tracedExecute: (execArgs: {
workflowId: string;
scenario: TestScenario;
workflowJsons: BuildResult['workflowJsons'];
}) => Promise<Awaited<ReturnType<typeof executeScenario>>>;
}
// Traceable wraps the actual build call *inside* the limiter — otherwise the
// LangSmith span would include queue-wait time, which accumulates across
// iterations as later builds queue behind earlier ones.
const tracedBuildWorkflow = traceable(
async (prompt: string) =>
await buildWorkflow({
client,
prompt,
timeoutMs: args.timeoutMs,
preRunWorkflowIds,
claimedWorkflowIds,
logger,
}),
{ name: 'workflow_build', run_type: 'chain', client: lsClient },
);
const laneStates: LaneState[] = lanes.map((lane, idx) => {
const laneNum = idx + 1;
const laneTag = lanes.length > 1 ? ` [lane ${String(laneNum)}/${String(lanes.length)}]` : '';
return {
runner: lane,
activeBuilds: 0,
inflightPrompts: new Set<string>(),
tracedBuild: traceable(
async (prompt: string) =>
await buildWorkflow({
client: lane.client,
prompt,
timeoutMs: args.timeoutMs,
preRunWorkflowIds: lane.preRunWorkflowIds,
claimedWorkflowIds: lane.claimedWorkflowIds,
logger,
laneTag,
}),
{
name: 'workflow_build',
run_type: 'chain',
client: lsClient,
metadata: { lane: laneNum },
},
),
tracedExecute: traceable(
async (execArgs: {
workflowId: string;
scenario: TestScenario;
workflowJsons: BuildResult['workflowJsons'];
}) =>
await executeScenario(
lane.client,
execArgs.workflowId,
execArgs.scenario,
execArgs.workflowJsons,
logger,
args.timeoutMs,
),
{
name: 'scenario_execution',
run_type: 'chain',
client: lsClient,
metadata: { lane: laneNum },
},
),
};
});
// Work-stealing: each build acquires a lane that isn't already running its
// prompt, runs there (capped per-lane), then releases. Scenarios re-use the
// lane that built their workflow.
const allocator = new LaneAllocator(laneStates, MAX_CONCURRENT_BUILDS);
const buildCache = new Map<
string,
Promise<{ build: BuildResult; lane: LaneState; buildDurationMs: number }>
>();
const buildDurations = new Map<string, number>();
async function getOrBuild(
prompt: string,
iteration: number,
): Promise<{ build: BuildResult; buildDurationMs?: number }> {
): Promise<{ build: BuildResult; lane: LaneState; buildDurationMs: number }> {
const key = `${String(iteration)}:${prompt}`;
const existing = buildCache.get(key);
if (existing) return { build: await existing };
const promise = buildLimiter(async () => {
const start = Date.now();
const build = await tracedBuildWorkflow(prompt);
buildDurations.set(key, Date.now() - start);
return build;
});
if (existing) return await existing;
const promise = (async () => {
const lane = await allocator.acquire(prompt);
try {
const start = Date.now();
const build = await lane.tracedBuild(prompt);
const buildDurationMs = Date.now() - start;
buildDurations.set(key, buildDurationMs);
return { build, lane, buildDurationMs };
} finally {
allocator.release(lane, prompt);
}
})();
buildCache.set(key, promise);
const build = await promise;
return { build, buildDurationMs: buildDurations.get(key) };
return await promise;
}
const traceableExecute = traceable(
async (execArgs: {
workflowId: string;
scenario: TestScenario;
workflowJsons: BuildResult['workflowJsons'];
}) =>
await executeScenario(
client,
execArgs.workflowId,
execArgs.scenario,
execArgs.workflowJsons,
logger,
args.timeoutMs,
),
{ name: 'scenario_execution', run_type: 'chain', client: lsClient },
);
const target = async (inputs: TargetInputs): Promise<TargetOutput> => {
const iteration = inputs._iteration ?? 0;
const scenario: TestScenario = {
@ -254,7 +302,11 @@ async function runWithLangSmith(config: RunConfig): Promise<MultiRunEvaluation>
successCriteria: inputs.successCriteria,
};
const { build, buildDurationMs } = await getOrBuild(inputs.prompt, iteration);
const {
build,
lane: builtOnLane,
buildDurationMs,
} = await getOrBuild(inputs.prompt, iteration);
if (!build.success || !build.workflowId) {
return {
@ -274,7 +326,7 @@ async function runWithLangSmith(config: RunConfig): Promise<MultiRunEvaluation>
const nodeCount = build.workflowJsons[0]?.nodes.length ?? 0;
let result;
try {
result = await traceableExecute({
result = await builtOnLane.tracedExecute({
workflowId: build.workflowId,
scenario,
workflowJsons: build.workflowJsons,
@ -356,7 +408,7 @@ async function runWithLangSmith(config: RunConfig): Promise<MultiRunEvaluation>
const experimentPrefix = args.experimentName ?? computeExperimentPrefix();
logger.info(
`Starting evaluate() with concurrency=${String(args.concurrency)}, builds limited to ${String(MAX_CONCURRENT_BUILDS)}, iterations=${String(args.iterations)}`,
`Starting evaluate() with concurrency=${String(args.concurrency)}, ${String(lanes.length)} lane(s) × ${String(MAX_CONCURRENT_BUILDS)} concurrent builds, iterations=${String(args.iterations)}`,
);
// Always filter the LangSmith dataset by the local file slugs. The local
@ -381,6 +433,7 @@ async function runWithLangSmith(config: RunConfig): Promise<MultiRunEvaluation>
filter: args.filter ?? 'all',
concurrency: args.concurrency,
maxBuilds: MAX_CONCURRENT_BUILDS,
lanes: lanes.length,
iterations: args.iterations,
...buildCIMetadata(),
},
@ -417,10 +470,10 @@ async function runWithLangSmith(config: RunConfig): Promise<MultiRunEvaluation>
} finally {
if (!args.keepWorkflows) {
await Promise.all(
[...buildCache.values()].map(async (buildPromise) => {
[...buildCache.values()].map(async (promise) => {
try {
const build = await buildPromise;
await cleanupBuild(client, build, logger);
const { build, lane } = await promise;
await cleanupBuild(lane.runner.client, build, logger);
} catch {
// Best-effort
}
@ -431,14 +484,10 @@ async function runWithLangSmith(config: RunConfig): Promise<MultiRunEvaluation>
}
/**
* Expand a source example stream into N copies, tagging each with `_iteration`
* so the target function can key its build cache by iteration and we can
* reshape runs back into per-iteration groups afterwards. All N copies share
* the source example's id, so LangSmith's UI groups them naturally by
* `reference_example_id` useful for pass@k visualization.
*
* The source is buffered into memory once before the first yield: we need to
* emit each example N times, and an AsyncIterable can only be consumed once.
* Expand a source example stream into N copies, tagging each with `_iteration`.
* Round-robins scenarios across test cases and iter-interleaves per scenario
* so the in-flight set spans both dimensions. Concentration is handled by the
* work-stealing allocator at build time.
*/
async function* expandExamplesForIterations(
source: AsyncIterable<Example>,
@ -446,11 +495,12 @@ async function* expandExamplesForIterations(
): AsyncIterable<Example> {
const cached: Example[] = [];
for await (const ex of source) cached.push(ex);
for (let i = 0; i < iterations; i++) {
for (const ex of cached) {
yield { ...ex, inputs: { ...ex.inputs, _iteration: i } };
}
}
yield* expandWithIterations(
cached,
(ex) => (typeof ex.inputs?.testCaseFile === 'string' ? ex.inputs.testCaseFile : 'unknown'),
iterations,
(ex, i) => ({ ...ex, inputs: { ...ex.inputs, _iteration: i } }),
);
}
function filteredExamplesIterable(
@ -639,7 +689,7 @@ function reshapeLangSmithRuns(
// ---------------------------------------------------------------------------
async function runDirectLoop(config: RunConfig): Promise<MultiRunEvaluation> {
const { args, client, preRunWorkflowIds, claimedWorkflowIds, logger, seedResult } = config;
const { args, lanes, logger } = config;
const testCasesWithFiles = loadWorkflowTestCasesWithFiles(args.filter);
if (testCasesWithFiles.length === 0) {
@ -652,30 +702,47 @@ async function runDirectLoop(config: RunConfig): Promise<MultiRunEvaluation> {
0,
);
logger.info(
`Running ${String(testCasesWithFiles.length)} test case(s) with ${String(totalScenarios)} scenario(s) × ${String(args.iterations)} iteration(s)`,
`Running ${String(testCasesWithFiles.length)} test case(s) with ${String(totalScenarios)} scenario(s) × ${String(args.iterations)} iteration(s) across ${String(lanes.length)} lane(s)`,
);
// Distribute test cases across lanes by source-order index. Each bucket carries
// the original index so we can re-sort lane outputs back to source order — the
// aggregator indexes per-iteration results positionally.
const indexed = testCasesWithFiles.map((tc, origIdx) => ({ tc, origIdx }));
const buckets = partitionRoundRobin(indexed, lanes.length);
const allRunResults: WorkflowTestCaseResult[][] = [];
for (let iter = 0; iter < args.iterations; iter++) {
if (args.iterations > 1) {
logger.info(`--- Iteration #${String(iter + 1)}/${String(args.iterations)} ---`);
}
const results = await runWithConcurrency(
testCasesWithFiles,
async ({ testCase }) =>
await runWorkflowTestCase({
client,
testCase,
timeoutMs: args.timeoutMs,
seededCredentialTypes: seedResult.seededTypes,
preRunWorkflowIds,
claimedWorkflowIds,
logger,
keepWorkflows: args.keepWorkflows,
}),
MAX_CONCURRENT_BUILDS,
const laneResults = await Promise.all(
lanes.map(async (lane, laneIdx) => {
const bucket = buckets[laneIdx];
const laneTag =
lanes.length > 1 ? ` [lane ${String(laneIdx + 1)}/${String(lanes.length)}]` : '';
const results = await runWithConcurrency(
bucket,
async ({ tc }) =>
await runWorkflowTestCase({
client: lane.client,
testCase: tc.testCase,
timeoutMs: args.timeoutMs,
seededCredentialTypes: lane.seedResult.seededTypes,
preRunWorkflowIds: lane.preRunWorkflowIds,
claimedWorkflowIds: lane.claimedWorkflowIds,
logger,
keepWorkflows: args.keepWorkflows,
laneTag,
}),
MAX_CONCURRENT_BUILDS,
);
return bucket.map((b, i) => ({ origIdx: b.origIdx, result: results[i] }));
}),
);
allRunResults.push(results);
const flat = laneResults.flat();
flat.sort((a, b) => a.origIdx - b.origIdx);
allRunResults.push(flat.map((x) => x.result));
}
return aggregateResults(allRunResults, args.iterations);

View File

@ -0,0 +1,72 @@
// Pull-based lane allocator. Each lane caps at `maxConcurrentBuilds` and never
// runs the same prompt twice concurrently — pairing those rules eliminates the
// same-prompt concentration that breaks the agent under load.
export interface AllocatableLane {
activeBuilds: number;
inflightPrompts: Set<string>;
}
interface Waiter<L> {
prompt: string;
resolve: (lane: L) => void;
}
export class LaneAllocator<L extends AllocatableLane> {
private readonly waiters: Array<Waiter<L>> = [];
constructor(
private readonly lanes: L[],
private readonly maxConcurrentBuilds: number,
) {}
async acquire(prompt: string): Promise<L> {
const lane = this.findFree(prompt);
if (lane) {
this.markBusy(lane, prompt);
return lane;
}
return await new Promise<L>((resolve) => {
this.waiters.push({ prompt, resolve });
});
}
release(lane: L, prompt: string): void {
lane.activeBuilds--;
lane.inflightPrompts.delete(prompt);
this.wakeNext(lane);
}
private findFree(prompt: string): L | undefined {
// Least-loaded policy: spread builds evenly across lanes rather than
// filling lane 0 to cap before touching lane 1. Avoids hot-spotting.
let best: L | undefined;
for (const lane of this.lanes) {
if (!this.canRun(lane, prompt)) continue;
if (best === undefined || lane.activeBuilds < best.activeBuilds) best = lane;
}
return best;
}
private canRun(lane: L, prompt: string): boolean {
return lane.activeBuilds < this.maxConcurrentBuilds && !lane.inflightPrompts.has(prompt);
}
private markBusy(lane: L, prompt: string): void {
lane.activeBuilds++;
lane.inflightPrompts.add(prompt);
}
private wakeNext(lane: L): void {
// Wake the first waiter this lane can now serve. FIFO ordering.
for (let i = 0; i < this.waiters.length; i++) {
const w = this.waiters[i];
if (this.canRun(lane, w.prompt)) {
this.waiters.splice(i, 1);
this.markBusy(lane, w.prompt);
w.resolve(lane);
return;
}
}
}
}

View File

@ -0,0 +1,59 @@
// ---------------------------------------------------------------------------
// Lane partitioning helpers for multi-container eval runs
//
// Pure functions, intentionally separated from index.ts so unit tests can
// import them without triggering main()'s side effects.
// ---------------------------------------------------------------------------
/**
* Partition `items` into `laneCount` round-robin buckets by source-order index.
* Item at index i goes to bucket `i % laneCount`.
*
* Empty buckets are returned (not omitted) when laneCount > items.length so
* callers can safely zip buckets with their lanes.
*/
export function partitionRoundRobin<T>(items: T[], laneCount: number): T[][] {
if (laneCount < 1) {
throw new Error(`laneCount must be >= 1, got ${String(laneCount)}`);
}
return Array.from({ length: laneCount }, (_, laneIdx) =>
items.filter((_, i) => i % laneCount === laneIdx),
);
}
/**
* Yield items grouped by file in a round-robin order across files, with each
* item duplicated `iterations` times via `tag`. Pure ordering logic caller
* provides the file accessor and the tagger.
*
* Order: round 1 = first item of each group, round 2 = second item of each
* group, etc. Within each yielded item, all `iterations` copies are emitted
* consecutively before moving to the next item.
*/
export function* expandWithIterations<T>(
items: T[],
getFile: (item: T) => string,
iterations: number,
tag: (item: T, iter: number) => T,
): IterableIterator<T> {
const byFile = new Map<string, T[]>();
for (const item of items) {
const file = getFile(item);
let group = byFile.get(file);
if (!group) {
group = [];
byFile.set(file, group);
}
group.push(item);
}
const groups = [...byFile.values()];
const maxScenarios = groups.reduce((m, g) => Math.max(m, g.length), 0);
for (let s = 0; s < maxScenarios; s++) {
for (const group of groups) {
if (s < group.length) {
const item = group[s];
for (let i = 0; i < iterations; i++) yield tag(item, i);
}
}
}
}

View File

@ -50,6 +50,8 @@ interface WorkflowTestCaseConfig {
claimedWorkflowIds: Set<string>;
logger: EvalLogger;
keepWorkflows: boolean;
/** Optional " [lane N/M]" suffix appended to per-build log lines. */
laneTag?: string;
}
/**
@ -76,6 +78,7 @@ export async function runWorkflowTestCase(
preRunWorkflowIds: config.preRunWorkflowIds,
claimedWorkflowIds: config.claimedWorkflowIds,
logger,
laneTag: config.laneTag,
});
if (!build.success || !build.workflowId) {
@ -116,7 +119,7 @@ export async function runWorkflowTestCase(
const scenarioMs = Date.now() - scenarioStart;
logger.info(
` Scenarios done: ${String(result.scenarioResults.length)} scenarios [${String(Math.round(scenarioMs / 1000))}s]`,
` Scenarios done: ${String(result.scenarioResults.length)} scenarios [${String(Math.round(scenarioMs / 1000))}s]${config.laneTag ?? ''}`,
);
if (!config.keepWorkflows) {
@ -147,6 +150,8 @@ export interface BuildWorkflowConfig {
preRunWorkflowIds: Set<string>;
claimedWorkflowIds: Set<string>;
logger: EvalLogger;
/** Optional " [lane N/M]" suffix appended to the build log line. */
laneTag?: string;
}
/**
@ -165,7 +170,7 @@ export async function buildWorkflow(config: BuildWorkflowConfig): Promise<BuildR
try {
const buildStart = Date.now();
logger.info(` Building workflow: "${truncate(prompt, 60)}"`);
logger.info(` Building workflow: "${truncate(prompt, 60)}"${config.laneTag ?? ''}`);
const ssePromise = startSseConnection(client, threadId, events, abortController.signal).catch(
() => {},
@ -656,6 +661,12 @@ async function waitForBackgroundTasks(config: WaitConfig, timeoutMs: number): Pr
config.logger.verbose('Sub-agent(s) detected -- waiting for background tasks...');
// Log on count change, plus a heartbeat every 20s so a long stable wait still
// emits a liveness signal without spamming every poll interval.
const HEARTBEAT_MS = 20_000;
let lastLoggedKey = '';
let lastLogAt = 0;
while (Date.now() < deadline) {
await processConfirmationRequests(config);
@ -673,9 +684,15 @@ async function waitForBackgroundTasks(config: WaitConfig, timeoutMs: number): Pr
return;
}
config.logger.verbose(
`Waiting for ${String(restRunning.length)} REST task(s), ${String(ssePending.length)} SSE agent(s)`,
);
const key = `${String(restRunning.length)}/${String(ssePending.length)}`;
const now = Date.now();
if (key !== lastLoggedKey || now - lastLogAt >= HEARTBEAT_MS) {
config.logger.verbose(
`Waiting for ${String(restRunning.length)} REST task(s), ${String(ssePending.length)} SSE agent(s)`,
);
lastLoggedKey = key;
lastLogAt = now;
}
await delay(BACKGROUND_TASK_POLL_INTERVAL_MS);
}