diff --git a/packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts b/packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts index a98a989bb6a..ca69f0b8d0d 100644 --- a/packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts +++ b/packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts @@ -28,6 +28,7 @@ export type PubSubEventName = | 'reload-sso-provisioning-configuration' | 'reload-source-control-config' | 'cancel-test-run' + | 'cancel-collection' | 'agent-chat-integration-changed' | 'agent-config-changed'; diff --git a/packages/cli/src/evaluation.ee/__tests__/evaluation-collection.service.test.ts b/packages/cli/src/evaluation.ee/__tests__/evaluation-collection.service.test.ts new file mode 100644 index 00000000000..263b1f4d50e --- /dev/null +++ b/packages/cli/src/evaluation.ee/__tests__/evaluation-collection.service.test.ts @@ -0,0 +1,507 @@ +import type { CreateEvaluationCollectionPayload } from '@n8n/api-types'; +import type { + EvaluationCollection, + EvaluationCollectionRepository, + EvaluationConfig, + EvaluationConfigRepository, + TestRun, + TestRunRepository, + User, + WorkflowHistory, + WorkflowHistoryRepository, + WorkflowPublishedVersion, + WorkflowPublishedVersionRepository, +} from '@n8n/db'; +import { mock } from 'jest-mock-extended'; + +import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import type { Telemetry } from '@/telemetry'; +import type { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service'; + +import { EvaluationCollectionService } from '../evaluation-collection.service'; +import type { TestRunnerService } from '../test-runner/test-runner.service.ee'; + +function makeConfig(over: Partial = {}): EvaluationConfig { + return { + id: 'cfg-1', + workflowId: 'wf-1', + name: 'cfg', + status: 'valid', + invalidReason: null, + datasetSource: 'data_table', + datasetRef: { dataTableId: 'dt-1' }, + startNodeName: 'Start', + endNodeName: 'End', + metrics: [], + createdAt: new Date('2026-01-01T00:00:00Z'), + updatedAt: new Date('2026-01-02T00:00:00Z'), + workflow: undefined as never, + ...over, + } as EvaluationConfig; +} + +function makeCollection(over: Partial = {}): EvaluationCollection { + return { + id: 'col-1', + name: 'My collection', + description: null, + workflowId: 'wf-1', + evaluationConfigId: 'cfg-1', + createdById: 'user-1', + insightsCache: null, + createdAt: new Date('2026-02-01T00:00:00Z'), + updatedAt: new Date('2026-02-01T00:00:00Z'), + workflow: undefined as never, + evaluationConfig: undefined as never, + ...over, + } as EvaluationCollection; +} + +function makeTestRun(over: Partial = {}): TestRun { + return { + id: 'tr-1', + status: 'completed', + workflowId: 'wf-1', + evaluationConfigId: 'cfg-1', + workflowVersionId: 'wfv-1', + collectionId: null, + evaluationConfigSnapshot: null, + metrics: { accuracy: 0.9 }, + runAt: new Date('2026-03-01T00:00:00Z'), + completedAt: new Date('2026-03-01T00:01:00Z'), + createdAt: new Date('2026-03-01T00:00:00Z'), + updatedAt: new Date('2026-03-01T00:01:00Z'), + errorCode: null, + errorDetails: null, + cancelRequested: false, + runningInstanceId: null, + workflow: undefined as never, + testCaseExecutions: [], + evaluationConfig: null, + collection: null, + ...over, + } as TestRun; +} + +const user = mock({ id: 'user-1' }); + +function makePayload( + over: Partial = {}, +): CreateEvaluationCollectionPayload { + return { + name: 'My collection', + evaluationConfigId: 'cfg-1', + versions: [{ workflowVersionId: 'wfv-1' }], + ...over, + }; +} + +describe('EvaluationCollectionService', () => { + let service: EvaluationCollectionService; + let collectionRepo: jest.Mocked; + let testRunRepo: jest.Mocked; + let evalConfigRepo: jest.Mocked; + let workflowHistoryRepo: jest.Mocked; + let publishedVersionRepo: jest.Mocked; + let workflowHistoryService: jest.Mocked; + let testRunnerService: jest.Mocked; + let telemetry: jest.Mocked; + + beforeEach(() => { + collectionRepo = mock(); + testRunRepo = mock(); + evalConfigRepo = mock(); + workflowHistoryRepo = mock(); + publishedVersionRepo = mock(); + workflowHistoryService = mock(); + testRunnerService = mock(); + telemetry = mock(); + + service = new EvaluationCollectionService( + collectionRepo, + testRunRepo, + evalConfigRepo, + workflowHistoryRepo, + publishedVersionRepo, + workflowHistoryService, + testRunnerService, + telemetry, + ); + + evalConfigRepo.findByIdAndWorkflowId.mockResolvedValue(makeConfig()); + workflowHistoryService.findVersion.mockResolvedValue( + mock({ versionId: 'wfv-1' }), + ); + publishedVersionRepo.findOneBy.mockResolvedValue(null); + workflowHistoryService.snapshotCurrent.mockResolvedValue({ versionId: 'wfv-snap' }); + testRunRepo.findOneBy.mockResolvedValue(makeTestRun()); + testRunRepo.find.mockResolvedValue([]); + collectionRepo.createCollection.mockImplementation(async (input) => + makeCollection({ id: input.id, name: input.name, description: input.description }), + ); + collectionRepo.getDetailByIdAndWorkflowId.mockResolvedValue({ + collection: makeCollection(), + runs: [], + }); + testRunnerService.startTestRun.mockResolvedValue({ + testRun: makeTestRun({ id: 'tr-new', status: 'new' }), + finished: Promise.resolve(), + }); + }); + + describe('createCollection', () => { + it('rejects when the evaluation config does not belong to the workflow', async () => { + evalConfigRepo.findByIdAndWorkflowId.mockResolvedValueOnce(null); + await expect(service.createCollection(user, 'wf-1', makePayload())).rejects.toThrow( + NotFoundError, + ); + expect(collectionRepo.createCollection).not.toHaveBeenCalled(); + }); + + it('rejects when an existingTestRunId belongs to a different workflow', async () => { + testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowId: 'other-wf' })); + await expect( + service.createCollection( + user, + 'wf-1', + makePayload({ + versions: [{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-other' }], + }), + ), + ).rejects.toThrow(BadRequestError); + expect(collectionRepo.createCollection).not.toHaveBeenCalled(); + }); + + it('rejects when an existingTestRunId belongs to a different evaluation config', async () => { + testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ evaluationConfigId: 'cfg-other' })); + await expect( + service.createCollection( + user, + 'wf-1', + makePayload({ + versions: [{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-x' }], + }), + ), + ).rejects.toThrow(BadRequestError); + }); + + it('rejects when existingTestRunId was executed against a different workflow version than requested', async () => { + // User asks "compare on wfv-A" but supplies a run that executed + // on wfv-B. Without this guard the service would silently attach + // the wfv-B run and never schedule wfv-A — the "collection on + // wfv-A" would actually be "collection on wfv-B". + testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowVersionId: 'wfv-B' })); + await expect( + service.createCollection( + user, + 'wf-1', + makePayload({ + versions: [{ workflowVersionId: 'wfv-A', existingTestRunId: 'tr-x' }], + }), + ), + ).rejects.toThrow(BadRequestError); + expect(collectionRepo.createCollection).not.toHaveBeenCalled(); + }); + + it('rejects when existingTestRunId is unpinned (legacy run with no workflowVersionId)', async () => { + // Unpinned runs can have executed against any historical workflow + // state, so they break collection comparability by construction. + testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowVersionId: null })); + await expect( + service.createCollection( + user, + 'wf-1', + makePayload({ + versions: [{ workflowVersionId: 'wfv-1', existingTestRunId: 'tr-legacy' }], + }), + ), + ).rejects.toThrow(BadRequestError); + }); + + it('attaches existing runs and schedules new runs for missing versions', async () => { + await service.createCollection( + user, + 'wf-1', + makePayload({ + versions: [ + { workflowVersionId: 'wfv-1', existingTestRunId: 'tr-existing' }, + { workflowVersionId: 'wfv-2' }, + ], + }), + ); + + expect(collectionRepo.createCollection).toHaveBeenCalledTimes(1); + expect(collectionRepo.addRunsToCollection).toHaveBeenCalledWith(expect.any(String), [ + 'tr-existing', + ]); + expect(testRunnerService.startTestRun).toHaveBeenCalledTimes(1); + expect(testRunnerService.startTestRun).toHaveBeenCalledWith( + user, + 'wf-1', + expect.any(Number), + expect.any(Boolean), + expect.objectContaining({ + workflowVersionId: 'wfv-2', + evaluationConfigId: 'cfg-1', + collectionId: expect.any(String), + evaluationConfigSnapshot: expect.objectContaining({ id: 'cfg-1' }), + }), + ); + }); + + it('snapshots a new workflow version for "current draft" entries before scheduling', async () => { + await service.createCollection( + user, + 'wf-1', + makePayload({ + versions: [{ workflowVersionId: null }], + }), + ); + + expect(workflowHistoryService.snapshotCurrent).toHaveBeenCalledTimes(1); + expect(workflowHistoryService.snapshotCurrent).toHaveBeenCalledWith('wf-1'); + expect(testRunnerService.startTestRun).toHaveBeenCalledWith( + user, + 'wf-1', + expect.any(Number), + expect.any(Boolean), + expect.objectContaining({ workflowVersionId: 'wfv-snap' }), + ); + }); + + it('emits Eval collection created telemetry with expected counts', async () => { + await service.createCollection( + user, + 'wf-1', + makePayload({ + versions: [ + { workflowVersionId: 'wfv-1', existingTestRunId: 'tr-existing' }, + { workflowVersionId: 'wfv-2' }, + { workflowVersionId: null }, + ], + }), + ); + + expect(telemetry.track).toHaveBeenCalledWith( + 'Eval collection created', + expect.objectContaining({ + version_count: 3, + existing_run_count: 1, + new_run_count: 2, + evaluation_config_id: 'cfg-1', + dataset_id: 'dt-1', + }), + ); + }); + }); + + describe('deleteCollection', () => { + it('emits Eval collection deleted telemetry with runs_unlinked', async () => { + collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection()); + collectionRepo.deleteByIdAndWorkflowId.mockResolvedValueOnce({ + deleted: true, + runsUnlinked: 3, + }); + + await service.deleteCollection(user, 'wf-1', 'col-1'); + + expect(telemetry.track).toHaveBeenCalledWith( + 'Eval collection deleted', + expect.objectContaining({ collection_id: 'col-1', runs_unlinked: 3 }), + ); + }); + + it('broadcasts cancel-collection when any run is still active', async () => { + collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection()); + testRunRepo.find.mockResolvedValueOnce([{ id: 'tr-active' } as TestRun]); + collectionRepo.deleteByIdAndWorkflowId.mockResolvedValueOnce({ + deleted: true, + runsUnlinked: 1, + }); + + await service.deleteCollection(user, 'wf-1', 'col-1'); + + expect(testRunnerService.cancelCollection).toHaveBeenCalledWith('col-1'); + }); + + it('throws NotFoundError when collection does not exist', async () => { + collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(null); + await expect(service.deleteCollection(user, 'wf-1', 'col-x')).rejects.toThrow(NotFoundError); + }); + + it('does not cancel runs when the collection belongs to a different workflow', async () => { + // A caller with `workflow:update` on wf-1 must not be able to + // trigger cancellation side effects on a collection owned by + // wf-other just by guessing its id — ownership has to be verified + // before the active-runs query reaches the cancel path. + collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(null); + + await expect( + service.deleteCollection(user, 'wf-1', 'col-from-other-workflow'), + ).rejects.toThrow(NotFoundError); + + expect(testRunRepo.find).not.toHaveBeenCalled(); + expect(testRunnerService.cancelCollection).not.toHaveBeenCalled(); + expect(collectionRepo.deleteByIdAndWorkflowId).not.toHaveBeenCalled(); + }); + }); + + describe('addRunToCollection', () => { + it('rejects when the run belongs to a different evaluation config', async () => { + collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection()); + testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ evaluationConfigId: 'cfg-other' })); + await expect(service.addRunToCollection('wf-1', 'col-1', 'tr-bad')).rejects.toThrow( + BadRequestError, + ); + }); + + it('rejects when the run is unpinned (legacy run with no workflowVersionId)', async () => { + // Mirrors the create-path invariant: an unpinned run could have + // executed against any historical workflow state and therefore + // cannot satisfy the collection's comparability promise. + collectionRepo.findByIdAndWorkflowId.mockResolvedValueOnce(makeCollection()); + testRunRepo.findOneBy.mockResolvedValueOnce(makeTestRun({ workflowVersionId: null })); + + await expect(service.addRunToCollection('wf-1', 'col-1', 'tr-legacy')).rejects.toThrow( + BadRequestError, + ); + + expect(collectionRepo.addRunsToCollection).not.toHaveBeenCalled(); + }); + }); + + describe('getEvalVersions', () => { + it('annotates the highest-scoring version as best and runs below 0.6 as critical', async () => { + const versions: WorkflowHistory[] = [ + mock({ + versionId: 'wfv-a', + name: 'A', + autosaved: false, + createdAt: new Date('2026-04-01'), + }), + mock({ + versionId: 'wfv-b', + name: 'B', + autosaved: false, + createdAt: new Date('2026-04-02'), + }), + ]; + workflowHistoryRepo.find.mockResolvedValueOnce(versions); + // Single bulk lookup, descending by createdAt — service picks the + // first row per `workflowVersionId` it sees, which is the latest. + testRunRepo.find.mockResolvedValueOnce([ + makeTestRun({ id: 'tr-a', workflowVersionId: 'wfv-a', metrics: { acc: 0.9 } }), + makeTestRun({ id: 'tr-b', workflowVersionId: 'wfv-b', metrics: { acc: 0.5 } }), + ]); + + const result = await service.getEvalVersions('wf-1', 'cfg-1'); + + const a = result.versions.find((v) => v.workflowVersionId === 'wfv-a'); + const b = result.versions.find((v) => v.workflowVersionId === 'wfv-b'); + expect(a?.lastRun?.isBest).toBe(true); + expect(a?.lastRun?.isCritical).toBe(false); + expect(b?.lastRun?.isBest).toBe(false); + expect(b?.lastRun?.isCritical).toBe(true); + }); + + it('includes a current-draft row with no last run', async () => { + workflowHistoryRepo.find.mockResolvedValueOnce([]); + const result = await service.getEvalVersions('wf-1', 'cfg-1'); + expect(result.versions[0]).toEqual( + expect.objectContaining({ + workflowVersionId: null, + isCurrent: true, + lastRun: null, + }), + ); + }); + + it('current-draft entry ignores runs without a workflowVersionId (legacy one-offs)', async () => { + // Legacy runs that pre-date eval-collections have no + // `workflowVersionId`. The wizard surfaces them as "Ungrouped + // runs" elsewhere, not in the versions picker — the current-draft + // row stays `lastRun: null` regardless. + workflowHistoryRepo.find.mockResolvedValueOnce([]); + testRunRepo.find.mockResolvedValueOnce([ + makeTestRun({ id: 'tr-legacy', workflowVersionId: null, metrics: { acc: 0.95 } }), + ]); + + const result = await service.getEvalVersions('wf-1', 'cfg-1'); + + const draft = result.versions.find((v) => v.workflowVersionId === null); + expect(draft?.lastRun).toBeNull(); + }); + + it('issues exactly one bulk run lookup regardless of history size', async () => { + const history = Array.from({ length: 10 }, (_, i) => + mock({ + versionId: `wfv-${i}`, + name: `v${i}`, + autosaved: false, + createdAt: new Date(`2026-04-${i + 1 < 10 ? '0' + (i + 1) : i + 1}`), + }), + ); + workflowHistoryRepo.find.mockResolvedValueOnce(history); + testRunRepo.find.mockResolvedValueOnce([]); + + await service.getEvalVersions('wf-1', 'cfg-1'); + + expect(testRunRepo.find).toHaveBeenCalledTimes(1); + }); + + it('loads history with a metadata-only column selection (skips nodes/connections JSON)', async () => { + workflowHistoryRepo.find.mockResolvedValueOnce([]); + testRunRepo.find.mockResolvedValueOnce([]); + + await service.getEvalVersions('wf-1', 'cfg-1'); + + expect(workflowHistoryRepo.find).toHaveBeenCalledWith( + expect.objectContaining({ + select: expect.arrayContaining(['versionId', 'name', 'autosaved', 'createdAt']), + }), + ); + // Sanity: nodes/connections should not be in the selection (those are + // the fat columns we're deliberately excluding). + const call = workflowHistoryRepo.find.mock.calls[0][0] as { select: string[] }; + expect(call.select).not.toContain('nodes'); + expect(call.select).not.toContain('connections'); + }); + + it('labels the currently-published version as "Published" rather than "Named"', async () => { + workflowHistoryRepo.find.mockResolvedValueOnce([ + mock({ + versionId: 'wfv-pub', + name: 'v3', + autosaved: false, + createdAt: new Date('2026-04-01'), + }), + mock({ + versionId: 'wfv-named', + name: 'experiment', + autosaved: false, + createdAt: new Date('2026-04-02'), + }), + mock({ + versionId: 'wfv-auto', + name: null, + autosaved: true, + createdAt: new Date('2026-04-03'), + }), + ]); + testRunRepo.find.mockResolvedValueOnce([]); + publishedVersionRepo.findOneBy.mockResolvedValueOnce( + mock({ publishedVersionId: 'wfv-pub' }), + ); + + const result = await service.getEvalVersions('wf-1', 'cfg-1'); + + const pub = result.versions.find((v) => v.workflowVersionId === 'wfv-pub'); + const named = result.versions.find((v) => v.workflowVersionId === 'wfv-named'); + const auto = result.versions.find((v) => v.workflowVersionId === 'wfv-auto'); + expect(pub?.sourceLabel).toMatch(/^Published/); + expect(named?.sourceLabel).toMatch(/^Named/); + expect(auto?.sourceLabel).toMatch(/^Autosaved/); + }); + }); +}); diff --git a/packages/cli/src/evaluation.ee/__tests__/evaluation-collections.controller.test.ts b/packages/cli/src/evaluation.ee/__tests__/evaluation-collections.controller.test.ts new file mode 100644 index 00000000000..b1036d48aa4 --- /dev/null +++ b/packages/cli/src/evaluation.ee/__tests__/evaluation-collections.controller.test.ts @@ -0,0 +1,189 @@ +import { EVAL_COLLECTIONS_FLAG } from '@n8n/api-types'; +import type { Logger } from '@n8n/backend-common'; +import type { AuthenticatedRequest, User } from '@n8n/db'; +import { ControllerRegistryMetadata } from '@n8n/decorators'; +import { Container } from '@n8n/di'; +import { mock } from 'jest-mock-extended'; + +import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import type { PostHogClient } from '@/posthog'; + +import { EvaluationCollectionsController } from '../evaluation-collections.controller.ee'; +import type { EvaluationCollectionService } from '../evaluation-collection.service'; + +describe('EvaluationCollectionsController', () => { + let controller: EvaluationCollectionsController; + let service: jest.Mocked; + let postHogClient: jest.Mocked; + let logger: jest.Mocked; + + const user = mock({ id: 'user-1' }); + + beforeEach(() => { + service = mock(); + postHogClient = mock(); + logger = mock(); + // Default: flag ON for the user. Each test that needs flag-off semantics + // overrides this with `mockResolvedValueOnce({})` (no flag entry). + postHogClient.getFeatureFlags.mockResolvedValue({ + eval_collections: true, + } as Record); + controller = new EvaluationCollectionsController(service, postHogClient, logger); + }); + + type WithBody = AuthenticatedRequest

& { body: B }; + + function makeReq

, B = unknown>( + params: P, + body: B = {} as B, + query: Record = {}, + ): WithBody { + return { user, params, body, query } as unknown as WithBody; + } + + describe('flag gating', () => { + it.each([ + ['list', async () => await controller.list(makeReq({ workflowId: 'wf-1' }))], + [ + 'get', + async () => await controller.get(makeReq({ workflowId: 'wf-1', collectionId: 'col-1' })), + ], + [ + 'create', + async () => + await controller.create(makeReq({ workflowId: 'wf-1' }), undefined, {} as never), + ], + [ + 'update', + async () => + await controller.update( + makeReq({ workflowId: 'wf-1', collectionId: 'col-1' }), + undefined, + {} as never, + ), + ], + [ + 'delete', + async () => await controller.delete(makeReq({ workflowId: 'wf-1', collectionId: 'col-1' })), + ], + [ + 'addRun', + async () => + await controller.addRun( + makeReq({ workflowId: 'wf-1', collectionId: 'col-1' }), + undefined, + { testRunId: 'tr-x' }, + ), + ], + [ + 'removeRun', + async () => + await controller.removeRun( + makeReq({ workflowId: 'wf-1', collectionId: 'col-1', runId: 'tr-x' }), + ), + ], + [ + 'listVersions', + async () => + await controller.listVersions( + makeReq({ workflowId: 'wf-1' }, {}, { evaluationConfigId: 'cfg-1' }), + ), + ], + ])('rejects %s with 404 when flag is off', async (_name, call) => { + postHogClient.getFeatureFlags.mockResolvedValueOnce({}); + await expect(call()).rejects.toThrow(NotFoundError); + }); + + it('treats a PostHog outage as flag-off (fail-open to 404, not 500)', async () => { + postHogClient.getFeatureFlags.mockRejectedValueOnce(new Error('posthog timeout')); + await expect(controller.list(makeReq({ workflowId: 'wf-1' }))).rejects.toThrow(NotFoundError); + }); + + it('uses the spec-declared flag id', () => { + // Future-proofs the rollout: if anyone renames the flag without + // also updating the spec/PostHog, this test fails immediately. + expect(EVAL_COLLECTIONS_FLAG).toBe('eval_collections'); + }); + }); + + describe('list', () => { + it('delegates to service when flag on', async () => { + service.listCollections.mockResolvedValueOnce([]); + await controller.list(makeReq({ workflowId: 'wf-1' })); + expect(service.listCollections).toHaveBeenCalledWith('wf-1'); + }); + }); + + describe('create', () => { + it('delegates to service with payload + user', async () => { + service.createCollection.mockResolvedValueOnce({ + record: { id: 'col-1' } as never, + runsStartedIds: ['tr-1'], + }); + + const payload = { + name: 'c', + evaluationConfigId: 'cfg-1', + versions: [{ workflowVersionId: 'wfv-1' }], + } as never; + await controller.create(makeReq({ workflowId: 'wf-1' }), undefined, payload); + + expect(service.createCollection).toHaveBeenCalledWith(user, 'wf-1', payload); + }); + }); + + describe('listVersions', () => { + it('rejects requests missing evaluationConfigId with 400', async () => { + await expect( + controller.listVersions(makeReq({ workflowId: 'wf-1' }, {}, {})), + ).rejects.toThrow(BadRequestError); + }); + }); + + // Route-access regression: every authenticated route on this controller + // must carry a `@ProjectScope('workflow:*')` decorator. The skill at + // `.claude/skills/protect-endpoints` calls this out as a hard rule — + // adding a route without a scope is an IDOR/permission-bypass risk that + // this test catches at compile time of the test suite. + describe('route access scopes', () => { + const metadata = Container.get(ControllerRegistryMetadata).getControllerMetadata( + EvaluationCollectionsController as never, + ); + const routeCases = Array.from(metadata.routes.entries()).map(([handlerName, route]) => ({ + handlerName, + route, + })); + + // Map each handler to the exact scope we expect. `create` schedules + // executions → `workflow:execute`. Mutations → `workflow:update`. + // Reads → `workflow:read`. Anything else would be a regression. + const expectedScopes: Record = { + list: 'workflow:read', + get: 'workflow:read', + create: 'workflow:execute', + update: 'workflow:update', + delete: 'workflow:update', + addRun: 'workflow:update', + removeRun: 'workflow:update', + listVersions: 'workflow:read', + }; + + it.each(routeCases)( + '$handlerName carries the expected ProjectScope', + ({ handlerName, route }) => { + expect(route.accessScope).toBeDefined(); + expect(route.accessScope?.globalOnly).toBe(false); + expect(route.accessScope?.scope).toBe(expectedScopes[handlerName]); + }, + ); + + it('covers every public handler on the controller', () => { + // Sanity check: if a new route is added without updating + // `expectedScopes` this catches it (else the `.each` above would + // silently pass with `undefined`). + const handlerNames = routeCases.map((r) => r.handlerName).sort(); + expect(handlerNames).toEqual(Object.keys(expectedScopes).sort()); + }); + }); +}); diff --git a/packages/cli/src/evaluation.ee/evaluation-collection.service.ts b/packages/cli/src/evaluation.ee/evaluation-collection.service.ts new file mode 100644 index 00000000000..812584a12cd --- /dev/null +++ b/packages/cli/src/evaluation.ee/evaluation-collection.service.ts @@ -0,0 +1,525 @@ +import type { + CreateEvaluationCollectionPayload, + EvalVersionEntry, + EvalVersionsResponse, + EvaluationCollectionDetail, + EvaluationCollectionRecord, + EvaluationCollectionRunSummary, + UpdateEvaluationCollectionPayload, +} from '@n8n/api-types'; +import type { TestRun, User } from '@n8n/db'; +import { + EvaluationCollectionRepository, + EvaluationConfigRepository, + TestRunRepository, + WorkflowHistoryRepository, + WorkflowPublishedVersionRepository, +} from '@n8n/db'; +import { Service } from '@n8n/di'; +// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import +import { In } from '@n8n/typeorm'; +import { nanoid } from 'nanoid'; + +import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import { TestRunnerService } from '@/evaluation.ee/test-runner/test-runner.service.ee'; +import { Telemetry } from '@/telemetry'; +import { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service'; + +/** + * Bare shape of the {@link EvaluationCollection} fields the service uses for + * building API records — kept structural (not the full entity class with + * methods + relations) so callers can pass plain-object spreads from + * `EvaluationCollectionListItem`. + */ +type CollectionFields = { + id: string; + name: string; + description: string | null; + workflowId: string; + evaluationConfigId: string; + createdById: string | null; + createdAt: Date; + updatedAt: Date; +}; + +/** + * Orchestrates eval-collection lifecycle: validating + creating collections, + * kicking off the per-version test runs that make up a collection, + * curating membership, and broadcasting collection-level cancellation. The + * runner itself stays unaware of collections — collection metadata is + * passed through `TestRunnerService.startTestRun()` as opaque options. + */ +@Service() +export class EvaluationCollectionService { + constructor( + private readonly collectionRepo: EvaluationCollectionRepository, + private readonly testRunRepo: TestRunRepository, + private readonly evalConfigRepo: EvaluationConfigRepository, + private readonly workflowHistoryRepo: WorkflowHistoryRepository, + private readonly publishedVersionRepo: WorkflowPublishedVersionRepository, + private readonly workflowHistoryService: WorkflowHistoryService, + private readonly testRunnerService: TestRunnerService, + private readonly telemetry: Telemetry, + ) {} + + async createCollection( + user: User, + workflowId: string, + input: CreateEvaluationCollectionPayload, + ): Promise<{ record: EvaluationCollectionRecord; runsStartedIds: string[] }> { + // 1. Validate evaluation config belongs to this workflow. Without this + // check a caller with access to two workflows could attach config A + // to workflow B's collection — the comparison would silently break. + const config = await this.evalConfigRepo.findByIdAndWorkflowId( + input.evaluationConfigId, + workflowId, + ); + if (!config) { + throw new NotFoundError('EvaluationConfig not found for this workflow'); + } + + // 2. Validate version IDs + reused-run IDs. Done up front so we either + // commit the whole collection cleanly or reject before any side + // effects (collection row, snapshot, runner kickoff). + for (const [index, v] of input.versions.entries()) { + if (v.workflowVersionId) { + const exists = await this.workflowHistoryService.findVersion( + workflowId, + v.workflowVersionId, + ); + if (!exists) { + throw new BadRequestError( + `versions[${index}]: workflow version "${v.workflowVersionId}" not found`, + ); + } + } + if (v.existingTestRunId) { + const run = await this.testRunRepo.findOneBy({ id: v.existingTestRunId }); + if ( + !run || + run.workflowId !== workflowId || + run.evaluationConfigId !== input.evaluationConfigId + ) { + throw new BadRequestError( + `versions[${index}]: test run "${v.existingTestRunId}" is not compatible with this collection`, + ); + } + // If the caller asks for version A but supplies a run from + // version B we'd silently attach B and never schedule A — the + // resulting "collection on version A" would actually be + // "collection on version B". Reject up front so the caller + // can decide whether to omit `workflowVersionId` (use the run + // as-is) or change `existingTestRunId` (run a fresh A). + if (v.workflowVersionId && run.workflowVersionId !== v.workflowVersionId) { + throw new BadRequestError( + `versions[${index}]: test run "${v.existingTestRunId}" was executed against version "${ + run.workflowVersionId ?? '(unpinned)' + }", not the requested "${v.workflowVersionId}"`, + ); + } + // Unpinned legacy runs (no `workflowVersionId`) cannot be + // reused in collections — the whole point is comparability + // across pinned versions, and an unpinned run could have + // executed against any historical workflow state. + if (!run.workflowVersionId) { + throw new BadRequestError( + `versions[${index}]: test run "${v.existingTestRunId}" has no pinned workflow version and cannot be reused in a collection`, + ); + } + } + } + + // 3. Persist the collection. We do this *before* kicking off any new + // runs so the runs can be tagged with `collectionId` at creation time. + const collection = await this.collectionRepo.createCollection({ + id: nanoid(), + name: input.name, + description: input.description ?? null, + workflowId, + evaluationConfigId: input.evaluationConfigId, + createdById: user.id, + }); + + // 4. Wire up runs the user wants to reuse. + const existingRunIds = input.versions + .filter((v) => v.existingTestRunId) + .map((v) => v.existingTestRunId!); + if (existingRunIds.length > 0) { + await this.collectionRepo.addRunsToCollection(collection.id, existingRunIds); + } + + // 5. Kick off new runs for versions that weren't matched to an existing + // run. For "current draft" entries we snapshot a `WorkflowHistory` row + // first so the new run is pinned to an immutable version — otherwise + // the next edit to the live workflow would invalidate the comparison. + // Freeze the evaluation config in `evaluationConfigSnapshot` for the + // same reason: future config edits must not retroactively change what + // a historical run was evaluating. + const configSnapshot = { + ...config, + createdAt: config.createdAt.toISOString(), + updatedAt: config.updatedAt.toISOString(), + }; + + const runsStartedIds: string[] = []; + for (const v of input.versions) { + if (v.existingTestRunId) continue; + + const versionId = + v.workflowVersionId ?? + (await this.workflowHistoryService.snapshotCurrent(workflowId)).versionId; + + const { testRun } = await this.testRunnerService.startTestRun( + user, + workflowId, + input.concurrency ?? 1, + (input.concurrency ?? 1) > 1, + { + collectionId: collection.id, + workflowVersionId: versionId, + evaluationConfigId: input.evaluationConfigId, + evaluationConfigSnapshot: configSnapshot, + }, + ); + runsStartedIds.push(testRun.id); + } + + this.telemetry.track('Eval collection created', { + user_id: user.id, + workflow_id: workflowId, + collection_id: collection.id, + evaluation_config_id: input.evaluationConfigId, + version_count: input.versions.length, + existing_run_count: existingRunIds.length, + new_run_count: runsStartedIds.length, + dataset_id: this.extractDatasetId(config), + }); + + const record = this.toRecord(collection, existingRunIds.length + runsStartedIds.length); + return { record, runsStartedIds }; + } + + async listCollections(workflowId: string): Promise { + const items = await this.collectionRepo.listByWorkflowId(workflowId); + return items.map(({ runCount, ...c }) => this.toRecord(c, runCount)); + } + + async getCollectionDetail( + workflowId: string, + collectionId: string, + ): Promise { + const detail = await this.collectionRepo.getDetailByIdAndWorkflowId(collectionId, workflowId); + if (!detail) throw new NotFoundError('Collection not found'); + + const runs = detail.runs.map((r) => this.toRunSummary(r)); + return { ...this.toRecord(detail.collection, runs.length), runs }; + } + + async updateCollectionMeta( + workflowId: string, + collectionId: string, + input: UpdateEvaluationCollectionPayload, + ): Promise { + const updated = await this.collectionRepo.updateMeta(collectionId, workflowId, input); + if (!updated) throw new NotFoundError('Collection not found'); + const runCount = await this.testRunRepo.count({ where: { collectionId } }); + return this.toRecord(updated, runCount); + } + + async deleteCollection( + user: User, + workflowId: string, + collectionId: string, + ): Promise<{ runsUnlinked: number }> { + // Verify the collection belongs to this workflow *before* any + // cancellation side effects. Otherwise a caller with `workflow:update` + // on workflow A could pass a known collection id from workflow B and + // trigger cancellation (abort + pubsub fan-out + DB writes) before + // receiving the eventual 404 from `deleteByIdAndWorkflowId`. + const owned = await this.collectionRepo.findByIdAndWorkflowId(collectionId, workflowId); + if (!owned) throw new NotFoundError('Collection not found'); + + // If any runs in this collection are still active, broadcast a + // collection-level cancel first so workers stop touching rows we're + // about to unlink. The FK is SET NULL anyway, but cancelling avoids + // post-delete writes flipping a deleted collection's runs back into + // limbo on a foreign main. + const active = await this.testRunRepo.find({ + where: [ + { collectionId, status: 'running' }, + { collectionId, status: 'new' }, + ], + select: ['id'], + }); + if (active.length > 0) { + await this.testRunnerService.cancelCollection(collectionId); + } + + const { deleted, runsUnlinked } = await this.collectionRepo.deleteByIdAndWorkflowId( + collectionId, + workflowId, + ); + if (!deleted) throw new NotFoundError('Collection not found'); + + this.telemetry.track('Eval collection deleted', { + user_id: user.id, + workflow_id: workflowId, + collection_id: collectionId, + runs_unlinked: runsUnlinked, + }); + + return { runsUnlinked }; + } + + async addRunToCollection( + workflowId: string, + collectionId: string, + testRunId: string, + ): Promise { + const collection = await this.collectionRepo.findByIdAndWorkflowId(collectionId, workflowId); + if (!collection) throw new NotFoundError('Collection not found'); + + const run = await this.testRunRepo.findOneBy({ id: testRunId }); + if (!run || run.workflowId !== workflowId) { + throw new NotFoundError('Test run not found for this workflow'); + } + if (run.evaluationConfigId !== collection.evaluationConfigId) { + throw new BadRequestError( + 'Test run is not compatible with this collection (different evaluation config)', + ); + } + // Mirror the create-path invariant: an unpinned legacy run could have + // executed against any historical workflow state, so it breaks the + // comparability promise the collection exists to provide. + if (!run.workflowVersionId) { + throw new BadRequestError( + 'Test run has no pinned workflow version and cannot be added to a collection', + ); + } + + await this.collectionRepo.addRunsToCollection(collectionId, [testRunId]); + return await this.getCollectionDetail(workflowId, collectionId); + } + + async removeRunFromCollection( + workflowId: string, + collectionId: string, + testRunId: string, + ): Promise { + const collection = await this.collectionRepo.findByIdAndWorkflowId(collectionId, workflowId); + if (!collection) throw new NotFoundError('Collection not found'); + + const affected = await this.collectionRepo.removeRunFromCollection(collectionId, testRunId); + if (affected === 0) { + throw new NotFoundError('Test run is not part of this collection'); + } + + return await this.getCollectionDetail(workflowId, collectionId); + } + + /** + * Powers the setup wizard's versions table. Lists every named/auto-saved + * workflow history row for the workflow, joined with the last test run + * (if any) executed against the given `evaluationConfigId` on that + * version. The "current draft" row is synthesised on top of that — its + * `workflowVersionId` is null until the user commits. + * + * `★ best` / `⚠ low` annotations are computed in-memory from each + * version's `avgScore`: the highest is `isBest`, anything below 0.6 is + * `isCritical`. The thresholds match the spec mock and stay co-located + * with the data so the FE doesn't have to re-derive them. + */ + async getEvalVersions( + workflowId: string, + evaluationConfigId: string, + ): Promise { + const config = await this.evalConfigRepo.findByIdAndWorkflowId(evaluationConfigId, workflowId); + if (!config) { + throw new NotFoundError('EvaluationConfig not found for this workflow'); + } + + // Cheap metadata-only load: `WorkflowHistory.nodes` / `connections` are + // fat JSON columns we never reference from the wizard's versions + // table. Excluding them keeps response payloads small and avoids + // streaming entire workflow canvases just to render labels. + const history = await this.workflowHistoryRepo.find({ + where: { workflowId }, + order: { createdAt: 'DESC' }, + select: ['versionId', 'name', 'autosaved', 'createdAt'], + }); + + // One bulk query for the latest run per version against this config + // instead of N+1 individual lookups. Sorted descending so the first + // run seen per `workflowVersionId` is the latest — picked into the + // per-version map below. + const lastRuns = + history.length === 0 + ? [] + : await this.testRunRepo.find({ + where: { + evaluationConfigId, + workflowVersionId: In(history.map((h) => h.versionId)), + }, + order: { createdAt: 'DESC' }, + }); + const latestRunByVersion = new Map(); + for (const run of lastRuns) { + if (run.workflowVersionId && !latestRunByVersion.has(run.workflowVersionId)) { + latestRunByVersion.set(run.workflowVersionId, run); + } + } + + // One lookup for which version is currently published, so the + // versions table can show "Published" as the source label instead of + // the generic "Named" / "Autosaved" — matches spec §4's three-way + // taxonomy (current draft / published / named snapshot). + const publishedRow = await this.publishedVersionRepo.findOneBy({ workflowId }); + const publishedVersionId = publishedRow?.publishedVersionId ?? null; + + const versions: EvalVersionEntry[] = []; + + // Current draft row — `workflowVersionId: null` signals "snapshot at + // run start". Always first in the list so the wizard surfaces it on + // top of the table. + // + // `lastRun: null` here is intentional: runs without a `workflowVersionId` + // (legacy one-offs) are not surfaced as draft history — they're + // "Ungrouped runs" in the FE list view. Showing them here would + // blur the boundary between draft + pinned-snapshot runs and lead + // to off-by-one comparability bugs in the wizard. + versions.push({ + workflowVersionId: null, + label: 'Current draft', + sourceLabel: 'Live workflow', + isCurrent: true, + lastRun: null, + }); + + for (const h of history) { + const lastRun = latestRunByVersion.get(h.versionId) ?? null; + + versions.push({ + workflowVersionId: h.versionId, + label: h.name ?? this.shortVersionLabel(h.versionId), + sourceLabel: this.formatSourceLabel(h, h.versionId === publishedVersionId), + isCurrent: false, + lastRun: lastRun + ? { + testRunId: lastRun.id, + runAt: (lastRun.runAt ?? lastRun.createdAt).toISOString(), + status: lastRun.status, + avgScore: this.computeAvgScore(lastRun), + isBest: false, // overwritten below + isCritical: false, // overwritten below + } + : null, + }); + } + + // Pass 2: annotate `isBest` / `isCritical` over the scored entries + // only. Skipped if no version has a scored run yet — the wizard then + // shows every row as "No run yet". + const scored = versions.filter((v) => v.lastRun?.avgScore !== null && v.lastRun !== null); + if (scored.length > 0) { + const best = scored.reduce((acc, v) => + (v.lastRun!.avgScore ?? -Infinity) > (acc.lastRun!.avgScore ?? -Infinity) ? v : acc, + ); + if (best.lastRun) best.lastRun.isBest = true; + for (const v of scored) { + if (v.lastRun && v.lastRun.avgScore !== null && v.lastRun.avgScore < 0.6) { + v.lastRun.isCritical = true; + } + } + } + + return { evaluationConfigId, versions }; + } + + // ---- internals ---- + + private toRecord(collection: CollectionFields, runCount: number): EvaluationCollectionRecord { + return { + id: collection.id, + name: collection.name, + description: collection.description, + workflowId: collection.workflowId, + evaluationConfigId: collection.evaluationConfigId, + createdById: collection.createdById, + createdAt: collection.createdAt.toISOString(), + updatedAt: collection.updatedAt.toISOString(), + runCount, + }; + } + + private toRunSummary(run: TestRun): EvaluationCollectionRunSummary { + return { + testRunId: run.id, + workflowVersionId: run.workflowVersionId, + status: run.status, + runAt: run.runAt ? run.runAt.toISOString() : null, + completedAt: run.completedAt ? run.completedAt.toISOString() : null, + avgScore: this.computeAvgScore(run), + metrics: this.coerceMetrics(run.metrics), + }; + } + + private computeAvgScore(run: TestRun): number | null { + const coerced = this.coerceMetrics(run.metrics); + if (!coerced) return null; + const values = Object.values(coerced); + if (values.length === 0) return null; + const sum = values.reduce((acc, v) => acc + v, 0); + return sum / values.length; + } + + private coerceMetrics( + metrics: Record | null | undefined, + ): Record | null { + if (!metrics) return null; + const out: Record = {}; + for (const [k, v] of Object.entries(metrics)) { + if (typeof v === 'number') out[k] = v; + else if (typeof v === 'boolean') out[k] = v ? 1 : 0; + } + return Object.keys(out).length > 0 ? out : null; + } + + private extractDatasetId(config: { datasetSource: string; datasetRef: unknown }): string | null { + if ( + config.datasetSource === 'data_table' && + typeof config.datasetRef === 'object' && + config.datasetRef !== null && + 'dataTableId' in config.datasetRef + ) { + return String((config.datasetRef as { dataTableId: string }).dataTableId); + } + return null; + } + + private shortVersionLabel(versionId: string): string { + return `v ${versionId.slice(0, 8)}`; + } + + private formatDateLabel(date: Date): string { + // Compact relative-ish label for the source column. Frontend can format + // further if it wants; this is a fallback for raw API consumers. + return date.toISOString().slice(0, 10); + } + + /** + * Three-way source-label taxonomy from spec §4. Published wins over + * named/autosaved when a version is both (a named snapshot can later be + * published; the wizard should surface it as "Published" then). + */ + private formatSourceLabel( + h: { name: string | null; autosaved: boolean; createdAt: Date }, + isPublished: boolean, + ): string { + const date = this.formatDateLabel(h.createdAt); + if (isPublished) return `Published · ${date}`; + if (h.name) return `Named · ${date}`; + if (h.autosaved) return `Autosaved · ${date}`; + return `Snapshot · ${date}`; + } +} diff --git a/packages/cli/src/evaluation.ee/evaluation-collections.controller.ee.ts b/packages/cli/src/evaluation.ee/evaluation-collections.controller.ee.ts new file mode 100644 index 00000000000..d556dd03e01 --- /dev/null +++ b/packages/cli/src/evaluation.ee/evaluation-collections.controller.ee.ts @@ -0,0 +1,168 @@ +import { + AddRunToCollectionDto, + CreateEvaluationCollectionDto, + EVAL_COLLECTIONS_FLAG, + UpdateEvaluationCollectionDto, +} from '@n8n/api-types'; +import { Logger } from '@n8n/backend-common'; +import type { AuthenticatedRequest, User } from '@n8n/db'; +import { Body, Delete, Get, Patch, Post, ProjectScope, RestController } from '@n8n/decorators'; + +import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import { PostHogClient } from '@/posthog'; + +import { EvaluationCollectionService } from './evaluation-collection.service'; + +type WorkflowParam = { workflowId: string }; +type CollectionParam = { workflowId: string; collectionId: string }; +type CollectionRunParam = { workflowId: string; collectionId: string; runId: string }; +type EvalVersionsQuery = { evaluationConfigId?: string }; + +/** + * Scope choice notes: + * - List / get / versions → `workflow:read`. Collection metadata is workflow- + * adjacent and the body never includes execution state. + * - Create → `workflow:execute`. Creating a collection schedules new test + * runs for any version without a reusable existing run, so it's not just + * a metadata operation — it kicks off workflow executions. + * - Update / delete / curate runs → `workflow:update`. Renaming a collection, + * deleting it (which broadcasts cancel-collection for active runs but + * does not start new ones), and adding/removing run membership are all + * metadata mutations on a workflow-owned resource. + * + * Workflow → project resolution is done by `@ProjectScope` itself via the + * `:workflowId` URL param (see `check-access.ts:93`), so we don't need a + * separate `findWorkflowForUser` call: the middleware throws 404 before the + * handler runs if the user lacks the scope on the workflow's project. The + * service layer still filters by `workflowId` for defense-in-depth. + */ +@RestController('/workflows') +export class EvaluationCollectionsController { + constructor( + private readonly service: EvaluationCollectionService, + private readonly postHogClient: PostHogClient, + private readonly logger: Logger, + ) {} + + /** + * PostHog rollout gate for the eval-collections feature surface. Returns + * 404 when off so the route looks exactly like an unknown endpoint — no + * flag id leaks into responses, no 403 → 200 transition visible to a + * stale tab when the rollout flips. Fail-open semantics mirror + * {@link TestRunsController.isParallelExecutionFlagEnabled}: PostHog + * outage degrades to flag-off rather than 500ing. + * + * Runs *inside* the handler (after the `@ProjectScope` middleware) so a + * user without workflow access still gets the standard 404 from the + * scope check, not a flag-off 404 that might leak rollout state. + */ + private async assertFlagEnabled(user: User): Promise { + let enabled = false; + try { + const flags = await this.postHogClient.getFeatureFlags(user); + enabled = flags?.[EVAL_COLLECTIONS_FLAG] === true; + } catch (error) { + this.logger.warn('Failed to resolve eval-collections flag', { + error: error instanceof Error ? error.message : String(error), + }); + } + if (!enabled) throw new NotFoundError('Not found'); + } + + @Get('/:workflowId/eval-collections') + @ProjectScope('workflow:read') + async list(req: AuthenticatedRequest) { + await this.assertFlagEnabled(req.user); + return await this.service.listCollections(req.params.workflowId); + } + + @Get('/:workflowId/eval-collections/:collectionId') + @ProjectScope('workflow:read') + async get(req: AuthenticatedRequest) { + await this.assertFlagEnabled(req.user); + return await this.service.getCollectionDetail(req.params.workflowId, req.params.collectionId); + } + + @Post('/:workflowId/eval-collections') + @ProjectScope('workflow:execute') + async create( + req: AuthenticatedRequest, + _res: unknown, + @Body payload: CreateEvaluationCollectionDto, + ) { + await this.assertFlagEnabled(req.user); + const { record, runsStartedIds } = await this.service.createCollection( + req.user, + req.params.workflowId, + payload, + ); + return { ...record, runsStartedIds }; + } + + @Patch('/:workflowId/eval-collections/:collectionId') + @ProjectScope('workflow:update') + async update( + req: AuthenticatedRequest, + _res: unknown, + @Body payload: UpdateEvaluationCollectionDto, + ) { + await this.assertFlagEnabled(req.user); + return await this.service.updateCollectionMeta( + req.params.workflowId, + req.params.collectionId, + payload, + ); + } + + @Delete('/:workflowId/eval-collections/:collectionId') + @ProjectScope('workflow:update') + async delete(req: AuthenticatedRequest) { + await this.assertFlagEnabled(req.user); + const { runsUnlinked } = await this.service.deleteCollection( + req.user, + req.params.workflowId, + req.params.collectionId, + ); + return { success: true, runsUnlinked }; + } + + @Post('/:workflowId/eval-collections/:collectionId/runs') + @ProjectScope('workflow:update') + async addRun( + req: AuthenticatedRequest, + _res: unknown, + @Body payload: AddRunToCollectionDto, + ) { + await this.assertFlagEnabled(req.user); + return await this.service.addRunToCollection( + req.params.workflowId, + req.params.collectionId, + payload.testRunId, + ); + } + + @Delete('/:workflowId/eval-collections/:collectionId/runs/:runId') + @ProjectScope('workflow:update') + async removeRun(req: AuthenticatedRequest) { + await this.assertFlagEnabled(req.user); + return await this.service.removeRunFromCollection( + req.params.workflowId, + req.params.collectionId, + req.params.runId, + ); + } + + @Get('/:workflowId/eval-versions') + @ProjectScope('workflow:read') + async listVersions( + req: AuthenticatedRequest, + ) { + await this.assertFlagEnabled(req.user); + const { evaluationConfigId } = req.query; + if (!evaluationConfigId) { + throw new BadRequestError('Missing required query parameter: evaluationConfigId'); + } + return await this.service.getEvalVersions(req.params.workflowId, evaluationConfigId); + } +} diff --git a/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts b/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts index 3139500c4e5..84b3c35930b 100644 --- a/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts +++ b/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts @@ -28,6 +28,7 @@ import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; import type { Telemetry } from '@/telemetry'; import type { WorkflowRunner } from '@/workflow-runner'; +import type { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service'; const wfUnderTestJson = JSON.parse( readFileSync(path.join(__dirname, './mock-data/workflow.under-test.json'), { encoding: 'utf-8' }), @@ -47,6 +48,7 @@ describe('TestRunnerService', () => { const publisher = mock(); const instanceSettings = mock({ hostId: 'test-host-id', isMultiMain: false }); const concurrencyControlService = mock(); + const workflowHistoryService = mock(); let testRunnerService: TestRunnerService; mockInstance(LoadNodesAndCredentials, { @@ -68,6 +70,7 @@ describe('TestRunnerService', () => { publisher, instanceSettings, concurrencyControlService, + workflowHistoryService, ); testRunRepository.createTestRun.mockResolvedValue(mock({ id: 'test-run-id' })); @@ -511,6 +514,7 @@ describe('TestRunnerService', () => { publisher, instanceSettings, concurrencyControlService, + workflowHistoryService, ); process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS = 'true'; @@ -829,6 +833,7 @@ describe('TestRunnerService', () => { publisher, instanceSettings, concurrencyControlService, + workflowHistoryService, ); }); @@ -2316,6 +2321,7 @@ describe('TestRunnerService', () => { publisher, instanceSettings, concurrencyControlService, + workflowHistoryService, ); const { inFlightTracker } = setupHappyPathMocks(6); @@ -2400,6 +2406,7 @@ describe('TestRunnerService', () => { publisher, multiMainInstance, concurrencyControlService, + workflowHistoryService, ); setupHappyPathMocks(4); @@ -2418,4 +2425,317 @@ describe('TestRunnerService', () => { expect(testRunRepository.markAsCompleted).not.toHaveBeenCalled(); }); }); + + describe('startTestRun - collection context (TRUST-72)', () => { + const USER = mock<{ id: string }>({ id: 'user-1' }); + + test('loads workflow JSON from WorkflowHistory when workflowVersionId is set', async () => { + workflowRepository.findById.mockResolvedValueOnce({ + id: 'wf-1', + name: 'Live', + nodes: [{ name: 'LiveNode' }], + connections: {}, + settings: {}, + } as never); + workflowHistoryService.findVersion.mockResolvedValueOnce({ + versionId: 'wfv-pinned', + nodes: [{ name: 'SnapshotNode' } as never], + connections: { SnapshotNode: {} } as never, + } as never); + testRunRepository.createTestRun.mockResolvedValueOnce(mock({ id: 'tr-pin' })); + // Short-circuit the execution loop so we only assert the lookup side + // effects — the runner's own logic is exercised by other tests. + workflowRepository.findById.mockClear(); + const { finished } = await testRunnerService.startTestRun(USER as never, 'wf-1', 1, false, { + collectionId: 'col-1', + workflowVersionId: 'wfv-pinned', + evaluationConfigId: 'cfg-1', + }); + // Drain the detached execution promise so we don't leak a microtask + // into the next test. + await finished.catch(() => undefined); + + expect(workflowHistoryService.findVersion).toHaveBeenCalledWith('wf-1', 'wfv-pinned'); + expect(testRunRepository.createTestRun).toHaveBeenCalledWith( + 'wf-1', + expect.objectContaining({ + collectionId: 'col-1', + workflowVersionId: 'wfv-pinned', + evaluationConfigId: 'cfg-1', + }), + ); + }); + + test('overwrites workflowData.versionId with the pinned history versionId', async () => { + // `ExecutionPersistence` reads `workflowData.versionId` and stores + // it on the execution row. If the live workflow's `versionId` leaks + // through the spread, every pinned-collection execution would be + // recorded under the wrong version and break compare-view fidelity. + // We capture the workflow object handed to `executeTestRun`'s first + // consumer (`validateWorkflowConfiguration`) to assert the pinned + // versionId rather than the live one is what flows downstream. + workflowRepository.findById.mockResolvedValueOnce({ + id: 'wf-1', + name: 'Live', + versionId: 'wfv-live-current', + nodes: [{ name: 'LiveNode' }], + connections: {}, + settings: {}, + } as never); + workflowHistoryService.findVersion.mockResolvedValueOnce({ + versionId: 'wfv-pinned', + nodes: [{ name: 'SnapshotNode' } as never], + connections: { SnapshotNode: {} } as never, + } as never); + testRunRepository.createTestRun.mockResolvedValueOnce(mock({ id: 'tr-pin-v' })); + + let capturedWorkflow: { versionId?: string } | undefined; + const validateSpy = jest + .spyOn( + testRunnerService as unknown as { + validateWorkflowConfiguration: (wf: { versionId?: string }) => void; + }, + 'validateWorkflowConfiguration', + ) + .mockImplementation((wf) => { + capturedWorkflow = wf; + // Short-circuit the rest of executeTestRun via a `TestRunError` + // path so the detached promise settles cleanly (markAsError) + // instead of running the dataset trigger against bogus JSON. + throw new TestRunError('EVALUATION_TRIGGER_NOT_FOUND'); + }); + + const { finished } = await testRunnerService.startTestRun(USER as never, 'wf-1', 1, false, { + collectionId: 'col-1', + workflowVersionId: 'wfv-pinned', + evaluationConfigId: 'cfg-1', + }); + await finished.catch(() => undefined); + + expect(validateSpy).toHaveBeenCalledTimes(1); + expect(capturedWorkflow?.versionId).toBe('wfv-pinned'); + expect(capturedWorkflow?.versionId).not.toBe('wfv-live-current'); + + validateSpy.mockRestore(); + }); + + test('does not call workflowHistoryService.findVersion when workflowVersionId is omitted', async () => { + workflowHistoryService.findVersion.mockClear(); + workflowRepository.findById.mockResolvedValueOnce({ + id: 'wf-1', + nodes: [], + connections: {}, + settings: {}, + } as never); + testRunRepository.createTestRun.mockResolvedValueOnce(mock({ id: 'tr-no-pin' })); + + const { finished } = await testRunnerService.startTestRun(USER as never, 'wf-1', 1, false); + await finished.catch(() => undefined); + + expect(workflowHistoryService.findVersion).not.toHaveBeenCalled(); + expect(testRunRepository.createTestRun).toHaveBeenCalledWith( + 'wf-1', + expect.objectContaining({ workflowVersionId: null, collectionId: null }), + ); + }); + }); + + describe('cancelCollection (TRUST-72)', () => { + test('aborts local running runs and broadcasts cancel-collection in multi-main', async () => { + // USER intentionally unused here — `cancelCollection` does not take a user. + const multiMain = mock({ hostId: 'main-a', isMultiMain: true }); + const service = new TestRunnerService( + logger, + telemetry, + workflowRepository, + workflowRunner, + activeExecutions, + testRunRepository, + testCaseExecutionRepository, + errorReporter, + executionsConfig, + mock(), + publisher, + multiMain, + concurrencyControlService, + workflowHistoryService, + ); + + testRunRepository.find.mockResolvedValue([{ id: 'tr-running' } as never]); + // Seed an abort controller for the running run so the local cancel + // path actually fires. + ( + service as unknown as { abortControllers: Map } + ).abortControllers.set('tr-running', new AbortController()); + + await service.cancelCollection('col-1'); + + expect(testRunRepository.requestCancellation).toHaveBeenCalledWith('tr-running'); + expect(publisher.publishCommand).toHaveBeenCalledWith({ + command: 'cancel-collection', + payload: { collectionId: 'col-1' }, + }); + }); + + test('aborts a locally-held `new`-status run (pre-`markAsRunning` window)', async () => { + // `executeTestRun` registers the abort controller in + // `abortControllers` *before* it flips status from `new` to + // `running`. A `cancel-collection` arriving in that window must + // still abort the local controller — querying only `running` + // runs would silently skip these on every main, leaving the + // freshly-kicked run to drain via DB-poll instead of stopping + // immediately. + // + // Critically: drive the mock from the `where` clause so we + // actually exercise the status filter. With a plain + // `mockResolvedValue([...])` the mock would return the seeded + // row regardless of which statuses the caller queried, and the + // test would pass against both pre-fix (`status:'running'`-only) + // and post-fix code — proving nothing. + const seeded = [{ id: 'tr-new-local', status: 'new' as const }]; + testRunRepository.find.mockImplementation(async (opts: unknown) => { + const where = (opts as { where: unknown }).where; + const clauses = (Array.isArray(where) ? where : [where]) as Array<{ + status?: string; + }>; + const statuses = new Set(clauses.map((c) => c.status)); + return seeded.filter((r) => statuses.has(r.status)) as never; + }); + + const newRunAbort = new AbortController(); + ( + testRunnerService as unknown as { abortControllers: Map } + ).abortControllers.set('tr-new-local', newRunAbort); + + // Provide a no-op dbManager so the DB-fallback path doesn't throw + // on the pre-fix code path that *would* take it for this run. + // Without this, the test would fail on a transaction TypeError + // before reaching the abort assertions, hiding the actual bug. + const dbManager = mock<{ transaction: jest.Mock }>(); + dbManager.transaction.mockImplementation(async (cb: (trx: unknown) => Promise) => { + await cb({}); + }); + (testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager; + + await testRunnerService.cancelCollection('col-new-window'); + + expect(newRunAbort.signal.aborted).toBe(true); + // Local abort wins — no DB-cancel fallback needed for this run. + expect(testRunRepository.markAsCancelled).not.toHaveBeenCalledWith( + 'tr-new-local', + expect.anything(), + ); + }); + + test('falls back to DB cancel for runs not held locally', async () => { + testRunRepository.find.mockResolvedValue([ + { id: 'tr-foreign' } as never, + { id: 'tr-mine' } as never, + ]); + ( + testRunnerService as unknown as { abortControllers: Map } + ).abortControllers.set('tr-mine', new AbortController()); + + const trxUpdate = jest.fn().mockResolvedValue({ affected: 1 }); + const dbManager = mock<{ transaction: jest.Mock }>(); + dbManager.transaction.mockImplementation( + async (cb: (trx: { update: jest.Mock }) => Promise) => { + await cb({ update: trxUpdate }); + }, + ); + (testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager; + + await testRunnerService.cancelCollection('col-2'); + + // `tr-foreign` is the run we don't hold locally → fallback fires. + // Assert the update is scoped to `id` AND `status: In([...])` so a + // run that completed between the initial find and this update is + // not clobbered. We don't lock down the exact entity ref — the + // shape of the where clause is the contract. + expect(trxUpdate).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + id: 'tr-foreign', + status: expect.anything(), + }), + expect.objectContaining({ status: 'cancelled' }), + ); + // `tr-mine` was held locally → no fallback update should fire. + expect(trxUpdate).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ id: 'tr-mine' }), + expect.anything(), + ); + }); + + test('fallback update does not clobber a run that completed between find and update', async () => { + // `activeRuns` is sampled before the requestCancellation loop and + // pubsub broadcast. By the time the fallback transaction runs, a + // foreign main may have completed (or errored) the run naturally. + // The status-scoped update should affect 0 rows in that case, and + // the test-case sweep must be skipped — otherwise we'd silently + // re-mark a `completed` run as `cancelled` and corrupt the record. + testRunRepository.find.mockResolvedValue([{ id: 'tr-just-finished' } as never]); + + const trxUpdate = jest.fn().mockResolvedValue({ affected: 0 }); // race: row no longer 'new'/'running' + const dbManager = mock<{ transaction: jest.Mock }>(); + dbManager.transaction.mockImplementation( + async (cb: (trx: { update: jest.Mock }) => Promise) => { + await cb({ update: trxUpdate }); + }, + ); + (testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager; + + await testRunnerService.cancelCollection('col-race'); + + // Update was attempted with status filter — the filter is what + // makes the in-DB WHERE narrow so the row stays untouched. + expect(trxUpdate).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ id: 'tr-just-finished', status: expect.anything() }), + expect.objectContaining({ status: 'cancelled' }), + ); + // Update affected 0 → don't sweep cases. (The sweep has its own + // status filter so this is also a redundant check, but it makes + // the "winner takes all" intent explicit at the run level.) + expect(testCaseExecutionRepository.markAllPendingAsCancelled).not.toHaveBeenCalledWith( + 'tr-just-finished', + expect.anything(), + ); + }); + }); + + describe('cancelTestRun', () => { + test('fallback update does not clobber a run that completed between requestCancellation and update', async () => { + // Mirrors the collection-level race: between `requestCancellation` + // (the DB-flag pass) and this fallback update, a foreign main can + // finish the run naturally. The update must be scoped by status + // so the terminal state wins. + // No abort controller registered → `cancelTestRunLocally` returns + // false → fallback path fires. + const trxUpdate = jest.fn().mockResolvedValue({ affected: 0 }); + const dbManager = mock<{ transaction: jest.Mock }>(); + dbManager.transaction.mockImplementation( + async (cb: (trx: { update: jest.Mock }) => Promise) => { + await cb({ update: trxUpdate }); + }, + ); + (testRunRepository as unknown as { manager: typeof dbManager }).manager = dbManager; + + await testRunnerService.cancelTestRun('tr-just-finished'); + + expect(trxUpdate).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + id: 'tr-just-finished', + status: expect.anything(), + }), + expect.objectContaining({ status: 'cancelled' }), + ); + expect(testCaseExecutionRepository.markAllPendingAsCancelled).not.toHaveBeenCalledWith( + 'tr-just-finished', + expect.anything(), + ); + }); + }); }); diff --git a/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts b/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts index 8db2b6219b4..e4b87a554fd 100644 --- a/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts +++ b/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts @@ -1,13 +1,21 @@ import { Logger } from '@n8n/backend-common'; import { ExecutionsConfig } from '@n8n/config'; -import type { User, TestRun } from '@n8n/db'; -import { TestCaseExecutionRepository, TestRunRepository, WorkflowRepository } from '@n8n/db'; +import type { User } from '@n8n/db'; +import { + TestCaseExecutionRepository, + TestRun, + TestRunRepository, + WorkflowRepository, +} from '@n8n/db'; import { OnPubSubEvent } from '@n8n/decorators'; import { Service } from '@n8n/di'; +// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import +import { In } from '@n8n/typeorm'; import { ErrorReporter, InstanceSettings } from 'n8n-core'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { Publisher } from '@/scaling/pubsub/publisher.service'; +import { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-history.service'; import { EVALUATION_NODE_TYPE, EVALUATION_TRIGGER_NODE_TYPE, @@ -80,6 +88,7 @@ export class TestRunnerService { private readonly publisher: Publisher, private readonly instanceSettings: InstanceSettings, private readonly concurrencyControlService: ConcurrencyControlService, + private readonly workflowHistoryService: WorkflowHistoryService, ) {} /** @@ -539,12 +548,24 @@ export class TestRunnerService { * The execution loop is detached so callers can return the new * `testRun.id` without waiting for the run to complete; tests that need * to observe completion await `finished` directly. + * + * `options` carries the eval-collections context (TRUST-72): when set, + * the new run is pinned to a workflow version (loaded from + * `WorkflowHistory` instead of the live workflow so future edits don't + * disturb cross-run comparability) and tagged with its parent collection + * + the `EvaluationConfig` snapshot captured at run-start. */ async startTestRun( user: User, workflowId: string, concurrency: number = 1, flagEnabledForUser: boolean = false, + options?: { + collectionId?: string; + workflowVersionId?: string; + evaluationConfigId?: string; + evaluationConfigSnapshot?: IDataObject; + }, ): Promise<{ testRun: TestRun; finished: Promise }> { const requestedConcurrency = Math.max(1, Math.min(10, Math.floor(concurrency))); const evaluationLimit = this.executionsConfig.concurrency.evaluationLimit; @@ -556,14 +577,47 @@ export class TestRunnerService { this.logger.debug( `[Eval] runTest called: requestedConcurrency=${requestedConcurrency} effectiveConcurrency=${effectiveConcurrency} evaluationLimit=${evaluationLimit} flagEnabledForUser=${flagEnabledForUser}`, - { workflowId }, + { + workflowId, + collectionId: options?.collectionId, + workflowVersionId: options?.workflowVersionId, + }, ); - const workflow = await this.workflowRepository.findById(workflowId); - assert(workflow, 'Workflow not found'); + // When pinned to a workflow version (collection runs), load nodes + + // connections from `WorkflowHistory` so the run executes against the + // snapshotted JSON, not whatever the live workflow looks like right + // now. The base workflow row still supplies `id`, `name`, `settings` + // — everything outside the canvas geometry. + const baseWorkflow = await this.workflowRepository.findById(workflowId); + assert(baseWorkflow, 'Workflow not found'); + + let workflow = baseWorkflow; + if (options?.workflowVersionId) { + const history = await this.workflowHistoryService.findVersion( + workflowId, + options.workflowVersionId, + ); + assert(history, `Workflow version ${options.workflowVersionId} not found`); + // `ExecutionPersistence` records `workflowData.versionId` on the + // execution row; keeping `baseWorkflow.versionId` here would tag + // historical-canvas executions with the *current* live version + // and break the comparability promise of the pinned run. + workflow = { + ...baseWorkflow, + versionId: history.versionId, + nodes: history.nodes, + connections: history.connections, + } as typeof baseWorkflow; + } // 0. Create new Test Run - const testRun = await this.testRunRepository.createTestRun(workflowId); + const testRun = await this.testRunRepository.createTestRun(workflowId, { + collectionId: options?.collectionId ?? null, + workflowVersionId: options?.workflowVersionId ?? null, + evaluationConfigId: options?.evaluationConfigId ?? null, + evaluationConfigSnapshot: options?.evaluationConfigSnapshot ?? null, + }); assert(testRun, 'Unable to create a test run'); // Detach the long-running execution from the awaited setup so callers @@ -1056,6 +1110,10 @@ export class TestRunnerService { // Send telemetry event with complete metadata const telemetryPayload: Record = { ...telemetryMeta, + // Collection context (TRUST-72). `collection_id` is null for legacy + // one-off runs so analytics can split flag-on/off cohorts cleanly. + collection_id: testRun.collectionId ?? null, + is_part_of_collection: testRun.collectionId !== null, }; // Add success-specific fields @@ -1105,6 +1163,109 @@ export class TestRunnerService { this.cancelTestRunLocally(testRunId); } + /** + * Handle cancel-collection pub/sub command from other main instances. + * Each main aborts the in-flight runs it owns that belong to the + * collection — runs not held locally are no-ops, mirroring the per-run + * cancel path's "if not running locally, fall back to DB" semantics. + */ + @OnPubSubEvent('cancel-collection', { instanceType: 'main' }) + async handleCancelCollectionCommand({ collectionId }: { collectionId: string }) { + this.logger.debug('Received cancel-collection command via pub/sub', { collectionId }); + await this.cancelCollectionLocally(collectionId); + } + + private async cancelCollectionLocally(collectionId: string): Promise { + // Match the active-status filter `cancelCollection` uses for DB-flag + // cancellation. `executeTestRun` registers the abort controller in + // `abortControllers` *before* `markAsRunning` flips status from `new` + // to `running`, so a freshly-kicked-off run is locally abortable + // while still showing as `new` in DB. Querying only `running` here + // would silently skip that window on every main — both the + // initiator (called from `cancelCollection`) and foreign mains + // (called from `handleCancelCollectionCommand`). The result set is + // still bounded by the collection size (≤ a handful in practice). + const runs = await this.testRunRepository.find({ + where: [ + { collectionId, status: 'running' }, + { collectionId, status: 'new' }, + ], + select: ['id'], + }); + const cancelledLocally: string[] = []; + for (const { id } of runs) { + if (this.cancelTestRunLocally(id)) cancelledLocally.push(id); + } + return cancelledLocally; + } + + /** + * Cancels every in-flight run in an evaluation collection. Mirrors + * {@link cancelTestRun}'s multi-main fan-out: each main aborts what it + * holds locally, the DB cancelRequested flag is set per run as a fallback, + * and pubsub broadcasts so foreign mains pick up the signal too. + */ + async cancelCollection(collectionId: string): Promise<{ cancelledRunIds: string[] }> { + // 1. Mark every run in the collection that's still active for + // fallback DB-poll cancellation. Runs already terminal stay terminal. + const activeRuns = await this.testRunRepository.find({ + where: [ + { collectionId, status: 'running' }, + { collectionId, status: 'new' }, + ], + select: ['id'], + }); + for (const { id } of activeRuns) { + await this.testRunRepository.requestCancellation(id); + } + + // 2. Try local cancellation for whatever this main owns. + const cancelledLocally = await this.cancelCollectionLocally(collectionId); + + // 3. In multi-main or queue mode, broadcast so foreign mains can + // cancel their share. + if (this.instanceSettings.isMultiMain || this.executionsConfig.mode === 'queue') { + this.logger.debug('Broadcasting cancel-collection command via pub/sub', { collectionId }); + await this.publisher.publishCommand({ + command: 'cancel-collection', + payload: { collectionId }, + }); + } + + // 4. Anything we didn't own locally and that's still flagged as + // pending: mark it cancelled in DB as a fallback (matches + // `cancelTestRun`'s contract for unreachable instances). + // + // Race protection: `activeRuns` was sampled before the requestCancellation + // loop and before the pubsub broadcast — by now a foreign main may have + // completed a run naturally (status → `completed`/`error`). A plain + // `markAsCancelled(id)` would clobber that terminal status with + // `cancelled` and corrupt the run record. Scope the update by status + // so it only fires on rows that are still active; if 0 rows were + // affected, the natural completion wins and we skip the test-case + // sweep too (its own status filter would no-op anyway, but skipping + // makes the "winner takes all" intent explicit). + const localSet = new Set(cancelledLocally); + const fallbackRunIds = activeRuns.map((r) => r.id).filter((id) => !localSet.has(id)); + if (fallbackRunIds.length > 0) { + const { manager: dbManager } = this.testRunRepository; + await dbManager.transaction(async (trx) => { + for (const runId of fallbackRunIds) { + const result = await trx.update( + TestRun, + { id: runId, status: In(['new', 'running']) }, + { status: 'cancelled', completedAt: new Date() }, + ); + if (result.affected) { + await this.testCaseExecutionRepository.markAllPendingAsCancelled(runId, trx); + } + } + }); + } + + return { cancelledRunIds: activeRuns.map((r) => r.id) }; + } + /** * Cancels the test run with the given ID. * In multi-main mode, this broadcasts the cancellation to all instances via pub/sub @@ -1126,14 +1287,24 @@ export class TestRunnerService { }); } - // 4. If not running locally, mark as cancelled in DB as a fallback - // This handles both single-main (where this is the only instance) and multi-main - // (where the running instance may be dead or unreachable via pub/sub) + // 4. If not running locally, mark as cancelled in DB as a fallback. + // This handles both single-main (where this is the only instance) and + // multi-main (where the running instance may be dead or unreachable + // via pub/sub). Same race-protection as `cancelCollection`: between + // the requestCancellation flag and this update, a foreign main may + // have completed the run naturally — scope by status so the + // terminal state wins. if (!cancelledLocally) { const { manager: dbManager } = this.testRunRepository; await dbManager.transaction(async (trx) => { - await this.testRunRepository.markAsCancelled(testRunId, trx); - await this.testCaseExecutionRepository.markAllPendingAsCancelled(testRunId, trx); + const result = await trx.update( + TestRun, + { id: testRunId, status: In(['new', 'running']) }, + { status: 'cancelled', completedAt: new Date() }, + ); + if (result.affected) { + await this.testCaseExecutionRepository.markAllPendingAsCancelled(testRunId, trx); + } }); } } diff --git a/packages/cli/src/scaling/pubsub/pubsub.event-map.ts b/packages/cli/src/scaling/pubsub/pubsub.event-map.ts index af01c59d990..9bfe307789c 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.event-map.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.event-map.ts @@ -174,6 +174,15 @@ export type PubSubCommandMap = { testRunId: string; }; + /** + * Cancel every running test run inside an evaluation collection across all + * main instances. Used when a user cancels a collection — each main checks + * its in-flight runs and aborts those that belong to the collection. + */ + 'cancel-collection': { + collectionId: string; + }; + // #endregion // #region Agents diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index f4a47d37b96..96a8e1bfb4e 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -64,6 +64,7 @@ export namespace PubSub { ToCommand<'reload-sso-provisioning-configuration'>; export type ReloadSourceControlConfiguration = ToCommand<'reload-source-control-config'>; export type CancelTestRun = ToCommand<'cancel-test-run'>; + export type CancelCollection = ToCommand<'cancel-collection'>; export type AgentChatIntegrationChanged = ToCommand<'agent-chat-integration-changed'>; export type AgentConfigChanged = ToCommand<'agent-config-changed'>; } @@ -94,6 +95,7 @@ export namespace PubSub { | Commands.ReloadSsoProvisioningConfiguration | Commands.ReloadSourceControlConfiguration | Commands.CancelTestRun + | Commands.CancelCollection | Commands.AgentChatIntegrationChanged | Commands.AgentConfigChanged; diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index be52c2e85f0..2bf4e34ce0d 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -61,6 +61,7 @@ import '@/node-execution/ephemeral-node-executor'; import '@/license/license.controller'; import '@/evaluation.ee/test-runs.controller.ee'; import '@/evaluation.ee/evaluation-config.controller'; +import '@/evaluation.ee/evaluation-collections.controller.ee'; import '@/workflows/workflow-history/workflow-history.controller'; import '@/workflows/workflows.controller'; import '@/modules/workflow-index/workflow-dependency.controller'; diff --git a/packages/cli/src/workflows/workflow-history/__tests__/workflow-history.service.test.ts b/packages/cli/src/workflows/workflow-history/__tests__/workflow-history.service.test.ts index a583fcc1809..9e2f78009a6 100644 --- a/packages/cli/src/workflows/workflow-history/__tests__/workflow-history.service.test.ts +++ b/packages/cli/src/workflows/workflow-history/__tests__/workflow-history.service.test.ts @@ -1,6 +1,11 @@ import { mockLogger, mockInstance } from '@n8n/backend-test-utils'; import type { WorkflowHistory } from '@n8n/db'; -import { User, WorkflowHistoryRepository, WorkflowPublishHistoryRepository } from '@n8n/db'; +import { + User, + WorkflowHistoryRepository, + WorkflowPublishHistoryRepository, + WorkflowRepository, +} from '@n8n/db'; import type { UpdateResult } from '@n8n/typeorm'; import { getWorkflow, getWorkflowHistory } from '@test-integration/workflow'; import { mockClear } from 'jest-mock-extended'; @@ -13,6 +18,7 @@ import { WorkflowHistoryService } from '@/workflows/workflow-history/workflow-hi const workflowHistoryRepository = mockInstance(WorkflowHistoryRepository); const workflowPublishHistoryRepository = mockInstance(WorkflowPublishHistoryRepository); +const workflowRepository = mockInstance(WorkflowRepository); const logger = mockLogger(); const workflowFinderService = mockInstance(WorkflowFinderService); const eventService = mockInstance(EventService); @@ -20,6 +26,7 @@ const workflowHistoryService = new WorkflowHistoryService( logger, workflowHistoryRepository, workflowPublishHistoryRepository, + workflowRepository, workflowFinderService, eventService, ); @@ -447,4 +454,80 @@ describe('WorkflowHistoryService', () => { ); }); }); + + describe('snapshotCurrent', () => { + it('returns the existing versionId without inserting when a row already exists for the workflow draft', async () => { + const workflow = getWorkflow({ addNodeWithoutCreds: true }); + workflow.id = 'wf-1'; + workflow.versionId = 'wfv-current'; + workflowRepository.findOneBy.mockResolvedValueOnce(workflow); + workflowHistoryRepository.findOne.mockResolvedValueOnce( + getWorkflowHistory(workflow, { versionId: 'wfv-current' }), + ); + + const result = await workflowHistoryService.snapshotCurrent('wf-1'); + + expect(result).toEqual({ versionId: 'wfv-current' }); + expect(workflowHistoryRepository.insert).not.toHaveBeenCalled(); + }); + + it('inserts a snapshot when no history row exists for the workflow draft', async () => { + const workflow = getWorkflow({ addNodeWithoutCreds: true }); + workflow.id = 'wf-1'; + workflow.versionId = 'wfv-fresh'; + workflow.connections = {}; + workflowRepository.findOneBy.mockResolvedValueOnce(workflow); + // First findOne: no existing history row → insert path. + // Second findOne: post-save verification that the row now exists. + workflowHistoryRepository.findOne.mockResolvedValueOnce(null); + workflowHistoryRepository.findOne.mockResolvedValueOnce( + getWorkflowHistory(workflow, { versionId: 'wfv-fresh' }), + ); + + const result = await workflowHistoryService.snapshotCurrent('wf-1'); + + expect(result).toEqual({ versionId: 'wfv-fresh' }); + expect(workflowHistoryRepository.insert).toHaveBeenCalledWith( + expect.objectContaining({ + authors: 'eval-snapshot', + versionId: 'wfv-fresh', + workflowId: 'wf-1', + nodes: workflow.nodes, + connections: workflow.connections, + }), + ); + }); + + it('throws when the workflow does not exist', async () => { + workflowRepository.findOneBy.mockResolvedValueOnce(null); + await expect(workflowHistoryService.snapshotCurrent('wf-missing')).rejects.toThrow( + 'Workflow wf-missing not found', + ); + }); + + it('throws when the snapshot insert silently fails (saveVersion swallowed the error)', async () => { + // `saveVersion` logs+swallows insert errors so the regular save + // flow can never be blocked by a history-row failure. For the + // snapshot path that's a footgun: the caller would get back a + // `versionId` with no matching row and hit a generic + // `findVersion` assert deep inside the test runner. Verify that + // `snapshotCurrent` re-fetches and fails loudly when the row did + // not materialise. + const workflow = getWorkflow({ addNodeWithoutCreds: true }); + workflow.id = 'wf-1'; + workflow.versionId = 'wfv-fresh'; + workflow.connections = {}; + workflowRepository.findOneBy.mockResolvedValueOnce(workflow); + workflowHistoryRepository.findOne.mockResolvedValueOnce(null); + // Simulate insert failure: saveVersion's try/catch logs the + // underlying error and returns normally. The post-save verify + // findOne still sees no row. + workflowHistoryRepository.insert.mockRejectedValueOnce(new Error('insert blew up')); + workflowHistoryRepository.findOne.mockResolvedValueOnce(null); + + await expect(workflowHistoryService.snapshotCurrent('wf-1')).rejects.toThrow( + 'Failed to persist workflow history snapshot for workflow wf-1', + ); + }); + }); }); diff --git a/packages/cli/src/workflows/workflow-history/workflow-history.service.ts b/packages/cli/src/workflows/workflow-history/workflow-history.service.ts index cbbfde03ac8..95284d06a7e 100644 --- a/packages/cli/src/workflows/workflow-history/workflow-history.service.ts +++ b/packages/cli/src/workflows/workflow-history/workflow-history.service.ts @@ -5,6 +5,7 @@ import { WorkflowHistory, WorkflowHistoryRepository, WorkflowPublishHistoryRepository, + WorkflowRepository, } from '@n8n/db'; import { Service } from '@n8n/di'; // eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import @@ -26,6 +27,7 @@ export class WorkflowHistoryService { private readonly logger: Logger, private readonly workflowHistoryRepository: WorkflowHistoryRepository, private readonly workflowPublishHistoryRepository: WorkflowPublishHistoryRepository, + private readonly workflowRepository: WorkflowRepository, private readonly workflowFinderService: WorkflowFinderService, private readonly eventService: EventService, ) {} @@ -107,6 +109,61 @@ export class WorkflowHistoryService { }); } + /** + * Ensure a {@link WorkflowHistory} row exists for the workflow's *current* + * draft state and return its `versionId`. Used by the eval-collections + * setup wizard when the user picks "current draft" — the run is pinned to + * an immutable snapshot so future edits don't break comparability with + * sibling runs in the collection. + * + * Idempotent: if a history row already exists for the workflow's current + * `versionId`, we return it unchanged (no duplicate snapshot, no churn in + * the version history list). License-on instances will have a history row + * from the regular save flow; license-off instances get the snapshot + * lazily here for eval comparability without otherwise enabling history. + */ + async snapshotCurrent(workflowId: string): Promise<{ versionId: string }> { + const workflow = await this.workflowRepository.findOneBy({ id: workflowId }); + if (!workflow) { + throw new UnexpectedError(`Workflow ${workflowId} not found`); + } + + const existing = await this.workflowHistoryRepository.findOne({ + where: { workflowId, versionId: workflow.versionId }, + select: ['versionId'], + }); + if (existing) return { versionId: existing.versionId }; + + await this.saveVersion( + 'eval-snapshot', + { + versionId: workflow.versionId, + nodes: workflow.nodes, + connections: workflow.connections, + }, + workflowId, + ); + + // `saveVersion` deliberately swallows insert errors (it only logs them) + // so the regular workflow-save flow can never be blocked by a history + // write failure. The snapshot use case is different: callers will + // hand this `versionId` to `findVersion()` moments later and assert + // the row is non-null. Verify persistence here and fail loudly while + // we still have the caller's stack — otherwise the next reader hits + // a generic "version not found" deep inside the test runner. + const persisted = await this.workflowHistoryRepository.findOne({ + where: { workflowId, versionId: workflow.versionId }, + select: ['versionId'], + }); + if (!persisted) { + throw new UnexpectedError( + `Failed to persist workflow history snapshot for workflow ${workflowId}`, + ); + } + + return { versionId: persisted.versionId }; + } + async saveVersion( user: User | string, workflow: {