diff --git a/packages/cli/src/__tests__/active-executions.test.ts b/packages/cli/src/__tests__/active-executions.test.ts index e3599c047de..1dfac7f286f 100644 --- a/packages/cli/src/__tests__/active-executions.test.ts +++ b/packages/cli/src/__tests__/active-executions.test.ts @@ -1,6 +1,7 @@ import { Logger } from '@n8n/backend-common'; -import { mockInstance } from '@n8n/backend-test-utils'; +import { mockInstance, mockLogger } from '@n8n/backend-test-utils'; import { ExecutionsConfig } from '@n8n/config'; +import type { GlobalConfig } from '@n8n/config'; import type { ExecutionRepository } from '@n8n/db'; import type { Response } from 'express'; import { captor, mock } from 'jest-mock-extended'; @@ -22,7 +23,10 @@ import { v4 as uuid } from 'uuid'; import { ActiveExecutions } from '@/active-executions'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import type { EventService } from '@/events/event.service'; import type { ExecutionPersistence } from '@/executions/execution-persistence'; +import type { License } from '@/license'; +import type { Telemetry } from '@/telemetry'; jest.mock('n8n-workflow', () => ({ ...jest.requireActual('n8n-workflow'), @@ -183,6 +187,91 @@ describe('ActiveExecutions', () => { }); }); + // TRUST-144: evaluation executions stayed in status 'new' (startedAt + // null) and never got picked up. The test-runner fan-out already throttles + // the shared evaluation concurrency queue before launching each case, so a + // second reservation here consumed a second slot from the same queue for + // the same case. Once the fan-out filled the queue up to its cap, this + // nested reservation blocked forever — before `setRunning` ran — leaving + // the execution stuck at status 'new'. + describe('evaluation executions do not double-reserve concurrency (TRUST-144)', () => { + // A real service whose evaluation queue has a single slot, already + // taken by the test-runner fan-out for this case. + const buildFullEvalConcurrencyControl = () => { + const service = new ConcurrencyControlService( + mockLogger(), + executionRepository, + mock(), + mock(), + mock({ + executions: { + mode: 'regular', + concurrency: { productionLimit: -1, evaluationLimit: 1 }, + }, + deployment: { type: 'default' }, + }), + mock(), + ); + return service; + }; + + const evalExecutionData: IWorkflowExecutionDataProcess = { + ...executionData, + executionMode: 'evaluation', + }; + + test('reaches setRunning even when the evaluation queue is full', async () => { + const realConcurrencyControl = buildFullEvalConcurrencyControl(); + + // The fan-out has taken the only evaluation slot for this case. + await realConcurrencyControl.throttle({ + mode: 'evaluation', + executionId: 'run-1-case-0', + }); + + const evalActiveExecutions = new ActiveExecutions( + mock(), + executionRepository, + executionPersistence, + realConcurrencyControl, + mock(), + executionsConfig, + ); + + let resolvedId: string | undefined; + const addPromise = evalActiveExecutions.add(evalExecutionData).then((id) => { + resolvedId = id; + return id; + }); + + // If `add` re-reserved a slot it would block here (queue is full). + await new Promise((resolve) => setImmediate(resolve)); + + expect(resolvedId).toBe(FAKE_EXECUTION_ID); + expect(executionRepository.setRunning).toHaveBeenCalledWith(FAKE_EXECUTION_ID); + + await addPromise; + }); + + test('does not throttle the evaluation queue', async () => { + const realConcurrencyControl = buildFullEvalConcurrencyControl(); + const throttleSpy = jest.spyOn(realConcurrencyControl, 'throttle'); + + const evalActiveExecutions = new ActiveExecutions( + mock(), + executionRepository, + executionPersistence, + realConcurrencyControl, + mock(), + executionsConfig, + ); + + await evalActiveExecutions.add(evalExecutionData); + + expect(throttleSpy).not.toHaveBeenCalled(); + }); + }); + describe('attachWorkflowExecution', () => { test('Should fail attaching execution to invalid executionId', async () => { expect(() => { diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index a12ba702313..b014a836e9e 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -67,6 +67,18 @@ export class ActiveExecutions { const mode = executionData.executionMode; const capacityReservation = new ConcurrencyCapacityReservation(this.concurrencyControl); + // Evaluation executions are already gated instance-wide by the + // test-runner fan-out, which throttles the shared evaluation + // concurrency queue before launching each case (see + // `test-runner.service.ee.ts`). Reserving capacity again here would + // consume a second slot from the same queue for the same case; once + // the fan-out fills the queue up to its cap, this nested reservation + // blocks forever — before `setRunning` runs — leaving the execution + // stuck at status 'new' with `startedAt` null (TRUST-144). Skip the + // reservation for evaluation mode; `release()` below is a no-op when + // nothing was reserved. + const shouldReserveCapacity = mode !== 'evaluation'; + try { if (maybeExecutionId === undefined) { const fullExecutionData: CreateExecutionPayload = { @@ -89,7 +101,9 @@ export class ActiveExecutions { maybeExecutionId = await this.executionPersistence.create(fullExecutionData); assert(maybeExecutionId); - await capacityReservation.reserve({ mode, executionId: maybeExecutionId }); + if (shouldReserveCapacity) { + await capacityReservation.reserve({ mode, executionId: maybeExecutionId }); + } if (this.executionsConfig.mode === 'regular') { await this.executionRepository.setRunning(maybeExecutionId); @@ -98,7 +112,9 @@ export class ActiveExecutions { } else { // Is an existing execution we want to finish so update in DB - await capacityReservation.reserve({ mode, executionId: maybeExecutionId }); + if (shouldReserveCapacity) { + await capacityReservation.reserve({ mode, executionId: maybeExecutionId }); + } const execution: Pick = { id: maybeExecutionId,