mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-04 18:49:20 +02:00
fix(core): Prevent evaluation executions from stalling in status new (#31619)
This commit is contained in:
parent
fe87a5da57
commit
ea800f715d
|
|
@ -1,6 +1,7 @@
|
|||
import { Logger } from '@n8n/backend-common';
|
||||
import { mockInstance } from '@n8n/backend-test-utils';
|
||||
import { mockInstance, mockLogger } from '@n8n/backend-test-utils';
|
||||
import { ExecutionsConfig } from '@n8n/config';
|
||||
import type { GlobalConfig } from '@n8n/config';
|
||||
import type { ExecutionRepository } from '@n8n/db';
|
||||
import type { Response } from 'express';
|
||||
import { captor, mock } from 'jest-mock-extended';
|
||||
|
|
@ -22,7 +23,10 @@ import { v4 as uuid } from 'uuid';
|
|||
|
||||
import { ActiveExecutions } from '@/active-executions';
|
||||
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
|
||||
import type { EventService } from '@/events/event.service';
|
||||
import type { ExecutionPersistence } from '@/executions/execution-persistence';
|
||||
import type { License } from '@/license';
|
||||
import type { Telemetry } from '@/telemetry';
|
||||
|
||||
jest.mock('n8n-workflow', () => ({
|
||||
...jest.requireActual('n8n-workflow'),
|
||||
|
|
@ -183,6 +187,91 @@ describe('ActiveExecutions', () => {
|
|||
});
|
||||
});
|
||||
|
||||
// TRUST-144: evaluation executions stayed in status 'new' (startedAt
|
||||
// null) and never got picked up. The test-runner fan-out already throttles
|
||||
// the shared evaluation concurrency queue before launching each case, so a
|
||||
// second reservation here consumed a second slot from the same queue for
|
||||
// the same case. Once the fan-out filled the queue up to its cap, this
|
||||
// nested reservation blocked forever — before `setRunning` ran — leaving
|
||||
// the execution stuck at status 'new'.
|
||||
describe('evaluation executions do not double-reserve concurrency (TRUST-144)', () => {
|
||||
// A real service whose evaluation queue has a single slot, already
|
||||
// taken by the test-runner fan-out for this case.
|
||||
const buildFullEvalConcurrencyControl = () => {
|
||||
const service = new ConcurrencyControlService(
|
||||
mockLogger(),
|
||||
executionRepository,
|
||||
mock<Telemetry>(),
|
||||
mock<EventService>(),
|
||||
mock<GlobalConfig>({
|
||||
executions: {
|
||||
mode: 'regular',
|
||||
concurrency: { productionLimit: -1, evaluationLimit: 1 },
|
||||
},
|
||||
deployment: { type: 'default' },
|
||||
}),
|
||||
mock<License>(),
|
||||
);
|
||||
return service;
|
||||
};
|
||||
|
||||
const evalExecutionData: IWorkflowExecutionDataProcess = {
|
||||
...executionData,
|
||||
executionMode: 'evaluation',
|
||||
};
|
||||
|
||||
test('reaches setRunning even when the evaluation queue is full', async () => {
|
||||
const realConcurrencyControl = buildFullEvalConcurrencyControl();
|
||||
|
||||
// The fan-out has taken the only evaluation slot for this case.
|
||||
await realConcurrencyControl.throttle({
|
||||
mode: 'evaluation',
|
||||
executionId: 'run-1-case-0',
|
||||
});
|
||||
|
||||
const evalActiveExecutions = new ActiveExecutions(
|
||||
mock(),
|
||||
executionRepository,
|
||||
executionPersistence,
|
||||
realConcurrencyControl,
|
||||
mock(),
|
||||
executionsConfig,
|
||||
);
|
||||
|
||||
let resolvedId: string | undefined;
|
||||
const addPromise = evalActiveExecutions.add(evalExecutionData).then((id) => {
|
||||
resolvedId = id;
|
||||
return id;
|
||||
});
|
||||
|
||||
// If `add` re-reserved a slot it would block here (queue is full).
|
||||
await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(resolvedId).toBe(FAKE_EXECUTION_ID);
|
||||
expect(executionRepository.setRunning).toHaveBeenCalledWith(FAKE_EXECUTION_ID);
|
||||
|
||||
await addPromise;
|
||||
});
|
||||
|
||||
test('does not throttle the evaluation queue', async () => {
|
||||
const realConcurrencyControl = buildFullEvalConcurrencyControl();
|
||||
const throttleSpy = jest.spyOn(realConcurrencyControl, 'throttle');
|
||||
|
||||
const evalActiveExecutions = new ActiveExecutions(
|
||||
mock(),
|
||||
executionRepository,
|
||||
executionPersistence,
|
||||
realConcurrencyControl,
|
||||
mock(),
|
||||
executionsConfig,
|
||||
);
|
||||
|
||||
await evalActiveExecutions.add(evalExecutionData);
|
||||
|
||||
expect(throttleSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('attachWorkflowExecution', () => {
|
||||
test('Should fail attaching execution to invalid executionId', async () => {
|
||||
expect(() => {
|
||||
|
|
|
|||
|
|
@ -67,6 +67,18 @@ export class ActiveExecutions {
|
|||
const mode = executionData.executionMode;
|
||||
const capacityReservation = new ConcurrencyCapacityReservation(this.concurrencyControl);
|
||||
|
||||
// Evaluation executions are already gated instance-wide by the
|
||||
// test-runner fan-out, which throttles the shared evaluation
|
||||
// concurrency queue before launching each case (see
|
||||
// `test-runner.service.ee.ts`). Reserving capacity again here would
|
||||
// consume a second slot from the same queue for the same case; once
|
||||
// the fan-out fills the queue up to its cap, this nested reservation
|
||||
// blocks forever — before `setRunning` runs — leaving the execution
|
||||
// stuck at status 'new' with `startedAt` null (TRUST-144). Skip the
|
||||
// reservation for evaluation mode; `release()` below is a no-op when
|
||||
// nothing was reserved.
|
||||
const shouldReserveCapacity = mode !== 'evaluation';
|
||||
|
||||
try {
|
||||
if (maybeExecutionId === undefined) {
|
||||
const fullExecutionData: CreateExecutionPayload = {
|
||||
|
|
@ -89,7 +101,9 @@ export class ActiveExecutions {
|
|||
maybeExecutionId = await this.executionPersistence.create(fullExecutionData);
|
||||
assert(maybeExecutionId);
|
||||
|
||||
await capacityReservation.reserve({ mode, executionId: maybeExecutionId });
|
||||
if (shouldReserveCapacity) {
|
||||
await capacityReservation.reserve({ mode, executionId: maybeExecutionId });
|
||||
}
|
||||
|
||||
if (this.executionsConfig.mode === 'regular') {
|
||||
await this.executionRepository.setRunning(maybeExecutionId);
|
||||
|
|
@ -98,7 +112,9 @@ export class ActiveExecutions {
|
|||
} else {
|
||||
// Is an existing execution we want to finish so update in DB
|
||||
|
||||
await capacityReservation.reserve({ mode, executionId: maybeExecutionId });
|
||||
if (shouldReserveCapacity) {
|
||||
await capacityReservation.reserve({ mode, executionId: maybeExecutionId });
|
||||
}
|
||||
|
||||
const execution: Pick<IExecutionDb, 'id' | 'data' | 'waitTill' | 'status'> = {
|
||||
id: maybeExecutionId,
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user