feat(core): Add observation log storage (#30339)

Co-authored-by: Michael Drury <michael.drury@n8n.io>
This commit is contained in:
bjorger 2026-05-18 10:16:10 +02:00 committed by GitHub
parent 4575c856f4
commit 28af69a57c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 864 additions and 322 deletions

View File

@ -54,6 +54,18 @@ export type {
ObservationalMemoryTrigger,
ObserveFn,
ScopeKind,
BuiltObservationLogStore,
NewObservationLogEntry,
ObservationLogEntry,
ObservationLogMarker,
ObservationLogMerge,
ObservationLogReadOptions,
ObservationLogReflection,
ObservationLogReflectionResult,
ObservationLogScope,
ObservationLogScopeKind,
ObservationLogStatus,
TokenCounter,
} from './types';
export type { ProviderOptions } from '@ai-sdk/provider-utils';
export { AgentEvent } from './types';
@ -63,6 +75,11 @@ export {
OBSERVATION_CATEGORIES,
OBSERVATION_SCHEMA_VERSION,
} from './types';
export {
estimateObservationTokens,
OBSERVATION_LOG_MARKERS,
OBSERVATION_LOG_STATUSES,
} from './types';
export { Tool, wrapToolForApproval } from './sdk/tool';
export { Memory } from './sdk/memory';

View File

@ -0,0 +1,89 @@
import { estimateObservationTokens } from '../../types/sdk/observation-log';
import { InMemoryMemory } from '../memory-store';
describe('observation log store', () => {
it('persists marker-based active observations with default mechanics', async () => {
const store = new InMemoryMemory();
const createdAt = new Date('2026-05-12T10:00:00Z');
const [entry] = await store.appendObservationLogEntries([
{
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'critical',
text: 'User chose the observation log model.',
createdAt,
},
]);
expect(entry).toMatchObject({
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'critical',
text: 'User chose the observation log model.',
parentId: null,
status: 'active',
supersededBy: null,
tokenCount: estimateObservationTokens('User chose the observation log model.'),
createdAt,
});
await expect(
store.getActiveObservationLog({ scopeKind: 'thread', scopeId: 'thread-1' }),
).resolves.toEqual([entry]);
});
it('keeps dropped and superseded observations out of the active read path', async () => {
const store = new InMemoryMemory();
const [dropped, superseded, replacement] = await store.appendObservationLogEntries([
{ scopeKind: 'thread', scopeId: 'thread-1', marker: 'info', text: 'Small detail' },
{ scopeKind: 'thread', scopeId: 'thread-1', marker: 'important', text: 'Old plan' },
{ scopeKind: 'thread', scopeId: 'thread-1', marker: 'important', text: 'New plan' },
]);
await store.dropObservationLogEntries([dropped.id]);
await store.supersedeObservationLogEntries([superseded.id], replacement.id);
await expect(
store.getActiveObservationLog({ scopeKind: 'thread', scopeId: 'thread-1' }),
).resolves.toEqual([replacement]);
await expect(
store.getObservationLog({ scopeKind: 'thread', scopeId: 'thread-1', status: 'dropped' }),
).resolves.toMatchObject([{ id: dropped.id, status: 'dropped', supersededBy: null }]);
await expect(
store.getObservationLog({ scopeKind: 'thread', scopeId: 'thread-1', status: 'superseded' }),
).resolves.toMatchObject([
{ id: superseded.id, status: 'superseded', supersededBy: replacement.id },
]);
});
it('applies reflection as drops plus merged replacements', async () => {
const store = new InMemoryMemory();
const [stale, oldA, oldB] = await store.appendObservationLogEntries([
{ scopeKind: 'resource', scopeId: 'user-1', marker: 'info', text: 'Tiny aside' },
{ scopeKind: 'resource', scopeId: 'user-1', marker: 'important', text: 'Plan A' },
{ scopeKind: 'resource', scopeId: 'user-1', marker: 'important', text: 'Plan B' },
]);
const result = await store.applyObservationLogReflection(
{ scopeKind: 'resource', scopeId: 'user-1' },
{
drop: [stale.id],
merge: [
{
supersedes: [oldA.id, oldB.id],
marker: 'important',
text: 'User compared Plan A and Plan B.',
},
],
},
);
expect(result.droppedIds).toEqual([stale.id]);
expect(result.supersededIds).toEqual([oldA.id, oldB.id]);
expect(result.inserted).toHaveLength(1);
await expect(
store.getActiveObservationLog({ scopeKind: 'resource', scopeId: 'user-1' }),
).resolves.toEqual(result.inserted);
});
});

View File

@ -8,6 +8,16 @@ import type {
ObservationLockHandle,
ScopeKind,
} from '../types/sdk/observation';
import type {
BuiltObservationLogStore,
NewObservationLogEntry,
ObservationLogEntry,
ObservationLogReadOptions,
ObservationLogReflection,
ObservationLogReflectionResult,
ObservationLogScope,
} from '../types/sdk/observation-log';
import { estimateObservationTokens } from '../types/sdk/observation-log';
interface StoredMessage {
message: AgentDbMessage;
@ -27,6 +37,10 @@ function cloneCursor(cursor: ObservationCursor): ObservationCursor {
};
}
function cloneObservationLogEntry(entry: ObservationLogEntry): ObservationLogEntry {
return { ...entry, createdAt: new Date(entry.createdAt) };
}
function compareKeyset(
a: { createdAt: Date; id: string },
b: { createdAt: Date; id: string },
@ -43,7 +57,9 @@ function compareKeyset(
* Thread context for `saveMessages` is established by calling `saveThread` first.
* The most recently saved thread is used when `saveMessages` is called.
*/
export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
export class InMemoryMemory
implements BuiltMemory, BuiltObservationStore, BuiltObservationLogStore
{
private threads = new Map<string, Thread>();
private messagesByThread = new Map<string, StoredMessage[]>();
@ -52,6 +68,8 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
private observationsByScope = new Map<string, Observation[]>();
private observationLogByScope = new Map<string, ObservationLogEntry[]>();
private cursorsByScope = new Map<string, ObservationCursor>();
private locksByScope = new Map<string, ObservationLockHandle>();
@ -108,6 +126,7 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
this.messagesByThread.delete(threadId);
const key = scopeKey('thread', threadId);
this.observationsByScope.delete(key);
this.observationLogByScope.delete(key);
this.cursorsByScope.delete(key);
this.locksByScope.delete(key);
}
@ -195,6 +214,117 @@ export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
// ── Observational memory ─────────────────────────────────────────────
// eslint-disable-next-line @typescript-eslint/require-await
async appendObservationLogEntries(
rows: NewObservationLogEntry[],
): Promise<ObservationLogEntry[]> {
const persisted: ObservationLogEntry[] = [];
for (const row of rows) {
const key = scopeKey(row.scopeKind, row.scopeId);
const bucket = this.observationLogByScope.get(key) ?? [];
const entry: ObservationLogEntry = {
id: crypto.randomUUID(),
scopeKind: row.scopeKind,
scopeId: row.scopeId,
marker: row.marker,
text: row.text,
parentId: row.parentId ?? null,
tokenCount: row.tokenCount ?? estimateObservationTokens(row.text),
status: 'active',
supersededBy: null,
createdAt: row.createdAt ?? new Date(),
};
bucket.push(entry);
this.observationLogByScope.set(key, bucket);
persisted.push(cloneObservationLogEntry(entry));
}
return persisted;
}
async getActiveObservationLog(
scope: ObservationLogScope & { limit?: number; order?: 'asc' | 'desc' },
): Promise<ObservationLogEntry[]> {
return await this.getObservationLog({ ...scope, status: 'active' });
}
// eslint-disable-next-line @typescript-eslint/require-await
async getObservationLog(opts: ObservationLogReadOptions): Promise<ObservationLogEntry[]> {
const bucket = this.observationLogByScope.get(scopeKey(opts.scopeKind, opts.scopeId)) ?? [];
let rows = [...bucket].sort((a, b) =>
compareKeyset({ createdAt: a.createdAt, id: a.id }, { createdAt: b.createdAt, id: b.id }),
);
if (opts.order === 'desc') rows.reverse();
if (opts.status !== undefined) {
rows = rows.filter((entry) => entry.status === opts.status);
}
if (opts.parentId !== undefined) {
rows = rows.filter((entry) => entry.parentId === opts.parentId);
}
if (opts.limit !== undefined) {
rows = rows.slice(0, opts.limit);
}
return rows.map(cloneObservationLogEntry);
}
// eslint-disable-next-line @typescript-eslint/require-await
async dropObservationLogEntries(ids: string[]): Promise<void> {
if (ids.length === 0) return;
const idSet = new Set(ids);
for (const bucket of this.observationLogByScope.values()) {
for (const entry of bucket) {
if (idSet.has(entry.id)) {
entry.status = 'dropped';
entry.supersededBy = null;
}
}
}
}
// eslint-disable-next-line @typescript-eslint/require-await
async supersedeObservationLogEntries(ids: string[], supersededBy: string): Promise<void> {
if (ids.length === 0) return;
const idSet = new Set(ids);
for (const bucket of this.observationLogByScope.values()) {
for (const entry of bucket) {
if (idSet.has(entry.id)) {
entry.status = 'superseded';
entry.supersededBy = supersededBy;
}
}
}
}
async applyObservationLogReflection(
scope: ObservationLogScope,
reflection: ObservationLogReflection,
): Promise<ObservationLogReflectionResult> {
const inserted = await this.appendObservationLogEntries(
reflection.merge.map((entry) => ({
scopeKind: scope.scopeKind,
scopeId: scope.scopeId,
marker: entry.marker,
text: entry.text,
parentId: entry.parentId,
tokenCount: entry.tokenCount,
createdAt: entry.createdAt,
})),
);
await this.dropObservationLogEntries(reflection.drop);
for (const [index, merge] of reflection.merge.entries()) {
const replacement = inserted[index];
if (replacement) {
await this.supersedeObservationLogEntries(merge.supersedes, replacement.id);
}
}
return {
droppedIds: [...reflection.drop],
supersededIds: reflection.merge.flatMap((entry) => entry.supersedes),
inserted,
};
}
// eslint-disable-next-line @typescript-eslint/require-await
async appendObservations(rows: NewObservation[]): Promise<Observation[]> {
const persisted: Observation[] = [];

View File

@ -0,0 +1,23 @@
import type { BuiltMemory, BuiltObservationLogStore } from '../types';
const OBSERVATION_LOG_STORE_METHODS = [
'appendObservationLogEntries',
'getActiveObservationLog',
'getObservationLog',
'dropObservationLogEntries',
'supersedeObservationLogEntries',
'applyObservationLogReflection',
] as const satisfies ReadonlyArray<keyof BuiltObservationLogStore>;
function hasFunctionProperty<K extends PropertyKey>(
value: object,
property: K,
): value is Record<K, (...args: never[]) => unknown> {
return property in value && typeof Reflect.get(value, property) === 'function';
}
export function hasObservationLogStore(
memory: BuiltMemory,
): memory is BuiltMemory & BuiltObservationLogStore {
return OBSERVATION_LOG_STORE_METHODS.every((method) => hasFunctionProperty(memory, method));
}

View File

@ -89,6 +89,26 @@ export {
OBSERVATION_SCHEMA_VERSION,
} from './sdk/observation';
export type {
BuiltObservationLogStore,
NewObservationLogEntry,
ObservationLogEntry,
ObservationLogMarker,
ObservationLogMerge,
ObservationLogReadOptions,
ObservationLogReflection,
ObservationLogReflectionResult,
ObservationLogScope,
ObservationLogScopeKind,
ObservationLogStatus,
TokenCounter,
} from './sdk/observation-log';
export {
estimateObservationTokens,
OBSERVATION_LOG_MARKERS,
OBSERVATION_LOG_STATUSES,
} from './sdk/observation-log';
export type {
EvalInput,
EvalScore,

View File

@ -0,0 +1,78 @@
export const OBSERVATION_LOG_MARKERS = ['critical', 'important', 'info', 'completion'] as const;
export type ObservationLogMarker = (typeof OBSERVATION_LOG_MARKERS)[number];
export const OBSERVATION_LOG_STATUSES = ['active', 'superseded', 'dropped'] as const;
export type ObservationLogStatus = (typeof OBSERVATION_LOG_STATUSES)[number];
export type ObservationLogScopeKind = 'thread' | 'resource';
export interface ObservationLogScope {
scopeKind: ObservationLogScopeKind;
scopeId: string;
}
export interface ObservationLogEntry extends ObservationLogScope {
id: string;
marker: ObservationLogMarker;
text: string;
parentId: string | null;
tokenCount: number;
status: ObservationLogStatus;
supersededBy: string | null;
createdAt: Date;
}
export interface NewObservationLogEntry extends ObservationLogScope {
marker: ObservationLogMarker;
text: string;
parentId?: string | null;
tokenCount?: number;
createdAt?: Date;
}
export interface ObservationLogReadOptions extends ObservationLogScope {
status?: ObservationLogStatus;
parentId?: string | null;
limit?: number;
order?: 'asc' | 'desc';
}
export interface ObservationLogMerge {
supersedes: string[];
marker: ObservationLogMarker;
text: string;
parentId?: string | null;
tokenCount?: number;
createdAt?: Date;
}
export interface ObservationLogReflection {
drop: string[];
merge: ObservationLogMerge[];
}
export interface ObservationLogReflectionResult {
droppedIds: string[];
supersededIds: string[];
inserted: ObservationLogEntry[];
}
export type TokenCounter = (text: string) => number;
export const estimateObservationTokens: TokenCounter = (text) => Math.ceil(text.length / 4);
export interface BuiltObservationLogStore {
appendObservationLogEntries(rows: NewObservationLogEntry[]): Promise<ObservationLogEntry[]>;
getActiveObservationLog(
scope: ObservationLogScope & { limit?: number; order?: 'asc' | 'desc' },
): Promise<ObservationLogEntry[]>;
getObservationLog(opts: ObservationLogReadOptions): Promise<ObservationLogEntry[]>;
dropObservationLogEntries(ids: string[]): Promise<void>;
supersedeObservationLogEntries(ids: string[], supersededBy: string): Promise<void>;
applyObservationLogReflection(
scope: ObservationLogScope,
reflection: ObservationLogReflection,
): Promise<ObservationLogReflectionResult>;
}

View File

@ -0,0 +1,115 @@
import type { MigrationContext, ReversibleMigration } from '../migration-types';
const OBSERVATION_SCOPE_KINDS = ['thread', 'resource'];
const OBSERVATION_MARKERS = ['critical', 'important', 'info', 'completion'];
const OBSERVATION_STATUSES = ['active', 'superseded', 'dropped'];
const OBSERVATION_TASK_KINDS = ['observer', 'reflector'];
/**
* Replaces the first observational-memory schema with the observation-log
* schema. The earlier schema has already shipped on master, so this migration
* intentionally drops its queued observation rows instead of editing the
* original migration.
*/
export class ReplaceAgentObservationTables1784000000001 implements ReversibleMigration {
async up({ schemaBuilder: { createTable, dropTable, column } }: MigrationContext) {
await this.dropObservationTables(dropTable);
await this.createObservationLogTables(createTable, column);
}
async down({ schemaBuilder: { createTable, dropTable, column } }: MigrationContext) {
await this.dropObservationTables(dropTable);
await this.createLegacyObservationTables(createTable, column);
}
private async dropObservationTables(dropTable: MigrationContext['schemaBuilder']['dropTable']) {
await dropTable('agents_observation_locks');
await dropTable('agents_observation_cursors');
await dropTable('agents_observations');
}
private async createObservationLogTables(
createTable: MigrationContext['schemaBuilder']['createTable'],
column: MigrationContext['schemaBuilder']['column'],
) {
await createTable('agents_observations')
.withColumns(
column('id')
.varchar(36)
.primary.notNull.comment('Application-generated n8n string ID, not a database UUID'),
column('scopeKind').varchar(20).notNull.withEnumCheck(OBSERVATION_SCOPE_KINDS),
column('scopeId').varchar(255).notNull,
column('marker').varchar(16).notNull.withEnumCheck(OBSERVATION_MARKERS),
column('text').text.notNull,
column('parentId').varchar(36),
column('tokenCount').int.notNull.default(0),
column('status').varchar(16).notNull.withEnumCheck(OBSERVATION_STATUSES),
column('supersededBy').varchar(36),
)
.withIndexOn(['scopeKind', 'scopeId', 'status', 'createdAt', 'id'])
.withIndexOn(['scopeKind', 'scopeId', 'createdAt', 'id'])
.withIndexOn('parentId')
.withIndexOn('supersededBy')
.withForeignKey('parentId', {
tableName: 'agents_observations',
columnName: 'id',
})
.withForeignKey('supersededBy', {
tableName: 'agents_observations',
columnName: 'id',
}).withTimestamps;
await createTable('agents_observation_cursors').withColumns(
column('scopeKind').varchar(20).notNull.primary.withEnumCheck(OBSERVATION_SCOPE_KINDS),
column('scopeId').varchar(255).notNull.primary,
column('lastObservedMessageId').varchar(36).notNull,
column('lastObservedAt').timestampTimezone(3).notNull,
).withTimestamps;
await createTable('agents_observation_locks').withColumns(
column('scopeKind').varchar(20).notNull.primary.withEnumCheck(OBSERVATION_SCOPE_KINDS),
column('scopeId').varchar(255).notNull.primary,
column('taskKind').varchar(20).notNull.primary.withEnumCheck(OBSERVATION_TASK_KINDS),
column('holderId')
.varchar(64)
.notNull.comment('Ephemeral background-task lock owner token, not a user ID'),
column('heldUntil').timestampTimezone(3).notNull,
).withTimestamps;
}
private async createLegacyObservationTables(
createTable: MigrationContext['schemaBuilder']['createTable'],
column: MigrationContext['schemaBuilder']['column'],
) {
await createTable('agents_observations')
.withColumns(
column('id').varchar(36).primary.notNull,
column('scopeKind').varchar(20).notNull.withEnumCheck(['thread', 'resource', 'agent']),
column('scopeId').varchar(255).notNull,
column('kind').varchar(64).notNull,
column('payload').json.notNull,
column('durationMs').bigint,
column('schemaVersion').int.notNull,
)
.withIndexOn(['scopeKind', 'scopeId', 'kind', 'createdAt'])
.withIndexOn(['scopeKind', 'scopeId', 'createdAt', 'id']).withTimestamps;
await createTable('agents_observation_cursors').withColumns(
column('scopeKind')
.varchar(20)
.notNull.primary.withEnumCheck(['thread', 'resource', 'agent']),
column('scopeId').varchar(255).notNull.primary,
column('lastObservedMessageId').varchar(36).notNull,
column('lastObservedAt').timestampTimezone(3).notNull,
).withTimestamps;
await createTable('agents_observation_locks').withColumns(
column('scopeKind')
.varchar(20)
.notNull.primary.withEnumCheck(['thread', 'resource', 'agent']),
column('scopeId').varchar(255).notNull.primary,
column('holderId').varchar(64).notNull,
column('heldUntil').timestampTimezone(3).notNull,
).withTimestamps;
}
}

View File

@ -0,0 +1,11 @@
import type { MigrationContext, ReversibleMigration } from '../migration-types';
export class DropAgentExecutionWorkingMemory1784000000002 implements ReversibleMigration {
async up({ schemaBuilder: { dropColumns } }: MigrationContext) {
await dropColumns('agent_execution', ['workingMemory']);
}
async down({ schemaBuilder: { addColumns, column } }: MigrationContext) {
await addColumns('agent_execution', [column('workingMemory').text]);
}
}

View File

@ -176,6 +176,8 @@ import { CreateEvaluationCollection1778496086558 } from '../common/1778496086558
import { CreateAgentTables1783000000000 } from '../common/1783000000000-CreateAgentTables';
import { CreateAgentExecutionTables1783000000001 } from '../common/1783000000001-CreateAgentExecutionTables';
import { CreateAgentObservationTables1784000000000 } from '../common/1784000000000-CreateAgentObservationTables';
import { ReplaceAgentObservationTables1784000000001 } from '../common/1784000000001-ReplaceAgentObservationTables';
import { DropAgentExecutionWorkingMemory1784000000002 } from '../common/1784000000002-DropAgentExecutionWorkingMemory';
import type { Migration } from '../migration-types';
export const postgresMigrations: Migration[] = [
@ -357,4 +359,6 @@ export const postgresMigrations: Migration[] = [
CreateAgentTables1783000000000,
CreateAgentExecutionTables1783000000001,
CreateAgentObservationTables1784000000000,
ReplaceAgentObservationTables1784000000001,
DropAgentExecutionWorkingMemory1784000000002,
];

View File

@ -169,6 +169,8 @@ import { CreateEvaluationCollection1778496086558 } from '../common/1778496086558
import { CreateAgentTables1783000000000 } from '../common/1783000000000-CreateAgentTables';
import { CreateAgentExecutionTables1783000000001 } from '../common/1783000000001-CreateAgentExecutionTables';
import { CreateAgentObservationTables1784000000000 } from '../common/1784000000000-CreateAgentObservationTables';
import { ReplaceAgentObservationTables1784000000001 } from '../common/1784000000001-ReplaceAgentObservationTables';
import { DropAgentExecutionWorkingMemory1784000000002 } from '../common/1784000000002-DropAgentExecutionWorkingMemory';
import type { Migration } from '../migration-types';
const sqliteMigrations: Migration[] = [
@ -343,6 +345,8 @@ const sqliteMigrations: Migration[] = [
CreateAgentTables1783000000000,
CreateAgentExecutionTables1783000000001,
CreateAgentObservationTables1784000000000,
ReplaceAgentObservationTables1784000000001,
DropAgentExecutionWorkingMemory1784000000002,
];
export { sqliteMigrations };

View File

@ -2,11 +2,31 @@ import { mockLogger } from '@n8n/backend-test-utils';
import { mock } from 'jest-mock-extended';
import { AgentExecutionService } from '../agent-execution.service';
import type { AgentExecution } from '../entities/agent-execution.entity';
import type { AgentExecutionThread } from '../entities/agent-execution-thread.entity';
import type { N8nMemory } from '../integrations/n8n-memory';
import type { AgentExecutionRepository } from '../repositories/agent-execution.repository';
import type { AgentExecutionThreadRepository } from '../repositories/agent-execution-thread.repository';
function makeThread(overrides: Partial<AgentExecutionThread> = {}): AgentExecutionThread {
return {
id: 'thread-1',
agentId: 'agent-1',
agentName: 'Agent',
projectId: 'project-1',
title: null,
emoji: null,
sessionNumber: 1,
totalPromptTokens: 0,
totalCompletionTokens: 0,
totalCost: 0,
totalDuration: 0,
createdAt: new Date('2026-05-07T10:00:00Z'),
updatedAt: new Date('2026-05-07T10:00:00Z'),
...overrides,
} as AgentExecutionThread;
}
describe('AgentExecutionService', () => {
let service: AgentExecutionService;
let agentExecutionRepository: jest.Mocked<AgentExecutionRepository>;
@ -28,6 +48,32 @@ describe('AgentExecutionService', () => {
);
});
describe('getThreadDetail', () => {
it('returns thread executions after ownership validation', async () => {
const thread = makeThread();
const executions = [{ id: 'execution-1' }] as AgentExecution[];
agentExecutionThreadRepository.findOneBy.mockResolvedValue(thread);
agentExecutionRepository.findByThreadIdOrdered.mockResolvedValue(executions);
const result = await service.getThreadDetail('thread-1', 'project-1', 'agent-1');
expect(result).toEqual({ thread, executions });
expect(n8nMemory.getWorkingMemory).not.toHaveBeenCalled();
});
it('does not read working memory for a thread outside the requested scope', async () => {
agentExecutionThreadRepository.findOneBy.mockResolvedValue(
makeThread({ projectId: 'other-project' }),
);
const result = await service.getThreadDetail('thread-1', 'project-1', 'agent-1');
expect(result).toBeNull();
expect(n8nMemory.getWorkingMemory).not.toHaveBeenCalled();
expect(agentExecutionRepository.findByThreadIdOrdered).not.toHaveBeenCalled();
});
});
describe('deleteThread', () => {
it('cleans SDK memory before deleting the execution thread', async () => {
agentExecutionThreadRepository.findOneBy.mockResolvedValue({

View File

@ -101,7 +101,6 @@ describe('ExecutionRecorder', () => {
const record = recorder.getMessageRecord();
expect(record.workingMemory).toBeNull();
expect(record.toolCalls).toEqual([
{
name: 'update_working_memory',
@ -111,27 +110,6 @@ describe('ExecutionRecorder', () => {
]);
expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(false);
});
it('does not derive execution working memory from update_working_memory calls', () => {
const recorder = new ExecutionRecorder();
recorder.record({
type: 'tool-call',
toolCallId: 'wm-1',
toolName: 'update_working_memory',
input: { memory: 'first' },
} as StreamChunk);
recorder.record({
type: 'tool-call',
toolCallId: 'wm-2',
toolName: 'update_working_memory',
input: { memory: 'second' },
} as StreamChunk);
recorder.record({ type: 'finish', finishReason: 'stop' } as StreamChunk);
const record = recorder.getMessageRecord();
expect(record.workingMemory).toBeNull();
});
});
describe('suspension', () => {

View File

@ -89,7 +89,6 @@ export class AgentExecutionService {
timeline: record.timeline.length > 0 ? record.timeline : null,
error: record.error,
hitlStatus: hitlStatus ?? null,
workingMemory: record.workingMemory,
source: source ?? null,
}),
);

View File

@ -78,9 +78,6 @@ export class AgentExecution extends WithTimestampsAndStringId {
@Column({ type: 'varchar', length: 16, nullable: true })
hitlStatus: AgentExecutionHitlStatus | null;
@Column({ type: 'text', nullable: true })
workingMemory: string | null;
/** Where the run originated, e.g. 'chat', 'slack'. */
@Column({ type: 'varchar', length: 32, nullable: true })
source: string | null;

View File

@ -3,6 +3,8 @@ import { Column, Entity, PrimaryColumn } from '@n8n/typeorm';
import type { ObservationScopeKind } from './agent-observation.entity';
export type ObservationTaskKind = 'observer' | 'reflector';
@Entity({ name: 'agents_observation_locks' })
export class AgentObservationLockEntity extends WithTimestamps {
@PrimaryColumn({ type: 'varchar', length: 20 })
@ -11,6 +13,9 @@ export class AgentObservationLockEntity extends WithTimestamps {
@PrimaryColumn({ type: 'varchar', length: 255 })
scopeId: string;
@PrimaryColumn({ type: 'varchar', length: 20 })
taskKind: ObservationTaskKind;
@Column({ type: 'varchar', length: 64 })
holderId: string;

View File

@ -1,11 +1,15 @@
import { JsonColumn, WithTimestampsAndStringId } from '@n8n/db';
import { WithTimestampsAndStringId } from '@n8n/db';
import { Column, Entity, Index } from '@n8n/typeorm';
export type ObservationScopeKind = 'thread' | 'resource' | 'agent';
export type ObservationScopeKind = 'thread' | 'resource';
export type ObservationMarker = 'critical' | 'important' | 'info' | 'completion';
export type ObservationStatus = 'active' | 'superseded' | 'dropped';
@Entity({ name: 'agents_observations' })
@Index(['scopeKind', 'scopeId', 'kind', 'createdAt'])
@Index(['scopeKind', 'scopeId', 'status', 'createdAt', 'id'])
@Index(['scopeKind', 'scopeId', 'createdAt', 'id'])
@Index(['parentId'])
@Index(['supersededBy'])
export class AgentObservationEntity extends WithTimestampsAndStringId {
@Column({ type: 'varchar', length: 20 })
scopeKind: ObservationScopeKind;
@ -13,15 +17,21 @@ export class AgentObservationEntity extends WithTimestampsAndStringId {
@Column({ type: 'varchar', length: 255 })
scopeId: string;
@Column({ type: 'varchar', length: 64 })
kind: string;
@Column({ type: 'varchar', length: 16 })
marker: ObservationMarker;
@JsonColumn()
payload: unknown;
@Column({ type: 'text' })
text: string;
@Column({ type: 'bigint', nullable: true })
durationMs: number | null;
@Column({ type: 'varchar', length: 36, nullable: true })
parentId: string | null;
@Column({ type: 'int' })
schemaVersion: number;
@Column({ type: 'int', default: 0 })
tokenCount: number;
@Column({ type: 'varchar', length: 16 })
status: ObservationStatus;
@Column({ type: 'varchar', length: 36, nullable: true })
supersededBy: string | null;
}

View File

@ -199,7 +199,6 @@ export interface MessageRecord {
startTime: number;
duration: number;
error: string | null;
workingMemory: string | null;
}
export class ExecutionRecorder {
@ -233,8 +232,6 @@ export class ExecutionRecorder {
private error: string | null = null;
private workingMemory: string | null = null;
private readonly startTime = Date.now();
/** Feed a stream chunk into the recorder. */
@ -306,7 +303,6 @@ export class ExecutionRecorder {
startTime: this.startTime,
duration: Date.now() - this.startTime,
error: this.error,
workingMemory: this.workingMemory,
};
}

View File

@ -1,5 +1,5 @@
import { OBSERVATION_SCHEMA_VERSION, type NewObservation } from '@n8n/agents';
import { Equal, In, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm';
import type { NewObservationLogEntry } from '@n8n/agents';
import { Equal, In, IsNull, LessThan, Like, MoreThan } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import type { AgentMessageEntity } from '../../entities/agent-message.entity';
@ -15,6 +15,8 @@ import type { AgentResourceRepository } from '../../repositories/agent-resource.
import type { AgentThreadRepository } from '../../repositories/agent-thread.repository';
import { N8nMemory } from '../n8n-memory';
const estimateObservationTokens = (text: string) => Math.ceil(text.length / 4);
describe('N8nMemory', () => {
let memory: N8nMemory;
let messageRepository: jest.Mocked<AgentMessageRepository>;
@ -25,6 +27,10 @@ describe('N8nMemory', () => {
let observationLockRepository: jest.Mocked<AgentObservationLockRepository>;
let runInTransaction: jest.Mock;
let transactionDelete: jest.Mock;
let observationRunInTransaction: jest.Mock;
let transactionObservationCreate: jest.Mock;
let transactionObservationSave: jest.Mock;
let transactionObservationUpdate: jest.Mock;
beforeEach(() => {
jest.clearAllMocks();
@ -45,6 +51,30 @@ describe('N8nMemory', () => {
value: { transaction: runInTransaction },
});
transactionObservationCreate = jest.fn((input) => ({ ...input }) as AgentObservationEntity);
transactionObservationSave = jest.fn(async (input: AgentObservationEntity[]) =>
input.map((entity, index) => ({
...entity,
id: `merged-${index + 1}`,
createdAt: entity.createdAt ?? new Date('2026-05-12T10:00:00Z'),
updatedAt: entity.updatedAt ?? new Date('2026-05-12T10:00:00Z'),
})),
);
transactionObservationUpdate = jest.fn().mockResolvedValue({ affected: 1, raw: {} });
observationRunInTransaction = jest.fn(
async (callback: (trx: { getRepository: jest.Mock }) => Promise<unknown>) =>
await callback({
getRepository: jest.fn().mockReturnValue({
create: transactionObservationCreate,
save: transactionObservationSave,
update: transactionObservationUpdate,
}),
}),
);
Object.defineProperty(observationRepository, 'manager', {
value: { transaction: observationRunInTransaction },
});
memory = new N8nMemory(
threadRepository,
messageRepository,
@ -324,111 +354,22 @@ describe('N8nMemory', () => {
});
});
describe('working memory — thread scope', () => {
it('stores thread-scoped working memory on thread metadata', async () => {
const existing = {
id: 'thread-1',
resourceId: 'user-1',
title: null,
metadata: JSON.stringify({ other: true }),
} as unknown as AgentThreadEntity;
threadRepository.findOneBy.mockResolvedValue(existing);
threadRepository.save.mockImplementation(async (entity) => entity as AgentThreadEntity);
// ── Observation log ──────────────────────────────────────────────────
await memory.saveWorkingMemory(
{ threadId: 'thread-1', resourceId: 'user-1', scope: 'thread' },
'# Thread memory',
);
expect(threadRepository.save).toHaveBeenCalledWith(
expect.objectContaining({
metadata: JSON.stringify({ other: true, workingMemory: '# Thread memory' }),
}),
);
expect(resourceRepository.save).not.toHaveBeenCalled();
});
it('creates the thread row when working memory is saved before messages', async () => {
threadRepository.findOneBy.mockResolvedValue(null);
threadRepository.create.mockImplementation((entity) => entity as AgentThreadEntity);
threadRepository.save.mockImplementation(async (entity) => entity as AgentThreadEntity);
resourceRepository.existsBy.mockResolvedValue(true);
await memory.saveWorkingMemory(
{ threadId: 'thread-1', resourceId: 'user-1', scope: 'thread' },
'# Thread memory',
);
expect(resourceRepository.existsBy).toHaveBeenCalledWith({ id: 'user-1' });
expect(threadRepository.create).toHaveBeenCalledWith({
id: 'thread-1',
resourceId: 'user-1',
title: null,
metadata: JSON.stringify({ workingMemory: '# Thread memory' }),
});
expect(threadRepository.save).toHaveBeenCalledWith(
expect.objectContaining({
id: 'thread-1',
resourceId: 'user-1',
metadata: JSON.stringify({ workingMemory: '# Thread memory' }),
}),
);
});
it('isolates thread-scoped working memory by user-scoped test-chat thread id', async () => {
const threads = new Map<string, AgentThreadEntity>([
[
'test-agent-1:user-1',
{
id: 'test-agent-1:user-1',
metadata: JSON.stringify({ workingMemory: 'alice notes' }),
} as unknown as AgentThreadEntity,
],
[
'test-agent-1:user-2',
{
id: 'test-agent-1:user-2',
metadata: JSON.stringify({ workingMemory: 'bob notes' }),
} as unknown as AgentThreadEntity,
],
]);
threadRepository.findOneBy.mockImplementation(
async ({ id }: { id: string }) => threads.get(id) ?? null,
);
await expect(
memory.getWorkingMemory({
threadId: 'test-agent-1:user-1',
resourceId: 'user-1',
scope: 'thread',
}),
).resolves.toBe('alice notes');
await expect(
memory.getWorkingMemory({
threadId: 'test-agent-1:user-2',
resourceId: 'user-2',
scope: 'thread',
}),
).resolves.toBe('bob notes');
});
});
// ── Observational memory ─────────────────────────────────────────────
function makeNewObs(overrides: Partial<NewObservation> = {}): NewObservation {
function makeNewObservationLogEntry(
overrides: Partial<NewObservationLogEntry> = {},
): NewObservationLogEntry {
return {
scopeKind: 'resource',
scopeId: 't-1',
kind: 'observation',
payload: { text: 'hello' },
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
marker: 'important',
text: 'hello',
createdAt: new Date('2026-05-05T00:00:00Z'),
...overrides,
};
}
describe('appendObservations', () => {
describe('appendObservationLogEntries', () => {
beforeEach(() => {
observationRepository.create.mockImplementation(
(input) => ({ ...input }) as AgentObservationEntity,
@ -436,42 +377,56 @@ describe('N8nMemory', () => {
});
it('returns [] for an empty input without touching the repo', async () => {
const result = await memory.appendObservations([]);
const result = await memory.appendObservationLogEntries([]);
expect(result).toEqual([]);
expect(observationRepository.findOne).not.toHaveBeenCalled();
expect(observationRepository.create).not.toHaveBeenCalled();
expect(observationRepository.save).not.toHaveBeenCalled();
});
it('persists rows without allocating a sequence number', async () => {
it('persists active marker rows with a default token count', async () => {
(observationRepository.save as unknown as jest.Mock).mockImplementation(
async (input: AgentObservationEntity | AgentObservationEntity[]) =>
(Array.isArray(input) ? input : [input]).map((e, i) => ({
...e,
id: `obs-${i + 1}`,
createdAt: e.createdAt ?? new Date('2026-05-05T00:00:00Z'),
updatedAt: e.updatedAt ?? new Date('2026-05-05T00:00:00Z'),
})),
);
const result = await memory.appendObservations([makeNewObs(), makeNewObs()]);
const result = await memory.appendObservationLogEntries([
makeNewObservationLogEntry(),
makeNewObservationLogEntry({ marker: 'critical', text: 'remember this' }),
]);
expect(observationRepository.findOne).not.toHaveBeenCalled();
expect(observationRepository.create).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
marker: 'important',
text: 'hello',
parentId: null,
tokenCount: estimateObservationTokens('hello'),
status: 'active',
supersededBy: null,
}),
);
expect(result.map((r) => r.id)).toEqual(['obs-1', 'obs-2']);
});
});
describe('getObservations', () => {
describe('getObservationLog', () => {
beforeEach(() => {
observationRepository.find.mockResolvedValue([]);
});
it('passes filters through to find()', async () => {
const sinceCreatedAt = new Date('2026-05-05T00:00:00Z');
await memory.getObservations({
await memory.getObservationLog({
scopeKind: 'resource',
scopeId: 't-1',
since: { sinceCreatedAt, sinceObservationId: 'obs-anchor' },
kindIs: 'summary',
schemaVersionAtMost: 1,
status: 'active',
parentId: null,
limit: 10,
order: 'desc',
});
expect(observationRepository.find).toHaveBeenCalledWith({
@ -479,64 +434,122 @@ describe('N8nMemory', () => {
{
scopeKind: 'resource',
scopeId: 't-1',
kind: 'summary',
schemaVersion: LessThanOrEqual(1),
createdAt: MoreThan(sinceCreatedAt),
},
{
scopeKind: 'resource',
scopeId: 't-1',
kind: 'summary',
schemaVersion: LessThanOrEqual(1),
createdAt: Equal(sinceCreatedAt),
id: MoreThan('obs-anchor'),
status: 'active',
parentId: IsNull(),
},
],
order: { createdAt: 'ASC', id: 'ASC' },
order: { createdAt: 'DESC', id: 'DESC' },
take: 10,
});
});
it('omits absent filters', async () => {
await memory.getObservations({ scopeKind: 'resource', scopeId: 't-1' });
it('active read filters out non-active rows', async () => {
await memory.getActiveObservationLog({ scopeKind: 'resource', scopeId: 't-1' });
expect(observationRepository.find).toHaveBeenCalledWith({
where: [{ scopeKind: 'resource', scopeId: 't-1' }],
where: [{ scopeKind: 'resource', scopeId: 't-1', status: 'active' }],
order: { createdAt: 'ASC', id: 'ASC' },
});
});
it('coerces bigint columns back to numbers on read', async () => {
it('maps persisted rows to observation log entries', async () => {
observationRepository.find.mockResolvedValue([
{
id: 'obs-1',
scopeKind: 'resource',
scopeId: 't-1',
kind: 'observation',
payload: { text: 'hi' },
durationMs: '1000' as unknown as number | null,
schemaVersion: '1' as unknown as number,
marker: 'important',
text: 'hi',
parentId: null,
tokenCount: '7' as unknown as number,
status: 'active',
supersededBy: null,
createdAt: new Date('2026-05-05T00:00:00Z'),
updatedAt: new Date('2026-05-05T00:00:00Z'),
} as AgentObservationEntity,
]);
const [row] = await memory.getObservations({ scopeKind: 'resource', scopeId: 't-1' });
expect(row.durationMs).toBe(1000);
expect(row.schemaVersion).toBe(1);
const [row] = await memory.getObservationLog({ scopeKind: 'resource', scopeId: 't-1' });
expect(row).toMatchObject({
id: 'obs-1',
marker: 'important',
text: 'hi',
tokenCount: 7,
status: 'active',
});
});
});
describe('deleteObservations', () => {
it('issues a single delete with the given ids', async () => {
await memory.deleteObservations(['a', 'b']);
describe('observation log status updates', () => {
it('marks rows as dropped instead of deleting them', async () => {
await memory.dropObservationLogEntries(['a', 'b']);
expect(observationRepository.delete).toHaveBeenCalledWith({ id: In(['a', 'b']) });
expect(observationRepository.update).toHaveBeenCalledWith(
{ id: In(['a', 'b']) },
{ status: 'dropped', supersededBy: null },
);
});
it('no-ops on empty input', async () => {
await memory.deleteObservations([]);
expect(observationRepository.delete).not.toHaveBeenCalled();
it('marks rows as superseded by a replacement row', async () => {
await memory.supersedeObservationLogEntries(['a', 'b'], 'replacement');
expect(observationRepository.update).toHaveBeenCalledWith(
{ id: In(['a', 'b']) },
{ status: 'superseded', supersededBy: 'replacement' },
);
});
it('no-ops on empty update inputs', async () => {
await memory.dropObservationLogEntries([]);
await memory.supersedeObservationLogEntries([], 'replacement');
expect(observationRepository.update).not.toHaveBeenCalled();
});
});
describe('applyObservationLogReflection', () => {
it('inserts merged replacements and updates old rows in one transaction', async () => {
const result = await memory.applyObservationLogReflection(
{ scopeKind: 'thread', scopeId: 't-1' },
{
drop: ['drop-1'],
merge: [
{
supersedes: ['old-1', 'old-2'],
marker: 'important',
text: 'Merged observation',
},
],
},
);
expect(observationRunInTransaction).toHaveBeenCalledWith(expect.any(Function));
expect(transactionObservationCreate).toHaveBeenCalledWith(
expect.objectContaining({
scopeKind: 'thread',
scopeId: 't-1',
marker: 'important',
text: 'Merged observation',
parentId: null,
tokenCount: estimateObservationTokens('Merged observation'),
status: 'active',
supersededBy: null,
}),
);
expect(transactionObservationUpdate).toHaveBeenNthCalledWith(
1,
{ scopeKind: 'thread', scopeId: 't-1', id: In(['drop-1']) },
{ status: 'dropped', supersededBy: null },
);
expect(transactionObservationUpdate).toHaveBeenNthCalledWith(
2,
{ scopeKind: 'thread', scopeId: 't-1', id: In(['old-1', 'old-2']) },
{ status: 'superseded', supersededBy: 'merged-1' },
);
expect(result).toMatchObject({
droppedIds: ['drop-1'],
supersededIds: ['old-1', 'old-2'],
inserted: [{ id: 'merged-1', status: 'active' }],
});
});
});
@ -634,6 +647,7 @@ describe('N8nMemory', () => {
claimed: {
scopeKind: 'thread',
scopeId: 't-1',
taskKind: 'observer',
holderId: 'A',
heldUntil: new Date(Date.now() + 60_000),
} as AgentObservationLockEntity,
@ -685,6 +699,9 @@ describe('N8nMemory', () => {
expect(updateQueryBuilder.andWhere).toHaveBeenCalledWith(
'("holderId" = :holderId OR "heldUntil" <= :now)',
);
expect(updateQueryBuilder.set).toHaveBeenCalledWith(
expect.objectContaining({ taskKind: 'observer', holderId: 'B' }),
);
expect(observationLockRepository.save).not.toHaveBeenCalled();
});
@ -709,6 +726,7 @@ describe('N8nMemory', () => {
expect(observationLockRepository.delete).toHaveBeenCalledWith({
scopeKind: 'resource',
scopeId: 't-1',
taskKind: 'observer',
holderId: 'A',
});
});

View File

@ -2,18 +2,22 @@ import type {
AgentDbMessage,
AgentMessage,
BuiltMemory,
BuiltObservationStore,
BuiltObservationLogStore,
MemoryDescriptor,
NewObservation,
Observation,
NewObservationLogEntry,
ObservationCursor,
ObservationLogEntry,
ObservationLogReadOptions,
ObservationLogReflection,
ObservationLogReflectionResult,
ObservationLogScope,
ObservationLogScopeKind,
ObservationLockHandle,
ScopeKind,
Thread,
} from '@n8n/agents';
import { Service } from '@n8n/di';
import type { FindOptionsWhere } from '@n8n/typeorm';
import { Equal, In, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm';
import { Equal, In, IsNull, LessThan, Like, MoreThan } from '@n8n/typeorm';
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
import { UnexpectedError } from 'n8n-workflow';
@ -29,11 +33,10 @@ import { AgentObservationRepository } from '../repositories/agent-observation.re
import { AgentResourceRepository } from '../repositories/agent-resource.repository';
import { AgentThreadRepository } from '../repositories/agent-thread.repository';
/** Key inside the metadata JSON where working memory content is stored. */
const WORKING_MEMORY_KEY = 'workingMemory';
const estimateObservationTokens = (text: string) => Math.ceil(text.length / 4);
@Service()
export class N8nMemory implements BuiltMemory, BuiltObservationStore {
export class N8nMemory implements BuiltMemory, BuiltObservationLogStore {
constructor(
private readonly threadRepository: AgentThreadRepository,
private readonly messageRepository: AgentMessageRepository,
@ -134,11 +137,7 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
entities.reverse();
}
return entities.map((e) => {
const msg = e.content as AgentMessage & { id?: string };
msg.id = e.id;
return msg as AgentDbMessage;
});
return entities.map((e) => this.toAgentDbMessage(e));
}
async saveMessages(args: {
@ -193,83 +192,74 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
resourceId: string;
scope: 'resource' | 'thread';
}): Promise<string | null> {
if (params.scope === 'resource') {
const resource = await this.resourceRepository.findOneBy({ id: params.resourceId });
return this.extractWorkingMemory(resource?.metadata ?? null);
}
const thread = await this.threadRepository.findOneBy({ id: params.threadId });
return this.extractWorkingMemory(thread?.metadata ?? null);
void params;
// Legacy `workingMemory` metadata is intentionally ignored. The new
// observation-log pipeline will own memory state.
return null;
}
async saveWorkingMemory(
params: { threadId: string; resourceId: string; scope: 'resource' | 'thread' },
content: string,
): Promise<void> {
if (params.scope === 'resource') {
await this.upsertResourceMetadata(params.resourceId, content);
} else {
await this.upsertThreadMetadata(params.threadId, params.resourceId, content);
}
void params;
void content;
// Legacy `workingMemory` metadata is intentionally ignored. The new
// observation-log pipeline will own memory state.
}
// ── Observational memory: data ───────────────────────────────────────
// ── Observation log ──────────────────────────────────────────────────
async appendObservations(rows: NewObservation[]): Promise<Observation[]> {
async appendObservationLogEntries(
rows: NewObservationLogEntry[],
): Promise<ObservationLogEntry[]> {
if (rows.length === 0) return [];
const entities: AgentObservationEntity[] = rows.map((row) =>
this.observationRepository.create({
scopeKind: row.scopeKind,
scopeId: row.scopeId,
kind: row.kind,
payload: row.payload,
durationMs: row.durationMs,
schemaVersion: row.schemaVersion,
marker: row.marker,
text: row.text,
parentId: row.parentId ?? null,
tokenCount: row.tokenCount ?? estimateObservationTokens(row.text),
status: 'active',
supersededBy: null,
createdAt: row.createdAt,
}),
);
const saved = await this.observationRepository.save(entities);
return saved.map((e) => this.toObservation(e));
return saved.map((e) => this.toObservationLogEntry(e));
}
async getObservations(opts: {
scopeKind: ScopeKind;
scopeId: string;
since?: { sinceCreatedAt: Date; sinceObservationId: string };
kindIs?: string;
limit?: number;
schemaVersionAtMost?: number;
}): Promise<Observation[]> {
async getActiveObservationLog(
scope: ObservationLogScope & { limit?: number; order?: 'asc' | 'desc' },
): Promise<ObservationLogEntry[]> {
return await this.getObservationLog({ ...scope, status: 'active' });
}
async getObservationLog(opts: ObservationLogReadOptions): Promise<ObservationLogEntry[]> {
const baseWhere: FindOptionsWhere<AgentObservationEntity> = {
scopeKind: opts.scopeKind,
scopeId: opts.scopeId,
...(opts.kindIs !== undefined && { kind: opts.kindIs }),
...(opts.schemaVersionAtMost !== undefined && {
schemaVersion: LessThanOrEqual(opts.schemaVersionAtMost),
}),
...(opts.status !== undefined && { status: opts.status }),
...(opts.parentId !== undefined && { parentId: opts.parentId ?? IsNull() }),
};
const where: FindOptionsWhere<AgentObservationEntity>[] = opts.since
? [
{ ...baseWhere, createdAt: MoreThan(opts.since.sinceCreatedAt) },
{
...baseWhere,
createdAt: Equal(opts.since.sinceCreatedAt),
id: MoreThan(opts.since.sinceObservationId),
},
]
: [baseWhere];
const entities = await this.observationRepository.find({
where,
order: { createdAt: 'ASC', id: 'ASC' },
where: [baseWhere],
order: {
createdAt: opts.order === 'desc' ? 'DESC' : 'ASC',
id: opts.order === 'desc' ? 'DESC' : 'ASC',
},
...(opts.limit !== undefined && { take: opts.limit }),
});
return entities.map((e) => this.toObservation(e));
return entities.map((e) => this.toObservationLogEntry(e));
}
async getMessagesForScope(
scopeKind: ScopeKind,
scopeKind: ObservationLogScopeKind,
scopeId: string,
opts?: { since?: { sinceCreatedAt: Date; sinceMessageId: string } },
): Promise<AgentDbMessage[]> {
@ -295,21 +285,80 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
where,
order: { createdAt: 'ASC', id: 'ASC' },
});
return entities.map((e) => {
const msg = e.content as AgentMessage & { id?: string };
msg.id = e.id;
return msg as AgentDbMessage;
});
return entities.map((e) => this.toAgentDbMessage(e));
}
async deleteObservations(ids: string[]): Promise<void> {
async dropObservationLogEntries(ids: string[]): Promise<void> {
if (ids.length === 0) return;
await this.observationRepository.delete({ id: In(ids) });
await this.observationRepository.update(
{ id: In(ids) },
{ status: 'dropped', supersededBy: null },
);
}
async supersedeObservationLogEntries(ids: string[], supersededBy: string): Promise<void> {
if (ids.length === 0) return;
await this.observationRepository.update(
{ id: In(ids) },
{ status: 'superseded', supersededBy },
);
}
async applyObservationLogReflection(
scope: ObservationLogScope,
reflection: ObservationLogReflection,
): Promise<ObservationLogReflectionResult> {
return await this.observationRepository.manager.transaction(async (trx) => {
const repo = trx.getRepository(AgentObservationEntity);
const inserted = reflection.merge.length
? await repo.save(
reflection.merge.map((entry) =>
repo.create({
scopeKind: scope.scopeKind,
scopeId: scope.scopeId,
marker: entry.marker,
text: entry.text,
parentId: entry.parentId ?? null,
tokenCount: entry.tokenCount ?? estimateObservationTokens(entry.text),
status: 'active',
supersededBy: null,
createdAt: entry.createdAt,
}),
),
)
: [];
if (reflection.drop.length > 0) {
await repo.update(
{ scopeKind: scope.scopeKind, scopeId: scope.scopeId, id: In(reflection.drop) },
{ status: 'dropped', supersededBy: null },
);
}
for (const [index, merge] of reflection.merge.entries()) {
const replacement = inserted[index];
if (replacement && merge.supersedes.length > 0) {
await repo.update(
{ scopeKind: scope.scopeKind, scopeId: scope.scopeId, id: In(merge.supersedes) },
{ status: 'superseded', supersededBy: replacement.id },
);
}
}
return {
droppedIds: [...reflection.drop],
supersededIds: reflection.merge.flatMap((entry) => entry.supersedes),
inserted: inserted.map((entry) => this.toObservationLogEntry(entry)),
};
});
}
// ── Observational memory: cursors ────────────────────────────────────
async getCursor(scopeKind: ScopeKind, scopeId: string): Promise<ObservationCursor | null> {
async getCursor(
scopeKind: ObservationLogScopeKind,
scopeId: string,
): Promise<ObservationCursor | null> {
const entity = await this.observationCursorRepository.findOneBy({ scopeKind, scopeId });
if (!entity) return null;
return {
@ -321,7 +370,9 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
};
}
async setCursor(cursor: ObservationCursor): Promise<void> {
async setCursor(
cursor: ObservationCursor & { scopeKind: ObservationLogScopeKind },
): Promise<void> {
await this.observationCursorRepository.upsert(
{
scopeKind: cursor.scopeKind,
@ -337,21 +388,23 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
// ── Observational memory: locks ──────────────────────────────────────
async acquireObservationLock(
scopeKind: ScopeKind,
scopeKind: ObservationLogScopeKind,
scopeId: string,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLockHandle | null> {
const now = new Date();
const heldUntil = new Date(now.getTime() + opts.ttlMs);
const taskKind = 'observer';
const updateResult = await this.observationLockRepository
.createQueryBuilder()
.update(AgentObservationLockEntity)
.set({ holderId: opts.holderId, heldUntil })
.set({ taskKind, holderId: opts.holderId, heldUntil })
.where('"scopeKind" = :scopeKind')
.andWhere('"scopeId" = :scopeId')
.andWhere('"taskKind" = :taskKind')
.andWhere('("holderId" = :holderId OR "heldUntil" <= :now)')
.setParameters({ scopeKind, scopeId, holderId: opts.holderId, now })
.setParameters({ scopeKind, scopeId, taskKind, holderId: opts.holderId, now })
.execute();
if ((updateResult.affected ?? 0) > 0) {
@ -362,13 +415,14 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
.createQueryBuilder()
.insert()
.into(AgentObservationLockEntity)
.values({ scopeKind, scopeId, holderId: opts.holderId, heldUntil })
.values({ scopeKind, scopeId, taskKind, holderId: opts.holderId, heldUntil })
.orIgnore()
.execute();
const claimed = await this.observationLockRepository.findOneBy({
scopeKind,
scopeId,
taskKind,
holderId: opts.holderId,
});
if (!claimed) return null;
@ -376,10 +430,13 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
return { scopeKind, scopeId, holderId: opts.holderId, heldUntil };
}
async releaseObservationLock(handle: ObservationLockHandle): Promise<void> {
async releaseObservationLock(
handle: ObservationLockHandle & { scopeKind: ObservationLogScopeKind },
): Promise<void> {
await this.observationLockRepository.delete({
scopeKind: handle.scopeKind,
scopeId: handle.scopeId,
taskKind: 'observer',
holderId: handle.holderId,
});
}
@ -392,15 +449,24 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
// ── Helpers ──────────────────────────────────────────────────────────
private toObservation(entity: AgentObservationEntity): Observation {
private toAgentDbMessage(entity: AgentMessageEntity): AgentDbMessage {
const msg = entity.content as AgentMessage & { id?: string; createdAt?: Date };
msg.id = entity.id;
msg.createdAt = entity.createdAt;
return msg as AgentDbMessage;
}
private toObservationLogEntry(entity: AgentObservationEntity): ObservationLogEntry {
return {
id: entity.id,
scopeKind: entity.scopeKind,
scopeId: entity.scopeId,
kind: entity.kind,
payload: entity.payload as Observation['payload'],
durationMs: entity.durationMs === null ? null : Number(entity.durationMs),
schemaVersion: Number(entity.schemaVersion),
marker: entity.marker,
text: entity.text,
parentId: entity.parentId,
tokenCount: Number(entity.tokenCount),
status: entity.status,
supersededBy: entity.supersededBy,
createdAt: entity.createdAt,
};
}
@ -423,65 +489,4 @@ export class N8nMemory implements BuiltMemory, BuiltObservationStore {
updatedAt: entity.updatedAt,
};
}
private extractWorkingMemory(metadataJson: string | null): string | null {
if (!metadataJson) return null;
try {
const parsed = JSON.parse(metadataJson) as Record<string, unknown>;
const wm = parsed[WORKING_MEMORY_KEY];
return typeof wm === 'string' ? wm : null;
} catch {
return null;
}
}
private mergeWorkingMemory(existingJson: string | null, content: string): string {
let parsed: Record<string, unknown> = {};
if (existingJson) {
try {
parsed = JSON.parse(existingJson) as Record<string, unknown>;
} catch {
// start fresh on corrupt JSON
}
}
parsed[WORKING_MEMORY_KEY] = content;
return JSON.stringify(parsed);
}
private async upsertResourceMetadata(resourceId: string, content: string): Promise<void> {
const existing = await this.resourceRepository.findOneBy({ id: resourceId });
if (existing) {
existing.metadata = this.mergeWorkingMemory(existing.metadata, content);
await this.resourceRepository.save(existing);
} else {
const entity = this.resourceRepository.create({
id: resourceId,
metadata: this.mergeWorkingMemory(null, content),
});
await this.resourceRepository.save(entity);
}
}
private async upsertThreadMetadata(
threadId: string,
resourceId: string,
content: string,
): Promise<void> {
const existing = await this.threadRepository.findOneBy({ id: threadId });
if (existing) {
existing.metadata = this.mergeWorkingMemory(existing.metadata, content);
await this.threadRepository.save(existing);
return;
}
await this.ensureResource(resourceId);
await this.threadRepository.save(
this.threadRepository.create({
id: threadId,
resourceId,
title: null,
metadata: this.mergeWorkingMemory(null, content),
}),
);
}
}

View File

@ -1323,7 +1323,6 @@
"agentSessions.detail.cost": "Cost",
"agentSessions.detail.toolCallSuspended": "Tool call suspended — waiting for user action",
"agentSessions.detail.suspendedNote": "This message was part of a tool call suspension. Token usage and cost are reported on the resumed message.",
"agentSessions.detail.workingMemory": "Memory Update",
"agentSessions.detail.builtInTools": "Built-in",
"agentSessions.detail.userTools": "Tools",
"agentSessions.detail.richInteraction.userResponse": "User responded",

View File

@ -158,7 +158,6 @@ function exec(overrides: Partial<AgentExecution> = {}): AgentExecution {
timeline: null,
error: null,
hitlStatus: null,
workingMemory: null,
source: null,
...overrides,
};

View File

@ -56,7 +56,6 @@ export interface AgentExecution {
timeline: AgentExecutionTimelineEvent[] | null;
error: string | null;
hitlStatus: AgentExecutionHitlStatus | null;
workingMemory: string | null;
source: string | null;
}