refactor(agents,cli,db): hard-delete observations on compact
Some checks failed
CI: Python / Checks (push) Has been cancelled

Replace the soft-flag compaction model (`compactedAt` column +
`markObservationsCompacted` + `onlyUncompacted` filter) with hard-delete
via `deleteObservations`. Compaction folds queued rows into a rolling
summary on the cursor; the queue table no longer needs to retain them.

Adds `getSummaryHistory` to `CompactFn` ctx so the compactor can read
prior summary texts when assembling a new one (used in PR2).

Drops `compactionMinObservations` from the config — the trigger now
runs on idle-window + burst-threshold alone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Robin Braumann 2026-05-07 12:07:27 +02:00
parent 804252028e
commit 51a8c690cc
7 changed files with 55 additions and 86 deletions

View File

@ -14,7 +14,6 @@ function makeRow(overrides: Partial<NewObservation> = {}): NewObservation {
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: new Date(),
compactedAt: null,
...overrides,
};
}
@ -41,7 +40,7 @@ describe('InMemoryMemory — observations', () => {
expect(rows.map((r) => r.payload)).toEqual(['first', 'second']);
});
it('filters by since (keyset), kindIs, onlyUncompacted, schemaVersionAtMost, limit', async () => {
it('filters by since (keyset), kindIs, schemaVersionAtMost, limit', async () => {
const mem = new InMemoryMemory();
const t = Date.now();
const [r1, r2, r3, r4] = await mem.appendObservations([
@ -88,17 +87,7 @@ describe('InMemoryMemory — observations', () => {
),
).toEqual(['one', 'mid']);
await mem.markObservationsCompacted([r1.id, r2.id], new Date());
expect(
(
await mem.getObservations({
scopeKind: 'thread',
scopeId: 't-1',
onlyUncompacted: true,
})
).map((r) => r.payload),
).toEqual(['two', 'three']);
expect(r2.id).toBeDefined();
expect(r3.id).toBeDefined();
expect(r4.id).toBeDefined();
});
@ -121,16 +110,23 @@ describe('InMemoryMemory — observations', () => {
expect(rows[0].id).toBe(high.id);
});
it('markObservationsCompacted is idempotent and ignores unknown ids', async () => {
it('deleteObservations removes the named rows and is idempotent', async () => {
const mem = new InMemoryMemory();
const [r1, r2] = await mem.appendObservations([makeRow(), makeRow()]);
await mem.deleteObservations([r1.id, 'unknown-id']);
await mem.deleteObservations([r1.id]);
const remaining = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(remaining.map((r) => r.id)).toEqual([r2.id]);
});
it('deleteObservations is a no-op for an empty id list', async () => {
const mem = new InMemoryMemory();
const [r1] = await mem.appendObservations([makeRow()]);
const at = new Date();
await mem.markObservationsCompacted([r1.id, 'unknown-id'], at);
await mem.markObservationsCompacted([r1.id], at);
const [reread] = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(reread.compactedAt?.getTime()).toBe(at.getTime());
await mem.deleteObservations([]);
const rows = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(rows.map((r) => r.id)).toEqual([r1.id]);
});
});

View File

@ -197,7 +197,6 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
kindIs?: string;
limit?: number;
schemaVersionAtMost?: number;
onlyUncompacted?: boolean;
}): Promise<Observation[]> {
const bucket = this.observationsByScope.get(scopeKey(opts.scopeKind, opts.scopeId)) ?? [];
let rows = [...bucket].sort((a, b) =>
@ -217,9 +216,6 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
const kind = opts.kindIs;
rows = rows.filter((r) => r.kind === kind);
}
if (opts.onlyUncompacted) {
rows = rows.filter((r) => r.compactedAt === null);
}
if (opts.schemaVersionAtMost !== undefined) {
const max = opts.schemaVersionAtMost;
rows = rows.filter((r) => r.schemaVersion <= max);
@ -270,14 +266,14 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
}
// eslint-disable-next-line @typescript-eslint/require-await
async markObservationsCompacted(ids: string[], compactedAt: Date): Promise<void> {
async deleteObservations(ids: string[]): Promise<void> {
if (ids.length === 0) return;
const idSet = new Set(ids);
for (const bucket of this.observationsByScope.values()) {
for (const row of bucket) {
if (idSet.has(row.id) && row.compactedAt === null) {
row.compactedAt = compactedAt;
}
}
for (const [key, bucket] of this.observationsByScope.entries()) {
this.observationsByScope.set(
key,
bucket.filter((row) => !idSet.has(row.id)),
);
}
}

View File

@ -28,8 +28,6 @@ export interface Observation {
durationMs: number | null;
schemaVersion: number;
createdAt: Date;
/** Soft-flag set by the compactor; `null` until then. */
compactedAt: Date | null;
}
/** Shape passed to `appendObservations`. `id` is backend-assigned. */
@ -89,6 +87,12 @@ export type ObserveFn = (ctx: {
export type CompactFn = (ctx: {
uncompactedRows: Observation[];
previousSummary: string | null;
/**
* Returns the most-recent N summary texts for this scope, oldest-first
* and excluding the previous-summary already passed in `previousSummary`.
* Empty array if no prior summaries exist. Default N = 5.
*/
getSummaryHistory: () => Promise<string[]>;
telemetry: BuiltTelemetry | undefined;
}) => Promise<{ summary: NewObservation }>;
@ -117,18 +121,16 @@ export type FormatContextFn = (ctx: {
export interface BuiltObservationStore {
/**
* Append observation rows for a scope. Backends assign `id` and return the
* persisted shape. Append-only; rows are not mutated after insert except
* via {@link BuiltObservationStore.markObservationsCompacted}.
* persisted shape.
*/
appendObservations(rows: NewObservation[]): Promise<Observation[]>;
/**
* Query observations for a scope. Filters compose: `since`, when supplied,
* returns only rows strictly after the keyset `(createdAt, id) >
* (since.sinceCreatedAt, since.sinceObservationId)`; `kindIs` matches
* `kind` exactly; `onlyUncompacted` excludes rows with `compactedAt` set;
* `schemaVersionAtMost` excludes rows whose `schemaVersion` exceeds the
* caller's supported version. Results are ordered by `(createdAt, id)`
* ascending.
* `kind` exactly; `schemaVersionAtMost` excludes rows whose `schemaVersion`
* exceeds the caller's supported version. Results are ordered by
* `(createdAt, id)` ascending.
*/
getObservations(opts: {
scopeKind: ScopeKind;
@ -137,7 +139,6 @@ export interface BuiltObservationStore {
kindIs?: string;
limit?: number;
schemaVersionAtMost?: number;
onlyUncompacted?: boolean;
}): Promise<Observation[]>;
/**
* Read the message delta the observer needs to process for a given scope.
@ -157,8 +158,8 @@ export interface BuiltObservationStore {
scopeId: string,
opts?: { since?: { sinceCreatedAt: Date; sinceMessageId: string } },
): Promise<AgentDbMessage[]>;
/** Soft-flag the given rows as compacted; idempotent. */
markObservationsCompacted(ids: string[], compactedAt: Date): Promise<void>;
/** Hard-delete the given rows. Idempotent: missing ids are ignored. */
deleteObservations(ids: string[]): Promise<void>;
/** Read the cursor for a scope; `null` if none has been written yet. */
getCursor(scopeKind: ScopeKind, scopeId: string): Promise<ObservationCursor | null>;
/**
@ -222,20 +223,15 @@ export interface ObservationalMemoryConfig {
*/
getScope?: ResolveObservationalScope;
/**
* Minimum number of queued (uncompacted) observations required before
* the compactor can fire. When unset, the count gate is disabled.
*/
compactionMinObservations?: number;
/**
* Minimum elapsed time (ms) since the last compaction before another
* one can fire. When unset, the idle gate is disabled. The first
* compaction always fires regardless (no prior `summaryUpdatedAt`).
* Minimum elapsed time (ms) since the last compaction before another one
* fires. When unset, the idle gate is disabled. The first compaction
* always fires regardless (no prior `summaryUpdatedAt`).
*/
compactionIdleMs?: number;
/**
* Burst override: when the queue grows to at least this many uncompacted
* observations, fire compaction even if the idle window has not elapsed.
* When unset, no burst override applies and the idle window is strict.
* Queue cap: when uncompacted observations reach this count, fire
* compaction even if the idle window has not elapsed. When unset, only
* the idle gate controls cadence.
*/
compactionBurstThreshold?: number;
/**

View File

@ -5,7 +5,8 @@ import type { MigrationContext, ReversibleMigration } from '../migration-types';
*
* - `agents_observations`: append-only observation log keyed by `(scopeKind,
* scopeId)` and ordered by `(createdAt, id)`. Consumers define `kind`;
* payload is JSON. `compactedAt` is a soft-flag set by the compactor.
* payload is JSON. The compactor hard-deletes rows it folds into the
* rolling summary; `summary`-kind rows persist on the cursor.
* - `agents_observation_cursors`: per-scope mutable state. Two distinct
* responsibilities live on this row:
* - `(lastObservedAt, lastObservedMessageId)` the keyset cursor that
@ -32,7 +33,6 @@ export class CreateAgentObservationTables1784000000000 implements ReversibleMigr
column('payload').json.notNull,
column('durationMs').bigint,
column('schemaVersion').int.notNull,
column('compactedAt').timestamp(3),
)
.withIndexOn(['scopeKind', 'scopeId', 'kind', 'createdAt']).withTimestamps;

View File

@ -1,4 +1,4 @@
import { DateTimeColumn, JsonColumn, WithTimestampsAndStringId } from '@n8n/db';
import { JsonColumn, WithTimestampsAndStringId } from '@n8n/db';
import { Column, Entity, Index } from '@n8n/typeorm';
export type ObservationScopeKind = 'thread' | 'resource' | 'agent';
@ -23,7 +23,4 @@ export class AgentObservationEntity extends WithTimestampsAndStringId {
@Column({ type: 'int' })
schemaVersion: number;
@DateTimeColumn({ nullable: true })
compactedAt: Date | null;
}

View File

@ -1,5 +1,5 @@
import { OBSERVATION_SCHEMA_VERSION, type NewObservation } from '@n8n/agents';
import { Equal, In, IsNull, LessThan, LessThanOrEqual, MoreThan } from '@n8n/typeorm';
import { Equal, In, LessThan, LessThanOrEqual, MoreThan } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import type { AgentMessageEntity } from '../../entities/agent-message.entity';
@ -337,7 +337,6 @@ describe('N8nMemory', () => {
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: new Date('2026-05-05T00:00:00Z'),
compactedAt: null,
...overrides,
};
}
@ -384,7 +383,6 @@ describe('N8nMemory', () => {
scopeId: 't-1',
since: { sinceCreatedAt, sinceObservationId: 'obs-anchor' },
kindIs: 'summary',
onlyUncompacted: true,
schemaVersionAtMost: 1,
limit: 10,
});
@ -395,7 +393,6 @@ describe('N8nMemory', () => {
scopeKind: 'resource',
scopeId: 't-1',
kind: 'summary',
compactedAt: IsNull(),
schemaVersion: LessThanOrEqual(1),
createdAt: MoreThan(sinceCreatedAt),
},
@ -403,7 +400,6 @@ describe('N8nMemory', () => {
scopeKind: 'resource',
scopeId: 't-1',
kind: 'summary',
compactedAt: IsNull(),
schemaVersion: LessThanOrEqual(1),
createdAt: Equal(sinceCreatedAt),
id: MoreThan('obs-anchor'),
@ -435,7 +431,6 @@ describe('N8nMemory', () => {
schemaVersion: '1' as unknown as number,
createdAt: new Date('2026-05-05T00:00:00Z'),
updatedAt: new Date('2026-05-05T00:00:00Z'),
compactedAt: null,
} as AgentObservationEntity,
]);
@ -445,20 +440,16 @@ describe('N8nMemory', () => {
});
});
describe('markObservationsCompacted', () => {
it('issues a single update only over uncompacted ids', async () => {
const at = new Date('2026-05-05T01:00:00Z');
await memory.markObservationsCompacted(['a', 'b'], at);
describe('deleteObservations', () => {
it('issues a single delete with the given ids', async () => {
await memory.deleteObservations(['a', 'b']);
expect(observationRepository.update).toHaveBeenCalledWith(
{ id: In(['a', 'b']), compactedAt: IsNull() },
{ compactedAt: at },
);
expect(observationRepository.delete).toHaveBeenCalledWith({ id: In(['a', 'b']) });
});
it('no-ops on empty input', async () => {
await memory.markObservationsCompacted([], new Date());
expect(observationRepository.update).not.toHaveBeenCalled();
await memory.deleteObservations([]);
expect(observationRepository.delete).not.toHaveBeenCalled();
});
});

View File

@ -13,7 +13,7 @@ import type {
} from '@n8n/agents';
import { Service } from '@n8n/di';
import type { FindOptionsWhere } from '@n8n/typeorm';
import { Equal, In, IsNull, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm';
import { Equal, In, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm';
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
import { UnexpectedError } from 'n8n-workflow';
@ -211,7 +211,6 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
payload: row.payload,
durationMs: row.durationMs,
schemaVersion: row.schemaVersion,
compactedAt: row.compactedAt,
createdAt: row.createdAt,
}),
);
@ -227,13 +226,11 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
kindIs?: string;
limit?: number;
schemaVersionAtMost?: number;
onlyUncompacted?: boolean;
}): Promise<Observation[]> {
const baseWhere: FindOptionsWhere<AgentObservationEntity> = {
scopeKind: opts.scopeKind,
scopeId: opts.scopeId,
...(opts.kindIs !== undefined && { kind: opts.kindIs }),
...(opts.onlyUncompacted && { compactedAt: IsNull() }),
...(opts.schemaVersionAtMost !== undefined && {
schemaVersion: LessThanOrEqual(opts.schemaVersionAtMost),
}),
@ -293,12 +290,9 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
});
}
async markObservationsCompacted(ids: string[], compactedAt: Date): Promise<void> {
async deleteObservations(ids: string[]): Promise<void> {
if (ids.length === 0) return;
await this.observationRepository.update(
{ id: In(ids), compactedAt: IsNull() },
{ compactedAt },
);
await this.observationRepository.delete({ id: In(ids) });
}
// ── Observational memory: cursors ────────────────────────────────────
@ -415,7 +409,6 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
durationMs: entity.durationMs === null ? null : Number(entity.durationMs),
schemaVersion: Number(entity.schemaVersion),
createdAt: entity.createdAt,
compactedAt: entity.compactedAt,
};
}