From 13a1a993f58e449447f0568677cbf6471d446420 Mon Sep 17 00:00:00 2001 From: Jaakko Husso Date: Thu, 4 Jun 2026 12:39:22 +0300 Subject: [PATCH] fix(core): Make AI assistant conversation pruning happen regularly (#31707) --- .../config/src/configs/instance-ai.config.ts | 6 +- packages/@n8n/config/test/config.test.ts | 2 +- .../@n8n/instance-ai/docs/configuration.md | 2 +- .../__tests__/instance-ai.service.test.ts | 61 +++++++++++++++++-- .../instance-ai/instance-ai-memory.service.ts | 5 +- .../modules/instance-ai/instance-ai.module.ts | 8 --- .../instance-ai/instance-ai.service.ts | 36 +++++++++-- 7 files changed, 95 insertions(+), 25 deletions(-) diff --git a/packages/@n8n/config/src/configs/instance-ai.config.ts b/packages/@n8n/config/src/configs/instance-ai.config.ts index abdd223c2b6..b78385a4b48 100644 --- a/packages/@n8n/config/src/configs/instance-ai.config.ts +++ b/packages/@n8n/config/src/configs/instance-ai.config.ts @@ -100,9 +100,9 @@ export class InstanceAiConfig { @Env('N8N_INSTANCE_AI_THREAD_TTL_DAYS') threadTtlDays: number = 90; - /** Interval in milliseconds between native persistence pruning runs. 0 = disabled. */ - @Env('N8N_INSTANCE_AI_SNAPSHOT_PRUNE_INTERVAL') - snapshotPruneInterval: number = 1 * Time.hours.toMilliseconds; + /** Interval in milliseconds between scheduled pruning runs on the leader. 0 = disabled. */ + @Env('N8N_INSTANCE_AI_PRUNE_INTERVAL') + pruneInterval: number = 1 * Time.hours.toMilliseconds; /** Retention period in milliseconds for stale native persistence checkpoints before pruning. */ @Env('N8N_INSTANCE_AI_SNAPSHOT_RETENTION') diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index ad6fb6c6a15..934c0e78da4 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -302,7 +302,7 @@ describe('GlobalConfig', () => { searxngUrl: '', gatewayApiKey: '', threadTtlDays: 90, - snapshotPruneInterval: 3_600_000, + pruneInterval: 3_600_000, snapshotRetention: 86_400_000, confirmationTimeout: 86_400_000, }, diff --git a/packages/@n8n/instance-ai/docs/configuration.md b/packages/@n8n/instance-ai/docs/configuration.md index f6abf3544e8..d54c05da0e9 100644 --- a/packages/@n8n/instance-ai/docs/configuration.md +++ b/packages/@n8n/instance-ai/docs/configuration.md @@ -89,7 +89,7 @@ Observer and Reflector use the same model as the orchestrator agent (see `@n8n/a | Variable | Type | Default | Description | |----------|------|---------|-------------| | `N8N_INSTANCE_AI_THREAD_TTL_DAYS` | number | `90` | Conversation thread TTL in days. Threads older than this are auto-expired. 0 = no expiration. | -| `N8N_INSTANCE_AI_SNAPSHOT_PRUNE_INTERVAL` | number | `3600000` | Interval in ms between snapshot pruning runs. 0 = disabled. | +| `N8N_INSTANCE_AI_PRUNE_INTERVAL` | number | `3600000` | Interval in ms between scheduled pruning runs on the leader. Prunes stale checkpoints, expired pending confirmations, and expired conversation threads. 0 = disabled. | | `N8N_INSTANCE_AI_SNAPSHOT_RETENTION` | number | `86400000` | Retention period in ms for orphaned workflow snapshots before pruning. | | `N8N_INSTANCE_AI_CONFIRMATION_TIMEOUT` | number | `86400000` | Timeout in ms for HITL confirmation requests. 0 = no timeout. | diff --git a/packages/cli/src/modules/instance-ai/__tests__/instance-ai.service.test.ts b/packages/cli/src/modules/instance-ai/__tests__/instance-ai.service.test.ts index 25d05e53e1d..0c9b777c241 100644 --- a/packages/cli/src/modules/instance-ai/__tests__/instance-ai.service.test.ts +++ b/packages/cli/src/modules/instance-ai/__tests__/instance-ai.service.test.ts @@ -430,8 +430,9 @@ function createCheckpointService(): ServiceInternals { type CheckpointPruneServiceInternals = { startCheckpointPruning: () => void; stopCheckpointPruning: () => void; - pruneStaleCheckpoints: (now?: number) => Promise; + runScheduledPrune: (now?: number) => Promise; pruneStalePendingConfirmations: jest.MockedFunction<(now: number) => Promise>; + pruneExpiredThreads: jest.MockedFunction<() => Promise>; scheduleCheckpointPrune: jest.MockedFunction<(delayMs?: number) => void>; checkpointStore: { markExpiredOlderThan: jest.MockedFunction<(olderThan: Date) => Promise>; @@ -439,7 +440,7 @@ type CheckpointPruneServiceInternals = { checkpointPruneTimer?: NodeJS.Timeout; checkpointPruningStopped: boolean; instanceAiConfig: { - snapshotPruneInterval: number; + pruneInterval: number; snapshotRetention: number; }; logger: { info: jest.Mock; debug: jest.Mock; warn: jest.Mock }; @@ -451,12 +452,13 @@ function createCheckpointPruneService(): CheckpointPruneServiceInternals { ) as unknown as CheckpointPruneServiceInternals; service.scheduleCheckpointPrune = jest.fn(); service.pruneStalePendingConfirmations = jest.fn(async (_now: number) => undefined); + service.pruneExpiredThreads = jest.fn(async () => undefined); service.checkpointStore = { markExpiredOlderThan: jest.fn(async (_olderThan: Date) => 0), }; service.checkpointPruningStopped = true; service.instanceAiConfig = { - snapshotPruneInterval: 60 * 60 * 1000, + pruneInterval: 60 * 60 * 1000, snapshotRetention: 7 * 24 * 60 * 60 * 1000, }; service.logger = { @@ -1425,17 +1427,18 @@ describe('InstanceAiService — pending checkpoint re-entry', () => { }); }); -describe('InstanceAiService — checkpoint pruning', () => { +describe('InstanceAiService — scheduled pruning', () => { it('marks checkpoints expired older than the retention window', async () => { const service = createCheckpointPruneService(); const now = new Date('2026-05-13T12:00:00.000Z').getTime(); - await service.pruneStaleCheckpoints(now); + await service.runScheduledPrune(now); expect(service.checkpointStore.markExpiredOlderThan).toHaveBeenCalledWith( new Date('2026-05-06T12:00:00.000Z'), ); expect(service.pruneStalePendingConfirmations).toHaveBeenCalledWith(now); + expect(service.pruneExpiredThreads).toHaveBeenCalled(); expect(service.scheduleCheckpointPrune).toHaveBeenCalledWith(); }); @@ -1450,7 +1453,7 @@ describe('InstanceAiService — checkpoint pruning', () => { it('does not start checkpoint pruning when disabled', () => { const service = createCheckpointPruneService(); - service.instanceAiConfig.snapshotPruneInterval = 0; + service.instanceAiConfig.pruneInterval = 0; service.startCheckpointPruning(); @@ -1458,6 +1461,52 @@ describe('InstanceAiService — checkpoint pruning', () => { }); }); +type ExpiredThreadPruneServiceInternals = { + pruneExpiredThreads: () => Promise; + clearThreadState: jest.MockedFunction<(threadId: string) => Promise>; + memoryService: { + cleanupExpiredThreads: jest.MockedFunction< + (onThreadDeleted?: (threadId: string) => Promise) => Promise + >; + }; + logger: { warn: jest.Mock }; +}; + +function createExpiredThreadPruneService(): ExpiredThreadPruneServiceInternals { + const service = Object.create( + InstanceAiService.prototype, + ) as unknown as ExpiredThreadPruneServiceInternals; + service.clearThreadState = jest.fn(async (_threadId: string) => undefined); + service.memoryService = { + cleanupExpiredThreads: jest.fn(async (_onThreadDeleted) => 0), + }; + service.logger = { warn: jest.fn() }; + return service; +} + +describe('InstanceAiService — expired thread pruning', () => { + it('delegates to the memory service and clears state for deleted threads', async () => { + const service = createExpiredThreadPruneService(); + service.memoryService.cleanupExpiredThreads.mockImplementation(async (onThreadDeleted) => { + await onThreadDeleted?.('thread-1'); + return 1; + }); + + await service.pruneExpiredThreads(); + + expect(service.memoryService.cleanupExpiredThreads).toHaveBeenCalledTimes(1); + expect(service.clearThreadState).toHaveBeenCalledWith('thread-1'); + }); + + it('swallows errors so the recurring prune is not disrupted', async () => { + const service = createExpiredThreadPruneService(); + service.memoryService.cleanupExpiredThreads.mockRejectedValueOnce(new Error('db down')); + + await expect(service.pruneExpiredThreads()).resolves.toBeUndefined(); + expect(service.logger.warn).toHaveBeenCalled(); + }); +}); + type RevalidationServiceInternals = { revalidateActiveUser: (userId: string) => Promise; userRepository: { findOne: jest.Mock }; diff --git a/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts b/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts index 9b96b5dbbb3..da1a4bfb367 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts @@ -295,8 +295,9 @@ export class InstanceAiMemoryService { } /** - * Delete conversation threads older than the configured TTL. - * Safe to call on startup — no-op if threadTtlDays is 0 (disabled). + * Delete conversation threads older than the configured TTL. Invoked on a + * recurring schedule by the leader instance's prune job. Idempotent and + * safe to call repeatedly — no-op if threadTtlDays is 0 (disabled). */ async cleanupExpiredThreads( onThreadDeleted?: (threadId: string) => Promise, diff --git a/packages/cli/src/modules/instance-ai/instance-ai.module.ts b/packages/cli/src/modules/instance-ai/instance-ai.module.ts index 4f4317e559a..39e0cf3a6b5 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai.module.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai.module.ts @@ -24,14 +24,6 @@ export class InstanceAiModule implements ModuleInterface { if (process.env.E2E_TESTS === 'true' && process.env.NODE_ENV !== 'production') { await import('./instance-ai-test.controller'); } - - // Fire-and-forget: clean up expired conversation threads on startup - const { InstanceAiMemoryService } = await import('./instance-ai-memory.service'); - const { InstanceAiService } = await import('./instance-ai.service'); - const aiService = Container.get(InstanceAiService); - void Container.get(InstanceAiMemoryService) - .cleanupExpiredThreads(async (threadId) => await aiService.clearThreadState(threadId)) - .catch(() => undefined); } async settings() { diff --git a/packages/cli/src/modules/instance-ai/instance-ai.service.ts b/packages/cli/src/modules/instance-ai/instance-ai.service.ts index 7cab1321070..2e9e57b369a 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai.service.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai.service.ts @@ -105,6 +105,7 @@ import { Telemetry } from '@/telemetry'; import { InProcessEventBus } from './event-bus/in-process-event-bus'; import type { LocalGateway } from './filesystem'; import { LocalGatewayRegistry } from './filesystem'; +import { InstanceAiMemoryService } from './instance-ai-memory.service'; import { InstanceAiSettingsService } from './instance-ai-settings.service'; import { InstanceAiAdapterService } from './instance-ai.adapter.service'; import { AUTO_FOLLOW_UP_MESSAGE } from './internal-messages'; @@ -640,6 +641,7 @@ export class InstanceAiService { private readonly adapterService: InstanceAiAdapterService, private readonly eventBus: InProcessEventBus, private readonly settingsService: InstanceAiSettingsService, + private readonly memoryService: InstanceAiMemoryService, private readonly agentMemory: TypeORMAgentMemory, private readonly checkpointStore: TypeORMAgentCheckpointStore, private readonly aiService: AiService, @@ -2129,7 +2131,7 @@ export class InstanceAiService { @OnLeaderTakeover() startCheckpointPruning(): void { - if (this.checkpointPruneTimer || this.instanceAiConfig.snapshotPruneInterval <= 0) return; + if (this.checkpointPruneTimer || this.instanceAiConfig.pruneInterval <= 0) return; this.checkpointPruningStopped = false; this.scheduleCheckpointPrune(0); } @@ -2141,10 +2143,10 @@ export class InstanceAiService { this.checkpointPruneTimer = undefined; } - private scheduleCheckpointPrune(delayMs = this.instanceAiConfig.snapshotPruneInterval): void { + private scheduleCheckpointPrune(delayMs = this.instanceAiConfig.pruneInterval): void { if (this.checkpointPruningStopped) return; this.checkpointPruneTimer = setTimeout(() => { - void this.pruneStaleCheckpoints(); + void this.runScheduledPrune(); }, delayMs); this.checkpointPruneTimer.unref(); } @@ -2224,7 +2226,14 @@ export class InstanceAiService { } } - private async pruneStaleCheckpoints(now = Date.now()): Promise { + /** + * One tick of the recurring leader prune cycle: expire stale checkpoints, + * drop expired pending confirmations, and delete expired conversation + * threads, then schedule the next run. A checkpoint failure reschedules + * with a shorter retry delay; the confirmation and thread steps swallow + * their own errors so they never disrupt the cycle. + */ + private async runScheduledPrune(now = Date.now()): Promise { const olderThan = new Date(now - this.instanceAiConfig.snapshotRetention); try { @@ -2235,6 +2244,7 @@ export class InstanceAiService { this.logger.debug('No stale Instance AI checkpoints to expire'); } await this.pruneStalePendingConfirmations(now); + await this.pruneExpiredThreads(); this.scheduleCheckpointPrune(); } catch (error: unknown) { this.logger.warn('Failed to expire stale Instance AI checkpoints', { @@ -2244,6 +2254,24 @@ export class InstanceAiService { } } + /** + * Delete conversation threads older than the configured TTL as part of the + * recurring leader prune. Has its own try/catch so a failure here never + * disrupts checkpoint pruning or the next scheduled run. No-op when + * `threadTtlDays` is 0 (handled inside `cleanupExpiredThreads`). + */ + private async pruneExpiredThreads(): Promise { + try { + await this.memoryService.cleanupExpiredThreads( + async (threadId) => await this.clearThreadState(threadId), + ); + } catch (error: unknown) { + this.logger.warn('Failed to clean up expired Instance AI conversation threads', { + error: getErrorMessage(error), + }); + } + } + private async pruneStalePendingConfirmations(now: number): Promise { try { const count = await this.pendingConfirmationRepo.deleteExpired(new Date(now));