fix(core): Make AI assistant conversation pruning happen regularly (#31707)

This commit is contained in:
Jaakko Husso 2026-06-04 12:39:22 +03:00 committed by GitHub
parent 780e86c838
commit 13a1a993f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 95 additions and 25 deletions

View File

@ -100,9 +100,9 @@ export class InstanceAiConfig {
@Env('N8N_INSTANCE_AI_THREAD_TTL_DAYS')
threadTtlDays: number = 90;
/** Interval in milliseconds between native persistence pruning runs. 0 = disabled. */
@Env('N8N_INSTANCE_AI_SNAPSHOT_PRUNE_INTERVAL')
snapshotPruneInterval: number = 1 * Time.hours.toMilliseconds;
/** Interval in milliseconds between scheduled pruning runs on the leader. 0 = disabled. */
@Env('N8N_INSTANCE_AI_PRUNE_INTERVAL')
pruneInterval: number = 1 * Time.hours.toMilliseconds;
/** Retention period in milliseconds for stale native persistence checkpoints before pruning. */
@Env('N8N_INSTANCE_AI_SNAPSHOT_RETENTION')

View File

@ -302,7 +302,7 @@ describe('GlobalConfig', () => {
searxngUrl: '',
gatewayApiKey: '',
threadTtlDays: 90,
snapshotPruneInterval: 3_600_000,
pruneInterval: 3_600_000,
snapshotRetention: 86_400_000,
confirmationTimeout: 86_400_000,
},

View File

@ -89,7 +89,7 @@ Observer and Reflector use the same model as the orchestrator agent (see `@n8n/a
| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `N8N_INSTANCE_AI_THREAD_TTL_DAYS` | number | `90` | Conversation thread TTL in days. Threads older than this are auto-expired. 0 = no expiration. |
| `N8N_INSTANCE_AI_SNAPSHOT_PRUNE_INTERVAL` | number | `3600000` | Interval in ms between snapshot pruning runs. 0 = disabled. |
| `N8N_INSTANCE_AI_PRUNE_INTERVAL` | number | `3600000` | Interval in ms between scheduled pruning runs on the leader. Prunes stale checkpoints, expired pending confirmations, and expired conversation threads. 0 = disabled. |
| `N8N_INSTANCE_AI_SNAPSHOT_RETENTION` | number | `86400000` | Retention period in ms for orphaned workflow snapshots before pruning. |
| `N8N_INSTANCE_AI_CONFIRMATION_TIMEOUT` | number | `86400000` | Timeout in ms for HITL confirmation requests. 0 = no timeout. |

View File

@ -430,8 +430,9 @@ function createCheckpointService(): ServiceInternals {
type CheckpointPruneServiceInternals = {
startCheckpointPruning: () => void;
stopCheckpointPruning: () => void;
pruneStaleCheckpoints: (now?: number) => Promise<void>;
runScheduledPrune: (now?: number) => Promise<void>;
pruneStalePendingConfirmations: jest.MockedFunction<(now: number) => Promise<void>>;
pruneExpiredThreads: jest.MockedFunction<() => Promise<void>>;
scheduleCheckpointPrune: jest.MockedFunction<(delayMs?: number) => void>;
checkpointStore: {
markExpiredOlderThan: jest.MockedFunction<(olderThan: Date) => Promise<number>>;
@ -439,7 +440,7 @@ type CheckpointPruneServiceInternals = {
checkpointPruneTimer?: NodeJS.Timeout;
checkpointPruningStopped: boolean;
instanceAiConfig: {
snapshotPruneInterval: number;
pruneInterval: number;
snapshotRetention: number;
};
logger: { info: jest.Mock; debug: jest.Mock; warn: jest.Mock };
@ -451,12 +452,13 @@ function createCheckpointPruneService(): CheckpointPruneServiceInternals {
) as unknown as CheckpointPruneServiceInternals;
service.scheduleCheckpointPrune = jest.fn();
service.pruneStalePendingConfirmations = jest.fn(async (_now: number) => undefined);
service.pruneExpiredThreads = jest.fn(async () => undefined);
service.checkpointStore = {
markExpiredOlderThan: jest.fn(async (_olderThan: Date) => 0),
};
service.checkpointPruningStopped = true;
service.instanceAiConfig = {
snapshotPruneInterval: 60 * 60 * 1000,
pruneInterval: 60 * 60 * 1000,
snapshotRetention: 7 * 24 * 60 * 60 * 1000,
};
service.logger = {
@ -1425,17 +1427,18 @@ describe('InstanceAiService — pending checkpoint re-entry', () => {
});
});
describe('InstanceAiService — checkpoint pruning', () => {
describe('InstanceAiService — scheduled pruning', () => {
it('marks checkpoints expired older than the retention window', async () => {
const service = createCheckpointPruneService();
const now = new Date('2026-05-13T12:00:00.000Z').getTime();
await service.pruneStaleCheckpoints(now);
await service.runScheduledPrune(now);
expect(service.checkpointStore.markExpiredOlderThan).toHaveBeenCalledWith(
new Date('2026-05-06T12:00:00.000Z'),
);
expect(service.pruneStalePendingConfirmations).toHaveBeenCalledWith(now);
expect(service.pruneExpiredThreads).toHaveBeenCalled();
expect(service.scheduleCheckpointPrune).toHaveBeenCalledWith();
});
@ -1450,7 +1453,7 @@ describe('InstanceAiService — checkpoint pruning', () => {
it('does not start checkpoint pruning when disabled', () => {
const service = createCheckpointPruneService();
service.instanceAiConfig.snapshotPruneInterval = 0;
service.instanceAiConfig.pruneInterval = 0;
service.startCheckpointPruning();
@ -1458,6 +1461,52 @@ describe('InstanceAiService — checkpoint pruning', () => {
});
});
type ExpiredThreadPruneServiceInternals = {
pruneExpiredThreads: () => Promise<void>;
clearThreadState: jest.MockedFunction<(threadId: string) => Promise<void>>;
memoryService: {
cleanupExpiredThreads: jest.MockedFunction<
(onThreadDeleted?: (threadId: string) => Promise<void>) => Promise<number>
>;
};
logger: { warn: jest.Mock };
};
function createExpiredThreadPruneService(): ExpiredThreadPruneServiceInternals {
const service = Object.create(
InstanceAiService.prototype,
) as unknown as ExpiredThreadPruneServiceInternals;
service.clearThreadState = jest.fn(async (_threadId: string) => undefined);
service.memoryService = {
cleanupExpiredThreads: jest.fn(async (_onThreadDeleted) => 0),
};
service.logger = { warn: jest.fn() };
return service;
}
describe('InstanceAiService — expired thread pruning', () => {
it('delegates to the memory service and clears state for deleted threads', async () => {
const service = createExpiredThreadPruneService();
service.memoryService.cleanupExpiredThreads.mockImplementation(async (onThreadDeleted) => {
await onThreadDeleted?.('thread-1');
return 1;
});
await service.pruneExpiredThreads();
expect(service.memoryService.cleanupExpiredThreads).toHaveBeenCalledTimes(1);
expect(service.clearThreadState).toHaveBeenCalledWith('thread-1');
});
it('swallows errors so the recurring prune is not disrupted', async () => {
const service = createExpiredThreadPruneService();
service.memoryService.cleanupExpiredThreads.mockRejectedValueOnce(new Error('db down'));
await expect(service.pruneExpiredThreads()).resolves.toBeUndefined();
expect(service.logger.warn).toHaveBeenCalled();
});
});
type RevalidationServiceInternals = {
revalidateActiveUser: (userId: string) => Promise<User | null>;
userRepository: { findOne: jest.Mock };

View File

@ -295,8 +295,9 @@ export class InstanceAiMemoryService {
}
/**
* Delete conversation threads older than the configured TTL.
* Safe to call on startup no-op if threadTtlDays is 0 (disabled).
* Delete conversation threads older than the configured TTL. Invoked on a
* recurring schedule by the leader instance's prune job. Idempotent and
* safe to call repeatedly no-op if threadTtlDays is 0 (disabled).
*/
async cleanupExpiredThreads(
onThreadDeleted?: (threadId: string) => Promise<void>,

View File

@ -24,14 +24,6 @@ export class InstanceAiModule implements ModuleInterface {
if (process.env.E2E_TESTS === 'true' && process.env.NODE_ENV !== 'production') {
await import('./instance-ai-test.controller');
}
// Fire-and-forget: clean up expired conversation threads on startup
const { InstanceAiMemoryService } = await import('./instance-ai-memory.service');
const { InstanceAiService } = await import('./instance-ai.service');
const aiService = Container.get(InstanceAiService);
void Container.get(InstanceAiMemoryService)
.cleanupExpiredThreads(async (threadId) => await aiService.clearThreadState(threadId))
.catch(() => undefined);
}
async settings() {

View File

@ -105,6 +105,7 @@ import { Telemetry } from '@/telemetry';
import { InProcessEventBus } from './event-bus/in-process-event-bus';
import type { LocalGateway } from './filesystem';
import { LocalGatewayRegistry } from './filesystem';
import { InstanceAiMemoryService } from './instance-ai-memory.service';
import { InstanceAiSettingsService } from './instance-ai-settings.service';
import { InstanceAiAdapterService } from './instance-ai.adapter.service';
import { AUTO_FOLLOW_UP_MESSAGE } from './internal-messages';
@ -640,6 +641,7 @@ export class InstanceAiService {
private readonly adapterService: InstanceAiAdapterService,
private readonly eventBus: InProcessEventBus,
private readonly settingsService: InstanceAiSettingsService,
private readonly memoryService: InstanceAiMemoryService,
private readonly agentMemory: TypeORMAgentMemory,
private readonly checkpointStore: TypeORMAgentCheckpointStore,
private readonly aiService: AiService,
@ -2129,7 +2131,7 @@ export class InstanceAiService {
@OnLeaderTakeover()
startCheckpointPruning(): void {
if (this.checkpointPruneTimer || this.instanceAiConfig.snapshotPruneInterval <= 0) return;
if (this.checkpointPruneTimer || this.instanceAiConfig.pruneInterval <= 0) return;
this.checkpointPruningStopped = false;
this.scheduleCheckpointPrune(0);
}
@ -2141,10 +2143,10 @@ export class InstanceAiService {
this.checkpointPruneTimer = undefined;
}
private scheduleCheckpointPrune(delayMs = this.instanceAiConfig.snapshotPruneInterval): void {
private scheduleCheckpointPrune(delayMs = this.instanceAiConfig.pruneInterval): void {
if (this.checkpointPruningStopped) return;
this.checkpointPruneTimer = setTimeout(() => {
void this.pruneStaleCheckpoints();
void this.runScheduledPrune();
}, delayMs);
this.checkpointPruneTimer.unref();
}
@ -2224,7 +2226,14 @@ export class InstanceAiService {
}
}
private async pruneStaleCheckpoints(now = Date.now()): Promise<void> {
/**
* One tick of the recurring leader prune cycle: expire stale checkpoints,
* drop expired pending confirmations, and delete expired conversation
* threads, then schedule the next run. A checkpoint failure reschedules
* with a shorter retry delay; the confirmation and thread steps swallow
* their own errors so they never disrupt the cycle.
*/
private async runScheduledPrune(now = Date.now()): Promise<void> {
const olderThan = new Date(now - this.instanceAiConfig.snapshotRetention);
try {
@ -2235,6 +2244,7 @@ export class InstanceAiService {
this.logger.debug('No stale Instance AI checkpoints to expire');
}
await this.pruneStalePendingConfirmations(now);
await this.pruneExpiredThreads();
this.scheduleCheckpointPrune();
} catch (error: unknown) {
this.logger.warn('Failed to expire stale Instance AI checkpoints', {
@ -2244,6 +2254,24 @@ export class InstanceAiService {
}
}
/**
* Delete conversation threads older than the configured TTL as part of the
* recurring leader prune. Has its own try/catch so a failure here never
* disrupts checkpoint pruning or the next scheduled run. No-op when
* `threadTtlDays` is 0 (handled inside `cleanupExpiredThreads`).
*/
private async pruneExpiredThreads(): Promise<void> {
try {
await this.memoryService.cleanupExpiredThreads(
async (threadId) => await this.clearThreadState(threadId),
);
} catch (error: unknown) {
this.logger.warn('Failed to clean up expired Instance AI conversation threads', {
error: getErrorMessage(error),
});
}
}
private async pruneStalePendingConfirmations(now: number): Promise<void> {
try {
const count = await this.pendingConfirmationRepo.deleteExpired(new Date(now));