From 28af69a57ca5add669f6ba54f4aefa35becb482a Mon Sep 17 00:00:00 2001 From: bjorger <50590409+bjorger@users.noreply.github.com> Date: Mon, 18 May 2026 10:16:10 +0200 Subject: [PATCH] feat(core): Add observation log storage (#30339) Co-authored-by: Michael Drury --- packages/@n8n/agents/src/index.ts | 17 + .../__tests__/observation-log-store.test.ts | 89 ++++++ .../@n8n/agents/src/runtime/memory-store.ts | 132 +++++++- .../src/runtime/observation-log-store.ts | 23 ++ packages/@n8n/agents/src/types/index.ts | 20 ++ .../agents/src/types/sdk/observation-log.ts | 78 +++++ ...000000001-ReplaceAgentObservationTables.ts | 115 +++++++ ...0000002-DropAgentExecutionWorkingMemory.ts | 11 + .../db/src/migrations/postgresdb/index.ts | 4 + .../@n8n/db/src/migrations/sqlite/index.ts | 4 + .../__tests__/agent-execution.service.test.ts | 46 +++ .../__tests__/execution-recorder.test.ts | 22 -- .../modules/agents/agent-execution.service.ts | 1 - .../agents/entities/agent-execution.entity.ts | 3 - .../entities/agent-observation-lock.entity.ts | 5 + .../entities/agent-observation.entity.ts | 32 +- .../src/modules/agents/execution-recorder.ts | 4 - .../integrations/__tests__/n8n-memory.test.ts | 296 ++++++++++-------- .../modules/agents/integrations/n8n-memory.ts | 281 +++++++++-------- .../frontend/@n8n/i18n/src/locales/en.json | 1 - .../__tests__/session-timeline.utils.spec.ts | 1 - .../agents/composables/useAgentThreadsApi.ts | 1 - 22 files changed, 864 insertions(+), 322 deletions(-) create mode 100644 packages/@n8n/agents/src/runtime/__tests__/observation-log-store.test.ts create mode 100644 packages/@n8n/agents/src/runtime/observation-log-store.ts create mode 100644 packages/@n8n/agents/src/types/sdk/observation-log.ts create mode 100644 packages/@n8n/db/src/migrations/common/1784000000001-ReplaceAgentObservationTables.ts create mode 100644 packages/@n8n/db/src/migrations/common/1784000000002-DropAgentExecutionWorkingMemory.ts diff --git a/packages/@n8n/agents/src/index.ts b/packages/@n8n/agents/src/index.ts index f9a398fc436..61a12bdc483 100644 --- a/packages/@n8n/agents/src/index.ts +++ b/packages/@n8n/agents/src/index.ts @@ -54,6 +54,18 @@ export type { ObservationalMemoryTrigger, ObserveFn, ScopeKind, + BuiltObservationLogStore, + NewObservationLogEntry, + ObservationLogEntry, + ObservationLogMarker, + ObservationLogMerge, + ObservationLogReadOptions, + ObservationLogReflection, + ObservationLogReflectionResult, + ObservationLogScope, + ObservationLogScopeKind, + ObservationLogStatus, + TokenCounter, } from './types'; export type { ProviderOptions } from '@ai-sdk/provider-utils'; export { AgentEvent } from './types'; @@ -63,6 +75,11 @@ export { OBSERVATION_CATEGORIES, OBSERVATION_SCHEMA_VERSION, } from './types'; +export { + estimateObservationTokens, + OBSERVATION_LOG_MARKERS, + OBSERVATION_LOG_STATUSES, +} from './types'; export { Tool, wrapToolForApproval } from './sdk/tool'; export { Memory } from './sdk/memory'; diff --git a/packages/@n8n/agents/src/runtime/__tests__/observation-log-store.test.ts b/packages/@n8n/agents/src/runtime/__tests__/observation-log-store.test.ts new file mode 100644 index 00000000000..364b96253f3 --- /dev/null +++ b/packages/@n8n/agents/src/runtime/__tests__/observation-log-store.test.ts @@ -0,0 +1,89 @@ +import { estimateObservationTokens } from '../../types/sdk/observation-log'; +import { InMemoryMemory } from '../memory-store'; + +describe('observation log store', () => { + it('persists marker-based active observations with default mechanics', async () => { + const store = new InMemoryMemory(); + const createdAt = new Date('2026-05-12T10:00:00Z'); + + const [entry] = await store.appendObservationLogEntries([ + { + scopeKind: 'thread', + scopeId: 'thread-1', + marker: 'critical', + text: 'User chose the observation log model.', + createdAt, + }, + ]); + + expect(entry).toMatchObject({ + scopeKind: 'thread', + scopeId: 'thread-1', + marker: 'critical', + text: 'User chose the observation log model.', + parentId: null, + status: 'active', + supersededBy: null, + tokenCount: estimateObservationTokens('User chose the observation log model.'), + createdAt, + }); + + await expect( + store.getActiveObservationLog({ scopeKind: 'thread', scopeId: 'thread-1' }), + ).resolves.toEqual([entry]); + }); + + it('keeps dropped and superseded observations out of the active read path', async () => { + const store = new InMemoryMemory(); + const [dropped, superseded, replacement] = await store.appendObservationLogEntries([ + { scopeKind: 'thread', scopeId: 'thread-1', marker: 'info', text: 'Small detail' }, + { scopeKind: 'thread', scopeId: 'thread-1', marker: 'important', text: 'Old plan' }, + { scopeKind: 'thread', scopeId: 'thread-1', marker: 'important', text: 'New plan' }, + ]); + + await store.dropObservationLogEntries([dropped.id]); + await store.supersedeObservationLogEntries([superseded.id], replacement.id); + + await expect( + store.getActiveObservationLog({ scopeKind: 'thread', scopeId: 'thread-1' }), + ).resolves.toEqual([replacement]); + await expect( + store.getObservationLog({ scopeKind: 'thread', scopeId: 'thread-1', status: 'dropped' }), + ).resolves.toMatchObject([{ id: dropped.id, status: 'dropped', supersededBy: null }]); + await expect( + store.getObservationLog({ scopeKind: 'thread', scopeId: 'thread-1', status: 'superseded' }), + ).resolves.toMatchObject([ + { id: superseded.id, status: 'superseded', supersededBy: replacement.id }, + ]); + }); + + it('applies reflection as drops plus merged replacements', async () => { + const store = new InMemoryMemory(); + const [stale, oldA, oldB] = await store.appendObservationLogEntries([ + { scopeKind: 'resource', scopeId: 'user-1', marker: 'info', text: 'Tiny aside' }, + { scopeKind: 'resource', scopeId: 'user-1', marker: 'important', text: 'Plan A' }, + { scopeKind: 'resource', scopeId: 'user-1', marker: 'important', text: 'Plan B' }, + ]); + + const result = await store.applyObservationLogReflection( + { scopeKind: 'resource', scopeId: 'user-1' }, + { + drop: [stale.id], + merge: [ + { + supersedes: [oldA.id, oldB.id], + marker: 'important', + text: 'User compared Plan A and Plan B.', + }, + ], + }, + ); + + expect(result.droppedIds).toEqual([stale.id]); + expect(result.supersededIds).toEqual([oldA.id, oldB.id]); + expect(result.inserted).toHaveLength(1); + await expect( + store.getActiveObservationLog({ scopeKind: 'resource', scopeId: 'user-1' }), + ).resolves.toEqual(result.inserted); + }); +}); diff --git a/packages/@n8n/agents/src/runtime/memory-store.ts b/packages/@n8n/agents/src/runtime/memory-store.ts index 793ad447f0b..30b1cab835d 100644 --- a/packages/@n8n/agents/src/runtime/memory-store.ts +++ b/packages/@n8n/agents/src/runtime/memory-store.ts @@ -8,6 +8,16 @@ import type { ObservationLockHandle, ScopeKind, } from '../types/sdk/observation'; +import type { + BuiltObservationLogStore, + NewObservationLogEntry, + ObservationLogEntry, + ObservationLogReadOptions, + ObservationLogReflection, + ObservationLogReflectionResult, + ObservationLogScope, +} from '../types/sdk/observation-log'; +import { estimateObservationTokens } from '../types/sdk/observation-log'; interface StoredMessage { message: AgentDbMessage; @@ -27,6 +37,10 @@ function cloneCursor(cursor: ObservationCursor): ObservationCursor { }; } +function cloneObservationLogEntry(entry: ObservationLogEntry): ObservationLogEntry { + return { ...entry, createdAt: new Date(entry.createdAt) }; +} + function compareKeyset( a: { createdAt: Date; id: string }, b: { createdAt: Date; id: string }, @@ -43,7 +57,9 @@ function compareKeyset( * Thread context for `saveMessages` is established by calling `saveThread` first. * The most recently saved thread is used when `saveMessages` is called. */ -export class InMemoryMemory implements BuiltMemory, BuiltObservationStore { +export class InMemoryMemory + implements BuiltMemory, BuiltObservationStore, BuiltObservationLogStore +{ private threads = new Map(); private messagesByThread = new Map(); @@ -52,6 +68,8 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore { private observationsByScope = new Map(); + private observationLogByScope = new Map(); + private cursorsByScope = new Map(); private locksByScope = new Map(); @@ -108,6 +126,7 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore { this.messagesByThread.delete(threadId); const key = scopeKey('thread', threadId); this.observationsByScope.delete(key); + this.observationLogByScope.delete(key); this.cursorsByScope.delete(key); this.locksByScope.delete(key); } @@ -195,6 +214,117 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore { // ── Observational memory ───────────────────────────────────────────── + // eslint-disable-next-line @typescript-eslint/require-await + async appendObservationLogEntries( + rows: NewObservationLogEntry[], + ): Promise { + const persisted: ObservationLogEntry[] = []; + for (const row of rows) { + const key = scopeKey(row.scopeKind, row.scopeId); + const bucket = this.observationLogByScope.get(key) ?? []; + const entry: ObservationLogEntry = { + id: crypto.randomUUID(), + scopeKind: row.scopeKind, + scopeId: row.scopeId, + marker: row.marker, + text: row.text, + parentId: row.parentId ?? null, + tokenCount: row.tokenCount ?? estimateObservationTokens(row.text), + status: 'active', + supersededBy: null, + createdAt: row.createdAt ?? new Date(), + }; + bucket.push(entry); + this.observationLogByScope.set(key, bucket); + persisted.push(cloneObservationLogEntry(entry)); + } + return persisted; + } + + async getActiveObservationLog( + scope: ObservationLogScope & { limit?: number; order?: 'asc' | 'desc' }, + ): Promise { + return await this.getObservationLog({ ...scope, status: 'active' }); + } + + // eslint-disable-next-line @typescript-eslint/require-await + async getObservationLog(opts: ObservationLogReadOptions): Promise { + const bucket = this.observationLogByScope.get(scopeKey(opts.scopeKind, opts.scopeId)) ?? []; + let rows = [...bucket].sort((a, b) => + compareKeyset({ createdAt: a.createdAt, id: a.id }, { createdAt: b.createdAt, id: b.id }), + ); + if (opts.order === 'desc') rows.reverse(); + if (opts.status !== undefined) { + rows = rows.filter((entry) => entry.status === opts.status); + } + if (opts.parentId !== undefined) { + rows = rows.filter((entry) => entry.parentId === opts.parentId); + } + if (opts.limit !== undefined) { + rows = rows.slice(0, opts.limit); + } + return rows.map(cloneObservationLogEntry); + } + + // eslint-disable-next-line @typescript-eslint/require-await + async dropObservationLogEntries(ids: string[]): Promise { + if (ids.length === 0) return; + const idSet = new Set(ids); + for (const bucket of this.observationLogByScope.values()) { + for (const entry of bucket) { + if (idSet.has(entry.id)) { + entry.status = 'dropped'; + entry.supersededBy = null; + } + } + } + } + + // eslint-disable-next-line @typescript-eslint/require-await + async supersedeObservationLogEntries(ids: string[], supersededBy: string): Promise { + if (ids.length === 0) return; + const idSet = new Set(ids); + for (const bucket of this.observationLogByScope.values()) { + for (const entry of bucket) { + if (idSet.has(entry.id)) { + entry.status = 'superseded'; + entry.supersededBy = supersededBy; + } + } + } + } + + async applyObservationLogReflection( + scope: ObservationLogScope, + reflection: ObservationLogReflection, + ): Promise { + const inserted = await this.appendObservationLogEntries( + reflection.merge.map((entry) => ({ + scopeKind: scope.scopeKind, + scopeId: scope.scopeId, + marker: entry.marker, + text: entry.text, + parentId: entry.parentId, + tokenCount: entry.tokenCount, + createdAt: entry.createdAt, + })), + ); + + await this.dropObservationLogEntries(reflection.drop); + for (const [index, merge] of reflection.merge.entries()) { + const replacement = inserted[index]; + if (replacement) { + await this.supersedeObservationLogEntries(merge.supersedes, replacement.id); + } + } + + return { + droppedIds: [...reflection.drop], + supersededIds: reflection.merge.flatMap((entry) => entry.supersedes), + inserted, + }; + } + // eslint-disable-next-line @typescript-eslint/require-await async appendObservations(rows: NewObservation[]): Promise { const persisted: Observation[] = []; diff --git a/packages/@n8n/agents/src/runtime/observation-log-store.ts b/packages/@n8n/agents/src/runtime/observation-log-store.ts new file mode 100644 index 00000000000..d563e816186 --- /dev/null +++ b/packages/@n8n/agents/src/runtime/observation-log-store.ts @@ -0,0 +1,23 @@ +import type { BuiltMemory, BuiltObservationLogStore } from '../types'; + +const OBSERVATION_LOG_STORE_METHODS = [ + 'appendObservationLogEntries', + 'getActiveObservationLog', + 'getObservationLog', + 'dropObservationLogEntries', + 'supersedeObservationLogEntries', + 'applyObservationLogReflection', +] as const satisfies ReadonlyArray; + +function hasFunctionProperty( + value: object, + property: K, +): value is Record unknown> { + return property in value && typeof Reflect.get(value, property) === 'function'; +} + +export function hasObservationLogStore( + memory: BuiltMemory, +): memory is BuiltMemory & BuiltObservationLogStore { + return OBSERVATION_LOG_STORE_METHODS.every((method) => hasFunctionProperty(memory, method)); +} diff --git a/packages/@n8n/agents/src/types/index.ts b/packages/@n8n/agents/src/types/index.ts index 45fa288bcb4..2f45d332e29 100644 --- a/packages/@n8n/agents/src/types/index.ts +++ b/packages/@n8n/agents/src/types/index.ts @@ -89,6 +89,26 @@ export { OBSERVATION_SCHEMA_VERSION, } from './sdk/observation'; +export type { + BuiltObservationLogStore, + NewObservationLogEntry, + ObservationLogEntry, + ObservationLogMarker, + ObservationLogMerge, + ObservationLogReadOptions, + ObservationLogReflection, + ObservationLogReflectionResult, + ObservationLogScope, + ObservationLogScopeKind, + ObservationLogStatus, + TokenCounter, +} from './sdk/observation-log'; +export { + estimateObservationTokens, + OBSERVATION_LOG_MARKERS, + OBSERVATION_LOG_STATUSES, +} from './sdk/observation-log'; + export type { EvalInput, EvalScore, diff --git a/packages/@n8n/agents/src/types/sdk/observation-log.ts b/packages/@n8n/agents/src/types/sdk/observation-log.ts new file mode 100644 index 00000000000..27fd1b9901d --- /dev/null +++ b/packages/@n8n/agents/src/types/sdk/observation-log.ts @@ -0,0 +1,78 @@ +export const OBSERVATION_LOG_MARKERS = ['critical', 'important', 'info', 'completion'] as const; + +export type ObservationLogMarker = (typeof OBSERVATION_LOG_MARKERS)[number]; + +export const OBSERVATION_LOG_STATUSES = ['active', 'superseded', 'dropped'] as const; + +export type ObservationLogStatus = (typeof OBSERVATION_LOG_STATUSES)[number]; + +export type ObservationLogScopeKind = 'thread' | 'resource'; + +export interface ObservationLogScope { + scopeKind: ObservationLogScopeKind; + scopeId: string; +} + +export interface ObservationLogEntry extends ObservationLogScope { + id: string; + marker: ObservationLogMarker; + text: string; + parentId: string | null; + tokenCount: number; + status: ObservationLogStatus; + supersededBy: string | null; + createdAt: Date; +} + +export interface NewObservationLogEntry extends ObservationLogScope { + marker: ObservationLogMarker; + text: string; + parentId?: string | null; + tokenCount?: number; + createdAt?: Date; +} + +export interface ObservationLogReadOptions extends ObservationLogScope { + status?: ObservationLogStatus; + parentId?: string | null; + limit?: number; + order?: 'asc' | 'desc'; +} + +export interface ObservationLogMerge { + supersedes: string[]; + marker: ObservationLogMarker; + text: string; + parentId?: string | null; + tokenCount?: number; + createdAt?: Date; +} + +export interface ObservationLogReflection { + drop: string[]; + merge: ObservationLogMerge[]; +} + +export interface ObservationLogReflectionResult { + droppedIds: string[]; + supersededIds: string[]; + inserted: ObservationLogEntry[]; +} + +export type TokenCounter = (text: string) => number; + +export const estimateObservationTokens: TokenCounter = (text) => Math.ceil(text.length / 4); + +export interface BuiltObservationLogStore { + appendObservationLogEntries(rows: NewObservationLogEntry[]): Promise; + getActiveObservationLog( + scope: ObservationLogScope & { limit?: number; order?: 'asc' | 'desc' }, + ): Promise; + getObservationLog(opts: ObservationLogReadOptions): Promise; + dropObservationLogEntries(ids: string[]): Promise; + supersedeObservationLogEntries(ids: string[], supersededBy: string): Promise; + applyObservationLogReflection( + scope: ObservationLogScope, + reflection: ObservationLogReflection, + ): Promise; +} diff --git a/packages/@n8n/db/src/migrations/common/1784000000001-ReplaceAgentObservationTables.ts b/packages/@n8n/db/src/migrations/common/1784000000001-ReplaceAgentObservationTables.ts new file mode 100644 index 00000000000..190456ec1c6 --- /dev/null +++ b/packages/@n8n/db/src/migrations/common/1784000000001-ReplaceAgentObservationTables.ts @@ -0,0 +1,115 @@ +import type { MigrationContext, ReversibleMigration } from '../migration-types'; + +const OBSERVATION_SCOPE_KINDS = ['thread', 'resource']; +const OBSERVATION_MARKERS = ['critical', 'important', 'info', 'completion']; +const OBSERVATION_STATUSES = ['active', 'superseded', 'dropped']; +const OBSERVATION_TASK_KINDS = ['observer', 'reflector']; + +/** + * Replaces the first observational-memory schema with the observation-log + * schema. The earlier schema has already shipped on master, so this migration + * intentionally drops its queued observation rows instead of editing the + * original migration. + */ +export class ReplaceAgentObservationTables1784000000001 implements ReversibleMigration { + async up({ schemaBuilder: { createTable, dropTable, column } }: MigrationContext) { + await this.dropObservationTables(dropTable); + await this.createObservationLogTables(createTable, column); + } + + async down({ schemaBuilder: { createTable, dropTable, column } }: MigrationContext) { + await this.dropObservationTables(dropTable); + await this.createLegacyObservationTables(createTable, column); + } + + private async dropObservationTables(dropTable: MigrationContext['schemaBuilder']['dropTable']) { + await dropTable('agents_observation_locks'); + await dropTable('agents_observation_cursors'); + await dropTable('agents_observations'); + } + + private async createObservationLogTables( + createTable: MigrationContext['schemaBuilder']['createTable'], + column: MigrationContext['schemaBuilder']['column'], + ) { + await createTable('agents_observations') + .withColumns( + column('id') + .varchar(36) + .primary.notNull.comment('Application-generated n8n string ID, not a database UUID'), + column('scopeKind').varchar(20).notNull.withEnumCheck(OBSERVATION_SCOPE_KINDS), + column('scopeId').varchar(255).notNull, + column('marker').varchar(16).notNull.withEnumCheck(OBSERVATION_MARKERS), + column('text').text.notNull, + column('parentId').varchar(36), + column('tokenCount').int.notNull.default(0), + column('status').varchar(16).notNull.withEnumCheck(OBSERVATION_STATUSES), + column('supersededBy').varchar(36), + ) + .withIndexOn(['scopeKind', 'scopeId', 'status', 'createdAt', 'id']) + .withIndexOn(['scopeKind', 'scopeId', 'createdAt', 'id']) + .withIndexOn('parentId') + .withIndexOn('supersededBy') + .withForeignKey('parentId', { + tableName: 'agents_observations', + columnName: 'id', + }) + .withForeignKey('supersededBy', { + tableName: 'agents_observations', + columnName: 'id', + }).withTimestamps; + + await createTable('agents_observation_cursors').withColumns( + column('scopeKind').varchar(20).notNull.primary.withEnumCheck(OBSERVATION_SCOPE_KINDS), + column('scopeId').varchar(255).notNull.primary, + column('lastObservedMessageId').varchar(36).notNull, + column('lastObservedAt').timestampTimezone(3).notNull, + ).withTimestamps; + + await createTable('agents_observation_locks').withColumns( + column('scopeKind').varchar(20).notNull.primary.withEnumCheck(OBSERVATION_SCOPE_KINDS), + column('scopeId').varchar(255).notNull.primary, + column('taskKind').varchar(20).notNull.primary.withEnumCheck(OBSERVATION_TASK_KINDS), + column('holderId') + .varchar(64) + .notNull.comment('Ephemeral background-task lock owner token, not a user ID'), + column('heldUntil').timestampTimezone(3).notNull, + ).withTimestamps; + } + + private async createLegacyObservationTables( + createTable: MigrationContext['schemaBuilder']['createTable'], + column: MigrationContext['schemaBuilder']['column'], + ) { + await createTable('agents_observations') + .withColumns( + column('id').varchar(36).primary.notNull, + column('scopeKind').varchar(20).notNull.withEnumCheck(['thread', 'resource', 'agent']), + column('scopeId').varchar(255).notNull, + column('kind').varchar(64).notNull, + column('payload').json.notNull, + column('durationMs').bigint, + column('schemaVersion').int.notNull, + ) + .withIndexOn(['scopeKind', 'scopeId', 'kind', 'createdAt']) + .withIndexOn(['scopeKind', 'scopeId', 'createdAt', 'id']).withTimestamps; + + await createTable('agents_observation_cursors').withColumns( + column('scopeKind') + .varchar(20) + .notNull.primary.withEnumCheck(['thread', 'resource', 'agent']), + column('scopeId').varchar(255).notNull.primary, + column('lastObservedMessageId').varchar(36).notNull, + column('lastObservedAt').timestampTimezone(3).notNull, + ).withTimestamps; + + await createTable('agents_observation_locks').withColumns( + column('scopeKind') + .varchar(20) + .notNull.primary.withEnumCheck(['thread', 'resource', 'agent']), + column('scopeId').varchar(255).notNull.primary, + column('holderId').varchar(64).notNull, + column('heldUntil').timestampTimezone(3).notNull, + ).withTimestamps; + } +} diff --git a/packages/@n8n/db/src/migrations/common/1784000000002-DropAgentExecutionWorkingMemory.ts b/packages/@n8n/db/src/migrations/common/1784000000002-DropAgentExecutionWorkingMemory.ts new file mode 100644 index 00000000000..ac8aa4d575d --- /dev/null +++ b/packages/@n8n/db/src/migrations/common/1784000000002-DropAgentExecutionWorkingMemory.ts @@ -0,0 +1,11 @@ +import type { MigrationContext, ReversibleMigration } from '../migration-types'; + +export class DropAgentExecutionWorkingMemory1784000000002 implements ReversibleMigration { + async up({ schemaBuilder: { dropColumns } }: MigrationContext) { + await dropColumns('agent_execution', ['workingMemory']); + } + + async down({ schemaBuilder: { addColumns, column } }: MigrationContext) { + await addColumns('agent_execution', [column('workingMemory').text]); + } +} diff --git a/packages/@n8n/db/src/migrations/postgresdb/index.ts b/packages/@n8n/db/src/migrations/postgresdb/index.ts index b72e950247e..01bf12da102 100644 --- a/packages/@n8n/db/src/migrations/postgresdb/index.ts +++ b/packages/@n8n/db/src/migrations/postgresdb/index.ts @@ -176,6 +176,8 @@ import { CreateEvaluationCollection1778496086558 } from '../common/1778496086558 import { CreateAgentTables1783000000000 } from '../common/1783000000000-CreateAgentTables'; import { CreateAgentExecutionTables1783000000001 } from '../common/1783000000001-CreateAgentExecutionTables'; import { CreateAgentObservationTables1784000000000 } from '../common/1784000000000-CreateAgentObservationTables'; +import { ReplaceAgentObservationTables1784000000001 } from '../common/1784000000001-ReplaceAgentObservationTables'; +import { DropAgentExecutionWorkingMemory1784000000002 } from '../common/1784000000002-DropAgentExecutionWorkingMemory'; import type { Migration } from '../migration-types'; export const postgresMigrations: Migration[] = [ @@ -357,4 +359,6 @@ export const postgresMigrations: Migration[] = [ CreateAgentTables1783000000000, CreateAgentExecutionTables1783000000001, CreateAgentObservationTables1784000000000, + ReplaceAgentObservationTables1784000000001, + DropAgentExecutionWorkingMemory1784000000002, ]; diff --git a/packages/@n8n/db/src/migrations/sqlite/index.ts b/packages/@n8n/db/src/migrations/sqlite/index.ts index 26347676a6a..54ce2becd77 100644 --- a/packages/@n8n/db/src/migrations/sqlite/index.ts +++ b/packages/@n8n/db/src/migrations/sqlite/index.ts @@ -169,6 +169,8 @@ import { CreateEvaluationCollection1778496086558 } from '../common/1778496086558 import { CreateAgentTables1783000000000 } from '../common/1783000000000-CreateAgentTables'; import { CreateAgentExecutionTables1783000000001 } from '../common/1783000000001-CreateAgentExecutionTables'; import { CreateAgentObservationTables1784000000000 } from '../common/1784000000000-CreateAgentObservationTables'; +import { ReplaceAgentObservationTables1784000000001 } from '../common/1784000000001-ReplaceAgentObservationTables'; +import { DropAgentExecutionWorkingMemory1784000000002 } from '../common/1784000000002-DropAgentExecutionWorkingMemory'; import type { Migration } from '../migration-types'; const sqliteMigrations: Migration[] = [ @@ -343,6 +345,8 @@ const sqliteMigrations: Migration[] = [ CreateAgentTables1783000000000, CreateAgentExecutionTables1783000000001, CreateAgentObservationTables1784000000000, + ReplaceAgentObservationTables1784000000001, + DropAgentExecutionWorkingMemory1784000000002, ]; export { sqliteMigrations }; diff --git a/packages/cli/src/modules/agents/__tests__/agent-execution.service.test.ts b/packages/cli/src/modules/agents/__tests__/agent-execution.service.test.ts index d1df7ecf834..0bd8c8f2297 100644 --- a/packages/cli/src/modules/agents/__tests__/agent-execution.service.test.ts +++ b/packages/cli/src/modules/agents/__tests__/agent-execution.service.test.ts @@ -2,11 +2,31 @@ import { mockLogger } from '@n8n/backend-test-utils'; import { mock } from 'jest-mock-extended'; import { AgentExecutionService } from '../agent-execution.service'; +import type { AgentExecution } from '../entities/agent-execution.entity'; import type { AgentExecutionThread } from '../entities/agent-execution-thread.entity'; import type { N8nMemory } from '../integrations/n8n-memory'; import type { AgentExecutionRepository } from '../repositories/agent-execution.repository'; import type { AgentExecutionThreadRepository } from '../repositories/agent-execution-thread.repository'; +function makeThread(overrides: Partial = {}): AgentExecutionThread { + return { + id: 'thread-1', + agentId: 'agent-1', + agentName: 'Agent', + projectId: 'project-1', + title: null, + emoji: null, + sessionNumber: 1, + totalPromptTokens: 0, + totalCompletionTokens: 0, + totalCost: 0, + totalDuration: 0, + createdAt: new Date('2026-05-07T10:00:00Z'), + updatedAt: new Date('2026-05-07T10:00:00Z'), + ...overrides, + } as AgentExecutionThread; +} + describe('AgentExecutionService', () => { let service: AgentExecutionService; let agentExecutionRepository: jest.Mocked; @@ -28,6 +48,32 @@ describe('AgentExecutionService', () => { ); }); + describe('getThreadDetail', () => { + it('returns thread executions after ownership validation', async () => { + const thread = makeThread(); + const executions = [{ id: 'execution-1' }] as AgentExecution[]; + agentExecutionThreadRepository.findOneBy.mockResolvedValue(thread); + agentExecutionRepository.findByThreadIdOrdered.mockResolvedValue(executions); + + const result = await service.getThreadDetail('thread-1', 'project-1', 'agent-1'); + + expect(result).toEqual({ thread, executions }); + expect(n8nMemory.getWorkingMemory).not.toHaveBeenCalled(); + }); + + it('does not read working memory for a thread outside the requested scope', async () => { + agentExecutionThreadRepository.findOneBy.mockResolvedValue( + makeThread({ projectId: 'other-project' }), + ); + + const result = await service.getThreadDetail('thread-1', 'project-1', 'agent-1'); + + expect(result).toBeNull(); + expect(n8nMemory.getWorkingMemory).not.toHaveBeenCalled(); + expect(agentExecutionRepository.findByThreadIdOrdered).not.toHaveBeenCalled(); + }); + }); + describe('deleteThread', () => { it('cleans SDK memory before deleting the execution thread', async () => { agentExecutionThreadRepository.findOneBy.mockResolvedValue({ diff --git a/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts b/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts index 3bc36a18f67..f23279ae9fb 100644 --- a/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts +++ b/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts @@ -101,7 +101,6 @@ describe('ExecutionRecorder', () => { const record = recorder.getMessageRecord(); - expect(record.workingMemory).toBeNull(); expect(record.toolCalls).toEqual([ { name: 'update_working_memory', @@ -111,27 +110,6 @@ describe('ExecutionRecorder', () => { ]); expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(false); }); - - it('does not derive execution working memory from update_working_memory calls', () => { - const recorder = new ExecutionRecorder(); - - recorder.record({ - type: 'tool-call', - toolCallId: 'wm-1', - toolName: 'update_working_memory', - input: { memory: 'first' }, - } as StreamChunk); - recorder.record({ - type: 'tool-call', - toolCallId: 'wm-2', - toolName: 'update_working_memory', - input: { memory: 'second' }, - } as StreamChunk); - recorder.record({ type: 'finish', finishReason: 'stop' } as StreamChunk); - - const record = recorder.getMessageRecord(); - expect(record.workingMemory).toBeNull(); - }); }); describe('suspension', () => { diff --git a/packages/cli/src/modules/agents/agent-execution.service.ts b/packages/cli/src/modules/agents/agent-execution.service.ts index 7a87d4eaec7..48044f35cc3 100644 --- a/packages/cli/src/modules/agents/agent-execution.service.ts +++ b/packages/cli/src/modules/agents/agent-execution.service.ts @@ -89,7 +89,6 @@ export class AgentExecutionService { timeline: record.timeline.length > 0 ? record.timeline : null, error: record.error, hitlStatus: hitlStatus ?? null, - workingMemory: record.workingMemory, source: source ?? null, }), ); diff --git a/packages/cli/src/modules/agents/entities/agent-execution.entity.ts b/packages/cli/src/modules/agents/entities/agent-execution.entity.ts index 37ec0d95a36..1ef02864171 100644 --- a/packages/cli/src/modules/agents/entities/agent-execution.entity.ts +++ b/packages/cli/src/modules/agents/entities/agent-execution.entity.ts @@ -78,9 +78,6 @@ export class AgentExecution extends WithTimestampsAndStringId { @Column({ type: 'varchar', length: 16, nullable: true }) hitlStatus: AgentExecutionHitlStatus | null; - @Column({ type: 'text', nullable: true }) - workingMemory: string | null; - /** Where the run originated, e.g. 'chat', 'slack'. */ @Column({ type: 'varchar', length: 32, nullable: true }) source: string | null; diff --git a/packages/cli/src/modules/agents/entities/agent-observation-lock.entity.ts b/packages/cli/src/modules/agents/entities/agent-observation-lock.entity.ts index 503930ab9ca..239e85ab97b 100644 --- a/packages/cli/src/modules/agents/entities/agent-observation-lock.entity.ts +++ b/packages/cli/src/modules/agents/entities/agent-observation-lock.entity.ts @@ -3,6 +3,8 @@ import { Column, Entity, PrimaryColumn } from '@n8n/typeorm'; import type { ObservationScopeKind } from './agent-observation.entity'; +export type ObservationTaskKind = 'observer' | 'reflector'; + @Entity({ name: 'agents_observation_locks' }) export class AgentObservationLockEntity extends WithTimestamps { @PrimaryColumn({ type: 'varchar', length: 20 }) @@ -11,6 +13,9 @@ export class AgentObservationLockEntity extends WithTimestamps { @PrimaryColumn({ type: 'varchar', length: 255 }) scopeId: string; + @PrimaryColumn({ type: 'varchar', length: 20 }) + taskKind: ObservationTaskKind; + @Column({ type: 'varchar', length: 64 }) holderId: string; diff --git a/packages/cli/src/modules/agents/entities/agent-observation.entity.ts b/packages/cli/src/modules/agents/entities/agent-observation.entity.ts index ff5a3490716..f15f17391a9 100644 --- a/packages/cli/src/modules/agents/entities/agent-observation.entity.ts +++ b/packages/cli/src/modules/agents/entities/agent-observation.entity.ts @@ -1,11 +1,15 @@ -import { JsonColumn, WithTimestampsAndStringId } from '@n8n/db'; +import { WithTimestampsAndStringId } from '@n8n/db'; import { Column, Entity, Index } from '@n8n/typeorm'; -export type ObservationScopeKind = 'thread' | 'resource' | 'agent'; +export type ObservationScopeKind = 'thread' | 'resource'; +export type ObservationMarker = 'critical' | 'important' | 'info' | 'completion'; +export type ObservationStatus = 'active' | 'superseded' | 'dropped'; @Entity({ name: 'agents_observations' }) -@Index(['scopeKind', 'scopeId', 'kind', 'createdAt']) +@Index(['scopeKind', 'scopeId', 'status', 'createdAt', 'id']) @Index(['scopeKind', 'scopeId', 'createdAt', 'id']) +@Index(['parentId']) +@Index(['supersededBy']) export class AgentObservationEntity extends WithTimestampsAndStringId { @Column({ type: 'varchar', length: 20 }) scopeKind: ObservationScopeKind; @@ -13,15 +17,21 @@ export class AgentObservationEntity extends WithTimestampsAndStringId { @Column({ type: 'varchar', length: 255 }) scopeId: string; - @Column({ type: 'varchar', length: 64 }) - kind: string; + @Column({ type: 'varchar', length: 16 }) + marker: ObservationMarker; - @JsonColumn() - payload: unknown; + @Column({ type: 'text' }) + text: string; - @Column({ type: 'bigint', nullable: true }) - durationMs: number | null; + @Column({ type: 'varchar', length: 36, nullable: true }) + parentId: string | null; - @Column({ type: 'int' }) - schemaVersion: number; + @Column({ type: 'int', default: 0 }) + tokenCount: number; + + @Column({ type: 'varchar', length: 16 }) + status: ObservationStatus; + + @Column({ type: 'varchar', length: 36, nullable: true }) + supersededBy: string | null; } diff --git a/packages/cli/src/modules/agents/execution-recorder.ts b/packages/cli/src/modules/agents/execution-recorder.ts index caa01205ad6..b4aa091639b 100644 --- a/packages/cli/src/modules/agents/execution-recorder.ts +++ b/packages/cli/src/modules/agents/execution-recorder.ts @@ -199,7 +199,6 @@ export interface MessageRecord { startTime: number; duration: number; error: string | null; - workingMemory: string | null; } export class ExecutionRecorder { @@ -233,8 +232,6 @@ export class ExecutionRecorder { private error: string | null = null; - private workingMemory: string | null = null; - private readonly startTime = Date.now(); /** Feed a stream chunk into the recorder. */ @@ -306,7 +303,6 @@ export class ExecutionRecorder { startTime: this.startTime, duration: Date.now() - this.startTime, error: this.error, - workingMemory: this.workingMemory, }; } diff --git a/packages/cli/src/modules/agents/integrations/__tests__/n8n-memory.test.ts b/packages/cli/src/modules/agents/integrations/__tests__/n8n-memory.test.ts index f9c8b721cc3..abe8cf10bef 100644 --- a/packages/cli/src/modules/agents/integrations/__tests__/n8n-memory.test.ts +++ b/packages/cli/src/modules/agents/integrations/__tests__/n8n-memory.test.ts @@ -1,5 +1,5 @@ -import { OBSERVATION_SCHEMA_VERSION, type NewObservation } from '@n8n/agents'; -import { Equal, In, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm'; +import type { NewObservationLogEntry } from '@n8n/agents'; +import { Equal, In, IsNull, LessThan, Like, MoreThan } from '@n8n/typeorm'; import { mock } from 'jest-mock-extended'; import type { AgentMessageEntity } from '../../entities/agent-message.entity'; @@ -15,6 +15,8 @@ import type { AgentResourceRepository } from '../../repositories/agent-resource. import type { AgentThreadRepository } from '../../repositories/agent-thread.repository'; import { N8nMemory } from '../n8n-memory'; +const estimateObservationTokens = (text: string) => Math.ceil(text.length / 4); + describe('N8nMemory', () => { let memory: N8nMemory; let messageRepository: jest.Mocked; @@ -25,6 +27,10 @@ describe('N8nMemory', () => { let observationLockRepository: jest.Mocked; let runInTransaction: jest.Mock; let transactionDelete: jest.Mock; + let observationRunInTransaction: jest.Mock; + let transactionObservationCreate: jest.Mock; + let transactionObservationSave: jest.Mock; + let transactionObservationUpdate: jest.Mock; beforeEach(() => { jest.clearAllMocks(); @@ -45,6 +51,30 @@ describe('N8nMemory', () => { value: { transaction: runInTransaction }, }); + transactionObservationCreate = jest.fn((input) => ({ ...input }) as AgentObservationEntity); + transactionObservationSave = jest.fn(async (input: AgentObservationEntity[]) => + input.map((entity, index) => ({ + ...entity, + id: `merged-${index + 1}`, + createdAt: entity.createdAt ?? new Date('2026-05-12T10:00:00Z'), + updatedAt: entity.updatedAt ?? new Date('2026-05-12T10:00:00Z'), + })), + ); + transactionObservationUpdate = jest.fn().mockResolvedValue({ affected: 1, raw: {} }); + observationRunInTransaction = jest.fn( + async (callback: (trx: { getRepository: jest.Mock }) => Promise) => + await callback({ + getRepository: jest.fn().mockReturnValue({ + create: transactionObservationCreate, + save: transactionObservationSave, + update: transactionObservationUpdate, + }), + }), + ); + Object.defineProperty(observationRepository, 'manager', { + value: { transaction: observationRunInTransaction }, + }); + memory = new N8nMemory( threadRepository, messageRepository, @@ -324,111 +354,22 @@ describe('N8nMemory', () => { }); }); - describe('working memory — thread scope', () => { - it('stores thread-scoped working memory on thread metadata', async () => { - const existing = { - id: 'thread-1', - resourceId: 'user-1', - title: null, - metadata: JSON.stringify({ other: true }), - } as unknown as AgentThreadEntity; - threadRepository.findOneBy.mockResolvedValue(existing); - threadRepository.save.mockImplementation(async (entity) => entity as AgentThreadEntity); + // ── Observation log ────────────────────────────────────────────────── - await memory.saveWorkingMemory( - { threadId: 'thread-1', resourceId: 'user-1', scope: 'thread' }, - '# Thread memory', - ); - - expect(threadRepository.save).toHaveBeenCalledWith( - expect.objectContaining({ - metadata: JSON.stringify({ other: true, workingMemory: '# Thread memory' }), - }), - ); - expect(resourceRepository.save).not.toHaveBeenCalled(); - }); - - it('creates the thread row when working memory is saved before messages', async () => { - threadRepository.findOneBy.mockResolvedValue(null); - threadRepository.create.mockImplementation((entity) => entity as AgentThreadEntity); - threadRepository.save.mockImplementation(async (entity) => entity as AgentThreadEntity); - resourceRepository.existsBy.mockResolvedValue(true); - - await memory.saveWorkingMemory( - { threadId: 'thread-1', resourceId: 'user-1', scope: 'thread' }, - '# Thread memory', - ); - - expect(resourceRepository.existsBy).toHaveBeenCalledWith({ id: 'user-1' }); - expect(threadRepository.create).toHaveBeenCalledWith({ - id: 'thread-1', - resourceId: 'user-1', - title: null, - metadata: JSON.stringify({ workingMemory: '# Thread memory' }), - }); - expect(threadRepository.save).toHaveBeenCalledWith( - expect.objectContaining({ - id: 'thread-1', - resourceId: 'user-1', - metadata: JSON.stringify({ workingMemory: '# Thread memory' }), - }), - ); - }); - - it('isolates thread-scoped working memory by user-scoped test-chat thread id', async () => { - const threads = new Map([ - [ - 'test-agent-1:user-1', - { - id: 'test-agent-1:user-1', - metadata: JSON.stringify({ workingMemory: 'alice notes' }), - } as unknown as AgentThreadEntity, - ], - [ - 'test-agent-1:user-2', - { - id: 'test-agent-1:user-2', - metadata: JSON.stringify({ workingMemory: 'bob notes' }), - } as unknown as AgentThreadEntity, - ], - ]); - threadRepository.findOneBy.mockImplementation( - async ({ id }: { id: string }) => threads.get(id) ?? null, - ); - - await expect( - memory.getWorkingMemory({ - threadId: 'test-agent-1:user-1', - resourceId: 'user-1', - scope: 'thread', - }), - ).resolves.toBe('alice notes'); - await expect( - memory.getWorkingMemory({ - threadId: 'test-agent-1:user-2', - resourceId: 'user-2', - scope: 'thread', - }), - ).resolves.toBe('bob notes'); - }); - }); - - // ── Observational memory ───────────────────────────────────────────── - - function makeNewObs(overrides: Partial = {}): NewObservation { + function makeNewObservationLogEntry( + overrides: Partial = {}, + ): NewObservationLogEntry { return { scopeKind: 'resource', scopeId: 't-1', - kind: 'observation', - payload: { text: 'hello' }, - durationMs: null, - schemaVersion: OBSERVATION_SCHEMA_VERSION, + marker: 'important', + text: 'hello', createdAt: new Date('2026-05-05T00:00:00Z'), ...overrides, }; } - describe('appendObservations', () => { + describe('appendObservationLogEntries', () => { beforeEach(() => { observationRepository.create.mockImplementation( (input) => ({ ...input }) as AgentObservationEntity, @@ -436,42 +377,56 @@ describe('N8nMemory', () => { }); it('returns [] for an empty input without touching the repo', async () => { - const result = await memory.appendObservations([]); + const result = await memory.appendObservationLogEntries([]); expect(result).toEqual([]); - expect(observationRepository.findOne).not.toHaveBeenCalled(); + expect(observationRepository.create).not.toHaveBeenCalled(); expect(observationRepository.save).not.toHaveBeenCalled(); }); - it('persists rows without allocating a sequence number', async () => { + it('persists active marker rows with a default token count', async () => { (observationRepository.save as unknown as jest.Mock).mockImplementation( async (input: AgentObservationEntity | AgentObservationEntity[]) => (Array.isArray(input) ? input : [input]).map((e, i) => ({ ...e, id: `obs-${i + 1}`, + createdAt: e.createdAt ?? new Date('2026-05-05T00:00:00Z'), + updatedAt: e.updatedAt ?? new Date('2026-05-05T00:00:00Z'), })), ); - const result = await memory.appendObservations([makeNewObs(), makeNewObs()]); + const result = await memory.appendObservationLogEntries([ + makeNewObservationLogEntry(), + makeNewObservationLogEntry({ marker: 'critical', text: 'remember this' }), + ]); - expect(observationRepository.findOne).not.toHaveBeenCalled(); + expect(observationRepository.create).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + marker: 'important', + text: 'hello', + parentId: null, + tokenCount: estimateObservationTokens('hello'), + status: 'active', + supersededBy: null, + }), + ); expect(result.map((r) => r.id)).toEqual(['obs-1', 'obs-2']); }); }); - describe('getObservations', () => { + describe('getObservationLog', () => { beforeEach(() => { observationRepository.find.mockResolvedValue([]); }); it('passes filters through to find()', async () => { - const sinceCreatedAt = new Date('2026-05-05T00:00:00Z'); - await memory.getObservations({ + await memory.getObservationLog({ scopeKind: 'resource', scopeId: 't-1', - since: { sinceCreatedAt, sinceObservationId: 'obs-anchor' }, - kindIs: 'summary', - schemaVersionAtMost: 1, + status: 'active', + parentId: null, limit: 10, + order: 'desc', }); expect(observationRepository.find).toHaveBeenCalledWith({ @@ -479,64 +434,122 @@ describe('N8nMemory', () => { { scopeKind: 'resource', scopeId: 't-1', - kind: 'summary', - schemaVersion: LessThanOrEqual(1), - createdAt: MoreThan(sinceCreatedAt), - }, - { - scopeKind: 'resource', - scopeId: 't-1', - kind: 'summary', - schemaVersion: LessThanOrEqual(1), - createdAt: Equal(sinceCreatedAt), - id: MoreThan('obs-anchor'), + status: 'active', + parentId: IsNull(), }, ], - order: { createdAt: 'ASC', id: 'ASC' }, + order: { createdAt: 'DESC', id: 'DESC' }, take: 10, }); }); - it('omits absent filters', async () => { - await memory.getObservations({ scopeKind: 'resource', scopeId: 't-1' }); + it('active read filters out non-active rows', async () => { + await memory.getActiveObservationLog({ scopeKind: 'resource', scopeId: 't-1' }); expect(observationRepository.find).toHaveBeenCalledWith({ - where: [{ scopeKind: 'resource', scopeId: 't-1' }], + where: [{ scopeKind: 'resource', scopeId: 't-1', status: 'active' }], order: { createdAt: 'ASC', id: 'ASC' }, }); }); - it('coerces bigint columns back to numbers on read', async () => { + it('maps persisted rows to observation log entries', async () => { observationRepository.find.mockResolvedValue([ { id: 'obs-1', scopeKind: 'resource', scopeId: 't-1', - kind: 'observation', - payload: { text: 'hi' }, - durationMs: '1000' as unknown as number | null, - schemaVersion: '1' as unknown as number, + marker: 'important', + text: 'hi', + parentId: null, + tokenCount: '7' as unknown as number, + status: 'active', + supersededBy: null, createdAt: new Date('2026-05-05T00:00:00Z'), updatedAt: new Date('2026-05-05T00:00:00Z'), } as AgentObservationEntity, ]); - const [row] = await memory.getObservations({ scopeKind: 'resource', scopeId: 't-1' }); - expect(row.durationMs).toBe(1000); - expect(row.schemaVersion).toBe(1); + const [row] = await memory.getObservationLog({ scopeKind: 'resource', scopeId: 't-1' }); + expect(row).toMatchObject({ + id: 'obs-1', + marker: 'important', + text: 'hi', + tokenCount: 7, + status: 'active', + }); }); }); - describe('deleteObservations', () => { - it('issues a single delete with the given ids', async () => { - await memory.deleteObservations(['a', 'b']); + describe('observation log status updates', () => { + it('marks rows as dropped instead of deleting them', async () => { + await memory.dropObservationLogEntries(['a', 'b']); - expect(observationRepository.delete).toHaveBeenCalledWith({ id: In(['a', 'b']) }); + expect(observationRepository.update).toHaveBeenCalledWith( + { id: In(['a', 'b']) }, + { status: 'dropped', supersededBy: null }, + ); }); - it('no-ops on empty input', async () => { - await memory.deleteObservations([]); - expect(observationRepository.delete).not.toHaveBeenCalled(); + it('marks rows as superseded by a replacement row', async () => { + await memory.supersedeObservationLogEntries(['a', 'b'], 'replacement'); + + expect(observationRepository.update).toHaveBeenCalledWith( + { id: In(['a', 'b']) }, + { status: 'superseded', supersededBy: 'replacement' }, + ); + }); + + it('no-ops on empty update inputs', async () => { + await memory.dropObservationLogEntries([]); + await memory.supersedeObservationLogEntries([], 'replacement'); + expect(observationRepository.update).not.toHaveBeenCalled(); + }); + }); + + describe('applyObservationLogReflection', () => { + it('inserts merged replacements and updates old rows in one transaction', async () => { + const result = await memory.applyObservationLogReflection( + { scopeKind: 'thread', scopeId: 't-1' }, + { + drop: ['drop-1'], + merge: [ + { + supersedes: ['old-1', 'old-2'], + marker: 'important', + text: 'Merged observation', + }, + ], + }, + ); + + expect(observationRunInTransaction).toHaveBeenCalledWith(expect.any(Function)); + expect(transactionObservationCreate).toHaveBeenCalledWith( + expect.objectContaining({ + scopeKind: 'thread', + scopeId: 't-1', + marker: 'important', + text: 'Merged observation', + parentId: null, + tokenCount: estimateObservationTokens('Merged observation'), + status: 'active', + supersededBy: null, + }), + ); + expect(transactionObservationUpdate).toHaveBeenNthCalledWith( + 1, + { scopeKind: 'thread', scopeId: 't-1', id: In(['drop-1']) }, + { status: 'dropped', supersededBy: null }, + ); + expect(transactionObservationUpdate).toHaveBeenNthCalledWith( + 2, + { scopeKind: 'thread', scopeId: 't-1', id: In(['old-1', 'old-2']) }, + { status: 'superseded', supersededBy: 'merged-1' }, + ); + expect(result).toMatchObject({ + droppedIds: ['drop-1'], + supersededIds: ['old-1', 'old-2'], + inserted: [{ id: 'merged-1', status: 'active' }], + }); }); }); @@ -634,6 +647,7 @@ describe('N8nMemory', () => { claimed: { scopeKind: 'thread', scopeId: 't-1', + taskKind: 'observer', holderId: 'A', heldUntil: new Date(Date.now() + 60_000), } as AgentObservationLockEntity, @@ -685,6 +699,9 @@ describe('N8nMemory', () => { expect(updateQueryBuilder.andWhere).toHaveBeenCalledWith( '("holderId" = :holderId OR "heldUntil" <= :now)', ); + expect(updateQueryBuilder.set).toHaveBeenCalledWith( + expect.objectContaining({ taskKind: 'observer', holderId: 'B' }), + ); expect(observationLockRepository.save).not.toHaveBeenCalled(); }); @@ -709,6 +726,7 @@ describe('N8nMemory', () => { expect(observationLockRepository.delete).toHaveBeenCalledWith({ scopeKind: 'resource', scopeId: 't-1', + taskKind: 'observer', holderId: 'A', }); }); diff --git a/packages/cli/src/modules/agents/integrations/n8n-memory.ts b/packages/cli/src/modules/agents/integrations/n8n-memory.ts index 8caa8ca50c4..24e5975c5c8 100644 --- a/packages/cli/src/modules/agents/integrations/n8n-memory.ts +++ b/packages/cli/src/modules/agents/integrations/n8n-memory.ts @@ -2,18 +2,22 @@ import type { AgentDbMessage, AgentMessage, BuiltMemory, - BuiltObservationStore, + BuiltObservationLogStore, MemoryDescriptor, - NewObservation, - Observation, + NewObservationLogEntry, ObservationCursor, + ObservationLogEntry, + ObservationLogReadOptions, + ObservationLogReflection, + ObservationLogReflectionResult, + ObservationLogScope, + ObservationLogScopeKind, ObservationLockHandle, - ScopeKind, Thread, } from '@n8n/agents'; import { Service } from '@n8n/di'; import type { FindOptionsWhere } from '@n8n/typeorm'; -import { Equal, In, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm'; +import { Equal, In, IsNull, LessThan, Like, MoreThan } from '@n8n/typeorm'; import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity'; import { UnexpectedError } from 'n8n-workflow'; @@ -29,11 +33,10 @@ import { AgentObservationRepository } from '../repositories/agent-observation.re import { AgentResourceRepository } from '../repositories/agent-resource.repository'; import { AgentThreadRepository } from '../repositories/agent-thread.repository'; -/** Key inside the metadata JSON where working memory content is stored. */ -const WORKING_MEMORY_KEY = 'workingMemory'; +const estimateObservationTokens = (text: string) => Math.ceil(text.length / 4); @Service() -export class N8nMemory implements BuiltMemory, BuiltObservationStore { +export class N8nMemory implements BuiltMemory, BuiltObservationLogStore { constructor( private readonly threadRepository: AgentThreadRepository, private readonly messageRepository: AgentMessageRepository, @@ -134,11 +137,7 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { entities.reverse(); } - return entities.map((e) => { - const msg = e.content as AgentMessage & { id?: string }; - msg.id = e.id; - return msg as AgentDbMessage; - }); + return entities.map((e) => this.toAgentDbMessage(e)); } async saveMessages(args: { @@ -193,83 +192,74 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { resourceId: string; scope: 'resource' | 'thread'; }): Promise { - if (params.scope === 'resource') { - const resource = await this.resourceRepository.findOneBy({ id: params.resourceId }); - return this.extractWorkingMemory(resource?.metadata ?? null); - } - - const thread = await this.threadRepository.findOneBy({ id: params.threadId }); - return this.extractWorkingMemory(thread?.metadata ?? null); + void params; + // Legacy `workingMemory` metadata is intentionally ignored. The new + // observation-log pipeline will own memory state. + return null; } async saveWorkingMemory( params: { threadId: string; resourceId: string; scope: 'resource' | 'thread' }, content: string, ): Promise { - if (params.scope === 'resource') { - await this.upsertResourceMetadata(params.resourceId, content); - } else { - await this.upsertThreadMetadata(params.threadId, params.resourceId, content); - } + void params; + void content; + // Legacy `workingMemory` metadata is intentionally ignored. The new + // observation-log pipeline will own memory state. } - // ── Observational memory: data ─────────────────────────────────────── + // ── Observation log ────────────────────────────────────────────────── - async appendObservations(rows: NewObservation[]): Promise { + async appendObservationLogEntries( + rows: NewObservationLogEntry[], + ): Promise { if (rows.length === 0) return []; const entities: AgentObservationEntity[] = rows.map((row) => this.observationRepository.create({ scopeKind: row.scopeKind, scopeId: row.scopeId, - kind: row.kind, - payload: row.payload, - durationMs: row.durationMs, - schemaVersion: row.schemaVersion, + marker: row.marker, + text: row.text, + parentId: row.parentId ?? null, + tokenCount: row.tokenCount ?? estimateObservationTokens(row.text), + status: 'active', + supersededBy: null, createdAt: row.createdAt, }), ); const saved = await this.observationRepository.save(entities); - return saved.map((e) => this.toObservation(e)); + return saved.map((e) => this.toObservationLogEntry(e)); } - async getObservations(opts: { - scopeKind: ScopeKind; - scopeId: string; - since?: { sinceCreatedAt: Date; sinceObservationId: string }; - kindIs?: string; - limit?: number; - schemaVersionAtMost?: number; - }): Promise { + async getActiveObservationLog( + scope: ObservationLogScope & { limit?: number; order?: 'asc' | 'desc' }, + ): Promise { + return await this.getObservationLog({ ...scope, status: 'active' }); + } + + async getObservationLog(opts: ObservationLogReadOptions): Promise { const baseWhere: FindOptionsWhere = { scopeKind: opts.scopeKind, scopeId: opts.scopeId, - ...(opts.kindIs !== undefined && { kind: opts.kindIs }), - ...(opts.schemaVersionAtMost !== undefined && { - schemaVersion: LessThanOrEqual(opts.schemaVersionAtMost), - }), + ...(opts.status !== undefined && { status: opts.status }), + ...(opts.parentId !== undefined && { parentId: opts.parentId ?? IsNull() }), }; - const where: FindOptionsWhere[] = opts.since - ? [ - { ...baseWhere, createdAt: MoreThan(opts.since.sinceCreatedAt) }, - { - ...baseWhere, - createdAt: Equal(opts.since.sinceCreatedAt), - id: MoreThan(opts.since.sinceObservationId), - }, - ] - : [baseWhere]; + const entities = await this.observationRepository.find({ - where, - order: { createdAt: 'ASC', id: 'ASC' }, + where: [baseWhere], + order: { + createdAt: opts.order === 'desc' ? 'DESC' : 'ASC', + id: opts.order === 'desc' ? 'DESC' : 'ASC', + }, ...(opts.limit !== undefined && { take: opts.limit }), }); - return entities.map((e) => this.toObservation(e)); + return entities.map((e) => this.toObservationLogEntry(e)); } async getMessagesForScope( - scopeKind: ScopeKind, + scopeKind: ObservationLogScopeKind, scopeId: string, opts?: { since?: { sinceCreatedAt: Date; sinceMessageId: string } }, ): Promise { @@ -295,21 +285,80 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { where, order: { createdAt: 'ASC', id: 'ASC' }, }); - return entities.map((e) => { - const msg = e.content as AgentMessage & { id?: string }; - msg.id = e.id; - return msg as AgentDbMessage; - }); + return entities.map((e) => this.toAgentDbMessage(e)); } - async deleteObservations(ids: string[]): Promise { + async dropObservationLogEntries(ids: string[]): Promise { if (ids.length === 0) return; - await this.observationRepository.delete({ id: In(ids) }); + await this.observationRepository.update( + { id: In(ids) }, + { status: 'dropped', supersededBy: null }, + ); + } + + async supersedeObservationLogEntries(ids: string[], supersededBy: string): Promise { + if (ids.length === 0) return; + await this.observationRepository.update( + { id: In(ids) }, + { status: 'superseded', supersededBy }, + ); + } + + async applyObservationLogReflection( + scope: ObservationLogScope, + reflection: ObservationLogReflection, + ): Promise { + return await this.observationRepository.manager.transaction(async (trx) => { + const repo = trx.getRepository(AgentObservationEntity); + const inserted = reflection.merge.length + ? await repo.save( + reflection.merge.map((entry) => + repo.create({ + scopeKind: scope.scopeKind, + scopeId: scope.scopeId, + marker: entry.marker, + text: entry.text, + parentId: entry.parentId ?? null, + tokenCount: entry.tokenCount ?? estimateObservationTokens(entry.text), + status: 'active', + supersededBy: null, + createdAt: entry.createdAt, + }), + ), + ) + : []; + + if (reflection.drop.length > 0) { + await repo.update( + { scopeKind: scope.scopeKind, scopeId: scope.scopeId, id: In(reflection.drop) }, + { status: 'dropped', supersededBy: null }, + ); + } + + for (const [index, merge] of reflection.merge.entries()) { + const replacement = inserted[index]; + if (replacement && merge.supersedes.length > 0) { + await repo.update( + { scopeKind: scope.scopeKind, scopeId: scope.scopeId, id: In(merge.supersedes) }, + { status: 'superseded', supersededBy: replacement.id }, + ); + } + } + + return { + droppedIds: [...reflection.drop], + supersededIds: reflection.merge.flatMap((entry) => entry.supersedes), + inserted: inserted.map((entry) => this.toObservationLogEntry(entry)), + }; + }); } // ── Observational memory: cursors ──────────────────────────────────── - async getCursor(scopeKind: ScopeKind, scopeId: string): Promise { + async getCursor( + scopeKind: ObservationLogScopeKind, + scopeId: string, + ): Promise { const entity = await this.observationCursorRepository.findOneBy({ scopeKind, scopeId }); if (!entity) return null; return { @@ -321,7 +370,9 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { }; } - async setCursor(cursor: ObservationCursor): Promise { + async setCursor( + cursor: ObservationCursor & { scopeKind: ObservationLogScopeKind }, + ): Promise { await this.observationCursorRepository.upsert( { scopeKind: cursor.scopeKind, @@ -337,21 +388,23 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { // ── Observational memory: locks ────────────────────────────────────── async acquireObservationLock( - scopeKind: ScopeKind, + scopeKind: ObservationLogScopeKind, scopeId: string, opts: { ttlMs: number; holderId: string }, ): Promise { const now = new Date(); const heldUntil = new Date(now.getTime() + opts.ttlMs); + const taskKind = 'observer'; const updateResult = await this.observationLockRepository .createQueryBuilder() .update(AgentObservationLockEntity) - .set({ holderId: opts.holderId, heldUntil }) + .set({ taskKind, holderId: opts.holderId, heldUntil }) .where('"scopeKind" = :scopeKind') .andWhere('"scopeId" = :scopeId') + .andWhere('"taskKind" = :taskKind') .andWhere('("holderId" = :holderId OR "heldUntil" <= :now)') - .setParameters({ scopeKind, scopeId, holderId: opts.holderId, now }) + .setParameters({ scopeKind, scopeId, taskKind, holderId: opts.holderId, now }) .execute(); if ((updateResult.affected ?? 0) > 0) { @@ -362,13 +415,14 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { .createQueryBuilder() .insert() .into(AgentObservationLockEntity) - .values({ scopeKind, scopeId, holderId: opts.holderId, heldUntil }) + .values({ scopeKind, scopeId, taskKind, holderId: opts.holderId, heldUntil }) .orIgnore() .execute(); const claimed = await this.observationLockRepository.findOneBy({ scopeKind, scopeId, + taskKind, holderId: opts.holderId, }); if (!claimed) return null; @@ -376,10 +430,13 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { return { scopeKind, scopeId, holderId: opts.holderId, heldUntil }; } - async releaseObservationLock(handle: ObservationLockHandle): Promise { + async releaseObservationLock( + handle: ObservationLockHandle & { scopeKind: ObservationLogScopeKind }, + ): Promise { await this.observationLockRepository.delete({ scopeKind: handle.scopeKind, scopeId: handle.scopeId, + taskKind: 'observer', holderId: handle.holderId, }); } @@ -392,15 +449,24 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { // ── Helpers ────────────────────────────────────────────────────────── - private toObservation(entity: AgentObservationEntity): Observation { + private toAgentDbMessage(entity: AgentMessageEntity): AgentDbMessage { + const msg = entity.content as AgentMessage & { id?: string; createdAt?: Date }; + msg.id = entity.id; + msg.createdAt = entity.createdAt; + return msg as AgentDbMessage; + } + + private toObservationLogEntry(entity: AgentObservationEntity): ObservationLogEntry { return { id: entity.id, scopeKind: entity.scopeKind, scopeId: entity.scopeId, - kind: entity.kind, - payload: entity.payload as Observation['payload'], - durationMs: entity.durationMs === null ? null : Number(entity.durationMs), - schemaVersion: Number(entity.schemaVersion), + marker: entity.marker, + text: entity.text, + parentId: entity.parentId, + tokenCount: Number(entity.tokenCount), + status: entity.status, + supersededBy: entity.supersededBy, createdAt: entity.createdAt, }; } @@ -423,65 +489,4 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore { updatedAt: entity.updatedAt, }; } - - private extractWorkingMemory(metadataJson: string | null): string | null { - if (!metadataJson) return null; - try { - const parsed = JSON.parse(metadataJson) as Record; - const wm = parsed[WORKING_MEMORY_KEY]; - return typeof wm === 'string' ? wm : null; - } catch { - return null; - } - } - - private mergeWorkingMemory(existingJson: string | null, content: string): string { - let parsed: Record = {}; - if (existingJson) { - try { - parsed = JSON.parse(existingJson) as Record; - } catch { - // start fresh on corrupt JSON - } - } - parsed[WORKING_MEMORY_KEY] = content; - return JSON.stringify(parsed); - } - - private async upsertResourceMetadata(resourceId: string, content: string): Promise { - const existing = await this.resourceRepository.findOneBy({ id: resourceId }); - if (existing) { - existing.metadata = this.mergeWorkingMemory(existing.metadata, content); - await this.resourceRepository.save(existing); - } else { - const entity = this.resourceRepository.create({ - id: resourceId, - metadata: this.mergeWorkingMemory(null, content), - }); - await this.resourceRepository.save(entity); - } - } - - private async upsertThreadMetadata( - threadId: string, - resourceId: string, - content: string, - ): Promise { - const existing = await this.threadRepository.findOneBy({ id: threadId }); - if (existing) { - existing.metadata = this.mergeWorkingMemory(existing.metadata, content); - await this.threadRepository.save(existing); - return; - } - - await this.ensureResource(resourceId); - await this.threadRepository.save( - this.threadRepository.create({ - id: threadId, - resourceId, - title: null, - metadata: this.mergeWorkingMemory(null, content), - }), - ); - } } diff --git a/packages/frontend/@n8n/i18n/src/locales/en.json b/packages/frontend/@n8n/i18n/src/locales/en.json index b11e4b3c9c2..212776ea8d3 100644 --- a/packages/frontend/@n8n/i18n/src/locales/en.json +++ b/packages/frontend/@n8n/i18n/src/locales/en.json @@ -1323,7 +1323,6 @@ "agentSessions.detail.cost": "Cost", "agentSessions.detail.toolCallSuspended": "Tool call suspended — waiting for user action", "agentSessions.detail.suspendedNote": "This message was part of a tool call suspension. Token usage and cost are reported on the resumed message.", - "agentSessions.detail.workingMemory": "Memory Update", "agentSessions.detail.builtInTools": "Built-in", "agentSessions.detail.userTools": "Tools", "agentSessions.detail.richInteraction.userResponse": "User responded", diff --git a/packages/frontend/editor-ui/src/features/agents/__tests__/session-timeline.utils.spec.ts b/packages/frontend/editor-ui/src/features/agents/__tests__/session-timeline.utils.spec.ts index 2017e1856a2..3b715e04a27 100644 --- a/packages/frontend/editor-ui/src/features/agents/__tests__/session-timeline.utils.spec.ts +++ b/packages/frontend/editor-ui/src/features/agents/__tests__/session-timeline.utils.spec.ts @@ -158,7 +158,6 @@ function exec(overrides: Partial = {}): AgentExecution { timeline: null, error: null, hitlStatus: null, - workingMemory: null, source: null, ...overrides, }; diff --git a/packages/frontend/editor-ui/src/features/agents/composables/useAgentThreadsApi.ts b/packages/frontend/editor-ui/src/features/agents/composables/useAgentThreadsApi.ts index 2b5eef3a4fc..0f038b2692b 100644 --- a/packages/frontend/editor-ui/src/features/agents/composables/useAgentThreadsApi.ts +++ b/packages/frontend/editor-ui/src/features/agents/composables/useAgentThreadsApi.ts @@ -56,7 +56,6 @@ export interface AgentExecution { timeline: AgentExecutionTimelineEvent[] | null; error: string | null; hitlStatus: AgentExecutionHitlStatus | null; - workingMemory: string | null; source: string | null; }