fix(core): Filter WaitTracker to only poll waiting executions (#29898)

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mike Repeć 2026-05-07 12:10:05 +02:00 committed by GitHub
parent 15105610f6
commit 5c7921f71c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 160 additions and 124 deletions

View File

@ -1,10 +1,18 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { GlobalConfig } from '@n8n/config';
import type { SqliteConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { In, LessThan, And, Not } from '@n8n/typeorm';
import type { SelectQueryBuilder } from '@n8n/typeorm';
import { In, LessThan, LessThanOrEqual, And, Not } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { BinaryDataService } from 'n8n-core';
import type { IRunExecutionData, IWorkflowBase } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { ExecutionEntity } from '../../entities';
import type { IExecutionResponse } from '../../entities/types-db';
import { mockEntityManager } from '../../utils/test-utils/mock-entity-manager';
import { mockInstance } from '../../utils/test-utils/mock-instance';
import { ExecutionRepository } from '../execution.repository';
const GREATER_THAN_MAX_UPDATE_THRESHOLD = 901;
@ -14,6 +22,10 @@ const GREATER_THAN_MAX_UPDATE_THRESHOLD = 901;
*/
describe('ExecutionRepository', () => {
const entityManager = mockEntityManager(ExecutionEntity);
const globalConfig = mockInstance(GlobalConfig, {
logging: { outputs: ['console'], scopes: [] },
});
mockInstance(BinaryDataService);
const executionRepository = Container.get(ExecutionRepository);
beforeEach(() => {
@ -366,6 +378,18 @@ describe('ExecutionRepository', () => {
await executionRepository.markAsCrashed(manyExecutionsToMarkAsCrashed);
expect(entityManager.update).toBeCalledTimes(2);
});
test('should clear waitTill when marking executions as crashed', async () => {
const executionIds = ['1', '2'];
await executionRepository.markAsCrashed(executionIds);
expect(entityManager.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: In(executionIds) },
expect.objectContaining({ status: 'crashed', waitTill: null }),
);
});
});
describe('stopDuringRun', () => {
@ -427,7 +451,7 @@ describe('ExecutionRepository', () => {
expect(entityManager.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId },
{ status: 'running', startedAt: expect.any(Date) },
{ status: 'running', startedAt: expect.any(Date), waitTill: null },
);
expect(result).toBeInstanceOf(Date);
});
@ -444,9 +468,133 @@ describe('ExecutionRepository', () => {
expect(entityManager.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId },
{ status: 'running', startedAt: existingStartedAt },
{ status: 'running', startedAt: existingStartedAt, waitTill: null },
);
expect(result).toBe(existingStartedAt);
});
});
describe('cancelMany', () => {
test('should clear waitTill when canceling executions', async () => {
const executionIds = ['1', '2', '3'];
await executionRepository.cancelMany(executionIds);
expect(entityManager.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: In(executionIds) },
expect.objectContaining({ status: 'canceled', waitTill: null }),
);
});
});
describe('stopBeforeRun', () => {
test('should clear waitTill when stopping execution before run', async () => {
const execution = mock<IExecutionResponse>({
id: '1',
status: 'waiting',
waitTill: new Date('2025-01-01T00:00:00.000Z'),
});
await executionRepository.stopBeforeRun(execution);
expect(execution.waitTill).toBeNull();
expect(execution.status).toBe('canceled');
expect(entityManager.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: '1' },
expect.objectContaining({ status: 'canceled', waitTill: null }),
);
});
});
describe('getWaitingExecutions', () => {
const mockDate = new Date('2023-12-28 12:34:56.789Z');
beforeAll(() => jest.useFakeTimers().setSystemTime(mockDate));
afterAll(() => jest.useRealTimers());
test.each(['sqlite', 'postgresdb'] as const)(
'on %s, should only return executions with status=waiting',
async (dbType) => {
globalConfig.database.type = dbType;
entityManager.find.mockResolvedValueOnce([]);
await executionRepository.getWaitingExecutions();
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
order: { waitTill: 'ASC' },
select: ['id', 'waitTill'],
where: {
status: 'waiting',
waitTill: LessThanOrEqual(
dbType === 'sqlite'
? '2023-12-28 12:36:06.789'
: new Date('2023-12-28T12:36:06.789Z'),
),
},
});
},
);
});
describe('deleteExecutionsByFilter', () => {
test('should delete binary data', async () => {
const workflowId = nanoid();
const binaryDataService = Container.get(BinaryDataService);
jest.spyOn(executionRepository, 'createQueryBuilder').mockReturnValue(
mock<SelectQueryBuilder<ExecutionEntity>>({
select: jest.fn().mockReturnThis(),
andWhere: jest.fn().mockReturnThis(),
getMany: jest.fn().mockResolvedValue([{ id: '1', workflowId }]),
}),
);
await executionRepository.deleteExecutionsByFilter({
filters: { id: '1' },
accessibleWorkflowIds: ['1'],
deleteConditions: { ids: ['1'] },
});
expect(binaryDataService.deleteMany).toHaveBeenCalledWith([
{ type: 'execution', executionId: '1', workflowId },
]);
});
});
describe('updateExistingExecution', () => {
test.each(['sqlite', 'postgresdb'] as const)(
'should update execution and data in transaction on %s',
async (dbType) => {
globalConfig.database.type = dbType;
globalConfig.database.sqlite = mock<SqliteConfig>({ poolSize: 1 });
const executionId = '1';
const execution = mock<IExecutionResponse>({
id: executionId,
data: mock<IRunExecutionData>(),
workflowData: mock<IWorkflowBase>(),
status: 'success',
});
const txCallback = jest.fn();
entityManager.transaction.mockImplementation(async (fn: unknown) => {
await (fn as (em: typeof entityManager) => Promise<unknown>)(entityManager);
txCallback();
});
entityManager.update.mockResolvedValue({ affected: 1, raw: [], generatedMaps: [] });
await executionRepository.updateExistingExecution(executionId, execution);
expect(entityManager.transaction).toHaveBeenCalled();
expect(entityManager.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId },
expect.objectContaining({ status: 'success' }),
);
expect(txCallback).toHaveBeenCalledTimes(1);
},
);
});
});

View File

@ -363,6 +363,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
{
status: 'crashed',
stoppedAt: new Date(),
waitTill: null,
},
);
this.logger.info('Marked executions as `crashed`', { executionIds });
@ -382,7 +383,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
await manager.update(
ExecutionEntity,
{ id: executionId },
{ status: 'running', startedAt: effectiveStartedAt },
{ status: 'running', startedAt: effectiveStartedAt, waitTill: null },
);
return effectiveStartedAt;
@ -608,7 +609,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
const waitTill = new Date(Date.now() + 70000);
const where: FindOptionsWhere<ExecutionEntity> = {
waitTill: LessThanOrEqual(waitTill),
status: Not('crashed'),
status: 'waiting',
};
const dbType = this.globalConfig.database.type;
@ -783,10 +784,11 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
async stopBeforeRun(execution: IExecutionResponse) {
execution.status = 'canceled';
execution.stoppedAt = new Date();
execution.waitTill = null;
await this.update(
{ id: execution.id },
{ status: execution.status, stoppedAt: execution.stoppedAt },
{ status: execution.status, stoppedAt: execution.stoppedAt, waitTill: execution.waitTill },
);
return execution;
@ -813,7 +815,10 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}
async cancelMany(executionIds: string[]) {
await this.update({ id: In(executionIds) }, { status: 'canceled', stoppedAt: new Date() });
await this.update(
{ id: In(executionIds) },
{ status: 'canceled', stoppedAt: new Date(), waitTill: null },
);
}
// ----------------------------------

View File

@ -1,117 +0,0 @@
import { mockInstance } from '@n8n/backend-test-utils';
import { GlobalConfig } from '@n8n/config';
import type { SqliteConfig } from '@n8n/config';
import type { IExecutionResponse } from '@n8n/db';
import { ExecutionEntity, ExecutionRepository } from '@n8n/db';
import { Container } from '@n8n/di';
import type { SelectQueryBuilder } from '@n8n/typeorm';
import { Not, LessThanOrEqual } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { BinaryDataService } from 'n8n-core';
import type { IRunExecutionData, IWorkflowBase } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { mockEntityManager } from '@test/mocking';
describe('ExecutionRepository', () => {
const entityManager = mockEntityManager(ExecutionEntity);
const globalConfig = mockInstance(GlobalConfig, {
logging: { outputs: ['console'], scopes: [] },
});
const binaryDataService = mockInstance(BinaryDataService);
const executionRepository = Container.get(ExecutionRepository);
const mockDate = new Date('2023-12-28 12:34:56.789Z');
beforeAll(() => {
jest.clearAllMocks();
jest.useFakeTimers().setSystemTime(mockDate);
});
afterAll(() => jest.useRealTimers());
describe('getWaitingExecutions()', () => {
test.each(['sqlite', 'postgresdb'] as const)(
'on %s, should be called with expected args',
async (dbType) => {
globalConfig.database.type = dbType;
entityManager.find.mockResolvedValueOnce([]);
await executionRepository.getWaitingExecutions();
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
order: { waitTill: 'ASC' },
select: ['id', 'waitTill'],
where: {
status: Not('crashed'),
waitTill: LessThanOrEqual(
dbType === 'sqlite'
? '2023-12-28 12:36:06.789'
: new Date('2023-12-28T12:36:06.789Z'),
),
},
});
},
);
});
describe('deleteExecutionsByFilter', () => {
test('should delete binary data', async () => {
const workflowId = nanoid();
jest.spyOn(executionRepository, 'createQueryBuilder').mockReturnValue(
mock<SelectQueryBuilder<ExecutionEntity>>({
select: jest.fn().mockReturnThis(),
andWhere: jest.fn().mockReturnThis(),
getMany: jest.fn().mockResolvedValue([{ id: '1', workflowId }]),
}),
);
await executionRepository.deleteExecutionsByFilter({
filters: { id: '1' },
accessibleWorkflowIds: ['1'],
deleteConditions: { ids: ['1'] },
});
expect(binaryDataService.deleteMany).toHaveBeenCalledWith([
{ type: 'execution', executionId: '1', workflowId },
]);
});
});
describe('updateExistingExecution', () => {
test.each(['sqlite', 'postgresdb'] as const)(
'should update execution and data in transaction on %s',
async (dbType) => {
globalConfig.database.type = dbType;
globalConfig.database.sqlite = mock<SqliteConfig>({ poolSize: 1 });
const executionId = '1';
const execution = mock<IExecutionResponse>({
id: executionId,
data: mock<IRunExecutionData>(),
workflowData: mock<IWorkflowBase>(),
status: 'success',
});
const txCallback = jest.fn();
entityManager.transaction.mockImplementation(async (cb) => {
// @ts-expect-error Mock
await cb(entityManager);
txCallback();
});
// Mock update to return affected count
entityManager.update.mockResolvedValue({ affected: 1, raw: [], generatedMaps: [] });
await executionRepository.updateExistingExecution(executionId, execution);
expect(entityManager.transaction).toHaveBeenCalled();
expect(entityManager.update).toHaveBeenCalledWith(
ExecutionEntity,
{ id: executionId },
expect.objectContaining({ status: 'success' }),
);
expect(txCallback).toHaveBeenCalledTimes(1);
},
);
});
});