refactor(instance-ai): reset obsolete native persistence

This commit is contained in:
Oleg Ivaniv 2026-05-05 13:06:17 +02:00
parent 321af6d7a0
commit cc7d3c536d
No known key found for this signature in database
19 changed files with 104 additions and 345 deletions

View File

@ -102,14 +102,6 @@ export class InstanceAiConfig {
@Env('N8N_INSTANCE_AI_THREAD_TTL_DAYS')
threadTtlDays: number = 90;
/** Interval in milliseconds between snapshot pruning runs. 0 = disabled. */
@Env('N8N_INSTANCE_AI_SNAPSHOT_PRUNE_INTERVAL')
snapshotPruneInterval: number = 60 * 60 * 1000; // 1 hour
/** Retention period in milliseconds for orphaned workflow snapshots before pruning. */
@Env('N8N_INSTANCE_AI_SNAPSHOT_RETENTION')
snapshotRetention: number = 24 * 60 * 60 * 1000; // 24 hours
/** Timeout in milliseconds for HITL confirmation requests. 0 = no timeout. */
@Env('N8N_INSTANCE_AI_CONFIRMATION_TIMEOUT')
confirmationTimeout: number = 10 * 60 * 1000; // 10 minutes

View File

@ -291,8 +291,6 @@ describe('GlobalConfig', () => {
searxngUrl: '',
gatewayApiKey: '',
threadTtlDays: 90,
snapshotPruneInterval: 3_600_000,
snapshotRetention: 86_400_000,
confirmationTimeout: 600_000,
},
queue: {

View File

@ -0,0 +1,28 @@
import type { IrreversibleMigration, MigrationContext } from '../migration-types';
const persistedRuntimeTables = [
'ai_builder_temporary_workflow',
'instance_ai_checkpoints',
'instance_ai_iteration_logs',
'instance_ai_run_snapshots',
'instance_ai_messages',
'instance_ai_resources',
'instance_ai_threads',
] as const;
const obsoleteTables = [
'instance_ai_workflow_snapshots',
'instance_ai_observational_memory',
] as const;
export class ResetInstanceAiNativePersistence1778060000000 implements IrreversibleMigration {
async up({ queryRunner, escape, schemaBuilder: { dropTable } }: MigrationContext) {
for (const table of persistedRuntimeTables) {
await queryRunner.query(`DELETE FROM ${escape.tableName(table)}`);
}
for (const table of obsoleteTables) {
await dropTable(table);
}
}
}

View File

@ -169,6 +169,7 @@ import { AddLangsmithIdsToInstanceAiRunSnapshots1777100000000 } from '../common/
import { CreateAiBuilderTemporaryWorkflowTable1777281990043 } from '../common/1777281990043-CreateAiBuilderTemporaryWorkflowTable';
import { AddExecutionDeduplicationKey1778000000000 } from '../common/1778000000000-AddExecutionDeduplicationKey';
import { CreateInstanceAiCheckpointTable1778050000000 } from '../common/1778050000000-CreateInstanceAiCheckpointTable';
import { ResetInstanceAiNativePersistence1778060000000 } from '../common/1778060000000-ResetInstanceAiNativePersistence';
import type { Migration } from '../migration-types';
export const postgresMigrations: Migration[] = [
@ -343,4 +344,5 @@ export const postgresMigrations: Migration[] = [
AddTracingContextToExecution1777045000000,
CreateAiBuilderTemporaryWorkflowTable1777281990043,
ExpandVariablesValueColumnToText1777420800000,
ResetInstanceAiNativePersistence1778060000000,
];

View File

@ -162,6 +162,7 @@ import { AddLangsmithIdsToInstanceAiRunSnapshots1777100000000 } from '../common/
import { CreateAiBuilderTemporaryWorkflowTable1777281990043 } from '../common/1777281990043-CreateAiBuilderTemporaryWorkflowTable';
import { AddExecutionDeduplicationKey1778000000000 } from '../common/1778000000000-AddExecutionDeduplicationKey';
import { CreateInstanceAiCheckpointTable1778050000000 } from '../common/1778050000000-CreateInstanceAiCheckpointTable';
import { ResetInstanceAiNativePersistence1778060000000 } from '../common/1778060000000-ResetInstanceAiNativePersistence';
import type { Migration } from '../migration-types';
const sqliteMigrations: Migration[] = [
@ -329,6 +330,7 @@ const sqliteMigrations: Migration[] = [
CreateInstanceAiCheckpointTable1778050000000,
AddTracingContextToExecution1777045000000,
CreateAiBuilderTemporaryWorkflowTable1777281990043,
ResetInstanceAiNativePersistence1778060000000,
];
export { sqliteMigrations };

View File

@ -68,4 +68,4 @@ See `docs/e2e-tests.md` for the full recording/replay architecture.
- **Run lifecycle**: `run-start` (first) → events → `run-finish` (last, carries status)
- **Planned tasks**: `plan` tool for multi-step work; tasks run detached as background agents
- **Sub-agents**: stateless, native domain tools only, no MCP, no recursive delegation
- **Memory**: observational memory = thread-scoped, working memory is disabled
- **Memory**: native thread messages plus rolling compaction; sub-agents are stateless

View File

@ -460,7 +460,7 @@ pnpm typecheck
- [x] Implement native chunk to Instance AI event mapper.
- [x] Implement TypeORM `BuiltMemory`.
- [x] Implement TypeORM `CheckpointStore`.
- [ ] Add fresh DB migration for native Instance AI persistence.
- [x] Add fresh DB migration for native Instance AI persistence.
- [x] Remove `TypeORMCompositeStore` from the active runtime path.
- [x] Remove `TypeORMWorkflowsStorage` from the active runtime path.
- [x] Remove observational memory runtime usage.
@ -474,7 +474,6 @@ pnpm typecheck
Current remaining implementation gaps:
- Add the destructive/fresh native persistence migration.
- Decide whether LangSmith should move further onto native telemetry/events or
keep the current product-level spans.

View File

@ -241,12 +241,12 @@ export function createListWorkflowsTool(context: InstanceAiContext) {
### Memory usage
The memory system is thread-scoped. Writing observations from a sub-agent
corrupts the orchestrator's context, and manually summarizing tool results
fights with the Observer doing the same thing.
The memory system is thread-scoped. Writing conversation state from a sub-agent
corrupts the orchestrator's context, and ad hoc summaries fight the rolling
compaction service.
- Never read/write memory from sub-agents — they're stateless by design
- Let observational memory handle compression — don't manually summarize
- Let the compaction service handle compression — don't manually summarize
### Agent creation

View File

@ -7,7 +7,7 @@ natural language interface to workflows, executions, credentials, and nodes —
the goal that most users never need to interact with workflows directly.
The system follows the **deep agent architecture** — an orchestrator with explicit
planning, dynamic sub-agent delegation, observational memory, and structured
planning, dynamic sub-agent delegation, rolling context compaction, and structured
prompts. The LLM controls the execution loop; the architecture provides the
primitives.

View File

@ -66,21 +66,11 @@ When no search provider is available, `web-search` and `research-with-agent` too
Sandbox workspaces persist per thread — the same container is reused across messages in a conversation. Workspaces are destroyed on server shutdown.
### Observational Memory
| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `N8N_INSTANCE_AI_OBSERVER_MODEL` | string | `google/gemini-2.5-flash` | LLM for Observer/Reflector compression agents |
| `N8N_INSTANCE_AI_OBSERVER_MESSAGE_TOKENS` | number | `30000` | Token threshold for Observer to trigger compression |
| `N8N_INSTANCE_AI_REFLECTOR_OBSERVATION_TOKENS` | number | `40000` | Token threshold for Reflector to condense observations |
### Lifecycle & Housekeeping
| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `N8N_INSTANCE_AI_THREAD_TTL_DAYS` | number | `90` | Conversation thread TTL in days. Threads older than this are auto-expired. 0 = no expiration. |
| `N8N_INSTANCE_AI_SNAPSHOT_PRUNE_INTERVAL` | number | `3600000` | Interval in ms between snapshot pruning runs. 0 = disabled. |
| `N8N_INSTANCE_AI_SNAPSHOT_RETENTION` | number | `86400000` | Retention period in ms for orphaned workflow snapshots before pruning. |
| `N8N_INSTANCE_AI_CONFIRMATION_TIMEOUT` | number | `600000` | Timeout in ms for HITL confirmation requests. 0 = no timeout. |
## Enabling / Disabling
@ -193,14 +183,14 @@ N8N_INSTANCE_AI_GATEWAY_API_KEY=my-secret-key
N8N_INSTANCE_AI_MODEL=custom/llama-3.1-70b
N8N_INSTANCE_AI_MODEL_URL=http://localhost:1234/v1
# Full configuration with observational memory tuning
# Full configuration
N8N_INSTANCE_AI_MODEL=anthropic/claude-sonnet-4-6
N8N_INSTANCE_AI_MCP_SERVERS="github=https://mcp.github.com/sse"
N8N_INSTANCE_AI_LAST_MESSAGES=20
N8N_INSTANCE_AI_EMBEDDER_MODEL=openai/text-embedding-3-small
N8N_INSTANCE_AI_MAX_STEPS=50
N8N_INSTANCE_AI_MAX_LOOP_ITERATIONS=10
N8N_INSTANCE_AI_OBSERVER_MODEL=google/gemini-2.5-flash
N8N_INSTANCE_AI_OBSERVER_MESSAGE_TOKENS=30000
N8N_INSTANCE_AI_SEMANTIC_RECALL_TOP_K=5
N8N_INSTANCE_AI_SUB_AGENT_MAX_STEPS=100
N8N_INSTANCE_AI_THREAD_TTL_DAYS=90
```
## SearXNG Setup (Docker Compose)

View File

@ -2,121 +2,98 @@
## Overview
The memory system serves two purposes:
The memory system serves three purposes:
- **Operational context management** — observational memory that compresses
the agent's operational history during long autonomous loops to prevent
context degradation (thread-scoped)
- **Conversation history** — recent messages and semantic recall for the
current thread (thread-scoped)
- **Native agent persistence** — thread and message storage through the
`@n8n/agents` `BuiltMemory` interface.
- **Operational context management** — rolling compaction of older messages into
a thread metadata summary when the conversation approaches the model context
window.
- **Conversation continuity** — recent messages, optional semantic recall, plan
state, retry history, checkpoints, and UI run snapshots for the current
thread.
Sub-agents are stateless — context is passed via the briefing only.
Sub-agents are stateless. Context is passed through the briefing, and retry
history is appended from thread-scoped iteration logs.
## Tiers
### Tier 1: Storage Backend
### Tier 1: Native Storage
The persistence layer. Stores all messages, observational memory, plan state,
event history, and vector embeddings.
Instance AI uses n8n's application database for native agent storage.
| Backend | When Used | Connection |
|---------|-----------|------------|
| PostgreSQL | n8n is configured with `postgresdb` | Built from n8n's DB config |
| LibSQL/SQLite | All other cases (default) | `file:instance-ai-memory.db` |
| Store | Tables |
|-------|--------|
| `TypeORMAgentMemory` | `instance_ai_threads`, `instance_ai_messages`, `instance_ai_resources` |
| `TypeORMAgentCheckpointStore` | `instance_ai_checkpoints` |
| UI run snapshots | `instance_ai_run_snapshots` |
| Iteration logs | `instance_ai_iteration_logs` |
| Temporary workflow mapping | `ai_builder_temporary_workflow` |
The storage backend is selected automatically based on n8n's database
configuration — no separate config needed.
The obsolete workflow snapshot and observational memory tables are dropped by
the native agents reset migration. Existing Instance AI runtime data may be
cleared during that migration.
### Tier 2: Recent Messages
A sliding window of the most recent N messages in the conversation, sent as
context to the LLM on every request.
A sliding window of the most recent N messages is sent as context to the LLM on
every request.
- **Default**: 20 messages
- **Config**: `N8N_INSTANCE_AI_LAST_MESSAGES`
### Tier 3: Observational Memory
### Tier 3: Rolling Compaction
Automatic context compression for long-running autonomous loops. Two background
agents manage the orchestrator's context size:
`InstanceAiCompactionService` estimates thread token usage. When a conversation
exceeds the configured context threshold, older messages outside the recent
tail are summarized by a native compaction agent.
- **Observer** — when message tokens exceed a threshold (default: 30K), compresses
old messages into dense observations
- **Reflector** — when observations exceed their threshold (default: 40K),
condenses observations into higher-level patterns
```
Context window layout during autonomous loop:
┌──────────────────────────────────────────┐
│ Observation Block (≤40K tokens) │ ← compressed history
│ "Built wf-123 with Schedule→HTTP→Slack. │ (append-only, cacheable)
│ Exec failed: 401 on HTTP node. │
│ Debugger identified missing API key. │
│ Rebuilt workflow, re-executed, passed." │
├──────────────────────────────────────────┤
│ Raw Message Block (≤30K tokens) │ ← recent tool calls & results
│ [current step's tool calls and results] │ (rotated as new messages arrive)
└──────────────────────────────────────────┘
```
**Why this matters for the autonomous loop**:
- Tool-heavy workloads (workflow definitions, execution results, node
descriptions) get **540x compression** — a 50-step loop that would blow
out the context window stays manageable
- The observation block is **append-only** until reflection runs, enabling
high prompt cache hit rates (410x cost reduction)
- **Async buffering** pre-computes observations in the background — no
user-visible pause when the threshold is hit
- Uses a secondary LLM (default: `google/gemini-2.5-flash`) for compression —
cheap and has a 1M token context window for the Reflector
Observational memory is **thread-scoped** — it tracks the operational history
of the current task.
Compaction state is stored in thread metadata under
`instanceAiConversationSummary`. Raw messages remain in the database for UI and
debugging; compaction only changes the model input.
### Tier 4: Semantic Recall (Optional)
Vector-based retrieval of relevant past messages. When enabled, the system
embeds each message and retrieves semantically similar past messages to include
as context.
When configured and supported by the active memory backend, the native agents
runtime can retrieve semantically related past messages and inject them into
context.
- **Requires**: `N8N_INSTANCE_AI_EMBEDDER_MODEL` to be set
- **Config**: `N8N_INSTANCE_AI_EMBEDDER_MODEL`
- **Config**: `N8N_INSTANCE_AI_SEMANTIC_RECALL_TOP_K` (default: 5)
- **Message range**: 2 messages before and 1 after each match
Disabled by default. When the embedder model is not set, only tiers 13 are
active.
Disabled by default.
### Tier 5: Plan Storage
### Tier 5: Plan And Retry State
The `plan` tool stores execution plans in thread-scoped storage. Plans are
structured data (goal, current phase, iteration count, step statuses) that
persist across reconnects within a conversation. See the [tools](./tools.md)
documentation for the plan tool schema.
The `plan` tool stores execution plans in thread metadata. Workflow loop
attempts are stored in `instance_ai_iteration_logs` and appended to sub-agent
briefings on retry.
### Tier 6: Checkpoints And Run Snapshots
Native checkpoints persist suspended agent state for human-in-the-loop resume.
Run snapshots persist the UI agent tree used to reconstruct visible progress
after reconnects.
## Scoping Model
All memory is thread-scoped (isolated per conversation):
All memory is thread-scoped unless a native memory call explicitly requests a
resource-scoped working-memory key.
- **Recent messages**the sliding window of N messages
- **Observational memory** — compressed operational history
- **Semantic recall** — vector retrieval of relevant past messages
- **Plan** — the current execution plan
- **Recent messages** — current conversation history.
- **Compaction summary** — older context summarized for the same thread.
- **Plan and iteration logs** — current task state and retry history.
- **Checkpoints** — suspended native agent state keyed by run.
### Sub-agent memory
### Sub-Agent Memory
Sub-agents are fully stateless — context is passed via the briefing and
`conversationContext` fields in the `delegate` and `build-workflow-with-agent`
tools.
Sub-agents do not read or write persistent memory directly. The orchestrator
builds their briefing from the current request, relevant task state, and retry
history.
Past failed attempts are tracked via the `IterationLog` (stored in thread
metadata) and appended to sub-agent briefings on retry, providing cross-attempt
context without persistent memory.
### Cross-User Isolation
### Cross-user isolation
Each user's memory is fully independent. The agent cannot see other users'
Each user's memory is independent. The agent cannot see other users'
conversations or semantic history.
## Configuration
@ -124,8 +101,5 @@ conversations or semantic history.
| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `N8N_INSTANCE_AI_LAST_MESSAGES` | number | 20 | Recent message window |
| `N8N_INSTANCE_AI_EMBEDDER_MODEL` | string | `''` | Embedder model (empty = disabled) |
| `N8N_INSTANCE_AI_EMBEDDER_MODEL` | string | `''` | Embedder model for semantic recall (empty = disabled) |
| `N8N_INSTANCE_AI_SEMANTIC_RECALL_TOP_K` | number | 5 | Number of semantic matches |
| `N8N_INSTANCE_AI_OBSERVER_MODEL` | string | `google/gemini-2.5-flash` | LLM for Observer/Reflector |
| `N8N_INSTANCE_AI_OBSERVER_MESSAGE_TOKENS` | number | 30000 | Observer trigger threshold |
| `N8N_INSTANCE_AI_REFLECTOR_OBSERVATION_TOKENS` | number | 40000 | Reflector trigger threshold |

View File

@ -1,8 +1,6 @@
export { InstanceAiThread } from './instance-ai-thread.entity';
export { InstanceAiMessage } from './instance-ai-message.entity';
export { InstanceAiResource } from './instance-ai-resource.entity';
export { InstanceAiObservationalMemory } from './instance-ai-observational-memory.entity';
export { InstanceAiWorkflowSnapshot } from './instance-ai-workflow-snapshot.entity';
export { InstanceAiRunSnapshot } from './instance-ai-run-snapshot.entity';
export { InstanceAiIterationLog } from './instance-ai-iteration-log.entity';
export { InstanceAiCheckpoint } from './instance-ai-checkpoint.entity';

View File

@ -1,99 +0,0 @@
import { WithTimestamps, DateTimeColumn, JsonColumn } from '@n8n/db';
import { Column, Entity, Index, PrimaryColumn } from '@n8n/typeorm';
@Entity({ name: 'instance_ai_observational_memory' })
@Index('IDX_instance_ai_om_scope_thread_resource', ['scope', 'threadId', 'resourceId'], {
unique: true,
})
export class InstanceAiObservationalMemory extends WithTimestamps {
@PrimaryColumn({ type: 'varchar', length: 36 })
id: string;
@Index()
@Column({ type: 'varchar', length: 255 })
lookupKey: string;
@Column({ type: 'varchar', length: 16 })
scope: string;
@Column({ type: 'uuid', nullable: true })
threadId: string | null;
@Column({ type: 'varchar', length: 255 })
resourceId: string;
@Column({ type: 'text', default: '' })
activeObservations: string;
@Column({ type: 'varchar', length: 32 })
originType: string;
@Column({ type: 'text' })
config: string;
@Column({ type: 'int', default: 0 })
generationCount: number;
@DateTimeColumn({ nullable: true })
lastObservedAt: Date | null;
@Column({ type: 'int', default: 0 })
pendingMessageTokens: number;
@Column({ type: 'int', default: 0 })
totalTokensObserved: number;
@Column({ type: 'int', default: 0 })
observationTokenCount: number;
@Column({ type: 'boolean', default: false })
isObserving: boolean;
@Column({ type: 'boolean', default: false })
isReflecting: boolean;
@JsonColumn({ nullable: true })
observedMessageIds: string[] | null;
@Column({ type: 'varchar', nullable: true })
observedTimezone: string | null;
@Column({ type: 'text', nullable: true })
bufferedObservations: string | null;
@Column({ type: 'int', nullable: true })
bufferedObservationTokens: number | null;
@JsonColumn({ nullable: true })
bufferedMessageIds: string[] | null;
@Column({ type: 'text', nullable: true })
bufferedReflection: string | null;
@Column({ type: 'int', nullable: true })
bufferedReflectionTokens: number | null;
@Column({ type: 'int', nullable: true })
bufferedReflectionInputTokens: number | null;
@Column({ type: 'int', nullable: true })
reflectedObservationLineCount: number | null;
@JsonColumn({ nullable: true })
bufferedObservationChunks: unknown[] | null;
@Column({ type: 'boolean', default: false })
isBufferingObservation: boolean;
@Column({ type: 'boolean', default: false })
isBufferingReflection: boolean;
@Column({ type: 'int', default: 0 })
lastBufferedAtTokens: number;
@DateTimeColumn({ nullable: true })
lastBufferedAtTime: Date | null;
@JsonColumn({ nullable: true })
metadata: Record<string, unknown> | null;
}

View File

@ -1,20 +0,0 @@
import { WithTimestamps } from '@n8n/db';
import { Column, Entity, PrimaryColumn } from '@n8n/typeorm';
@Entity({ name: 'instance_ai_workflow_snapshots' })
export class InstanceAiWorkflowSnapshot extends WithTimestamps {
@PrimaryColumn({ type: 'varchar', length: 36 })
runId: string;
@PrimaryColumn({ type: 'varchar', length: 255 })
workflowName: string;
@Column({ type: 'varchar', length: 255, nullable: true })
resourceId: string | null;
@Column({ type: 'varchar', nullable: true })
status: string | null;
@Column({ type: 'text' })
snapshot: string;
}

View File

@ -31,10 +31,6 @@ export class InstanceAiModule implements ModuleInterface {
void Container.get(InstanceAiMemoryService)
.cleanupExpiredThreads(async (threadId) => await aiService.clearThreadState(threadId))
.catch(() => undefined);
// Initialize snapshot pruning — lifecycle decorators handle multi-main start/stop
const { SnapshotPruningService } = await import('./snapshot-pruning.service');
Container.get(SnapshotPruningService).init();
}
async settings() {
@ -60,12 +56,6 @@ export class InstanceAiModule implements ModuleInterface {
const { InstanceAiThread } = await import('./entities/instance-ai-thread.entity');
const { InstanceAiMessage } = await import('./entities/instance-ai-message.entity');
const { InstanceAiResource } = await import('./entities/instance-ai-resource.entity');
const { InstanceAiObservationalMemory } = await import(
'./entities/instance-ai-observational-memory.entity'
);
const { InstanceAiWorkflowSnapshot } = await import(
'./entities/instance-ai-workflow-snapshot.entity'
);
const { InstanceAiRunSnapshot } = await import('./entities/instance-ai-run-snapshot.entity');
const { InstanceAiIterationLog } = await import('./entities/instance-ai-iteration-log.entity');
const { InstanceAiCheckpoint } = await import('./entities/instance-ai-checkpoint.entity');
@ -74,8 +64,6 @@ export class InstanceAiModule implements ModuleInterface {
InstanceAiThread,
InstanceAiMessage,
InstanceAiResource,
InstanceAiObservationalMemory,
InstanceAiWorkflowSnapshot,
InstanceAiRunSnapshot,
InstanceAiIterationLog,
InstanceAiCheckpoint,

View File

@ -1,8 +1,6 @@
export { InstanceAiThreadRepository } from './instance-ai-thread.repository';
export { InstanceAiMessageRepository } from './instance-ai-message.repository';
export { InstanceAiResourceRepository } from './instance-ai-resource.repository';
export { InstanceAiObservationalMemoryRepository } from './instance-ai-observational-memory.repository';
export { InstanceAiWorkflowSnapshotRepository } from './instance-ai-workflow-snapshot.repository';
export { InstanceAiRunSnapshotRepository } from './instance-ai-run-snapshot.repository';
export { InstanceAiIterationLogRepository } from './instance-ai-iteration-log.repository';
export { InstanceAiCheckpointRepository } from './instance-ai-checkpoint.repository';

View File

@ -1,11 +0,0 @@
import { Service } from '@n8n/di';
import { DataSource, Repository } from '@n8n/typeorm';
import { InstanceAiObservationalMemory } from '../entities/instance-ai-observational-memory.entity';
@Service()
export class InstanceAiObservationalMemoryRepository extends Repository<InstanceAiObservationalMemory> {
constructor(dataSource: DataSource) {
super(InstanceAiObservationalMemory, dataSource.manager);
}
}

View File

@ -1,11 +0,0 @@
import { Service } from '@n8n/di';
import { DataSource, Repository } from '@n8n/typeorm';
import { InstanceAiWorkflowSnapshot } from '../entities/instance-ai-workflow-snapshot.entity';
@Service()
export class InstanceAiWorkflowSnapshotRepository extends Repository<InstanceAiWorkflowSnapshot> {
constructor(dataSource: DataSource) {
super(InstanceAiWorkflowSnapshot, dataSource.manager);
}
}

View File

@ -1,69 +0,0 @@
import { Logger } from '@n8n/backend-common';
import { InstanceAiConfig } from '@n8n/config';
import { OnLeaderStepdown, OnLeaderTakeover, OnShutdown } from '@n8n/decorators';
import { Service } from '@n8n/di';
import { LessThan } from '@n8n/typeorm';
import { InstanceSettings } from 'n8n-core';
import { InstanceAiWorkflowSnapshotRepository } from './repositories/instance-ai-workflow-snapshot.repository';
@Service()
export class SnapshotPruningService {
private pruningInterval: NodeJS.Timeout | undefined;
constructor(
private readonly logger: Logger,
private readonly config: InstanceAiConfig,
private readonly snapshotRepo: InstanceAiWorkflowSnapshotRepository,
private readonly instanceSettings: InstanceSettings,
) {
this.logger = this.logger.scoped('instance-ai');
}
init() {
if (this.instanceSettings.isLeader) this.startPruning();
}
@OnLeaderTakeover()
startPruning() {
if (this.config.snapshotPruneInterval <= 0) return;
this.pruningInterval = setInterval(
async () => await this.prune(),
this.config.snapshotPruneInterval,
);
this.logger.debug('Started snapshot pruning timer', {
pruneIntervalMs: this.config.snapshotPruneInterval,
retentionMs: this.config.snapshotRetention,
});
}
@OnLeaderStepdown()
stopPruning() {
if (this.pruningInterval) {
clearInterval(this.pruningInterval);
this.pruningInterval = undefined;
this.logger.debug('Stopped snapshot pruning timer');
}
}
@OnShutdown()
shutdown() {
this.stopPruning();
}
async prune() {
const cutoff = new Date(Date.now() - this.config.snapshotRetention);
const { affected } = await this.snapshotRepo.delete({
updatedAt: LessThan(cutoff),
});
if (affected === 0) {
this.logger.debug('Found no workflow snapshots to prune');
return;
}
this.logger.debug('Pruned stale workflow snapshots', { count: affected });
}
}