diff --git a/packages/@n8n/db/src/migrations/common/1784000000004-AddInsightsRawTimestampIdIndex.ts b/packages/@n8n/db/src/migrations/common/1784000000004-AddInsightsRawTimestampIdIndex.ts new file mode 100644 index 00000000000..16a09c6e61d --- /dev/null +++ b/packages/@n8n/db/src/migrations/common/1784000000004-AddInsightsRawTimestampIdIndex.ts @@ -0,0 +1,14 @@ +import type { MigrationContext, ReversibleMigration } from '../migration-types'; + +const tableName = 'insights_raw'; +const columns = ['timestamp', 'id']; + +export class AddInsightsRawTimestampIdIndex1784000000004 implements ReversibleMigration { + async up(context: MigrationContext) { + await context.schemaBuilder.createIndex(tableName, columns); + } + + async down(context: MigrationContext) { + await context.schemaBuilder.dropIndex(tableName, columns, { skipIfMissing: true }); + } +} diff --git a/packages/@n8n/db/src/migrations/mysqldb/index.ts b/packages/@n8n/db/src/migrations/mysqldb/index.ts index aca6468bd5b..a4423e34927 100644 --- a/packages/@n8n/db/src/migrations/mysqldb/index.ts +++ b/packages/@n8n/db/src/migrations/mysqldb/index.ts @@ -120,6 +120,7 @@ import { ChangeOAuthStateColumnToUnboundedVarchar1763572724000 } from '../common import { CreateBinaryDataTable1763716655000 } from '../common/1763716655000-CreateBinaryDataTable'; import { CreateWorkflowPublishHistoryTable1764167920585 } from '../common/1764167920585-CreateWorkflowPublishHistoryTable'; import { BackfillMissingWorkflowHistoryRecords1765448186933 } from '../common/1765448186933-BackfillMissingWorkflowHistoryRecords'; +import { AddInsightsRawTimestampIdIndex1784000000004 } from '../common/1784000000004-AddInsightsRawTimestampIdIndex'; import type { Migration } from '../migration-types'; export const mysqlMigrations: Migration[] = [ @@ -245,4 +246,5 @@ export const mysqlMigrations: Migration[] = [ CreateBinaryDataTable1763716655000, CreateWorkflowPublishHistoryTable1764167920585, BackfillMissingWorkflowHistoryRecords1765448186933, + AddInsightsRawTimestampIdIndex1784000000004, ]; diff --git a/packages/@n8n/db/src/migrations/postgresdb/index.ts b/packages/@n8n/db/src/migrations/postgresdb/index.ts index a6c06680069..43ebeaa5ef4 100644 --- a/packages/@n8n/db/src/migrations/postgresdb/index.ts +++ b/packages/@n8n/db/src/migrations/postgresdb/index.ts @@ -120,6 +120,7 @@ import { ChangeOAuthStateColumnToUnboundedVarchar1763572724000 } from '../common import { CreateBinaryDataTable1763716655000 } from '../common/1763716655000-CreateBinaryDataTable'; import { CreateWorkflowPublishHistoryTable1764167920585 } from '../common/1764167920585-CreateWorkflowPublishHistoryTable'; import { BackfillMissingWorkflowHistoryRecords1765448186933 } from '../common/1765448186933-BackfillMissingWorkflowHistoryRecords'; +import { AddInsightsRawTimestampIdIndex1784000000004 } from '../common/1784000000004-AddInsightsRawTimestampIdIndex'; import type { Migration } from '../migration-types'; export const postgresMigrations: Migration[] = [ @@ -245,4 +246,5 @@ export const postgresMigrations: Migration[] = [ CreateBinaryDataTable1763716655000, CreateWorkflowPublishHistoryTable1764167920585, BackfillMissingWorkflowHistoryRecords1765448186933, + AddInsightsRawTimestampIdIndex1784000000004, ]; diff --git a/packages/@n8n/db/src/migrations/sqlite/index.ts b/packages/@n8n/db/src/migrations/sqlite/index.ts index 1f15ef36939..8bae7f0f796 100644 --- a/packages/@n8n/db/src/migrations/sqlite/index.ts +++ b/packages/@n8n/db/src/migrations/sqlite/index.ts @@ -116,6 +116,7 @@ import { ChangeOAuthStateColumnToUnboundedVarchar1763572724000 } from '../common import { CreateBinaryDataTable1763716655000 } from '../common/1763716655000-CreateBinaryDataTable'; import { CreateWorkflowPublishHistoryTable1764167920585 } from '../common/1764167920585-CreateWorkflowPublishHistoryTable'; import { BackfillMissingWorkflowHistoryRecords1765448186933 } from '../common/1765448186933-BackfillMissingWorkflowHistoryRecords'; +import { AddInsightsRawTimestampIdIndex1784000000004 } from '../common/1784000000004-AddInsightsRawTimestampIdIndex'; import type { Migration } from '../migration-types'; const sqliteMigrations: Migration[] = [ @@ -237,6 +238,7 @@ const sqliteMigrations: Migration[] = [ CreateBinaryDataTable1763716655000, CreateWorkflowPublishHistoryTable1764167920585, BackfillMissingWorkflowHistoryRecords1765448186933, + AddInsightsRawTimestampIdIndex1784000000004, ]; export { sqliteMigrations }; diff --git a/packages/cli/src/modules/insights/__tests__/insights-compaction.service.integration.test.ts b/packages/cli/src/modules/insights/__tests__/insights-compaction.service.integration.test.ts index c2075a69845..ee08f434dee 100644 --- a/packages/cli/src/modules/insights/__tests__/insights-compaction.service.integration.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights-compaction.service.integration.test.ts @@ -5,6 +5,8 @@ import { testDb, testModules, } from '@n8n/backend-test-utils'; +import type { Logger } from '@n8n/backend-common'; +import type { WorkflowEntity } from '@n8n/db'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; @@ -17,13 +19,32 @@ import { createCompactedInsightsEvent, createRawInsightsEvents, } from '../database/entities/__tests__/db-utils'; +import type { PeriodUnit } from '../database/entities/insights-shared'; import { InsightsByPeriodRepository } from '../database/repositories/insights-by-period.repository'; import { InsightsCompactionService } from '../insights-compaction.service'; import { InsightsConfig } from '../insights.config'; +type CompactionConfig = Pick< + InsightsConfig, + | 'compactionBatchSize' + | 'compactionMaxBatchesPerRun' + | 'compactionMaxRuntimeSeconds' + | 'compactionBatchDelayMilliseconds' +>; + +let defaultCompactionConfig: CompactionConfig; + beforeAll(async () => { await testModules.loadModules(['insights']); await testDb.init(); + + const config = Container.get(InsightsConfig); + defaultCompactionConfig = { + compactionBatchSize: config.compactionBatchSize, + compactionMaxBatchesPerRun: config.compactionMaxBatchesPerRun, + compactionMaxRuntimeSeconds: config.compactionMaxRuntimeSeconds, + compactionBatchDelayMilliseconds: config.compactionBatchDelayMilliseconds, + }; }); beforeEach(async () => { @@ -36,11 +57,54 @@ beforeEach(async () => { ]); }); +afterEach(() => { + Object.assign(Container.get(InsightsConfig), defaultCompactionConfig); + jest.useRealTimers(); + jest.restoreAllMocks(); +}); + // Terminate DB once after all tests complete afterAll(async () => { await testDb.terminate(); }); +function overrideCompactionConfig(overrides: Partial) { + Object.assign(Container.get(InsightsConfig), overrides); +} + +async function createRawSuccessEvents( + workflow: WorkflowEntity, + count: number, + { + start = DateTime.utc().startOf('hour'), + minutesBetweenEvents = 1, + value = 1, + }: { start?: DateTime; minutesBetweenEvents?: number; value?: number } = {}, +) { + const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>(); + let timestamp = start; + + for (let i = 0; i < count; i++) { + events.push({ type: 'success', value, timestamp }); + timestamp = timestamp.plus({ minute: minutesBetweenEvents }); + } + + await createRawInsightsEvents(workflow, events); +} + +async function getCompactedTotal(periodUnit: PeriodUnit) { + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + const insights = await insightsByPeriodRepository.find(); + + return insights + .filter((insight) => insight.periodUnit === periodUnit) + .reduce((total, insight) => total + insight.value, 0); +} + +async function expectRawCount(expected: number) { + await expect(Container.get(InsightsRawRepository).count()).resolves.toBe(expected); +} + describe('compaction', () => { describe('compactRawToHour', () => { type TestData = { @@ -278,9 +342,6 @@ describe('compaction', () => { const insightsRawRepository = Container.get(InsightsRawRepository); const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); - // spy on the compactRawToHour method to check if it's called multiple times - const rawToHourSpy = jest.spyOn(insightsCompactionService, 'compactRawToHour'); - const project = await createTeamProject(); const workflow = await createWorkflow({}, project); @@ -299,9 +360,6 @@ describe('compaction', () => { await insightsCompactionService.compactInsights(); // ASSERT - // compaction batch size is 500, so rawToHour should be called 2 times: - // 1st call: 500 events, 2nd call: 100 events - expect(rawToHourSpy).toHaveBeenCalledTimes(2); await expect(insightsRawRepository.count()).resolves.toBe(0); const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); @@ -340,6 +398,244 @@ describe('compaction', () => { }); }); + describe('compactInsights in-memory run guard', () => { + test('skips a concurrent run and accepts another run after the active one finishes', async () => { + // ARRANGE + const logger = mock({ scoped: jest.fn().mockReturnThis() }); + const insightsCompactionService = new InsightsCompactionService( + mock(), + mock(), + mock({ + compactionBatchSize: 2, + compactionBatchDelayMilliseconds: 0, + compactionMaxBatchesPerRun: 0, + compactionMaxRuntimeSeconds: 0, + }), + logger, + ); + + let resolveRawToHour!: (numberOfCompactedRawData: number) => void; + const firstRawToHourPromise = new Promise((resolve) => { + resolveRawToHour = resolve; + }); + const rawToHourSpy = jest + .spyOn(insightsCompactionService, 'compactRawToHour') + .mockReturnValueOnce(firstRawToHourPromise) + .mockResolvedValue(0); + jest.spyOn(insightsCompactionService, 'compactHourToDay').mockResolvedValue(0); + jest.spyOn(insightsCompactionService, 'compactDayToWeek').mockResolvedValue(0); + + // ACT + const firstCompactionPromise = insightsCompactionService.compactInsights(); + await Promise.resolve(); + + await insightsCompactionService.compactInsights(); + + // ASSERT + expect(logger.debug).toHaveBeenCalledWith( + 'Skipping insights compaction because another compaction run is active', + ); + + resolveRawToHour(0); + await firstCompactionPromise; + + await insightsCompactionService.compactInsights(); + expect(rawToHourSpy).toHaveBeenCalledTimes(2); + }); + + test('accepts another run after the active run fails', async () => { + // ARRANGE + const logger = mock({ scoped: jest.fn().mockReturnThis() }); + const insightsCompactionService = new InsightsCompactionService( + mock(), + mock(), + mock({ + compactionBatchSize: 2, + compactionBatchDelayMilliseconds: 0, + compactionMaxBatchesPerRun: 0, + compactionMaxRuntimeSeconds: 0, + }), + logger, + ); + const rawToHourSpy = jest + .spyOn(insightsCompactionService, 'compactRawToHour') + .mockRejectedValueOnce(new Error('compaction failed')) + .mockResolvedValue(0); + const hourToDaySpy = jest + .spyOn(insightsCompactionService, 'compactHourToDay') + .mockResolvedValue(0); + const dayToWeekSpy = jest + .spyOn(insightsCompactionService, 'compactDayToWeek') + .mockResolvedValue(0); + + // ACT + ASSERT + await expect(insightsCompactionService.compactInsights()).rejects.toThrow( + 'compaction failed', + ); + + await insightsCompactionService.compactInsights(); + expect(rawToHourSpy).toHaveBeenCalledTimes(2); + expect(hourToDaySpy).toHaveBeenCalledTimes(1); + expect(dayToWeekSpy).toHaveBeenCalledTimes(1); + }); + }); + + describe('compactInsights run limits', () => { + test('stops raw compaction after compactionMaxBatchesPerRun and resumes later', async () => { + // ARRANGE + overrideCompactionConfig({ + compactionBatchSize: 2, + compactionMaxBatchesPerRun: 2, + compactionMaxRuntimeSeconds: 0, + compactionBatchDelayMilliseconds: 0, + }); + const insightsCompactionService = Container.get(InsightsCompactionService); + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + await createRawSuccessEvents(workflow, 5); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expectRawCount(1); + await expect(getCompactedTotal('hour')).resolves.toBe(4); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expectRawCount(0); + await expect(getCompactedTotal('hour')).resolves.toBe(5); + }); + + test('applies compactionMaxBatchesPerRun across compaction stages', async () => { + // ARRANGE + const config = Container.get(InsightsConfig); + overrideCompactionConfig({ + compactionBatchSize: 2, + compactionMaxBatchesPerRun: 2, + compactionMaxRuntimeSeconds: 0, + compactionBatchDelayMilliseconds: 0, + }); + const insightsCompactionService = Container.get(InsightsCompactionService); + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + await createRawSuccessEvents(workflow, 1); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'hour', + periodStart: DateTime.utc() + .minus({ days: config.compactionHourlyToDailyThresholdDays + 1 }) + .startOf('hour'), + }); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'hour', + periodStart: DateTime.utc() + .minus({ days: config.compactionHourlyToDailyThresholdDays + 1, hours: 1 }) + .startOf('hour'), + }); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'day', + periodStart: DateTime.utc() + .minus({ days: config.compactionDailyToWeeklyThresholdDays + 1 }) + .startOf('day'), + }); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expectRawCount(0); + await expect(getCompactedTotal('hour')).resolves.toBe(1); + await expect(getCompactedTotal('day')).resolves.toBe(3); + await expect(getCompactedTotal('week')).resolves.toBe(0); + }); + + test('stops after compactionMaxRuntimeSeconds and resumes later', async () => { + // ARRANGE + overrideCompactionConfig({ + compactionBatchSize: 2, + compactionMaxBatchesPerRun: 0, + compactionMaxRuntimeSeconds: 1, + compactionBatchDelayMilliseconds: 0, + }); + const insightsCompactionService = Container.get(InsightsCompactionService); + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + await createRawSuccessEvents(workflow, 5); + const dateNowSpy = jest + .spyOn(Date, 'now') + .mockReturnValueOnce(0) + .mockReturnValueOnce(0) + .mockReturnValue(1000); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expectRawCount(3); + await expect(getCompactedTotal('hour')).resolves.toBe(2); + + dateNowSpy.mockRestore(); + overrideCompactionConfig({ compactionMaxRuntimeSeconds: 0 }); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expectRawCount(0); + await expect(getCompactedTotal('hour')).resolves.toBe(5); + }); + }); + + describe('raw compaction batch ordering', () => { + test('processes rows with identical timestamps in id order across limited runs', async () => { + // ARRANGE + overrideCompactionConfig({ + compactionBatchSize: 2, + compactionMaxBatchesPerRun: 1, + compactionMaxRuntimeSeconds: 0, + compactionBatchDelayMilliseconds: 0, + }); + const insightsCompactionService = Container.get(InsightsCompactionService); + const insightsRawRepository = Container.get(InsightsRawRepository); + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + const timestamp = DateTime.utc(2000, 1, 1, 0, 0); + + const createdEvents: Array>> = []; + for (let i = 0; i < 4; i++) { + createdEvents.push( + await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp }), + ); + } + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + const remainingAfterFirstRun = await insightsRawRepository.find({ order: { id: 'ASC' } }); + expect(remainingAfterFirstRun.map((event) => event.id)).toEqual( + createdEvents.slice(2).map((event) => event.id), + ); + await expect(getCompactedTotal('hour')).resolves.toBe(2); + + // ACT + await insightsCompactionService.compactInsights(); + + // ASSERT + await expectRawCount(0); + await expect(getCompactedTotal('hour')).resolves.toBe(4); + }); + }); + describe('compactHourToDay', () => { type TestData = { name: string; diff --git a/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts b/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts index 22e114cb8d3..2f6e35c491f 100644 --- a/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts +++ b/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts @@ -26,6 +26,7 @@ export class InsightsRawRepository extends Repository { ) .addSelect('timestamp', 'periodStart') .orderBy('timestamp', 'ASC') + .addOrderBy('id', 'ASC') .limit(compactionBatchSize); return batchQuery; diff --git a/packages/cli/src/modules/insights/insights-compaction.service.ts b/packages/cli/src/modules/insights/insights-compaction.service.ts index 6567201d891..4202b7874a1 100644 --- a/packages/cli/src/modules/insights/insights-compaction.service.ts +++ b/packages/cli/src/modules/insights/insights-compaction.service.ts @@ -1,10 +1,20 @@ import { Logger } from '@n8n/backend-common'; +import { Time } from '@n8n/constants'; import { Service } from '@n8n/di'; +import { sleep } from 'n8n-workflow'; import { InsightsByPeriodRepository } from './database/repositories/insights-by-period.repository'; import { InsightsRawRepository } from './database/repositories/insights-raw.repository'; import { InsightsConfig } from './insights.config'; +type CompactionRunState = { + startedAt: number; + batchesProcessed: number; + rowsCompacted: number; +}; + +type CompactionStopReason = 'max-batches' | 'max-runtime'; + /** * This service is responsible for compacting lower granularity insights data * into higher granularity to control the size of the insights data. @@ -13,6 +23,8 @@ import { InsightsConfig } from './insights.config'; export class InsightsCompactionService { private compactInsightsTimer: NodeJS.Timeout | undefined; + private isCompactionRunning = false; + constructor( private readonly insightsByPeriodRepository: InsightsByPeriodRepository, private readonly insightsRawRepository: InsightsRawRepository, @@ -40,31 +52,139 @@ export class InsightsCompactionService { } async compactInsights() { - let numberOfCompactedRawData: number; + if (this.isCompactionRunning) { + this.logger.debug('Skipping insights compaction because another compaction run is active'); + return; + } + + this.isCompactionRunning = true; + + try { + const runState: CompactionRunState = { + startedAt: Date.now(), + batchesProcessed: 0, + rowsCompacted: 0, + }; + + const stoppedAfterRawToHour = await this.compactStage({ + stageName: 'raw-to-hour', + beforeBatchMessage: 'Compacting raw data to hourly aggregates', + afterBatchMessage: (rowsCompacted) => + `Compacted ${rowsCompacted} raw data to hourly aggregates`, + compactBatch: this.compactRawToHour.bind(this), + runState, + }); + if (stoppedAfterRawToHour) return; + + const stoppedAfterHourToDay = await this.compactStage({ + stageName: 'hour-to-day', + beforeBatchMessage: 'Compacting hourly data to daily aggregates', + afterBatchMessage: (rowsCompacted) => + `Compacted ${rowsCompacted} hourly data to daily aggregates`, + compactBatch: this.compactHourToDay.bind(this), + runState, + }); + if (stoppedAfterHourToDay) return; + + await this.compactStage({ + stageName: 'day-to-week', + beforeBatchMessage: 'Compacting daily data to weekly aggregates', + afterBatchMessage: (rowsCompacted) => + `Compacted ${rowsCompacted} daily data to weekly aggregates`, + compactBatch: this.compactDayToWeek.bind(this), + runState, + }); + } finally { + this.isCompactionRunning = false; + } + } + + private async compactStage({ + stageName, + beforeBatchMessage, + afterBatchMessage, + compactBatch, + runState, + }: { + stageName: string; + beforeBatchMessage: string; + afterBatchMessage: (rowsCompacted: number) => string; + compactBatch: () => Promise; + runState: CompactionRunState; + }) { + let numberOfCompactedData: number; - // Compact raw data to hourly aggregates do { - this.logger.debug('Compacting raw data to hourly aggregates'); - numberOfCompactedRawData = await this.compactRawToHour(); - this.logger.debug(`Compacted ${numberOfCompactedRawData} raw data to hourly aggregates`); - } while (numberOfCompactedRawData === this.insightsConfig.compactionBatchSize); + const stopReason = this.getCompactionRunStopReason(runState); + if (stopReason !== undefined) { + this.logCompactionRunLimitReached(stopReason, stageName, runState); + return true; + } - let numberOfCompactedHourData: number; + this.logger.debug(beforeBatchMessage); + numberOfCompactedData = await compactBatch(); + this.logger.debug(afterBatchMessage(numberOfCompactedData)); - // Compact hourly data to daily aggregates - do { - this.logger.debug('Compacting hourly data to daily aggregates'); - numberOfCompactedHourData = await this.compactHourToDay(); - this.logger.debug(`Compacted ${numberOfCompactedHourData} hourly data to daily aggregates`); - } while (numberOfCompactedHourData === this.insightsConfig.compactionBatchSize); + runState.batchesProcessed++; + runState.rowsCompacted += numberOfCompactedData; - let numberOfCompactedDayData: number; - // Compact daily data to weekly aggregates - do { - this.logger.debug('Compacting daily data to weekly aggregates'); - numberOfCompactedDayData = await this.compactDayToWeek(); - this.logger.debug(`Compacted ${numberOfCompactedDayData} daily data to weekly aggregates`); - } while (numberOfCompactedDayData === this.insightsConfig.compactionBatchSize); + const stopReasonAfterBatch = this.getCompactionRunStopReason(runState); + if (stopReasonAfterBatch !== undefined) { + this.logCompactionRunLimitReached(stopReasonAfterBatch, stageName, runState); + return true; + } + + await this.waitBeforeNextBatchIfFull(numberOfCompactedData); + } while (numberOfCompactedData === this.insightsConfig.compactionBatchSize); + + return false; + } + + private getCompactionRunStopReason( + runState: CompactionRunState, + ): CompactionStopReason | undefined { + if ( + this.insightsConfig.compactionMaxBatchesPerRun > 0 && + runState.batchesProcessed >= this.insightsConfig.compactionMaxBatchesPerRun + ) { + return 'max-batches'; + } + + if ( + this.insightsConfig.compactionMaxRuntimeSeconds > 0 && + Date.now() - runState.startedAt >= + this.insightsConfig.compactionMaxRuntimeSeconds * Time.seconds.toMilliseconds + ) { + return 'max-runtime'; + } + + return undefined; + } + + private logCompactionRunLimitReached( + reason: CompactionStopReason, + stageName: string, + runState: CompactionRunState, + ) { + this.logger.warn('Stopping insights compaction because a per-run limit was reached', { + reason, + stageName, + batchesProcessed: runState.batchesProcessed, + rowsCompacted: runState.rowsCompacted, + compactionMaxBatchesPerRun: this.insightsConfig.compactionMaxBatchesPerRun, + compactionMaxRuntimeSeconds: this.insightsConfig.compactionMaxRuntimeSeconds, + }); + } + + private async waitBeforeNextBatchIfFull(numberOfCompactedData: number) { + if ( + numberOfCompactedData !== this.insightsConfig.compactionBatchSize || + this.insightsConfig.compactionBatchDelayMilliseconds <= 0 + ) { + return; + } + + await sleep(this.insightsConfig.compactionBatchDelayMilliseconds); } /** diff --git a/packages/cli/src/modules/insights/insights.config.ts b/packages/cli/src/modules/insights/insights.config.ts index a28642202e9..7f110bd8f8b 100644 --- a/packages/cli/src/modules/insights/insights.config.ts +++ b/packages/cli/src/modules/insights/insights.config.ts @@ -57,4 +57,24 @@ export class InsightsConfig { */ @Env('N8N_INSIGHTS_PRUNE_CHECK_INTERVAL_HOURS') pruneCheckIntervalHours: number = 24; + + /** + * The maximum number of compaction batches to process in a single compaction run. + * Set to 0 to disable this limit. + */ + @Env('N8N_INSIGHTS_COMPACTION_MAX_BATCHES_PER_RUN') + compactionMaxBatchesPerRun: number = 1000; + + /** + * The maximum runtime in seconds for a single compaction run. + * Set to 0 to disable this limit. + */ + @Env('N8N_INSIGHTS_COMPACTION_MAX_RUNTIME_SECONDS') + compactionMaxRuntimeSeconds: number = 300; + + /** + * The delay in milliseconds between full compaction batches. + */ + @Env('N8N_INSIGHTS_COMPACTION_BATCH_DELAY_MILLISECONDS') + compactionBatchDelayMilliseconds: number = 100; }