fix(core): Limit Insights compaction runs (backport to 1.x) (#30629)
Some checks are pending
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.13.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions

Co-authored-by: Irénée <irenee.ajeneza@n8n.io>
This commit is contained in:
n8n-assistant[bot] 2026-05-19 12:02:59 +01:00 committed by GitHub
parent e1c9c2123f
commit 5ce4ce36cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 483 additions and 26 deletions

View File

@ -0,0 +1,14 @@
import type { MigrationContext, ReversibleMigration } from '../migration-types';
const tableName = 'insights_raw';
const columns = ['timestamp', 'id'];
export class AddInsightsRawTimestampIdIndex1784000000004 implements ReversibleMigration {
async up(context: MigrationContext) {
await context.schemaBuilder.createIndex(tableName, columns);
}
async down(context: MigrationContext) {
await context.schemaBuilder.dropIndex(tableName, columns, { skipIfMissing: true });
}
}

View File

@ -120,6 +120,7 @@ import { ChangeOAuthStateColumnToUnboundedVarchar1763572724000 } from '../common
import { CreateBinaryDataTable1763716655000 } from '../common/1763716655000-CreateBinaryDataTable';
import { CreateWorkflowPublishHistoryTable1764167920585 } from '../common/1764167920585-CreateWorkflowPublishHistoryTable';
import { BackfillMissingWorkflowHistoryRecords1765448186933 } from '../common/1765448186933-BackfillMissingWorkflowHistoryRecords';
import { AddInsightsRawTimestampIdIndex1784000000004 } from '../common/1784000000004-AddInsightsRawTimestampIdIndex';
import type { Migration } from '../migration-types';
export const mysqlMigrations: Migration[] = [
@ -245,4 +246,5 @@ export const mysqlMigrations: Migration[] = [
CreateBinaryDataTable1763716655000,
CreateWorkflowPublishHistoryTable1764167920585,
BackfillMissingWorkflowHistoryRecords1765448186933,
AddInsightsRawTimestampIdIndex1784000000004,
];

View File

@ -120,6 +120,7 @@ import { ChangeOAuthStateColumnToUnboundedVarchar1763572724000 } from '../common
import { CreateBinaryDataTable1763716655000 } from '../common/1763716655000-CreateBinaryDataTable';
import { CreateWorkflowPublishHistoryTable1764167920585 } from '../common/1764167920585-CreateWorkflowPublishHistoryTable';
import { BackfillMissingWorkflowHistoryRecords1765448186933 } from '../common/1765448186933-BackfillMissingWorkflowHistoryRecords';
import { AddInsightsRawTimestampIdIndex1784000000004 } from '../common/1784000000004-AddInsightsRawTimestampIdIndex';
import type { Migration } from '../migration-types';
export const postgresMigrations: Migration[] = [
@ -245,4 +246,5 @@ export const postgresMigrations: Migration[] = [
CreateBinaryDataTable1763716655000,
CreateWorkflowPublishHistoryTable1764167920585,
BackfillMissingWorkflowHistoryRecords1765448186933,
AddInsightsRawTimestampIdIndex1784000000004,
];

View File

@ -116,6 +116,7 @@ import { ChangeOAuthStateColumnToUnboundedVarchar1763572724000 } from '../common
import { CreateBinaryDataTable1763716655000 } from '../common/1763716655000-CreateBinaryDataTable';
import { CreateWorkflowPublishHistoryTable1764167920585 } from '../common/1764167920585-CreateWorkflowPublishHistoryTable';
import { BackfillMissingWorkflowHistoryRecords1765448186933 } from '../common/1765448186933-BackfillMissingWorkflowHistoryRecords';
import { AddInsightsRawTimestampIdIndex1784000000004 } from '../common/1784000000004-AddInsightsRawTimestampIdIndex';
import type { Migration } from '../migration-types';
const sqliteMigrations: Migration[] = [
@ -237,6 +238,7 @@ const sqliteMigrations: Migration[] = [
CreateBinaryDataTable1763716655000,
CreateWorkflowPublishHistoryTable1764167920585,
BackfillMissingWorkflowHistoryRecords1765448186933,
AddInsightsRawTimestampIdIndex1784000000004,
];
export { sqliteMigrations };

View File

@ -5,6 +5,8 @@ import {
testDb,
testModules,
} from '@n8n/backend-test-utils';
import type { Logger } from '@n8n/backend-common';
import type { WorkflowEntity } from '@n8n/db';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
@ -17,13 +19,32 @@ import {
createCompactedInsightsEvent,
createRawInsightsEvents,
} from '../database/entities/__tests__/db-utils';
import type { PeriodUnit } from '../database/entities/insights-shared';
import { InsightsByPeriodRepository } from '../database/repositories/insights-by-period.repository';
import { InsightsCompactionService } from '../insights-compaction.service';
import { InsightsConfig } from '../insights.config';
type CompactionConfig = Pick<
InsightsConfig,
| 'compactionBatchSize'
| 'compactionMaxBatchesPerRun'
| 'compactionMaxRuntimeSeconds'
| 'compactionBatchDelayMilliseconds'
>;
let defaultCompactionConfig: CompactionConfig;
beforeAll(async () => {
await testModules.loadModules(['insights']);
await testDb.init();
const config = Container.get(InsightsConfig);
defaultCompactionConfig = {
compactionBatchSize: config.compactionBatchSize,
compactionMaxBatchesPerRun: config.compactionMaxBatchesPerRun,
compactionMaxRuntimeSeconds: config.compactionMaxRuntimeSeconds,
compactionBatchDelayMilliseconds: config.compactionBatchDelayMilliseconds,
};
});
beforeEach(async () => {
@ -36,11 +57,54 @@ beforeEach(async () => {
]);
});
afterEach(() => {
Object.assign(Container.get(InsightsConfig), defaultCompactionConfig);
jest.useRealTimers();
jest.restoreAllMocks();
});
// Terminate DB once after all tests complete
afterAll(async () => {
await testDb.terminate();
});
function overrideCompactionConfig(overrides: Partial<CompactionConfig>) {
Object.assign(Container.get(InsightsConfig), overrides);
}
async function createRawSuccessEvents(
workflow: WorkflowEntity,
count: number,
{
start = DateTime.utc().startOf('hour'),
minutesBetweenEvents = 1,
value = 1,
}: { start?: DateTime; minutesBetweenEvents?: number; value?: number } = {},
) {
const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>();
let timestamp = start;
for (let i = 0; i < count; i++) {
events.push({ type: 'success', value, timestamp });
timestamp = timestamp.plus({ minute: minutesBetweenEvents });
}
await createRawInsightsEvents(workflow, events);
}
async function getCompactedTotal(periodUnit: PeriodUnit) {
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const insights = await insightsByPeriodRepository.find();
return insights
.filter((insight) => insight.periodUnit === periodUnit)
.reduce((total, insight) => total + insight.value, 0);
}
async function expectRawCount(expected: number) {
await expect(Container.get(InsightsRawRepository).count()).resolves.toBe(expected);
}
describe('compaction', () => {
describe('compactRawToHour', () => {
type TestData = {
@ -278,9 +342,6 @@ describe('compaction', () => {
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
// spy on the compactRawToHour method to check if it's called multiple times
const rawToHourSpy = jest.spyOn(insightsCompactionService, 'compactRawToHour');
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
@ -299,9 +360,6 @@ describe('compaction', () => {
await insightsCompactionService.compactInsights();
// ASSERT
// compaction batch size is 500, so rawToHour should be called 2 times:
// 1st call: 500 events, 2nd call: 100 events
expect(rawToHourSpy).toHaveBeenCalledTimes(2);
await expect(insightsRawRepository.count()).resolves.toBe(0);
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0);
@ -340,6 +398,244 @@ describe('compaction', () => {
});
});
describe('compactInsights in-memory run guard', () => {
test('skips a concurrent run and accepts another run after the active one finishes', async () => {
// ARRANGE
const logger = mock<Logger>({ scoped: jest.fn().mockReturnThis() });
const insightsCompactionService = new InsightsCompactionService(
mock<InsightsByPeriodRepository>(),
mock<InsightsRawRepository>(),
mock<InsightsConfig>({
compactionBatchSize: 2,
compactionBatchDelayMilliseconds: 0,
compactionMaxBatchesPerRun: 0,
compactionMaxRuntimeSeconds: 0,
}),
logger,
);
let resolveRawToHour!: (numberOfCompactedRawData: number) => void;
const firstRawToHourPromise = new Promise<number>((resolve) => {
resolveRawToHour = resolve;
});
const rawToHourSpy = jest
.spyOn(insightsCompactionService, 'compactRawToHour')
.mockReturnValueOnce(firstRawToHourPromise)
.mockResolvedValue(0);
jest.spyOn(insightsCompactionService, 'compactHourToDay').mockResolvedValue(0);
jest.spyOn(insightsCompactionService, 'compactDayToWeek').mockResolvedValue(0);
// ACT
const firstCompactionPromise = insightsCompactionService.compactInsights();
await Promise.resolve();
await insightsCompactionService.compactInsights();
// ASSERT
expect(logger.debug).toHaveBeenCalledWith(
'Skipping insights compaction because another compaction run is active',
);
resolveRawToHour(0);
await firstCompactionPromise;
await insightsCompactionService.compactInsights();
expect(rawToHourSpy).toHaveBeenCalledTimes(2);
});
test('accepts another run after the active run fails', async () => {
// ARRANGE
const logger = mock<Logger>({ scoped: jest.fn().mockReturnThis() });
const insightsCompactionService = new InsightsCompactionService(
mock<InsightsByPeriodRepository>(),
mock<InsightsRawRepository>(),
mock<InsightsConfig>({
compactionBatchSize: 2,
compactionBatchDelayMilliseconds: 0,
compactionMaxBatchesPerRun: 0,
compactionMaxRuntimeSeconds: 0,
}),
logger,
);
const rawToHourSpy = jest
.spyOn(insightsCompactionService, 'compactRawToHour')
.mockRejectedValueOnce(new Error('compaction failed'))
.mockResolvedValue(0);
const hourToDaySpy = jest
.spyOn(insightsCompactionService, 'compactHourToDay')
.mockResolvedValue(0);
const dayToWeekSpy = jest
.spyOn(insightsCompactionService, 'compactDayToWeek')
.mockResolvedValue(0);
// ACT + ASSERT
await expect(insightsCompactionService.compactInsights()).rejects.toThrow(
'compaction failed',
);
await insightsCompactionService.compactInsights();
expect(rawToHourSpy).toHaveBeenCalledTimes(2);
expect(hourToDaySpy).toHaveBeenCalledTimes(1);
expect(dayToWeekSpy).toHaveBeenCalledTimes(1);
});
});
describe('compactInsights run limits', () => {
test('stops raw compaction after compactionMaxBatchesPerRun and resumes later', async () => {
// ARRANGE
overrideCompactionConfig({
compactionBatchSize: 2,
compactionMaxBatchesPerRun: 2,
compactionMaxRuntimeSeconds: 0,
compactionBatchDelayMilliseconds: 0,
});
const insightsCompactionService = Container.get(InsightsCompactionService);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
await createRawSuccessEvents(workflow, 5);
// ACT
await insightsCompactionService.compactInsights();
// ASSERT
await expectRawCount(1);
await expect(getCompactedTotal('hour')).resolves.toBe(4);
// ACT
await insightsCompactionService.compactInsights();
// ASSERT
await expectRawCount(0);
await expect(getCompactedTotal('hour')).resolves.toBe(5);
});
test('applies compactionMaxBatchesPerRun across compaction stages', async () => {
// ARRANGE
const config = Container.get(InsightsConfig);
overrideCompactionConfig({
compactionBatchSize: 2,
compactionMaxBatchesPerRun: 2,
compactionMaxRuntimeSeconds: 0,
compactionBatchDelayMilliseconds: 0,
});
const insightsCompactionService = Container.get(InsightsCompactionService);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
await createRawSuccessEvents(workflow, 1);
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'hour',
periodStart: DateTime.utc()
.minus({ days: config.compactionHourlyToDailyThresholdDays + 1 })
.startOf('hour'),
});
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'hour',
periodStart: DateTime.utc()
.minus({ days: config.compactionHourlyToDailyThresholdDays + 1, hours: 1 })
.startOf('hour'),
});
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'day',
periodStart: DateTime.utc()
.minus({ days: config.compactionDailyToWeeklyThresholdDays + 1 })
.startOf('day'),
});
// ACT
await insightsCompactionService.compactInsights();
// ASSERT
await expectRawCount(0);
await expect(getCompactedTotal('hour')).resolves.toBe(1);
await expect(getCompactedTotal('day')).resolves.toBe(3);
await expect(getCompactedTotal('week')).resolves.toBe(0);
});
test('stops after compactionMaxRuntimeSeconds and resumes later', async () => {
// ARRANGE
overrideCompactionConfig({
compactionBatchSize: 2,
compactionMaxBatchesPerRun: 0,
compactionMaxRuntimeSeconds: 1,
compactionBatchDelayMilliseconds: 0,
});
const insightsCompactionService = Container.get(InsightsCompactionService);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
await createRawSuccessEvents(workflow, 5);
const dateNowSpy = jest
.spyOn(Date, 'now')
.mockReturnValueOnce(0)
.mockReturnValueOnce(0)
.mockReturnValue(1000);
// ACT
await insightsCompactionService.compactInsights();
// ASSERT
await expectRawCount(3);
await expect(getCompactedTotal('hour')).resolves.toBe(2);
dateNowSpy.mockRestore();
overrideCompactionConfig({ compactionMaxRuntimeSeconds: 0 });
// ACT
await insightsCompactionService.compactInsights();
// ASSERT
await expectRawCount(0);
await expect(getCompactedTotal('hour')).resolves.toBe(5);
});
});
describe('raw compaction batch ordering', () => {
test('processes rows with identical timestamps in id order across limited runs', async () => {
// ARRANGE
overrideCompactionConfig({
compactionBatchSize: 2,
compactionMaxBatchesPerRun: 1,
compactionMaxRuntimeSeconds: 0,
compactionBatchDelayMilliseconds: 0,
});
const insightsCompactionService = Container.get(InsightsCompactionService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
const timestamp = DateTime.utc(2000, 1, 1, 0, 0);
const createdEvents: Array<Awaited<ReturnType<typeof createRawInsightsEvent>>> = [];
for (let i = 0; i < 4; i++) {
createdEvents.push(
await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp }),
);
}
// ACT
await insightsCompactionService.compactInsights();
// ASSERT
const remainingAfterFirstRun = await insightsRawRepository.find({ order: { id: 'ASC' } });
expect(remainingAfterFirstRun.map((event) => event.id)).toEqual(
createdEvents.slice(2).map((event) => event.id),
);
await expect(getCompactedTotal('hour')).resolves.toBe(2);
// ACT
await insightsCompactionService.compactInsights();
// ASSERT
await expectRawCount(0);
await expect(getCompactedTotal('hour')).resolves.toBe(4);
});
});
describe('compactHourToDay', () => {
type TestData = {
name: string;

View File

@ -26,6 +26,7 @@ export class InsightsRawRepository extends Repository<InsightsRaw> {
)
.addSelect('timestamp', 'periodStart')
.orderBy('timestamp', 'ASC')
.addOrderBy('id', 'ASC')
.limit(compactionBatchSize);
return batchQuery;

View File

@ -1,10 +1,20 @@
import { Logger } from '@n8n/backend-common';
import { Time } from '@n8n/constants';
import { Service } from '@n8n/di';
import { sleep } from 'n8n-workflow';
import { InsightsByPeriodRepository } from './database/repositories/insights-by-period.repository';
import { InsightsRawRepository } from './database/repositories/insights-raw.repository';
import { InsightsConfig } from './insights.config';
type CompactionRunState = {
startedAt: number;
batchesProcessed: number;
rowsCompacted: number;
};
type CompactionStopReason = 'max-batches' | 'max-runtime';
/**
* This service is responsible for compacting lower granularity insights data
* into higher granularity to control the size of the insights data.
@ -13,6 +23,8 @@ import { InsightsConfig } from './insights.config';
export class InsightsCompactionService {
private compactInsightsTimer: NodeJS.Timeout | undefined;
private isCompactionRunning = false;
constructor(
private readonly insightsByPeriodRepository: InsightsByPeriodRepository,
private readonly insightsRawRepository: InsightsRawRepository,
@ -40,31 +52,139 @@ export class InsightsCompactionService {
}
async compactInsights() {
let numberOfCompactedRawData: number;
if (this.isCompactionRunning) {
this.logger.debug('Skipping insights compaction because another compaction run is active');
return;
}
this.isCompactionRunning = true;
try {
const runState: CompactionRunState = {
startedAt: Date.now(),
batchesProcessed: 0,
rowsCompacted: 0,
};
const stoppedAfterRawToHour = await this.compactStage({
stageName: 'raw-to-hour',
beforeBatchMessage: 'Compacting raw data to hourly aggregates',
afterBatchMessage: (rowsCompacted) =>
`Compacted ${rowsCompacted} raw data to hourly aggregates`,
compactBatch: this.compactRawToHour.bind(this),
runState,
});
if (stoppedAfterRawToHour) return;
const stoppedAfterHourToDay = await this.compactStage({
stageName: 'hour-to-day',
beforeBatchMessage: 'Compacting hourly data to daily aggregates',
afterBatchMessage: (rowsCompacted) =>
`Compacted ${rowsCompacted} hourly data to daily aggregates`,
compactBatch: this.compactHourToDay.bind(this),
runState,
});
if (stoppedAfterHourToDay) return;
await this.compactStage({
stageName: 'day-to-week',
beforeBatchMessage: 'Compacting daily data to weekly aggregates',
afterBatchMessage: (rowsCompacted) =>
`Compacted ${rowsCompacted} daily data to weekly aggregates`,
compactBatch: this.compactDayToWeek.bind(this),
runState,
});
} finally {
this.isCompactionRunning = false;
}
}
private async compactStage({
stageName,
beforeBatchMessage,
afterBatchMessage,
compactBatch,
runState,
}: {
stageName: string;
beforeBatchMessage: string;
afterBatchMessage: (rowsCompacted: number) => string;
compactBatch: () => Promise<number>;
runState: CompactionRunState;
}) {
let numberOfCompactedData: number;
// Compact raw data to hourly aggregates
do {
this.logger.debug('Compacting raw data to hourly aggregates');
numberOfCompactedRawData = await this.compactRawToHour();
this.logger.debug(`Compacted ${numberOfCompactedRawData} raw data to hourly aggregates`);
} while (numberOfCompactedRawData === this.insightsConfig.compactionBatchSize);
const stopReason = this.getCompactionRunStopReason(runState);
if (stopReason !== undefined) {
this.logCompactionRunLimitReached(stopReason, stageName, runState);
return true;
}
let numberOfCompactedHourData: number;
this.logger.debug(beforeBatchMessage);
numberOfCompactedData = await compactBatch();
this.logger.debug(afterBatchMessage(numberOfCompactedData));
// Compact hourly data to daily aggregates
do {
this.logger.debug('Compacting hourly data to daily aggregates');
numberOfCompactedHourData = await this.compactHourToDay();
this.logger.debug(`Compacted ${numberOfCompactedHourData} hourly data to daily aggregates`);
} while (numberOfCompactedHourData === this.insightsConfig.compactionBatchSize);
runState.batchesProcessed++;
runState.rowsCompacted += numberOfCompactedData;
let numberOfCompactedDayData: number;
// Compact daily data to weekly aggregates
do {
this.logger.debug('Compacting daily data to weekly aggregates');
numberOfCompactedDayData = await this.compactDayToWeek();
this.logger.debug(`Compacted ${numberOfCompactedDayData} daily data to weekly aggregates`);
} while (numberOfCompactedDayData === this.insightsConfig.compactionBatchSize);
const stopReasonAfterBatch = this.getCompactionRunStopReason(runState);
if (stopReasonAfterBatch !== undefined) {
this.logCompactionRunLimitReached(stopReasonAfterBatch, stageName, runState);
return true;
}
await this.waitBeforeNextBatchIfFull(numberOfCompactedData);
} while (numberOfCompactedData === this.insightsConfig.compactionBatchSize);
return false;
}
private getCompactionRunStopReason(
runState: CompactionRunState,
): CompactionStopReason | undefined {
if (
this.insightsConfig.compactionMaxBatchesPerRun > 0 &&
runState.batchesProcessed >= this.insightsConfig.compactionMaxBatchesPerRun
) {
return 'max-batches';
}
if (
this.insightsConfig.compactionMaxRuntimeSeconds > 0 &&
Date.now() - runState.startedAt >=
this.insightsConfig.compactionMaxRuntimeSeconds * Time.seconds.toMilliseconds
) {
return 'max-runtime';
}
return undefined;
}
private logCompactionRunLimitReached(
reason: CompactionStopReason,
stageName: string,
runState: CompactionRunState,
) {
this.logger.warn('Stopping insights compaction because a per-run limit was reached', {
reason,
stageName,
batchesProcessed: runState.batchesProcessed,
rowsCompacted: runState.rowsCompacted,
compactionMaxBatchesPerRun: this.insightsConfig.compactionMaxBatchesPerRun,
compactionMaxRuntimeSeconds: this.insightsConfig.compactionMaxRuntimeSeconds,
});
}
private async waitBeforeNextBatchIfFull(numberOfCompactedData: number) {
if (
numberOfCompactedData !== this.insightsConfig.compactionBatchSize ||
this.insightsConfig.compactionBatchDelayMilliseconds <= 0
) {
return;
}
await sleep(this.insightsConfig.compactionBatchDelayMilliseconds);
}
/**

View File

@ -57,4 +57,24 @@ export class InsightsConfig {
*/
@Env('N8N_INSIGHTS_PRUNE_CHECK_INTERVAL_HOURS')
pruneCheckIntervalHours: number = 24;
/**
* The maximum number of compaction batches to process in a single compaction run.
* Set to 0 to disable this limit.
*/
@Env('N8N_INSIGHTS_COMPACTION_MAX_BATCHES_PER_RUN')
compactionMaxBatchesPerRun: number = 1000;
/**
* The maximum runtime in seconds for a single compaction run.
* Set to 0 to disable this limit.
*/
@Env('N8N_INSIGHTS_COMPACTION_MAX_RUNTIME_SECONDS')
compactionMaxRuntimeSeconds: number = 300;
/**
* The delay in milliseconds between full compaction batches.
*/
@Env('N8N_INSIGHTS_COMPACTION_BATCH_DELAY_MILLISECONDS')
compactionBatchDelayMilliseconds: number = 100;
}