feat(core): Add eval collection service + REST + runner integration (TRUST-72) (#30218)

This commit is contained in:
Arvin A 2026-05-14 12:48:46 +02:00 committed by GitHub
parent 339b6136c6
commit e630e7e489
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 2045 additions and 12 deletions

View File

@ -28,6 +28,7 @@ export type PubSubEventName =
| 'reload-sso-provisioning-configuration'
| 'reload-source-control-config'
| 'cancel-test-run'
| 'cancel-collection'
| 'agent-chat-integration-changed'
| 'agent-config-changed';

View File

@ -0,0 +1,507 @@
import type { CreateEvaluationCollectionPayload } from '@n8n/api-types';
import type {
EvaluationCollection,
EvaluationCollectionRepository,
EvaluationConfig,
EvaluationConfigRepository,
TestRun,
TestRunRepository,
User,
WorkflowHistory,
WorkflowHistoryRepository,
WorkflowPublishedVersion,
WorkflowPublishedVersionRepository,
} from '@n8n/db';
import { mock } from 'jest-mock-extended';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type { Telemetry } from '@/telemetry';
import type { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service';
import { EvaluationCollectionService } from '../evaluation-collection.service';
import type { TestRunnerService } from '../test-runner/test-runner.service.ee';
function makeConfig(over: Partial<EvaluationConfig> = {}): EvaluationConfig {
return {
id: 'cfg-1',
workflowId: 'wf-1',
name: 'cfg',
status: 'valid',
invalidReason: null,
datasetSource: 'data_table',
datasetRef: { dataTableId: 'dt-1' },
startNodeName: 'Start',
endNodeName: 'End',
metrics: [],
createdAt: new Date('2026-01-01T00:00:00Z'),
updatedAt: new Date('2026-01-02T00:00:00Z'),
workflow: undefined as never,
...over,
} as EvaluationConfig;
}
function makeCollection(over: Partial<EvaluationCollection> = {}): EvaluationCollection {
return {
id: 'col-1',
name: 'My collection',
description: null,
workflowId: 'wf-1',
evaluationConfigId: 'cfg-1',
createdById: 'user-1',
insightsCache: null,
createdAt: new Date('2026-02-01T00:00:00Z'),
updatedAt: new Date('2026-02-01T00:00:00Z'),
workflow: undefined as never,
evaluationConfig: undefined as never,
...over,
} as EvaluationCollection;
}
function makeTestRun(over: Partial<TestRun> = {}): TestRun {
return {
id: 'tr-1',
status: 'completed',
workflowId: 'wf-1',
evaluationConfigId: 'cfg-1',
workflowVersionId: 'wfv-1',
collectionId: null,
evaluationConfigSnapshot: null,
metrics: { accuracy: 0.9 },
runAt: new Date('2026-03-01T00:00:00Z'),
completedAt: new Date('2026-03-01T00:01:00Z'),
createdAt: new Date('2026-03-01T00:00:00Z'),
updatedAt: new Date('2026-03-01T00:01:00Z'),
errorCode: null,
errorDetails: null,
cancelRequested: false,
runningInstanceId: null,
workflow: undefined as never,
testCaseExecutions: [],
evaluationConfig: null,
collection: null,
...over,
} as TestRun;
}
const user = mock<User>({ id: 'user-1' });
function makePayload(
over: Partial<CreateEvaluationCollectionPayload> = {},
): CreateEvaluationCollectionPayload {
return {
name: 'My collection',
evaluationConfigId: 'cfg-1',
versions: [{ workflowVersionId: 'wfv-1' }],
...over,
};
}
describe('EvaluationCollectionService', () => {
let service: EvaluationCollectionService;
let collectionRepo: jest.Mocked<EvaluationCollectionRepository>;
let testRunRepo: jest.Mocked<TestRunRepository>;
let evalConfigRepo: jest.Mocked<EvaluationConfigRepository>;
let workflowHistoryRepo: jest.Mocked<WorkflowHistoryRepository>;
let publishedVersionRepo: jest.Mocked<WorkflowPublishedVersionRepository>;
let workflowHistoryService: jest.Mocked<WorkflowHistoryService>;
let testRunnerService: jest.Mocked<TestRunnerService>;
let telemetry: jest.Mocked<Telemetry>;
beforeEach(() => {
collectionRepo = mock<EvaluationCollectionRepository>();
testRunRepo = mock<TestRunRepository>();
evalConfigRepo = mock<EvaluationConfigRepository>();
workflowHistoryRepo = mock<WorkflowHistoryRepository>();
publishedVersionRepo = mock<WorkflowPublishedVersionRepository>();
workflowHistoryService = mock<WorkflowHistoryService>();
testRunnerService = mock<TestRunnerService>();
telemetry = mock<Telemetry>();
service = new EvaluationCollectionService(
collectionRepo,
testRunRepo,
evalConfigRepo,
workflowHistoryRepo,
publishedVersionRepo,
workflowHistoryService,
testRunnerService,
telemetry,
);
evalConfigRepo.findByIdAndWorkflowId.mockResolvedValue(makeConfig());
workflowHistoryService.findVersion.mockResolvedValue(
mock<WorkflowHistory>({ versionId: 'wfv-1' }),
);
publishedVersionRepo.findOneBy.mockResolvedValue(null);
workflowHistoryService.snapshotCurrent.mockResolvedValue({ versionId: 'wfv-snap' });
testRunRepo.findOneBy.mockResolvedValue(makeTestRun());
testRunRepo.find.mockResolvedValue([]);
collectionRepo.createCollection.mockImplementation(async (input) =>
makeCollection({ id: input.id, name: input.name, description: input.description }),
);
collectionRepo.getDetailByIdAndWorkflowId.mockResolvedValue({
collection: makeCollection(),
runs: [],
});
testRunnerService.startTestRun.mockResolvedValue({
testRun: makeTestRun({ id: 'tr-new', status: 'new' }),
finished: Promise.resolve(),
});
});
describe('createCollection', () => {
it('rejects when the evaluation config does not belong to the workflow', async () => {
evalConfigRepo.findByIdAndWorkflowId.mockResolvedValueOnce(null);
await expect(service.createCollection(user, 'wf-1', makePayload())).rejects.toThrow(
NotFoundError,
);
expect(collectionRepo.createCollection).not.toHaveBeenCalled();
});
it('rejects when an existingTestRunId belongs to a different workflow', async () => {
testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowId: 'other-wf' }));
await expect(
service.createCollection(
user,
'wf-1',
makePayload({
versions: [{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-other' }],
}),
),
).rejects.toThrow(BadRequestError);
expect(collectionRepo.createCollection).not.toHaveBeenCalled();
});
it('rejects when an existingTestRunId belongs to a different evaluation config', async () => {
testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ evaluationConfigId: 'cfg-other' }));
await expect(
service.createCollection(
user,
'wf-1',
makePayload({
versions: [{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-x' }],
}),
),
).rejects.toThrow(BadRequestError);
});
it('rejects when existingTestRunId was executed against a different workflow version than requested', async () => {
// User asks "compare on wfv-A" but supplies a run that executed
// on wfv-B. Without this guard the service would silently attach
// the wfv-B run and never schedule wfv-A — the "collection on
// wfv-A" would actually be "collection on wfv-B".
testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowVersionId: 'wfv-B' }));
await expect(
service.createCollection(
user,
'wf-1',
makePayload({
versions: [{ workflowVersionId: 'wfv-A', existingTestRunId: 'tr-x' }],
}),
),
).rejects.toThrow(BadRequestError);
expect(collectionRepo.createCollection).not.toHaveBeenCalled();
});
it('rejects when existingTestRunId is unpinned (legacy run with no workflowVersionId)', async () => {
// Unpinned runs can have executed against any historical workflow
// state, so they break collection comparability by construction.
testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowVersionId: null }));
await expect(
service.createCollection(
user,
'wf-1',
makePayload({
versions: [{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-legacy' }],
}),
),
).rejects.toThrow(BadRequestError);
});
it('attaches existing runs and schedules new runs for missing versions', async () => {
await service.createCollection(
user,
'wf-1',
makePayload({
versions: [
{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-existing' },
{ workflowVersionId: 'wfv-2' },
],
}),
);
expect(collectionRepo.createCollection).toHaveBeenCalledTimes(1);
expect(collectionRepo.addRunsToCollection).toHaveBeenCalledWith(expect.any(String), [
'tr-existing',
]);
expect(testRunnerService.startTestRun).toHaveBeenCalledTimes(1);
expect(testRunnerService.startTestRun).toHaveBeenCalledWith(
user,
'wf-1',
expect.any(Number),
expect.any(Boolean),
expect.objectContaining({
workflowVersionId: 'wfv-2',
evaluationConfigId: 'cfg-1',
collectionId: expect.any(String),
evaluationConfigSnapshot: expect.objectContaining({ id: 'cfg-1' }),
}),
);
});
it('snapshots a new workflow version for "current draft" entries before scheduling', async () => {
await service.createCollection(
user,
'wf-1',
makePayload({
versions: [{ workflowVersionId: null }],
}),
);
expect(workflowHistoryService.snapshotCurrent).toHaveBeenCalledTimes(1);
expect(workflowHistoryService.snapshotCurrent).toHaveBeenCalledWith('wf-1');
expect(testRunnerService.startTestRun).toHaveBeenCalledWith(
user,
'wf-1',
expect.any(Number),
expect.any(Boolean),
expect.objectContaining({ workflowVersionId: 'wfv-snap' }),
);
});
it('emits Eval collection created telemetry with expected counts', async () => {
await service.createCollection(
user,
'wf-1',
makePayload({
versions: [
{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-existing' },
{ workflowVersionId: 'wfv-2' },
{ workflowVersionId: null },
],
}),
);
expect(telemetry.track).toHaveBeenCalledWith(
'Eval collection created',
expect.objectContaining({
version_count: 3,
existing_run_count: 1,
new_run_count: 2,
evaluation_config_id: 'cfg-1',
dataset_id: 'dt-1',
}),
);
});
});
describe('deleteCollection', () => {
it('emits Eval collection deleted telemetry with runs_unlinked', async () => {
collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection());
collectionRepo.deleteByIdAndWorkflowId.mockResolvedValueOnce({
deleted: true,
runsUnlinked: 3,
});
await service.deleteCollection(user, 'wf-1', 'col-1');
expect(telemetry.track).toHaveBeenCalledWith(
'Eval collection deleted',
expect.objectContaining({ collection_id: 'col-1', runs_unlinked: 3 }),
);
});
it('broadcasts cancel-collection when any run is still active', async () => {
collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection());
testRunRepo.find.mockResolvedValueOnce([{ id: 'tr-active' } as TestRun]);
collectionRepo.deleteByIdAndWorkflowId.mockResolvedValueOnce({
deleted: true,
runsUnlinked: 1,
});
await service.deleteCollection(user, 'wf-1', 'col-1');
expect(testRunnerService.cancelCollection).toHaveBeenCalledWith('col-1');
});
it('throws NotFoundError when collection does not exist', async () => {
collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(null);
await expect(service.deleteCollection(user, 'wf-1', 'col-x')).rejects.toThrow(NotFoundError);
});
it('does not cancel runs when the collection belongs to a different workflow', async () => {
// A caller with `workflow:update` on wf-1 must not be able to
// trigger cancellation side effects on a collection owned by
// wf-other just by guessing its id — ownership has to be verified
// before the active-runs query reaches the cancel path.
collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(null);
await expect(
service.deleteCollection(user, 'wf-1', 'col-from-other-workflow'),
).rejects.toThrow(NotFoundError);
expect(testRunRepo.find).not.toHaveBeenCalled();
expect(testRunnerService.cancelCollection).not.toHaveBeenCalled();
expect(collectionRepo.deleteByIdAndWorkflowId).not.toHaveBeenCalled();
});
});
describe('addRunToCollection', () => {
it('rejects when the run belongs to a different evaluation config', async () => {
collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection());
testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ evaluationConfigId: 'cfg-other' }));
await expect(service.addRunToCollection('wf-1', 'col-1', 'tr-bad')).rejects.toThrow(
BadRequestError,
);
});
it('rejects when the run is unpinned (legacy run with no workflowVersionId)', async () => {
// Mirrors the create-path invariant: an unpinned run could have
// executed against any historical workflow state and therefore
// cannot satisfy the collection's comparability promise.
collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection());
testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowVersionId: null }));
await expect(service.addRunToCollection('wf-1', 'col-1', 'tr-legacy')).rejects.toThrow(
BadRequestError,
);
expect(collectionRepo.addRunsToCollection).not.toHaveBeenCalled();
});
});
describe('getEvalVersions', () => {
it('annotates the highest-scoring version as best and runs below 0.6 as critical', async () => {
const versions: WorkflowHistory[] = [
mock<WorkflowHistory>({
versionId: 'wfv-a',
name: 'A',
autosaved: false,
createdAt: new Date('2026-04-01'),
}),
mock<WorkflowHistory>({
versionId: 'wfv-b',
name: 'B',
autosaved: false,
createdAt: new Date('2026-04-02'),
}),
];
workflowHistoryRepo.find.mockResolvedValueOnce(versions);
// Single bulk lookup, descending by createdAt — service picks the
// first row per `workflowVersionId` it sees, which is the latest.
testRunRepo.find.mockResolvedValueOnce([
makeTestRun({ id: 'tr-a', workflowVersionId: 'wfv-a', metrics: { acc: 0.9 } }),
makeTestRun({ id: 'tr-b', workflowVersionId: 'wfv-b', metrics: { acc: 0.5 } }),
]);
const result = await service.getEvalVersions('wf-1', 'cfg-1');
const a = result.versions.find((v) => v.workflowVersionId === 'wfv-a');
const b = result.versions.find((v) => v.workflowVersionId === 'wfv-b');
expect(a?.lastRun?.isBest).toBe(true);
expect(a?.lastRun?.isCritical).toBe(false);
expect(b?.lastRun?.isBest).toBe(false);
expect(b?.lastRun?.isCritical).toBe(true);
});
it('includes a current-draft row with no last run', async () => {
workflowHistoryRepo.find.mockResolvedValueOnce([]);
const result = await service.getEvalVersions('wf-1', 'cfg-1');
expect(result.versions[0]).toEqual(
expect.objectContaining({
workflowVersionId: null,
isCurrent: true,
lastRun: null,
}),
);
});
it('current-draft entry ignores runs without a workflowVersionId (legacy one-offs)', async () => {
// Legacy runs that pre-date eval-collections have no
// `workflowVersionId`. The wizard surfaces them as "Ungrouped
// runs" elsewhere, not in the versions picker — the current-draft
// row stays `lastRun: null` regardless.
workflowHistoryRepo.find.mockResolvedValueOnce([]);
testRunRepo.find.mockResolvedValueOnce([
makeTestRun({ id: 'tr-legacy', workflowVersionId: null, metrics: { acc: 0.95 } }),
]);
const result = await service.getEvalVersions('wf-1', 'cfg-1');
const draft = result.versions.find((v) => v.workflowVersionId === null);
expect(draft?.lastRun).toBeNull();
});
it('issues exactly one bulk run lookup regardless of history size', async () => {
const history = Array.from({ length: 10 }, (_, i) =>
mock<WorkflowHistory>({
versionId: `wfv-${i}`,
name: `v${i}`,
autosaved: false,
createdAt: new Date(`2026-04-${i + 1 < 10 ? '0' + (i + 1) : i + 1}`),
}),
);
workflowHistoryRepo.find.mockResolvedValueOnce(history);
testRunRepo.find.mockResolvedValueOnce([]);
await service.getEvalVersions('wf-1', 'cfg-1');
expect(testRunRepo.find).toHaveBeenCalledTimes(1);
});
it('loads history with a metadata-only column selection (skips nodes/connections JSON)', async () => {
workflowHistoryRepo.find.mockResolvedValueOnce([]);
testRunRepo.find.mockResolvedValueOnce([]);
await service.getEvalVersions('wf-1', 'cfg-1');
expect(workflowHistoryRepo.find).toHaveBeenCalledWith(
expect.objectContaining({
select: expect.arrayContaining(['versionId', 'name', 'autosaved', 'createdAt']),
}),
);
// Sanity: nodes/connections should not be in the selection (those are
// the fat columns we're deliberately excluding).
const call = workflowHistoryRepo.find.mock.calls[0][0] as { select: string[] };
expect(call.select).not.toContain('nodes');
expect(call.select).not.toContain('connections');
});
it('labels the currently-published version as "Published" rather than "Named"', async () => {
workflowHistoryRepo.find.mockResolvedValueOnce([
mock<WorkflowHistory>({
versionId: 'wfv-pub',
name: 'v3',
autosaved: false,
createdAt: new Date('2026-04-01'),
}),
mock<WorkflowHistory>({
versionId: 'wfv-named',
name: 'experiment',
autosaved: false,
createdAt: new Date('2026-04-02'),
}),
mock<WorkflowHistory>({
versionId: 'wfv-auto',
name: null,
autosaved: true,
createdAt: new Date('2026-04-03'),
}),
]);
testRunRepo.find.mockResolvedValueOnce([]);
publishedVersionRepo.findOneBy.mockResolvedValueOnce(
mock<WorkflowPublishedVersion>({ publishedVersionId: 'wfv-pub' }),
);
const result = await service.getEvalVersions('wf-1', 'cfg-1');
const pub = result.versions.find((v) => v.workflowVersionId === 'wfv-pub');
const named = result.versions.find((v) => v.workflowVersionId === 'wfv-named');
const auto = result.versions.find((v) => v.workflowVersionId === 'wfv-auto');
expect(pub?.sourceLabel).toMatch(/^Published/);
expect(named?.sourceLabel).toMatch(/^Named/);
expect(auto?.sourceLabel).toMatch(/^Autosaved/);
});
});
});

View File

@ -0,0 +1,189 @@
import { EVAL_COLLECTIONS_FLAG } from '@n8n/api-types';
import type { Logger } from '@n8n/backend-common';
import type { AuthenticatedRequest, User } from '@n8n/db';
import { ControllerRegistryMetadata } from '@n8n/decorators';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type { PostHogClient } from '@/posthog';
import { EvaluationCollectionsController } from '../evaluation-collections.controller.ee';
import type { EvaluationCollectionService } from '../evaluation-collection.service';
describe('EvaluationCollectionsController', () => {
let controller: EvaluationCollectionsController;
let service: jest.Mocked<EvaluationCollectionService>;
let postHogClient: jest.Mocked<PostHogClient>;
let logger: jest.Mocked<Logger>;
const user = mock<User>({ id: 'user-1' });
beforeEach(() => {
service = mock<EvaluationCollectionService>();
postHogClient = mock<PostHogClient>();
logger = mock<Logger>();
// Default: flag ON for the user. Each test that needs flag-off semantics
// overrides this with `mockResolvedValueOnce({})` (no flag entry).
postHogClient.getFeatureFlags.mockResolvedValue({
eval_collections: true,
} as Record<string, boolean>);
controller = new EvaluationCollectionsController(service, postHogClient, logger);
});
type WithBody<P, B> = AuthenticatedRequest<P> & { body: B };
function makeReq<P extends Record<string, unknown>, B = unknown>(
params: P,
body: B = {} as B,
query: Record<string, unknown> = {},
): WithBody<P, B> {
return { user, params, body, query } as unknown as WithBody<P, B>;
}
describe('flag gating', () => {
it.each([
['list', async () => await controller.list(makeReq({ workflowId: 'wf-1' }))],
[
'get',
async () => await controller.get(makeReq({ workflowId: 'wf-1', collectionId: 'col-1' })),
],
[
'create',
async () =>
await controller.create(makeReq({ workflowId: 'wf-1' }), undefined, {} as never),
],
[
'update',
async () =>
await controller.update(
makeReq({ workflowId: 'wf-1', collectionId: 'col-1' }),
undefined,
{} as never,
),
],
[
'delete',
async () => await controller.delete(makeReq({ workflowId: 'wf-1', collectionId: 'col-1' })),
],
[
'addRun',
async () =>
await controller.addRun(
makeReq({ workflowId: 'wf-1', collectionId: 'col-1' }),
undefined,
{ testRunId: 'tr-x' },
),
],
[
'removeRun',
async () =>
await controller.removeRun(
makeReq({ workflowId: 'wf-1', collectionId: 'col-1', runId: 'tr-x' }),
),
],
[
'listVersions',
async () =>
await controller.listVersions(
makeReq({ workflowId: 'wf-1' }, {}, { evaluationConfigId: 'cfg-1' }),
),
],
])('rejects %s with 404 when flag is off', async (_name, call) => {
postHogClient.getFeatureFlags.mockResolvedValueOnce({});
await expect(call()).rejects.toThrow(NotFoundError);
});
it('treats a PostHog outage as flag-off (fail-open to 404, not 500)', async () => {
postHogClient.getFeatureFlags.mockRejectedValueOnce(new Error('posthog timeout'));
await expect(controller.list(makeReq({ workflowId: 'wf-1' }))).rejects.toThrow(NotFoundError);
});
it('uses the spec-declared flag id', () => {
// Future-proofs the rollout: if anyone renames the flag without
// also updating the spec/PostHog, this test fails immediately.
expect(EVAL_COLLECTIONS_FLAG).toBe('eval_collections');
});
});
describe('list', () => {
it('delegates to service when flag on', async () => {
service.listCollections.mockResolvedValueOnce([]);
await controller.list(makeReq({ workflowId: 'wf-1' }));
expect(service.listCollections).toHaveBeenCalledWith('wf-1');
});
});
describe('create', () => {
it('delegates to service with payload + user', async () => {
service.createCollection.mockResolvedValueOnce({
record: { id: 'col-1' } as never,
runsStartedIds: ['tr-1'],
});
const payload = {
name: 'c',
evaluationConfigId: 'cfg-1',
versions: [{ workflowVersionId: 'wfv-1' }],
} as never;
await controller.create(makeReq({ workflowId: 'wf-1' }), undefined, payload);
expect(service.createCollection).toHaveBeenCalledWith(user, 'wf-1', payload);
});
});
describe('listVersions', () => {
it('rejects requests missing evaluationConfigId with 400', async () => {
await expect(
controller.listVersions(makeReq({ workflowId: 'wf-1' }, {}, {})),
).rejects.toThrow(BadRequestError);
});
});
// Route-access regression: every authenticated route on this controller
// must carry a `@ProjectScope('workflow:*')` decorator. The skill at
// `.claude/skills/protect-endpoints` calls this out as a hard rule —
// adding a route without a scope is an IDOR/permission-bypass risk that
// this test catches at compile time of the test suite.
describe('route access scopes', () => {
const metadata = Container.get(ControllerRegistryMetadata).getControllerMetadata(
EvaluationCollectionsController as never,
);
const routeCases = Array.from(metadata.routes.entries()).map(([handlerName, route]) => ({
handlerName,
route,
}));
// Map each handler to the exact scope we expect. `create` schedules
// executions → `workflow:execute`. Mutations → `workflow:update`.
// Reads → `workflow:read`. Anything else would be a regression.
const expectedScopes: Record<string, string> = {
list: 'workflow:read',
get: 'workflow:read',
create: 'workflow:execute',
update: 'workflow:update',
delete: 'workflow:update',
addRun: 'workflow:update',
removeRun: 'workflow:update',
listVersions: 'workflow:read',
};
it.each(routeCases)(
'$handlerName carries the expected ProjectScope',
({ handlerName, route }) => {
expect(route.accessScope).toBeDefined();
expect(route.accessScope?.globalOnly).toBe(false);
expect(route.accessScope?.scope).toBe(expectedScopes[handlerName]);
},
);
it('covers every public handler on the controller', () => {
// Sanity check: if a new route is added without updating
// `expectedScopes` this catches it (else the `.each` above would
// silently pass with `undefined`).
const handlerNames = routeCases.map((r) => r.handlerName).sort();
expect(handlerNames).toEqual(Object.keys(expectedScopes).sort());
});
});
});

View File

@ -0,0 +1,525 @@
import type {
CreateEvaluationCollectionPayload,
EvalVersionEntry,
EvalVersionsResponse,
EvaluationCollectionDetail,
EvaluationCollectionRecord,
EvaluationCollectionRunSummary,
UpdateEvaluationCollectionPayload,
} from '@n8n/api-types';
import type { TestRun, User } from '@n8n/db';
import {
EvaluationCollectionRepository,
EvaluationConfigRepository,
TestRunRepository,
WorkflowHistoryRepository,
WorkflowPublishedVersionRepository,
} from '@n8n/db';
import { Service } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { In } from '@n8n/typeorm';
import { nanoid } from 'nanoid';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { TestRunnerService } from '@/evaluation.ee/test-runner/test-runner.service.ee';
import { Telemetry } from '@/telemetry';
import { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service';
/**
* Bare shape of the {@link EvaluationCollection} fields the service uses for
* building API records kept structural (not the full entity class with
* methods + relations) so callers can pass plain-object spreads from
* `EvaluationCollectionListItem`.
*/
type CollectionFields = {
id: string;
name: string;
description: string | null;
workflowId: string;
evaluationConfigId: string;
createdById: string | null;
createdAt: Date;
updatedAt: Date;
};
/**
* Orchestrates eval-collection lifecycle: validating + creating collections,
* kicking off the per-version test runs that make up a collection,
* curating membership, and broadcasting collection-level cancellation. The
* runner itself stays unaware of collections collection metadata is
* passed through `TestRunnerService.startTestRun()` as opaque options.
*/
@Service()
export class EvaluationCollectionService {
constructor(
private readonly collectionRepo: EvaluationCollectionRepository,
private readonly testRunRepo: TestRunRepository,
private readonly evalConfigRepo: EvaluationConfigRepository,
private readonly workflowHistoryRepo: WorkflowHistoryRepository,
private readonly publishedVersionRepo: WorkflowPublishedVersionRepository,
private readonly workflowHistoryService: WorkflowHistoryService,
private readonly testRunnerService: TestRunnerService,
private readonly telemetry: Telemetry,
) {}
async createCollection(
user: User,
workflowId: string,
input: CreateEvaluationCollectionPayload,
): Promise<{ record: EvaluationCollectionRecord; runsStartedIds: string[] }> {
// 1. Validate evaluation config belongs to this workflow. Without this
// check a caller with access to two workflows could attach config A
// to workflow B's collection — the comparison would silently break.
const config = await this.evalConfigRepo.findByIdAndWorkflowId(
input.evaluationConfigId,
workflowId,
);
if (!config) {
throw new NotFoundError('EvaluationConfig not found for this workflow');
}
// 2. Validate version IDs + reused-run IDs. Done up front so we either
// commit the whole collection cleanly or reject before any side
// effects (collection row, snapshot, runner kickoff).
for (const [index, v] of input.versions.entries()) {
if (v.workflowVersionId) {
const exists = await this.workflowHistoryService.findVersion(
workflowId,
v.workflowVersionId,
);
if (!exists) {
throw new BadRequestError(
`versions[${index}]: workflow version "${v.workflowVersionId}" not found`,
);
}
}
if (v.existingTestRunId) {
const run = await this.testRunRepo.findOneBy({ id: v.existingTestRunId });
if (
!run ||
run.workflowId !== workflowId ||
run.evaluationConfigId !== input.evaluationConfigId
) {
throw new BadRequestError(
`versions[${index}]: test run "${v.existingTestRunId}" is not compatible with this collection`,
);
}
// If the caller asks for version A but supplies a run from
// version B we'd silently attach B and never schedule A — the
// resulting "collection on version A" would actually be
// "collection on version B". Reject up front so the caller
// can decide whether to omit `workflowVersionId` (use the run
// as-is) or change `existingTestRunId` (run a fresh A).
if (v.workflowVersionId && run.workflowVersionId !== v.workflowVersionId) {
throw new BadRequestError(
`versions[${index}]: test run "${v.existingTestRunId}" was executed against version "${
run.workflowVersionId ?? '(unpinned)'
}", not the requested "${v.workflowVersionId}"`,
);
}
// Unpinned legacy runs (no `workflowVersionId`) cannot be
// reused in collections — the whole point is comparability
// across pinned versions, and an unpinned run could have
// executed against any historical workflow state.
if (!run.workflowVersionId) {
throw new BadRequestError(
`versions[${index}]: test run "${v.existingTestRunId}" has no pinned workflow version and cannot be reused in a collection`,
);
}
}
}
// 3. Persist the collection. We do this *before* kicking off any new
// runs so the runs can be tagged with `collectionId` at creation time.
const collection = await this.collectionRepo.createCollection({
id: nanoid(),
name: input.name,
description: input.description ?? null,
workflowId,
evaluationConfigId: input.evaluationConfigId,
createdById: user.id,
});
// 4. Wire up runs the user wants to reuse.
const existingRunIds = input.versions
.filter((v) => v.existingTestRunId)
.map((v) => v.existingTestRunId!);
if (existingRunIds.length > 0) {
await this.collectionRepo.addRunsToCollection(collection.id, existingRunIds);
}
// 5. Kick off new runs for versions that weren't matched to an existing
// run. For "current draft" entries we snapshot a `WorkflowHistory` row
// first so the new run is pinned to an immutable version — otherwise
// the next edit to the live workflow would invalidate the comparison.
// Freeze the evaluation config in `evaluationConfigSnapshot` for the
// same reason: future config edits must not retroactively change what
// a historical run was evaluating.
const configSnapshot = {
...config,
createdAt: config.createdAt.toISOString(),
updatedAt: config.updatedAt.toISOString(),
};
const runsStartedIds: string[] = [];
for (const v of input.versions) {
if (v.existingTestRunId) continue;
const versionId =
v.workflowVersionId ??
(await this.workflowHistoryService.snapshotCurrent(workflowId)).versionId;
const { testRun } = await this.testRunnerService.startTestRun(
user,
workflowId,
input.concurrency ?? 1,
(input.concurrency ?? 1) > 1,
{
collectionId: collection.id,
workflowVersionId: versionId,
evaluationConfigId: input.evaluationConfigId,
evaluationConfigSnapshot: configSnapshot,
},
);
runsStartedIds.push(testRun.id);
}
this.telemetry.track('Eval collection created', {
user_id: user.id,
workflow_id: workflowId,
collection_id: collection.id,
evaluation_config_id: input.evaluationConfigId,
version_count: input.versions.length,
existing_run_count: existingRunIds.length,
new_run_count: runsStartedIds.length,
dataset_id: this.extractDatasetId(config),
});
const record = this.toRecord(collection, existingRunIds.length + runsStartedIds.length);
return { record, runsStartedIds };
}
async listCollections(workflowId: string): Promise<EvaluationCollectionRecord[]> {
const items = await this.collectionRepo.listByWorkflowId(workflowId);
return items.map(({ runCount, ...c }) => this.toRecord(c, runCount));
}
async getCollectionDetail(
workflowId: string,
collectionId: string,
): Promise<EvaluationCollectionDetail> {
const detail = await this.collectionRepo.getDetailByIdAndWorkflowId(collectionId, workflowId);
if (!detail) throw new NotFoundError('Collection not found');
const runs = detail.runs.map((r) => this.toRunSummary(r));
return { ...this.toRecord(detail.collection, runs.length), runs };
}
async updateCollectionMeta(
workflowId: string,
collectionId: string,
input: UpdateEvaluationCollectionPayload,
): Promise<EvaluationCollectionRecord> {
const updated = await this.collectionRepo.updateMeta(collectionId, workflowId, input);
if (!updated) throw new NotFoundError('Collection not found');
const runCount = await this.testRunRepo.count({ where: { collectionId } });
return this.toRecord(updated, runCount);
}
async deleteCollection(
user: User,
workflowId: string,
collectionId: string,
): Promise<{ runsUnlinked: number }> {
// Verify the collection belongs to this workflow *before* any
// cancellation side effects. Otherwise a caller with `workflow:update`
// on workflow A could pass a known collection id from workflow B and
// trigger cancellation (abort + pubsub fan-out + DB writes) before
// receiving the eventual 404 from `deleteByIdAndWorkflowId`.
const owned = await this.collectionRepo.findByIdAndWorkflowId(collectionId, workflowId);
if (!owned) throw new NotFoundError('Collection not found');
// If any runs in this collection are still active, broadcast a
// collection-level cancel first so workers stop touching rows we're
// about to unlink. The FK is SET NULL anyway, but cancelling avoids
// post-delete writes flipping a deleted collection's runs back into
// limbo on a foreign main.
const active = await this.testRunRepo.find({
where: [
{ collectionId, status: 'running' },
{ collectionId, status: 'new' },
],
select: ['id'],
});
if (active.length > 0) {
await this.testRunnerService.cancelCollection(collectionId);
}
const { deleted, runsUnlinked } = await this.collectionRepo.deleteByIdAndWorkflowId(
collectionId,
workflowId,
);
if (!deleted) throw new NotFoundError('Collection not found');
this.telemetry.track('Eval collection deleted', {
user_id: user.id,
workflow_id: workflowId,
collection_id: collectionId,
runs_unlinked: runsUnlinked,
});
return { runsUnlinked };
}
async addRunToCollection(
workflowId: string,
collectionId: string,
testRunId: string,
): Promise<EvaluationCollectionDetail> {
const collection = await this.collectionRepo.findByIdAndWorkflowId(collectionId, workflowId);
if (!collection) throw new NotFoundError('Collection not found');
const run = await this.testRunRepo.findOneBy({ id: testRunId });
if (!run || run.workflowId !== workflowId) {
throw new NotFoundError('Test run not found for this workflow');
}
if (run.evaluationConfigId !== collection.evaluationConfigId) {
throw new BadRequestError(
'Test run is not compatible with this collection (different evaluation config)',
);
}
// Mirror the create-path invariant: an unpinned legacy run could have
// executed against any historical workflow state, so it breaks the
// comparability promise the collection exists to provide.
if (!run.workflowVersionId) {
throw new BadRequestError(
'Test run has no pinned workflow version and cannot be added to a collection',
);
}
await this.collectionRepo.addRunsToCollection(collectionId, [testRunId]);
return await this.getCollectionDetail(workflowId, collectionId);
}
async removeRunFromCollection(
workflowId: string,
collectionId: string,
testRunId: string,
): Promise<EvaluationCollectionDetail> {
const collection = await this.collectionRepo.findByIdAndWorkflowId(collectionId, workflowId);
if (!collection) throw new NotFoundError('Collection not found');
const affected = await this.collectionRepo.removeRunFromCollection(collectionId, testRunId);
if (affected === 0) {
throw new NotFoundError('Test run is not part of this collection');
}
return await this.getCollectionDetail(workflowId, collectionId);
}
/**
* Powers the setup wizard's versions table. Lists every named/auto-saved
* workflow history row for the workflow, joined with the last test run
* (if any) executed against the given `evaluationConfigId` on that
* version. The "current draft" row is synthesised on top of that its
* `workflowVersionId` is null until the user commits.
*
* `★ best` / `⚠ low` annotations are computed in-memory from each
* version's `avgScore`: the highest is `isBest`, anything below 0.6 is
* `isCritical`. The thresholds match the spec mock and stay co-located
* with the data so the FE doesn't have to re-derive them.
*/
async getEvalVersions(
workflowId: string,
evaluationConfigId: string,
): Promise<EvalVersionsResponse> {
const config = await this.evalConfigRepo.findByIdAndWorkflowId(evaluationConfigId, workflowId);
if (!config) {
throw new NotFoundError('EvaluationConfig not found for this workflow');
}
// Cheap metadata-only load: `WorkflowHistory.nodes` / `connections` are
// fat JSON columns we never reference from the wizard's versions
// table. Excluding them keeps response payloads small and avoids
// streaming entire workflow canvases just to render labels.
const history = await this.workflowHistoryRepo.find({
where: { workflowId },
order: { createdAt: 'DESC' },
select: ['versionId', 'name', 'autosaved', 'createdAt'],
});
// One bulk query for the latest run per version against this config
// instead of N+1 individual lookups. Sorted descending so the first
// run seen per `workflowVersionId` is the latest — picked into the
// per-version map below.
const lastRuns =
history.length === 0
? []
: await this.testRunRepo.find({
where: {
evaluationConfigId,
workflowVersionId: In(history.map((h) => h.versionId)),
},
order: { createdAt: 'DESC' },
});
const latestRunByVersion = new Map<string, TestRun>();
for (const run of lastRuns) {
if (run.workflowVersionId && !latestRunByVersion.has(run.workflowVersionId)) {
latestRunByVersion.set(run.workflowVersionId, run);
}
}
// One lookup for which version is currently published, so the
// versions table can show "Published" as the source label instead of
// the generic "Named" / "Autosaved" — matches spec §4's three-way
// taxonomy (current draft / published / named snapshot).
const publishedRow = await this.publishedVersionRepo.findOneBy({ workflowId });
const publishedVersionId = publishedRow?.publishedVersionId ?? null;
const versions: EvalVersionEntry[] = [];
// Current draft row — `workflowVersionId: null` signals "snapshot at
// run start". Always first in the list so the wizard surfaces it on
// top of the table.
//
// `lastRun: null` here is intentional: runs without a `workflowVersionId`
// (legacy one-offs) are not surfaced as draft history — they're
// "Ungrouped runs" in the FE list view. Showing them here would
// blur the boundary between draft + pinned-snapshot runs and lead
// to off-by-one comparability bugs in the wizard.
versions.push({
workflowVersionId: null,
label: 'Current draft',
sourceLabel: 'Live workflow',
isCurrent: true,
lastRun: null,
});
for (const h of history) {
const lastRun = latestRunByVersion.get(h.versionId) ?? null;
versions.push({
workflowVersionId: h.versionId,
label: h.name ?? this.shortVersionLabel(h.versionId),
sourceLabel: this.formatSourceLabel(h, h.versionId === publishedVersionId),
isCurrent: false,
lastRun: lastRun
? {
testRunId: lastRun.id,
runAt: (lastRun.runAt ?? lastRun.createdAt).toISOString(),
status: lastRun.status,
avgScore: this.computeAvgScore(lastRun),
isBest: false, // overwritten below
isCritical: false, // overwritten below
}
: null,
});
}
// Pass 2: annotate `isBest` / `isCritical` over the scored entries
// only. Skipped if no version has a scored run yet — the wizard then
// shows every row as "No run yet".
const scored = versions.filter((v) => v.lastRun?.avgScore !== null && v.lastRun !== null);
if (scored.length > 0) {
const best = scored.reduce((acc, v) =>
(v.lastRun!.avgScore ?? -Infinity) > (acc.lastRun!.avgScore ?? -Infinity) ? v : acc,
);
if (best.lastRun) best.lastRun.isBest = true;
for (const v of scored) {
if (v.lastRun && v.lastRun.avgScore !== null && v.lastRun.avgScore < 0.6) {
v.lastRun.isCritical = true;
}
}
}
return { evaluationConfigId, versions };
}
// ---- internals ----
private toRecord(collection: CollectionFields, runCount: number): EvaluationCollectionRecord {
return {
id: collection.id,
name: collection.name,
description: collection.description,
workflowId: collection.workflowId,
evaluationConfigId: collection.evaluationConfigId,
createdById: collection.createdById,
createdAt: collection.createdAt.toISOString(),
updatedAt: collection.updatedAt.toISOString(),
runCount,
};
}
private toRunSummary(run: TestRun): EvaluationCollectionRunSummary {
return {
testRunId: run.id,
workflowVersionId: run.workflowVersionId,
status: run.status,
runAt: run.runAt ? run.runAt.toISOString() : null,
completedAt: run.completedAt ? run.completedAt.toISOString() : null,
avgScore: this.computeAvgScore(run),
metrics: this.coerceMetrics(run.metrics),
};
}
private computeAvgScore(run: TestRun): number | null {
const coerced = this.coerceMetrics(run.metrics);
if (!coerced) return null;
const values = Object.values(coerced);
if (values.length === 0) return null;
const sum = values.reduce((acc, v) => acc + v, 0);
return sum / values.length;
}
private coerceMetrics(
metrics: Record<string, number | boolean> | null | undefined,
): Record<string, number> | null {
if (!metrics) return null;
const out: Record<string, number> = {};
for (const [k, v] of Object.entries(metrics)) {
if (typeof v === 'number') out[k] = v;
else if (typeof v === 'boolean') out[k] = v ? 1 : 0;
}
return Object.keys(out).length > 0 ? out : null;
}
private extractDatasetId(config: { datasetSource: string; datasetRef: unknown }): string | null {
if (
config.datasetSource === 'data_table' &&
typeof config.datasetRef === 'object' &&
config.datasetRef !== null &&
'dataTableId' in config.datasetRef
) {
return String((config.datasetRef as { dataTableId: string }).dataTableId);
}
return null;
}
private shortVersionLabel(versionId: string): string {
return `v ${versionId.slice(0, 8)}`;
}
private formatDateLabel(date: Date): string {
// Compact relative-ish label for the source column. Frontend can format
// further if it wants; this is a fallback for raw API consumers.
return date.toISOString().slice(0, 10);
}
/**
* Three-way source-label taxonomy from spec §4. Published wins over
* named/autosaved when a version is both (a named snapshot can later be
* published; the wizard should surface it as "Published" then).
*/
private formatSourceLabel(
h: { name: string | null; autosaved: boolean; createdAt: Date },
isPublished: boolean,
): string {
const date = this.formatDateLabel(h.createdAt);
if (isPublished) return `Published · ${date}`;
if (h.name) return `Named · ${date}`;
if (h.autosaved) return `Autosaved · ${date}`;
return `Snapshot · ${date}`;
}
}

View File

@ -0,0 +1,168 @@
import {
AddRunToCollectionDto,
CreateEvaluationCollectionDto,
EVAL_COLLECTIONS_FLAG,
UpdateEvaluationCollectionDto,
} from '@n8n/api-types';
import { Logger } from '@n8n/backend-common';
import type { AuthenticatedRequest, User } from '@n8n/db';
import { Body, Delete, Get, Patch, Post, ProjectScope, RestController } from '@n8n/decorators';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { PostHogClient } from '@/posthog';
import { EvaluationCollectionService } from './evaluation-collection.service';
type WorkflowParam = { workflowId: string };
type CollectionParam = { workflowId: string; collectionId: string };
type CollectionRunParam = { workflowId: string; collectionId: string; runId: string };
type EvalVersionsQuery = { evaluationConfigId?: string };
/**
* Scope choice notes:
* - List / get / versions `workflow:read`. Collection metadata is workflow-
* adjacent and the body never includes execution state.
* - Create `workflow:execute`. Creating a collection schedules new test
* runs for any version without a reusable existing run, so it's not just
* a metadata operation it kicks off workflow executions.
* - Update / delete / curate runs `workflow:update`. Renaming a collection,
* deleting it (which broadcasts cancel-collection for active runs but
* does not start new ones), and adding/removing run membership are all
* metadata mutations on a workflow-owned resource.
*
* Workflow project resolution is done by `@ProjectScope` itself via the
* `:workflowId` URL param (see `check-access.ts:93`), so we don't need a
* separate `findWorkflowForUser` call: the middleware throws 404 before the
* handler runs if the user lacks the scope on the workflow's project. The
* service layer still filters by `workflowId` for defense-in-depth.
*/
@RestController('/workflows')
export class EvaluationCollectionsController {
constructor(
private readonly service: EvaluationCollectionService,
private readonly postHogClient: PostHogClient,
private readonly logger: Logger,
) {}
/**
* PostHog rollout gate for the eval-collections feature surface. Returns
* 404 when off so the route looks exactly like an unknown endpoint no
* flag id leaks into responses, no 403 200 transition visible to a
* stale tab when the rollout flips. Fail-open semantics mirror
* {@link TestRunsController.isParallelExecutionFlagEnabled}: PostHog
* outage degrades to flag-off rather than 500ing.
*
* Runs *inside* the handler (after the `@ProjectScope` middleware) so a
* user without workflow access still gets the standard 404 from the
* scope check, not a flag-off 404 that might leak rollout state.
*/
private async assertFlagEnabled(user: User): Promise<void> {
let enabled = false;
try {
const flags = await this.postHogClient.getFeatureFlags(user);
enabled = flags?.[EVAL_COLLECTIONS_FLAG] === true;
} catch (error) {
this.logger.warn('Failed to resolve eval-collections flag', {
error: error instanceof Error ? error.message : String(error),
});
}
if (!enabled) throw new NotFoundError('Not found');
}
@Get('/:workflowId/eval-collections')
@ProjectScope('workflow:read')
async list(req: AuthenticatedRequest<WorkflowParam>) {
await this.assertFlagEnabled(req.user);
return await this.service.listCollections(req.params.workflowId);
}
@Get('/:workflowId/eval-collections/:collectionId')
@ProjectScope('workflow:read')
async get(req: AuthenticatedRequest<CollectionParam>) {
await this.assertFlagEnabled(req.user);
return await this.service.getCollectionDetail(req.params.workflowId, req.params.collectionId);
}
@Post('/:workflowId/eval-collections')
@ProjectScope('workflow:execute')
async create(
req: AuthenticatedRequest<WorkflowParam>,
_res: unknown,
@Body payload: CreateEvaluationCollectionDto,
) {
await this.assertFlagEnabled(req.user);
const { record, runsStartedIds } = await this.service.createCollection(
req.user,
req.params.workflowId,
payload,
);
return { ...record, runsStartedIds };
}
@Patch('/:workflowId/eval-collections/:collectionId')
@ProjectScope('workflow:update')
async update(
req: AuthenticatedRequest<CollectionParam>,
_res: unknown,
@Body payload: UpdateEvaluationCollectionDto,
) {
await this.assertFlagEnabled(req.user);
return await this.service.updateCollectionMeta(
req.params.workflowId,
req.params.collectionId,
payload,
);
}
@Delete('/:workflowId/eval-collections/:collectionId')
@ProjectScope('workflow:update')
async delete(req: AuthenticatedRequest<CollectionParam>) {
await this.assertFlagEnabled(req.user);
const { runsUnlinked } = await this.service.deleteCollection(
req.user,
req.params.workflowId,
req.params.collectionId,
);
return { success: true, runsUnlinked };
}
@Post('/:workflowId/eval-collections/:collectionId/runs')
@ProjectScope('workflow:update')
async addRun(
req: AuthenticatedRequest<CollectionParam>,
_res: unknown,
@Body payload: AddRunToCollectionDto,
) {
await this.assertFlagEnabled(req.user);
return await this.service.addRunToCollection(
req.params.workflowId,
req.params.collectionId,
payload.testRunId,
);
}
@Delete('/:workflowId/eval-collections/:collectionId/runs/:runId')
@ProjectScope('workflow:update')
async removeRun(req: AuthenticatedRequest<CollectionRunParam>) {
await this.assertFlagEnabled(req.user);
return await this.service.removeRunFromCollection(
req.params.workflowId,
req.params.collectionId,
req.params.runId,
);
}
@Get('/:workflowId/eval-versions')
@ProjectScope('workflow:read')
async listVersions(
req: AuthenticatedRequest<WorkflowParam, unknown, unknown, EvalVersionsQuery>,
) {
await this.assertFlagEnabled(req.user);
const { evaluationConfigId } = req.query;
if (!evaluationConfigId) {
throw new BadRequestError('Missing required query parameter: evaluationConfigId');
}
return await this.service.getEvalVersions(req.params.workflowId, evaluationConfigId);
}
}

View File

@ -28,6 +28,7 @@ import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import type { Publisher } from '@/scaling/pubsub/publisher.service';
import type { Telemetry } from '@/telemetry';
import type { WorkflowRunner } from '@/workflow-runner';
import type { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service';
const wfUnderTestJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/workflow.under-test.json'), { encoding: 'utf-8' }),
@ -47,6 +48,7 @@ describe('TestRunnerService', () => {
const publisher = mock<Publisher>();
const instanceSettings = mock<InstanceSettings>({ hostId: 'test-host-id', isMultiMain: false });
const concurrencyControlService = mock<ConcurrencyControlService>();
const workflowHistoryService = mock<WorkflowHistoryService>();
let testRunnerService: TestRunnerService;
mockInstance(LoadNodesAndCredentials, {
@ -68,6 +70,7 @@ describe('TestRunnerService', () => {
publisher,
instanceSettings,
concurrencyControlService,
workflowHistoryService,
);
testRunRepository.createTestRun.mockResolvedValue(mock<TestRun>({ id: 'test-run-id' }));
@ -511,6 +514,7 @@ describe('TestRunnerService', () => {
publisher,
instanceSettings,
concurrencyControlService,
workflowHistoryService,
);
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS = 'true';
@ -829,6 +833,7 @@ describe('TestRunnerService', () => {
publisher,
instanceSettings,
concurrencyControlService,
workflowHistoryService,
);
});
@ -2316,6 +2321,7 @@ describe('TestRunnerService', () => {
publisher,
instanceSettings,
concurrencyControlService,
workflowHistoryService,
);
const { inFlightTracker } = setupHappyPathMocks(6);
@ -2400,6 +2406,7 @@ describe('TestRunnerService', () => {
publisher,
multiMainInstance,
concurrencyControlService,
workflowHistoryService,
);
setupHappyPathMocks(4);
@ -2418,4 +2425,317 @@ describe('TestRunnerService', () => {
expect(testRunRepository.markAsCompleted).not.toHaveBeenCalled();
});
});
describe('startTestRun - collection context (TRUST-72)', () => {
const USER = mock<{ id: string }>({ id: 'user-1' });
test('loads workflow JSON from WorkflowHistory when workflowVersionId is set', async () => {
workflowRepository.findById.mockResolvedValueOnce({
id: 'wf-1',
name: 'Live',
nodes: [{ name: 'LiveNode' }],
connections: {},
settings: {},
} as never);
workflowHistoryService.findVersion.mockResolvedValueOnce({
versionId: 'wfv-pinned',
nodes: [{ name: 'SnapshotNode' } as never],
connections: { SnapshotNode: {} } as never,
} as never);
testRunRepository.createTestRun.mockResolvedValueOnce(mock<TestRun>({ id: 'tr-pin' }));
// Short-circuit the execution loop so we only assert the lookup side
// effects — the runner's own logic is exercised by other tests.
workflowRepository.findById.mockClear();
const { finished } = await testRunnerService.startTestRun(USER as never, 'wf-1', 1, false, {
collectionId: 'col-1',
workflowVersionId: 'wfv-pinned',
evaluationConfigId: 'cfg-1',
});
// Drain the detached execution promise so we don't leak a microtask
// into the next test.
await finished.catch(() => undefined);
expect(workflowHistoryService.findVersion).toHaveBeenCalledWith('wf-1', 'wfv-pinned');
expect(testRunRepository.createTestRun).toHaveBeenCalledWith(
'wf-1',
expect.objectContaining({
collectionId: 'col-1',
workflowVersionId: 'wfv-pinned',
evaluationConfigId: 'cfg-1',
}),
);
});
test('overwrites workflowData.versionId with the pinned history versionId', async () => {
// `ExecutionPersistence` reads `workflowData.versionId` and stores
// it on the execution row. If the live workflow's `versionId` leaks
// through the spread, every pinned-collection execution would be
// recorded under the wrong version and break compare-view fidelity.
// We capture the workflow object handed to `executeTestRun`'s first
// consumer (`validateWorkflowConfiguration`) to assert the pinned
// versionId rather than the live one is what flows downstream.
workflowRepository.findById.mockResolvedValueOnce({
id: 'wf-1',
name: 'Live',
versionId: 'wfv-live-current',
nodes: [{ name: 'LiveNode' }],
connections: {},
settings: {},
} as never);
workflowHistoryService.findVersion.mockResolvedValueOnce({
versionId: 'wfv-pinned',
nodes: [{ name: 'SnapshotNode' } as never],
connections: { SnapshotNode: {} } as never,
} as never);
testRunRepository.createTestRun.mockResolvedValueOnce(mock<TestRun>({ id: 'tr-pin-v' }));
let capturedWorkflow: { versionId?: string } | undefined;
const validateSpy = jest
.spyOn(
testRunnerService as unknown as {
validateWorkflowConfiguration: (wf: { versionId?: string }) => void;
},
'validateWorkflowConfiguration',
)
.mockImplementation((wf) => {
capturedWorkflow = wf;
// Short-circuit the rest of executeTestRun via a `TestRunError`
// path so the detached promise settles cleanly (markAsError)
// instead of running the dataset trigger against bogus JSON.
throw new TestRunError('EVALUATION_TRIGGER_NOT_FOUND');
});
const { finished } = await testRunnerService.startTestRun(USER as never, 'wf-1', 1, false, {
collectionId: 'col-1',
workflowVersionId: 'wfv-pinned',
evaluationConfigId: 'cfg-1',
});
await finished.catch(() => undefined);
expect(validateSpy).toHaveBeenCalledTimes(1);
expect(capturedWorkflow?.versionId).toBe('wfv-pinned');
expect(capturedWorkflow?.versionId).not.toBe('wfv-live-current');
validateSpy.mockRestore();
});
test('does not call workflowHistoryService.findVersion when workflowVersionId is omitted', async () => {
workflowHistoryService.findVersion.mockClear();
workflowRepository.findById.mockResolvedValueOnce({
id: 'wf-1',
nodes: [],
connections: {},
settings: {},
} as never);
testRunRepository.createTestRun.mockResolvedValueOnce(mock<TestRun>({ id: 'tr-no-pin' }));
const { finished } = await testRunnerService.startTestRun(USER as never, 'wf-1', 1, false);
await finished.catch(() => undefined);
expect(workflowHistoryService.findVersion).not.toHaveBeenCalled();
expect(testRunRepository.createTestRun).toHaveBeenCalledWith(
'wf-1',
expect.objectContaining({ workflowVersionId: null, collectionId: null }),
);
});
});
describe('cancelCollection (TRUST-72)', () => {
test('aborts local running runs and broadcasts cancel-collection in multi-main', async () => {
// USER intentionally unused here — `cancelCollection` does not take a user.
const multiMain = mock<InstanceSettings>({ hostId: 'main-a', isMultiMain: true });
const service = new TestRunnerService(
logger,
telemetry,
workflowRepository,
workflowRunner,
activeExecutions,
testRunRepository,
testCaseExecutionRepository,
errorReporter,
executionsConfig,
mock(),
publisher,
multiMain,
concurrencyControlService,
workflowHistoryService,
);
testRunRepository.find.mockResolvedValue([{ id: 'tr-running' } as never]);
// Seed an abort controller for the running run so the local cancel
// path actually fires.
(
service as unknown as { abortControllers: Map<string, AbortController> }
).abortControllers.set('tr-running', new AbortController());
await service.cancelCollection('col-1');
expect(testRunRepository.requestCancellation).toHaveBeenCalledWith('tr-running');
expect(publisher.publishCommand).toHaveBeenCalledWith({
command: 'cancel-collection',
payload: { collectionId: 'col-1' },
});
});
test('aborts a locally-held `new`-status run (pre-`markAsRunning` window)', async () => {
// `executeTestRun` registers the abort controller in
// `abortControllers` *before* it flips status from `new` to
// `running`. A `cancel-collection` arriving in that window must
// still abort the local controller — querying only `running`
// runs would silently skip these on every main, leaving the
// freshly-kicked run to drain via DB-poll instead of stopping
// immediately.
//
// Critically: drive the mock from the `where` clause so we
// actually exercise the status filter. With a plain
// `mockResolvedValue([...])` the mock would return the seeded
// row regardless of which statuses the caller queried, and the
// test would pass against both pre-fix (`status:'running'`-only)
// and post-fix code — proving nothing.
const seeded = [{ id: 'tr-new-local', status: 'new' as const }];
testRunRepository.find.mockImplementation(async (opts: unknown) => {
const where = (opts as { where: unknown }).where;
const clauses = (Array.isArray(where) ? where : [where]) as Array<{
status?: string;
}>;
const statuses = new Set(clauses.map((c) => c.status));
return seeded.filter((r) => statuses.has(r.status)) as never;
});
const newRunAbort = new AbortController();
(
testRunnerService as unknown as { abortControllers: Map<string, AbortController> }
).abortControllers.set('tr-new-local', newRunAbort);
// Provide a no-op dbManager so the DB-fallback path doesn't throw
// on the pre-fix code path that *would* take it for this run.
// Without this, the test would fail on a transaction TypeError
// before reaching the abort assertions, hiding the actual bug.
const dbManager = mock<{ transaction: jest.Mock }>();
dbManager.transaction.mockImplementation(async (cb: (trx: unknown) => Promise<void>) => {
await cb({});
});
(testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager;
await testRunnerService.cancelCollection('col-new-window');
expect(newRunAbort.signal.aborted).toBe(true);
// Local abort wins — no DB-cancel fallback needed for this run.
expect(testRunRepository.markAsCancelled).not.toHaveBeenCalledWith(
'tr-new-local',
expect.anything(),
);
});
test('falls back to DB cancel for runs not held locally', async () => {
testRunRepository.find.mockResolvedValue([
{ id: 'tr-foreign' } as never,
{ id: 'tr-mine' } as never,
]);
(
testRunnerService as unknown as { abortControllers: Map<string, AbortController> }
).abortControllers.set('tr-mine', new AbortController());
const trxUpdate = jest.fn().mockResolvedValue({ affected: 1 });
const dbManager = mock<{ transaction: jest.Mock }>();
dbManager.transaction.mockImplementation(
async (cb: (trx: { update: jest.Mock }) => Promise<void>) => {
await cb({ update: trxUpdate });
},
);
(testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager;
await testRunnerService.cancelCollection('col-2');
// `tr-foreign` is the run we don't hold locally → fallback fires.
// Assert the update is scoped to `id` AND `status: In([...])` so a
// run that completed between the initial find and this update is
// not clobbered. We don't lock down the exact entity ref — the
// shape of the where clause is the contract.
expect(trxUpdate).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
id: 'tr-foreign',
status: expect.anything(),
}),
expect.objectContaining({ status: 'cancelled' }),
);
// `tr-mine` was held locally → no fallback update should fire.
expect(trxUpdate).not.toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ id: 'tr-mine' }),
expect.anything(),
);
});
test('fallback update does not clobber a run that completed between find and update', async () => {
// `activeRuns` is sampled before the requestCancellation loop and
// pubsub broadcast. By the time the fallback transaction runs, a
// foreign main may have completed (or errored) the run naturally.
// The status-scoped update should affect 0 rows in that case, and
// the test-case sweep must be skipped — otherwise we'd silently
// re-mark a `completed` run as `cancelled` and corrupt the record.
testRunRepository.find.mockResolvedValue([{ id: 'tr-just-finished' } as never]);
const trxUpdate = jest.fn().mockResolvedValue({ affected: 0 }); // race: row no longer 'new'/'running'
const dbManager = mock<{ transaction: jest.Mock }>();
dbManager.transaction.mockImplementation(
async (cb: (trx: { update: jest.Mock }) => Promise<void>) => {
await cb({ update: trxUpdate });
},
);
(testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager;
await testRunnerService.cancelCollection('col-race');
// Update was attempted with status filter — the filter is what
// makes the in-DB WHERE narrow so the row stays untouched.
expect(trxUpdate).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ id: 'tr-just-finished', status: expect.anything() }),
expect.objectContaining({ status: 'cancelled' }),
);
// Update affected 0 → don't sweep cases. (The sweep has its own
// status filter so this is also a redundant check, but it makes
// the "winner takes all" intent explicit at the run level.)
expect(testCaseExecutionRepository.markAllPendingAsCancelled).not.toHaveBeenCalledWith(
'tr-just-finished',
expect.anything(),
);
});
});
describe('cancelTestRun', () => {
test('fallback update does not clobber a run that completed between requestCancellation and update', async () => {
// Mirrors the collection-level race: between `requestCancellation`
// (the DB-flag pass) and this fallback update, a foreign main can
// finish the run naturally. The update must be scoped by status
// so the terminal state wins.
// No abort controller registered → `cancelTestRunLocally` returns
// false → fallback path fires.
const trxUpdate = jest.fn().mockResolvedValue({ affected: 0 });
const dbManager = mock<{ transaction: jest.Mock }>();
dbManager.transaction.mockImplementation(
async (cb: (trx: { update: jest.Mock }) => Promise<void>) => {
await cb({ update: trxUpdate });
},
);
(testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager;
await testRunnerService.cancelTestRun('tr-just-finished');
expect(trxUpdate).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
id: 'tr-just-finished',
status: expect.anything(),
}),
expect.objectContaining({ status: 'cancelled' }),
);
expect(testCaseExecutionRepository.markAllPendingAsCancelled).not.toHaveBeenCalledWith(
'tr-just-finished',
expect.anything(),
);
});
});
});

View File

@ -1,13 +1,21 @@
import { Logger } from '@n8n/backend-common';
import { ExecutionsConfig } from '@n8n/config';
import type { User, TestRun } from '@n8n/db';
import { TestCaseExecutionRepository, TestRunRepository, WorkflowRepository } from '@n8n/db';
import type { User } from '@n8n/db';
import {
TestCaseExecutionRepository,
TestRun,
TestRunRepository,
WorkflowRepository,
} from '@n8n/db';
import { OnPubSubEvent } from '@n8n/decorators';
import { Service } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { In } from '@n8n/typeorm';
import { ErrorReporter, InstanceSettings } from 'n8n-core';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service';
import {
EVALUATION_NODE_TYPE,
EVALUATION_TRIGGER_NODE_TYPE,
@ -80,6 +88,7 @@ export class TestRunnerService {
private readonly publisher: Publisher,
private readonly instanceSettings: InstanceSettings,
private readonly concurrencyControlService: ConcurrencyControlService,
private readonly workflowHistoryService: WorkflowHistoryService,
) {}
/**
@ -539,12 +548,24 @@ export class TestRunnerService {
* The execution loop is detached so callers can return the new
* `testRun.id` without waiting for the run to complete; tests that need
* to observe completion await `finished` directly.
*
* `options` carries the eval-collections context (TRUST-72): when set,
* the new run is pinned to a workflow version (loaded from
* `WorkflowHistory` instead of the live workflow so future edits don't
* disturb cross-run comparability) and tagged with its parent collection
* + the `EvaluationConfig` snapshot captured at run-start.
*/
async startTestRun(
user: User,
workflowId: string,
concurrency: number = 1,
flagEnabledForUser: boolean = false,
options?: {
collectionId?: string;
workflowVersionId?: string;
evaluationConfigId?: string;
evaluationConfigSnapshot?: IDataObject;
},
): Promise<{ testRun: TestRun; finished: Promise<void> }> {
const requestedConcurrency = Math.max(1, Math.min(10, Math.floor(concurrency)));
const evaluationLimit = this.executionsConfig.concurrency.evaluationLimit;
@ -556,14 +577,47 @@ export class TestRunnerService {
this.logger.debug(
`[Eval] runTest called: requestedConcurrency=${requestedConcurrency} effectiveConcurrency=${effectiveConcurrency} evaluationLimit=${evaluationLimit} flagEnabledForUser=${flagEnabledForUser}`,
{ workflowId },
{
workflowId,
collectionId: options?.collectionId,
workflowVersionId: options?.workflowVersionId,
},
);
const workflow = await this.workflowRepository.findById(workflowId);
assert(workflow, 'Workflow not found');
// When pinned to a workflow version (collection runs), load nodes +
// connections from `WorkflowHistory` so the run executes against the
// snapshotted JSON, not whatever the live workflow looks like right
// now. The base workflow row still supplies `id`, `name`, `settings`
// — everything outside the canvas geometry.
const baseWorkflow = await this.workflowRepository.findById(workflowId);
assert(baseWorkflow, 'Workflow not found');
let workflow = baseWorkflow;
if (options?.workflowVersionId) {
const history = await this.workflowHistoryService.findVersion(
workflowId,
options.workflowVersionId,
);
assert(history, `Workflow version ${options.workflowVersionId} not found`);
// `ExecutionPersistence` records `workflowData.versionId` on the
// execution row; keeping `baseWorkflow.versionId` here would tag
// historical-canvas executions with the *current* live version
// and break the comparability promise of the pinned run.
workflow = {
...baseWorkflow,
versionId: history.versionId,
nodes: history.nodes,
connections: history.connections,
} as typeof baseWorkflow;
}
// 0. Create new Test Run
const testRun = await this.testRunRepository.createTestRun(workflowId);
const testRun = await this.testRunRepository.createTestRun(workflowId, {
collectionId: options?.collectionId ?? null,
workflowVersionId: options?.workflowVersionId ?? null,
evaluationConfigId: options?.evaluationConfigId ?? null,
evaluationConfigSnapshot: options?.evaluationConfigSnapshot ?? null,
});
assert(testRun, 'Unable to create a test run');
// Detach the long-running execution from the awaited setup so callers
@ -1056,6 +1110,10 @@ export class TestRunnerService {
// Send telemetry event with complete metadata
const telemetryPayload: Record<string, GenericValue> = {
...telemetryMeta,
// Collection context (TRUST-72). `collection_id` is null for legacy
// one-off runs so analytics can split flag-on/off cohorts cleanly.
collection_id: testRun.collectionId ?? null,
is_part_of_collection: testRun.collectionId !== null,
};
// Add success-specific fields
@ -1105,6 +1163,109 @@ export class TestRunnerService {
this.cancelTestRunLocally(testRunId);
}
/**
* Handle cancel-collection pub/sub command from other main instances.
* Each main aborts the in-flight runs it owns that belong to the
* collection runs not held locally are no-ops, mirroring the per-run
* cancel path's "if not running locally, fall back to DB" semantics.
*/
@OnPubSubEvent('cancel-collection', { instanceType: 'main' })
async handleCancelCollectionCommand({ collectionId }: { collectionId: string }) {
this.logger.debug('Received cancel-collection command via pub/sub', { collectionId });
await this.cancelCollectionLocally(collectionId);
}
private async cancelCollectionLocally(collectionId: string): Promise<string[]> {
// Match the active-status filter `cancelCollection` uses for DB-flag
// cancellation. `executeTestRun` registers the abort controller in
// `abortControllers` *before* `markAsRunning` flips status from `new`
// to `running`, so a freshly-kicked-off run is locally abortable
// while still showing as `new` in DB. Querying only `running` here
// would silently skip that window on every main — both the
// initiator (called from `cancelCollection`) and foreign mains
// (called from `handleCancelCollectionCommand`). The result set is
// still bounded by the collection size (≤ a handful in practice).
const runs = await this.testRunRepository.find({
where: [
{ collectionId, status: 'running' },
{ collectionId, status: 'new' },
],
select: ['id'],
});
const cancelledLocally: string[] = [];
for (const { id } of runs) {
if (this.cancelTestRunLocally(id)) cancelledLocally.push(id);
}
return cancelledLocally;
}
/**
* Cancels every in-flight run in an evaluation collection. Mirrors
* {@link cancelTestRun}'s multi-main fan-out: each main aborts what it
* holds locally, the DB cancelRequested flag is set per run as a fallback,
* and pubsub broadcasts so foreign mains pick up the signal too.
*/
async cancelCollection(collectionId: string): Promise<{ cancelledRunIds: string[] }> {
// 1. Mark every run in the collection that's still active for
// fallback DB-poll cancellation. Runs already terminal stay terminal.
const activeRuns = await this.testRunRepository.find({
where: [
{ collectionId, status: 'running' },
{ collectionId, status: 'new' },
],
select: ['id'],
});
for (const { id } of activeRuns) {
await this.testRunRepository.requestCancellation(id);
}
// 2. Try local cancellation for whatever this main owns.
const cancelledLocally = await this.cancelCollectionLocally(collectionId);
// 3. In multi-main or queue mode, broadcast so foreign mains can
// cancel their share.
if (this.instanceSettings.isMultiMain || this.executionsConfig.mode === 'queue') {
this.logger.debug('Broadcasting cancel-collection command via pub/sub', { collectionId });
await this.publisher.publishCommand({
command: 'cancel-collection',
payload: { collectionId },
});
}
// 4. Anything we didn't own locally and that's still flagged as
// pending: mark it cancelled in DB as a fallback (matches
// `cancelTestRun`'s contract for unreachable instances).
//
// Race protection: `activeRuns` was sampled before the requestCancellation
// loop and before the pubsub broadcast — by now a foreign main may have
// completed a run naturally (status → `completed`/`error`). A plain
// `markAsCancelled(id)` would clobber that terminal status with
// `cancelled` and corrupt the run record. Scope the update by status
// so it only fires on rows that are still active; if 0 rows were
// affected, the natural completion wins and we skip the test-case
// sweep too (its own status filter would no-op anyway, but skipping
// makes the "winner takes all" intent explicit).
const localSet = new Set(cancelledLocally);
const fallbackRunIds = activeRuns.map((r) => r.id).filter((id) => !localSet.has(id));
if (fallbackRunIds.length > 0) {
const { manager: dbManager } = this.testRunRepository;
await dbManager.transaction(async (trx) => {
for (const runId of fallbackRunIds) {
const result = await trx.update(
TestRun,
{ id: runId, status: In(['new', 'running']) },
{ status: 'cancelled', completedAt: new Date() },
);
if (result.affected) {
await this.testCaseExecutionRepository.markAllPendingAsCancelled(runId, trx);
}
}
});
}
return { cancelledRunIds: activeRuns.map((r) => r.id) };
}
/**
* Cancels the test run with the given ID.
* In multi-main mode, this broadcasts the cancellation to all instances via pub/sub
@ -1126,14 +1287,24 @@ export class TestRunnerService {
});
}
// 4. If not running locally, mark as cancelled in DB as a fallback
// This handles both single-main (where this is the only instance) and multi-main
// (where the running instance may be dead or unreachable via pub/sub)
// 4. If not running locally, mark as cancelled in DB as a fallback.
// This handles both single-main (where this is the only instance) and
// multi-main (where the running instance may be dead or unreachable
// via pub/sub). Same race-protection as `cancelCollection`: between
// the requestCancellation flag and this update, a foreign main may
// have completed the run naturally — scope by status so the
// terminal state wins.
if (!cancelledLocally) {
const { manager: dbManager } = this.testRunRepository;
await dbManager.transaction(async (trx) => {
await this.testRunRepository.markAsCancelled(testRunId, trx);
await this.testCaseExecutionRepository.markAllPendingAsCancelled(testRunId, trx);
const result = await trx.update(
TestRun,
{ id: testRunId, status: In(['new', 'running']) },
{ status: 'cancelled', completedAt: new Date() },
);
if (result.affected) {
await this.testCaseExecutionRepository.markAllPendingAsCancelled(testRunId, trx);
}
});
}
}

View File

@ -174,6 +174,15 @@ export type PubSubCommandMap = {
testRunId: string;
};
/**
* Cancel every running test run inside an evaluation collection across all
* main instances. Used when a user cancels a collection each main checks
* its in-flight runs and aborts those that belong to the collection.
*/
'cancel-collection': {
collectionId: string;
};
// #endregion
// #region Agents

View File

@ -64,6 +64,7 @@ export namespace PubSub {
ToCommand<'reload-sso-provisioning-configuration'>;
export type ReloadSourceControlConfiguration = ToCommand<'reload-source-control-config'>;
export type CancelTestRun = ToCommand<'cancel-test-run'>;
export type CancelCollection = ToCommand<'cancel-collection'>;
export type AgentChatIntegrationChanged = ToCommand<'agent-chat-integration-changed'>;
export type AgentConfigChanged = ToCommand<'agent-config-changed'>;
}
@ -94,6 +95,7 @@ export namespace PubSub {
| Commands.ReloadSsoProvisioningConfiguration
| Commands.ReloadSourceControlConfiguration
| Commands.CancelTestRun
| Commands.CancelCollection
| Commands.AgentChatIntegrationChanged
| Commands.AgentConfigChanged;

View File

@ -61,6 +61,7 @@ import '@/node-execution/ephemeral-node-executor';
import '@/license/license.controller';
import '@/evaluation.ee/test-runs.controller.ee';
import '@/evaluation.ee/evaluation-config.controller';
import '@/evaluation.ee/evaluation-collections.controller.ee';
import '@/workflows/workflow-history/workflow-history.controller';
import '@/workflows/workflows.controller';
import '@/modules/workflow-index/workflow-dependency.controller';

View File

@ -1,6 +1,11 @@
import { mockLogger, mockInstance } from '@n8n/backend-test-utils';
import type { WorkflowHistory } from '@n8n/db';
import { User, WorkflowHistoryRepository, WorkflowPublishHistoryRepository } from '@n8n/db';
import {
User,
WorkflowHistoryRepository,
WorkflowPublishHistoryRepository,
WorkflowRepository,
} from '@n8n/db';
import type { UpdateResult } from '@n8n/typeorm';
import { getWorkflow, getWorkflowHistory } from '@test-integration/workflow';
import { mockClear } from 'jest-mock-extended';
@ -13,6 +18,7 @@ import { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-hi
const workflowHistoryRepository = mockInstance(WorkflowHistoryRepository);
const workflowPublishHistoryRepository = mockInstance(WorkflowPublishHistoryRepository);
const workflowRepository = mockInstance(WorkflowRepository);
const logger = mockLogger();
const workflowFinderService = mockInstance(WorkflowFinderService);
const eventService = mockInstance(EventService);
@ -20,6 +26,7 @@ const workflowHistoryService = new WorkflowHistoryService(
logger,
workflowHistoryRepository,
workflowPublishHistoryRepository,
workflowRepository,
workflowFinderService,
eventService,
);
@ -447,4 +454,80 @@ describe('WorkflowHistoryService', () => {
);
});
});
describe('snapshotCurrent', () => {
it('returns the existing versionId without inserting when a row already exists for the workflow draft', async () => {
const workflow = getWorkflow({ addNodeWithoutCreds: true });
workflow.id = 'wf-1';
workflow.versionId = 'wfv-current';
workflowRepository.findOneBy.mockResolvedValueOnce(workflow);
workflowHistoryRepository.findOne.mockResolvedValueOnce(
getWorkflowHistory(workflow, { versionId: 'wfv-current' }),
);
const result = await workflowHistoryService.snapshotCurrent('wf-1');
expect(result).toEqual({ versionId: 'wfv-current' });
expect(workflowHistoryRepository.insert).not.toHaveBeenCalled();
});
it('inserts a snapshot when no history row exists for the workflow draft', async () => {
const workflow = getWorkflow({ addNodeWithoutCreds: true });
workflow.id = 'wf-1';
workflow.versionId = 'wfv-fresh';
workflow.connections = {};
workflowRepository.findOneBy.mockResolvedValueOnce(workflow);
// First findOne: no existing history row → insert path.
// Second findOne: post-save verification that the row now exists.
workflowHistoryRepository.findOne.mockResolvedValueOnce(null);
workflowHistoryRepository.findOne.mockResolvedValueOnce(
getWorkflowHistory(workflow, { versionId: 'wfv-fresh' }),
);
const result = await workflowHistoryService.snapshotCurrent('wf-1');
expect(result).toEqual({ versionId: 'wfv-fresh' });
expect(workflowHistoryRepository.insert).toHaveBeenCalledWith(
expect.objectContaining({
authors: 'eval-snapshot',
versionId: 'wfv-fresh',
workflowId: 'wf-1',
nodes: workflow.nodes,
connections: workflow.connections,
}),
);
});
it('throws when the workflow does not exist', async () => {
workflowRepository.findOneBy.mockResolvedValueOnce(null);
await expect(workflowHistoryService.snapshotCurrent('wf-missing')).rejects.toThrow(
'Workflow wf-missing not found',
);
});
it('throws when the snapshot insert silently fails (saveVersion swallowed the error)', async () => {
// `saveVersion` logs+swallows insert errors so the regular save
// flow can never be blocked by a history-row failure. For the
// snapshot path that's a footgun: the caller would get back a
// `versionId` with no matching row and hit a generic
// `findVersion` assert deep inside the test runner. Verify that
// `snapshotCurrent` re-fetches and fails loudly when the row did
// not materialise.
const workflow = getWorkflow({ addNodeWithoutCreds: true });
workflow.id = 'wf-1';
workflow.versionId = 'wfv-fresh';
workflow.connections = {};
workflowRepository.findOneBy.mockResolvedValueOnce(workflow);
workflowHistoryRepository.findOne.mockResolvedValueOnce(null);
// Simulate insert failure: saveVersion's try/catch logs the
// underlying error and returns normally. The post-save verify
// findOne still sees no row.
workflowHistoryRepository.insert.mockRejectedValueOnce(new Error('insert blew up'));
workflowHistoryRepository.findOne.mockResolvedValueOnce(null);
await expect(workflowHistoryService.snapshotCurrent('wf-1')).rejects.toThrow(
'Failed to persist workflow history snapshot for workflow wf-1',
);
});
});
});

View File

@ -5,6 +5,7 @@ import {
WorkflowHistory,
WorkflowHistoryRepository,
WorkflowPublishHistoryRepository,
WorkflowRepository,
} from '@n8n/db';
import { Service } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
@ -26,6 +27,7 @@ export class WorkflowHistoryService {
private readonly logger: Logger,
private readonly workflowHistoryRepository: WorkflowHistoryRepository,
private readonly workflowPublishHistoryRepository: WorkflowPublishHistoryRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly workflowFinderService: WorkflowFinderService,
private readonly eventService: EventService,
) {}
@ -107,6 +109,61 @@ export class WorkflowHistoryService {
});
}
/**
* Ensure a {@link WorkflowHistory} row exists for the workflow's *current*
* draft state and return its `versionId`. Used by the eval-collections
* setup wizard when the user picks "current draft" the run is pinned to
* an immutable snapshot so future edits don't break comparability with
* sibling runs in the collection.
*
* Idempotent: if a history row already exists for the workflow's current
* `versionId`, we return it unchanged (no duplicate snapshot, no churn in
* the version history list). License-on instances will have a history row
* from the regular save flow; license-off instances get the snapshot
* lazily here for eval comparability without otherwise enabling history.
*/
async snapshotCurrent(workflowId: string): Promise<{ versionId: string }> {
const workflow = await this.workflowRepository.findOneBy({ id: workflowId });
if (!workflow) {
throw new UnexpectedError(`Workflow ${workflowId} not found`);
}
const existing = await this.workflowHistoryRepository.findOne({
where: { workflowId, versionId: workflow.versionId },
select: ['versionId'],
});
if (existing) return { versionId: existing.versionId };
await this.saveVersion(
'eval-snapshot',
{
versionId: workflow.versionId,
nodes: workflow.nodes,
connections: workflow.connections,
},
workflowId,
);
// `saveVersion` deliberately swallows insert errors (it only logs them)
// so the regular workflow-save flow can never be blocked by a history
// write failure. The snapshot use case is different: callers will
// hand this `versionId` to `findVersion()` moments later and assert
// the row is non-null. Verify persistence here and fail loudly while
// we still have the caller's stack — otherwise the next reader hits
// a generic "version not found" deep inside the test runner.
const persisted = await this.workflowHistoryRepository.findOne({
where: { workflowId, versionId: workflow.versionId },
select: ['versionId'],
});
if (!persisted) {
throw new UnexpectedError(
`Failed to persist workflow history snapshot for workflow ${workflowId}`,
);
}
return { versionId: persisted.versionId };
}
async saveVersion(
user: User | string,
workflow: {