feat(core): Add observational memory storage foundation (#29814)

This commit is contained in:
bjorger 2026-05-11 11:01:44 +02:00 committed by GitHub
parent f4e8088cb8
commit be4ef22533
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 2382 additions and 284 deletions

View File

@ -70,8 +70,7 @@ docs/
```
The **`index.ts`** surface also exports `Workspace` / sandbox / filesystem types,
`SqliteMemory` / `PostgresMemory`, `LangSmithTelemetry`, and `evals` alongside the
core SDK builders.
`InMemoryMemory`, `LangSmithTelemetry`, and `evals` alongside the core SDK builders.
Optional **peer dependencies** (telemetry): `langsmith`, `@opentelemetry/sdk-trace-node`,
`@opentelemetry/sdk-trace-base`, `@opentelemetry/exporter-trace-otlp-http` — all

View File

@ -0,0 +1,95 @@
import { InMemoryMemory } from '../runtime/memory-store';
import type { AgentDbMessage, AgentMessage, Message } from '../types/sdk/message';
function makeMsg(role: 'user' | 'assistant', text: string, createdAt = new Date()): AgentDbMessage {
return {
id: crypto.randomUUID(),
createdAt,
role,
content: [{ type: 'text', text }],
};
}
function textOf(msg: AgentMessage): string {
const m = msg as Message;
return (m.content[0] as { text: string }).text;
}
describe('InMemoryMemory — message keyset reads', () => {
it('returns messages ordered by (createdAt, id) ascending', async () => {
const mem = new InMemoryMemory();
const t = Date.now();
await mem.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'one', new Date(t)), makeMsg('assistant', 'two', new Date(t + 1))],
});
await mem.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'three', new Date(t + 2))],
});
const all = await mem.getMessages('t-1');
expect(all.map(textOf)).toEqual(['one', 'two', 'three']);
});
it('upsert by id preserves identity (re-saving the same id does not duplicate)', async () => {
const mem = new InMemoryMemory();
const original = makeMsg('user', 'original');
await mem.saveMessages({ threadId: 't-1', resourceId: 'u-1', messages: [original] });
const edited: AgentDbMessage = {
id: original.id,
createdAt: original.createdAt,
role: 'user',
content: [{ type: 'text', text: 'edited' }],
};
await mem.saveMessages({ threadId: 't-1', resourceId: 'u-1', messages: [edited] });
const all = await mem.getMessages('t-1');
expect(all).toHaveLength(1);
expect(textOf(all[0])).toBe('edited');
});
it('filters by since (createdAt, id) keyset', async () => {
const mem = new InMemoryMemory();
const t = Date.now();
await mem.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [
makeMsg('user', 'a', new Date(t)),
makeMsg('assistant', 'b', new Date(t + 1)),
makeMsg('user', 'c', new Date(t + 2)),
],
});
const all = await mem.getMessages('t-1');
const tail = await mem.getMessages('t-1', {
since: { sinceCreatedAt: all[0].createdAt, sinceMessageId: all[0].id },
});
expect(tail.map(textOf)).toEqual(['b', 'c']);
const empty = await mem.getMessages('t-1', {
since: { sinceCreatedAt: all[2].createdAt, sinceMessageId: all[2].id },
});
expect(empty).toEqual([]);
});
it('keyset since includes rows sharing createdAt with the anchor when id is greater', async () => {
const mem = new InMemoryMemory();
const at = new Date();
const m1 = makeMsg('user', 'a', at);
const m2 = makeMsg('user', 'b', at);
await mem.saveMessages({ threadId: 't-1', resourceId: 'u-1', messages: [m1, m2] });
const [low, high] = [m1, m2].sort((a, b) => (a.id < b.id ? -1 : 1));
const tail = await mem.getMessages('t-1', {
since: { sinceCreatedAt: low.createdAt, sinceMessageId: low.id },
});
expect(tail).toHaveLength(1);
expect(tail[0].id).toBe(high.id);
});
});

View File

@ -4,56 +4,102 @@ import type { AgentDbMessage, Message } from '../types/sdk/message';
describe('InMemoryMemory working memory', () => {
it('returns null for unknown key', async () => {
const mem = new InMemoryMemory();
expect(await mem.getWorkingMemory({ threadId: 'thread-x', resourceId: 'unknown' })).toBeNull();
expect(
await mem.getWorkingMemory({
threadId: 'thread-x',
resourceId: 'unknown',
scope: 'resource',
}),
).toBeNull();
});
it('saves and retrieves working memory keyed by resourceId', async () => {
const mem = new InMemoryMemory();
await mem.saveWorkingMemory(
{ threadId: 'thread-1', resourceId: 'user-1' },
'# Context\n- Name: Alice',
);
expect(await mem.getWorkingMemory({ threadId: 'thread-1', resourceId: 'user-1' })).toBe(
{ threadId: 'thread-1', resourceId: 'user-1', scope: 'resource' },
'# Context\n- Name: Alice',
);
expect(
await mem.getWorkingMemory({ threadId: 'thread-1', resourceId: 'user-1', scope: 'resource' }),
).toBe('# Context\n- Name: Alice');
});
it('overwrites on subsequent save', async () => {
const mem = new InMemoryMemory();
await mem.saveWorkingMemory({ threadId: 'thread-1', resourceId: 'user-1' }, 'v1');
await mem.saveWorkingMemory({ threadId: 'thread-1', resourceId: 'user-1' }, 'v2');
expect(await mem.getWorkingMemory({ threadId: 'thread-1', resourceId: 'user-1' })).toBe('v2');
await mem.saveWorkingMemory(
{ threadId: 'thread-1', resourceId: 'user-1', scope: 'resource' },
'v1',
);
await mem.saveWorkingMemory(
{ threadId: 'thread-1', resourceId: 'user-1', scope: 'resource' },
'v2',
);
expect(
await mem.getWorkingMemory({ threadId: 'thread-1', resourceId: 'user-1', scope: 'resource' }),
).toBe('v2');
});
it('isolates by resourceId (resource scope)', async () => {
const mem = new InMemoryMemory();
await mem.saveWorkingMemory({ threadId: 'thread-a', resourceId: 'user-1' }, 'Alice data');
await mem.saveWorkingMemory({ threadId: 'thread-b', resourceId: 'user-2' }, 'Bob data');
expect(await mem.getWorkingMemory({ threadId: 'thread-a', resourceId: 'user-1' })).toBe(
await mem.saveWorkingMemory(
{ threadId: 'thread-a', resourceId: 'user-1', scope: 'resource' },
'Alice data',
);
expect(await mem.getWorkingMemory({ threadId: 'thread-b', resourceId: 'user-2' })).toBe(
await mem.saveWorkingMemory(
{ threadId: 'thread-b', resourceId: 'user-2', scope: 'resource' },
'Bob data',
);
expect(
await mem.getWorkingMemory({ threadId: 'thread-a', resourceId: 'user-1', scope: 'resource' }),
).toBe('Alice data');
expect(
await mem.getWorkingMemory({ threadId: 'thread-b', resourceId: 'user-2', scope: 'resource' }),
).toBe('Bob data');
});
it('returns null for unknown threadId (thread scope)', async () => {
const mem = new InMemoryMemory();
expect(await mem.getWorkingMemory({ threadId: 'unknown' })).toBeNull();
expect(await mem.getWorkingMemory({ threadId: 'unknown', scope: 'thread' })).toBeNull();
});
it('saves and retrieves working memory keyed by threadId', async () => {
const mem = new InMemoryMemory();
await mem.saveWorkingMemory({ threadId: 'thread-1' }, '# Thread Notes');
expect(await mem.getWorkingMemory({ threadId: 'thread-1' })).toBe('# Thread Notes');
await mem.saveWorkingMemory({ threadId: 'thread-1', scope: 'thread' }, '# Thread Notes');
expect(await mem.getWorkingMemory({ threadId: 'thread-1', scope: 'thread' })).toBe(
'# Thread Notes',
);
});
it('isolates by threadId (thread scope)', async () => {
const mem = new InMemoryMemory();
await mem.saveWorkingMemory({ threadId: 'thread-1' }, 'data for thread 1');
await mem.saveWorkingMemory({ threadId: 'thread-2' }, 'data for thread 2');
expect(await mem.getWorkingMemory({ threadId: 'thread-1' })).toBe('data for thread 1');
expect(await mem.getWorkingMemory({ threadId: 'thread-2' })).toBe('data for thread 2');
await mem.saveWorkingMemory({ threadId: 'thread-1', scope: 'thread' }, 'data for thread 1');
await mem.saveWorkingMemory({ threadId: 'thread-2', scope: 'thread' }, 'data for thread 2');
expect(await mem.getWorkingMemory({ threadId: 'thread-1', scope: 'thread' })).toBe(
'data for thread 1',
);
expect(await mem.getWorkingMemory({ threadId: 'thread-2', scope: 'thread' })).toBe(
'data for thread 2',
);
});
it('isolates entries by scope when threadId and resourceId match', async () => {
const mem = new InMemoryMemory();
await mem.saveWorkingMemory({ threadId: 'shared-id', scope: 'thread' }, 'thread memory');
await mem.saveWorkingMemory(
{ threadId: 'thread-1', resourceId: 'shared-id', scope: 'resource' },
'resource memory',
);
expect(await mem.getWorkingMemory({ threadId: 'shared-id', scope: 'thread' })).toBe(
'thread memory',
);
expect(
await mem.getWorkingMemory({
threadId: 'thread-1',
resourceId: 'shared-id',
scope: 'resource',
}),
).toBe('resource memory');
});
});

View File

@ -1,6 +1,3 @@
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import { describe as _describe } from 'vitest';
import { z } from 'zod';
@ -12,7 +9,7 @@ import {
type StreamChunk,
type AgentMessage,
} from '../../index';
import { SqliteMemory } from '../../storage/sqlite-memory';
import { InMemoryMemory } from '../../runtime/memory-store';
export type { StreamChunk };
@ -416,25 +413,18 @@ export const collectTextDeltas = (chunks: StreamChunk[]): string => {
};
export function createSqliteMemory(): {
memory: SqliteMemory;
memory: InMemoryMemory;
cleanup: () => void;
url: string;
} {
const dbPath = path.join(
os.tmpdir(),
`test-${Date.now()}-${Math.random().toString(36).slice(2)}.db`,
);
const url = `file:${dbPath}`;
const memory = new SqliteMemory({ url });
// In-memory backend; the `url` field is kept on the return type so existing
// integration tests that reference it (e.g. for "restart" scenarios) keep
// compiling, but it's not load-bearing — InMemoryMemory has no persistence.
return {
memory,
url,
memory: new InMemoryMemory(),
url: '',
cleanup: () => {
try {
fs.unlinkSync(dbPath);
} catch {
// File may already be removed — ignore
}
// no-op for in-memory backend
},
};
}

View File

@ -1,106 +0,0 @@
import { expect, it, afterEach } from 'vitest';
import { Agent, Memory } from '../../../index';
import { SqliteMemory } from '../../../storage/sqlite-memory';
import { describeIf, findLastTextContent, getModel, createSqliteMemory } from '../helpers';
const describe = describeIf('anthropic');
const cleanups: Array<() => void> = [];
afterEach(() => {
cleanups.forEach((fn) => fn());
cleanups.length = 0;
});
describe('freeform working memory', () => {
const template = '# User Context\n- **Name**:\n- **City**:\n- **Pet**:';
it('agent recalls info via working memory across turns', async () => {
const memory = new Memory().storage('memory').lastMessages(10).freeform(template);
const agent = new Agent('freeform-test')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(memory);
const threadId = `freeform-${Date.now()}`;
const options = { persistence: { threadId, resourceId: 'test-user' } };
await agent.generate('My name is Alice and I live in Berlin.', options);
const result = await agent.generate('What city do I live in?', options);
expect(findLastTextContent(result.messages)?.toLowerCase()).toContain('berlin');
});
it('working memory is updated when new information is provided', async () => {
const memory = new Memory().storage('memory').lastMessages(10).freeform(template);
const agent = new Agent('wm-update-test')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(memory);
const threadId = `wm-update-${Date.now()}`;
const options = { persistence: { threadId, resourceId: 'test-user' } };
const result = await agent.generate('My name is Bob.', options);
const toolCalls = result.messages.flatMap((m) =>
'content' in m ? m.content.filter((c) => c.type === 'tool-call') : [],
) as Array<{ type: 'tool-call'; toolName: string }>;
const wmToolCall = toolCalls.find((c) => c.toolName === 'updateWorkingMemory');
expect(wmToolCall).toBeDefined();
});
it('working memory persists across threads with same resourceId', async () => {
const { memory, cleanup } = createSqliteMemory();
cleanups.push(cleanup);
const mem = new Memory().storage(memory).lastMessages(10).freeform(template);
const agent = new Agent('cross-thread-test')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(mem);
const resourceId = `user-${Date.now()}`;
await agent.generate('My name is Charlie and I have a dog named Rex.', {
persistence: { threadId: `thread-1-${Date.now()}`, resourceId },
});
const result = await agent.generate("What's my dog's name?", {
persistence: { threadId: `thread-2-${Date.now()}`, resourceId },
});
expect(findLastTextContent(result.messages)?.toLowerCase()).toContain('rex');
});
it('working memory survives SqliteMemory restart', async () => {
const { memory, cleanup, url } = createSqliteMemory();
cleanups.push(cleanup);
const mem = new Memory().storage(memory).lastMessages(10).freeform(template);
const agent1 = new Agent('restart-wm-1')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(mem);
const resourceId = `user-${Date.now()}`;
const threadId = `restart-wm-${Date.now()}`;
await agent1.generate('My name is Diana.', { persistence: { threadId, resourceId } });
const memory2 = new SqliteMemory({ url });
const mem2 = new Memory().storage(memory2).lastMessages(10).freeform(template);
const agent2 = new Agent('restart-wm-2')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(mem2);
const result = await agent2.generate('What is my name?', {
persistence: { threadId: `new-thread-${Date.now()}`, resourceId },
});
expect(findLastTextContent(result.messages)?.toLowerCase()).toContain('diana');
});
});

View File

@ -1,105 +0,0 @@
import { describe as _describe, expect, it, afterEach } from 'vitest';
import { Agent, Memory } from '../../../index';
import { SqliteMemory } from '../../../storage/sqlite-memory';
import { describeIf, findLastTextContent, getModel, createSqliteMemory } from '../helpers';
const describe = describeIf('anthropic');
const cleanups: Array<() => void> = [];
afterEach(() => {
cleanups.forEach((fn) => fn());
cleanups.length = 0;
});
_describe('SqliteMemory saveThread upsert', () => {
it('preserves existing title and metadata when not provided', async () => {
const { memory, cleanup } = createSqliteMemory();
cleanups.push(cleanup);
await memory.saveThread({
id: 'upsert-t1',
resourceId: 'user-1',
title: 'Original Title',
metadata: { key: 'value' },
});
// Upsert without title or metadata (simulates saveMessagesToThread)
await memory.saveThread({ id: 'upsert-t1', resourceId: 'user-1' });
const thread = await memory.getThread('upsert-t1');
expect(thread).not.toBeNull();
expect(thread!.title).toBe('Original Title');
expect(thread!.metadata).toEqual({ key: 'value' });
});
it('overwrites title and metadata when explicitly provided', async () => {
const { memory, cleanup } = createSqliteMemory();
cleanups.push(cleanup);
await memory.saveThread({
id: 'upsert-t2',
resourceId: 'user-1',
title: 'Old Title',
metadata: { old: true },
});
await memory.saveThread({
id: 'upsert-t2',
resourceId: 'user-1',
title: 'New Title',
metadata: { new: true },
});
const thread = await memory.getThread('upsert-t2');
expect(thread!.title).toBe('New Title');
expect(thread!.metadata).toEqual({ new: true });
});
});
describe('SQLite memory integration', () => {
it('agent recalls info from previous turn with SqliteMemory', async () => {
const { memory, cleanup } = createSqliteMemory();
cleanups.push(cleanup);
const mem = new Memory().storage(memory).lastMessages(10);
const agent = new Agent('sqlite-test')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(mem);
const threadId = `sqlite-${Date.now()}`;
const options = { persistence: { threadId, resourceId: 'test-user' } };
await agent.generate('My favorite number is 42. Just acknowledge.', options);
const result = await agent.generate('What is my favorite number?', options);
expect(findLastTextContent(result.messages)?.toLowerCase()).toContain('42');
});
it('data survives a fresh SqliteMemory instance', async () => {
const { memory, cleanup, url } = createSqliteMemory();
cleanups.push(cleanup);
const mem1 = new Memory().storage(memory).lastMessages(10);
const agent1 = new Agent('persist-test-1')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(mem1);
const threadId = `persist-${Date.now()}`;
const options = { persistence: { threadId, resourceId: 'test-user' } };
await agent1.generate('My favorite animal is a dolphin. Just acknowledge.', options);
// New SqliteMemory instance, same file
const memory2 = new SqliteMemory({ url });
const mem2 = new Memory().storage(memory2).lastMessages(10);
const agent2 = new Agent('persist-test-2')
.model(getModel('anthropic'))
.instructions('You are a helpful assistant. Be concise.')
.memory(mem2);
const result = await agent2.generate('What is my favorite animal?', options);
expect(findLastTextContent(result.messages)?.toLowerCase()).toContain('dolphin');
});
});

View File

@ -0,0 +1,402 @@
import { generateText } from 'ai';
import { expect, it } from 'vitest';
import {
Agent,
type AgentDbMessage,
type BuiltObservationStore,
type CompactFn,
createModel,
Memory,
type Observation,
type ObservationCursor,
OBSERVATION_SCHEMA_VERSION,
type ObserveFn,
} from '../../../index';
import { InMemoryMemory } from '../../../runtime/memory-store';
import { describeIf, findLastTextContent, getModel } from '../helpers';
const describe = describeIf('anthropic');
const WORKING_MEMORY_TEMPLATE = [
'# User Memory',
'- **Location**:',
'- **Project codename**:',
].join('\n');
type ObservationCycleStore = BuiltObservationStore &
Pick<InMemoryMemory, 'getWorkingMemory' | 'saveWorkingMemory'>;
function uniqueId(prefix: string): string {
return `${prefix}-${crypto.randomUUID()}`;
}
function messageText(message: AgentDbMessage): string {
if (!('content' in message) || !Array.isArray(message.content)) {
return `${message.type}: ${JSON.stringify(message)}`;
}
const text = message.content
.map((part) => {
if (part.type === 'text' || part.type === 'reasoning') return part.text;
if (part.type === 'tool-call') return `[tool:${part.toolName}] ${JSON.stringify(part.input)}`;
if (part.type === 'invalid-tool-call') return `[invalid-tool:${part.name ?? 'unknown'}]`;
if (part.type === 'file') return `[file:${part.mediaType ?? 'unknown'}]`;
if (part.type === 'citation') return `[citation:${part.title ?? part.url ?? 'unknown'}]`;
if (part.type === 'provider') return JSON.stringify(part.value);
return '';
})
.filter(Boolean)
.join(' ');
return `${message.role}: ${text}`;
}
function observationText(observation: Observation): string {
const payload = observation.payload;
if (payload !== null && typeof payload === 'object' && !Array.isArray(payload)) {
const text = (payload as Record<string, unknown>).text;
if (typeof text === 'string') return text;
}
return JSON.stringify(payload);
}
function observeWithModel(model: string): ObserveFn {
return async ({ deltaMessages, threadId, now }) => {
const transcript = deltaMessages.map(messageText).join('\n');
const { text } = await generateText({
model: createModel(model),
temperature: 0,
system: [
'Extract durable user facts from the transcript.',
'Return one concise observation sentence.',
'Preserve exact names, places, and codes.',
'If there are no durable facts, return NONE.',
].join(' '),
prompt: transcript,
});
const content = text.trim();
if (content.toUpperCase() === 'NONE') return [];
return [
{
scopeKind: 'thread',
scopeId: threadId,
kind: 'user-fact',
payload: { text: content },
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: now,
},
];
};
}
function compactWithModel(model: string): CompactFn {
return async ({ observations, currentWorkingMemory, workingMemoryTemplate }) => {
const observationList = observations.map((observation) => `- ${observationText(observation)}`);
const { text } = await generateText({
model: createModel(model),
temperature: 0,
system: [
'You maintain a concise working-memory document.',
'Return the complete updated document only.',
'Preserve exact names, places, and codes.',
].join(' '),
prompt: [
'Template:',
workingMemoryTemplate,
'',
'Current working memory:',
currentWorkingMemory ?? workingMemoryTemplate,
'',
'New observations:',
observationList.join('\n'),
].join('\n'),
});
return { content: text.trim() };
};
}
async function runObservationCycleForTest({
store,
threadId,
resourceId,
model,
}: {
store: ObservationCycleStore;
threadId: string;
resourceId: string;
model: string;
}): Promise<{
deltaMessages: AgentDbMessage[];
cursorAfter: ObservationCursor | null;
}> {
const handle = await store.acquireObservationLock('thread', threadId, {
holderId: 'observational-memory-integration-test',
ttlMs: 30_000,
});
expect(handle).not.toBeNull();
if (!handle) throw new Error('Failed to acquire observation lock');
try {
const cursor = await store.getCursor('thread', threadId);
const deltaMessages = await store.getMessagesForScope('thread', threadId, {
...(cursor && {
since: {
sinceCreatedAt: cursor.lastObservedAt,
sinceMessageId: cursor.lastObservedMessageId,
},
}),
});
expect(deltaMessages.length).toBeGreaterThan(0);
const currentWorkingMemory = await store.getWorkingMemory({
threadId,
resourceId,
scope: 'resource',
});
const now = new Date();
const observedRows = await observeWithModel(model)({
deltaMessages,
currentWorkingMemory,
cursor,
threadId,
resourceId,
now,
trigger: { type: 'per-turn' },
telemetry: undefined,
});
const persistedRows = await store.appendObservations(observedRows);
expect(persistedRows.length).toBeGreaterThan(0);
const lastMessage = deltaMessages[deltaMessages.length - 1];
await store.setCursor({
scopeKind: 'thread',
scopeId: threadId,
lastObservedMessageId: lastMessage.id,
lastObservedAt: lastMessage.createdAt,
updatedAt: now,
});
const queuedRows = await store.getObservations({
scopeKind: 'thread',
scopeId: threadId,
schemaVersionAtMost: OBSERVATION_SCHEMA_VERSION,
});
expect(queuedRows.length).toBeGreaterThan(0);
const compacted = await compactWithModel(model)({
observations: queuedRows,
currentWorkingMemory,
workingMemoryTemplate: WORKING_MEMORY_TEMPLATE,
structured: false,
threadId,
resourceId,
model,
compactorPrompt: 'Compact thread-scoped observations into resource-scoped working memory.',
telemetry: undefined,
});
await store.saveWorkingMemory({ threadId, resourceId, scope: 'resource' }, compacted.content);
await store.deleteObservations(queuedRows.map((row) => row.id));
const remainingRows = await store.getObservations({
scopeKind: 'thread',
scopeId: threadId,
});
expect(remainingRows).toHaveLength(0);
return {
deltaMessages,
cursorAfter: await store.getCursor('thread', threadId),
};
} finally {
await store.releaseObservationLock(handle);
}
}
function createWriterAgent(model: string, store: InMemoryMemory): Agent {
return new Agent('observational-memory-writer')
.model(model)
.instructions('You are a helpful assistant. Acknowledge briefly, and do not repeat user facts.')
.memory(new Memory().storage(store).lastMessages(10));
}
function createReaderAgent(model: string, store: InMemoryMemory): Agent {
return new Agent('observational-memory-reader')
.model(model)
.instructions('Answer only from working memory. Be concise.')
.memory(
new Memory()
.storage(store)
.lastMessages(1)
.scope('resource')
.freeform(WORKING_MEMORY_TEMPLATE),
);
}
async function rememberFact(
agent: Agent,
fact: string,
options: { persistence: { threadId: string; resourceId: string } },
) {
const result = await agent.generate(`${fact} Reply with "noted".`, options);
expect(result.finishReason).toBe('stop');
expect(findLastTextContent(result.messages)).toBeTruthy();
}
async function addNeutralTurn(
agent: Agent,
options: { persistence: { threadId: string; resourceId: string } },
forbiddenTerms: string[],
) {
const result = await agent.generate('Reply only with "ok".', options);
expect(result.finishReason).toBe('stop');
const text = findLastTextContent(result.messages)?.toLowerCase() ?? '';
expect(text).toContain('ok');
for (const term of forbiddenTerms) {
expect(text).not.toContain(term);
}
}
function expectTextToContain(text: string | null | undefined, expectedTerms: string[]) {
const normalized = text?.toLowerCase() ?? '';
for (const term of expectedTerms) {
expect(normalized).toContain(term);
}
}
describe('observational memory integration', () => {
it('compacts observed thread facts into resource working memory for another thread', async () => {
const store = new InMemoryMemory();
const model = getModel('anthropic');
const resourceId = uniqueId('obs-resource');
const sourceThreadId = uniqueId('obs-source');
const readerThreadId = uniqueId('obs-reader');
const writer = createWriterAgent(model, store);
await rememberFact(writer, 'Please remember this for later: I live in Reykjavik.', {
persistence: { threadId: sourceThreadId, resourceId },
});
await runObservationCycleForTest({
store,
threadId: sourceThreadId,
resourceId,
model,
});
const reader = createReaderAgent(model, store);
const result = await reader.generate('From memory only, where do I live?', {
persistence: {
threadId: readerThreadId,
resourceId,
},
});
expectTextToContain(findLastTextContent(result.messages), ['reykjavik']);
});
it('uses compacted working memory inside the observed thread after the fact leaves chat history', async () => {
const store = new InMemoryMemory();
const model = getModel('anthropic');
const resourceId = uniqueId('obs-resource');
const sourceThreadId = uniqueId('obs-source');
const options = {
persistence: { threadId: sourceThreadId, resourceId },
};
const writer = createWriterAgent(model, store);
await rememberFact(
writer,
'Please remember this for later: I live in Reykjavik, and my project codename is Aurora-17.',
options,
);
await addNeutralTurn(writer, options, ['reykjavik', 'aurora-17']);
await runObservationCycleForTest({
store,
threadId: sourceThreadId,
resourceId,
model,
});
const workingMemory = await store.getWorkingMemory({
threadId: sourceThreadId,
resourceId,
scope: 'resource',
});
expectTextToContain(workingMemory, ['reykjavik', 'aurora-17']);
const reader = createReaderAgent(model, store);
const result = await reader.generate(
'From memory only, where do I live and what is my project codename?',
options,
);
expectTextToContain(findLastTextContent(result.messages), ['reykjavik', 'aurora-17']);
});
it('folds later turns from the same thread into existing working memory', async () => {
const store = new InMemoryMemory();
const model = getModel('anthropic');
const resourceId = uniqueId('obs-resource');
const sourceThreadId = uniqueId('obs-source');
const options = {
persistence: { threadId: sourceThreadId, resourceId },
};
const writer = createWriterAgent(model, store);
await rememberFact(
writer,
'Please remember this for later: I live in Reykjavik, and my project codename is Aurora-17.',
options,
);
await addNeutralTurn(writer, options, ['reykjavik', 'aurora-17']);
const firstCycle = await runObservationCycleForTest({
store,
threadId: sourceThreadId,
resourceId,
model,
});
await rememberFact(writer, 'Also remember that my editor theme is Solarized Dawn.', options);
await addNeutralTurn(writer, options, ['solarized', 'dawn']);
const secondCycle = await runObservationCycleForTest({
store,
threadId: sourceThreadId,
resourceId,
model,
});
expect(firstCycle.cursorAfter).not.toBeNull();
expect(secondCycle.cursorAfter?.lastObservedAt.getTime()).toBeGreaterThan(
firstCycle.cursorAfter!.lastObservedAt.getTime(),
);
const workingMemory = await store.getWorkingMemory({
threadId: sourceThreadId,
resourceId,
scope: 'resource',
});
expectTextToContain(workingMemory, ['reykjavik', 'aurora-17', 'solarized dawn']);
const reader = createReaderAgent(model, store);
const result = await reader.generate(
'From memory only, where do I live, what is my project codename, and what is my editor theme?',
options,
);
expectTextToContain(findLastTextContent(result.messages), [
'reykjavik',
'aurora-17',
'solarized',
'dawn',
]);
});
});

View File

@ -0,0 +1,28 @@
import type {
BuiltMemory,
MemoryConfig,
ObservationCapableMemory,
ObservationalMemoryConfig,
} from '../types';
type AssertMemoryConfig<T extends MemoryConfig> = T;
type PlainMemoryConfig = AssertMemoryConfig<{
memory: BuiltMemory;
lastMessages: 10;
}>;
type ObservationCapableMemoryConfig = AssertMemoryConfig<{
memory: ObservationCapableMemory;
lastMessages: 10;
observationalMemory: ObservationalMemoryConfig;
}>;
// @ts-expect-error Observational memory requires a backend that also implements BuiltObservationStore.
type InvalidObservationalMemoryConfig = AssertMemoryConfig<{
memory: BuiltMemory;
lastMessages: 10;
observationalMemory: ObservationalMemoryConfig;
}>;
export type { InvalidObservationalMemoryConfig, ObservationCapableMemoryConfig, PlainMemoryConfig };

View File

@ -0,0 +1,304 @@
import { InMemoryMemory } from '../runtime/memory-store';
import {
OBSERVATION_SCHEMA_VERSION,
type NewObservation,
type ObservationCursor,
} from '../types/sdk/observation';
function makeRow(overrides: Partial<NewObservation> = {}): NewObservation {
return {
scopeKind: 'thread',
scopeId: 't-1',
kind: 'observation',
payload: { text: 'hello' },
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: new Date(),
...overrides,
};
}
describe('InMemoryMemory — observations', () => {
it('appends rows with assigned ids', async () => {
const mem = new InMemoryMemory();
const persisted = await mem.appendObservations([makeRow(), makeRow(), makeRow()]);
expect(persisted).toHaveLength(3);
const ids = persisted.map((r) => r.id);
expect(new Set(ids).size).toBe(3);
expect(ids.every((id) => typeof id === 'string' && id.length > 0)).toBe(true);
});
it('getObservations returns rows in (createdAt, id) ascending', async () => {
const mem = new InMemoryMemory();
const t = Date.now();
await mem.appendObservations([
makeRow({ payload: 'first', createdAt: new Date(t) }),
makeRow({ payload: 'second', createdAt: new Date(t + 1) }),
]);
const rows = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(rows.map((r) => r.payload)).toEqual(['first', 'second']);
});
it('filters by since (keyset), kindIs, schemaVersionAtMost, limit', async () => {
const mem = new InMemoryMemory();
const t = Date.now();
const [r1, r2, r3, r4] = await mem.appendObservations([
makeRow({ kind: 'observation', payload: 'one', createdAt: new Date(t) }),
makeRow({ kind: 'summary', payload: 'mid', createdAt: new Date(t + 1) }),
makeRow({
kind: 'observation',
payload: 'two',
schemaVersion: 99,
createdAt: new Date(t + 2),
}),
makeRow({ kind: 'observation', payload: 'three', createdAt: new Date(t + 3) }),
]);
expect(
(
await mem.getObservations({
scopeKind: 'thread',
scopeId: 't-1',
since: { sinceCreatedAt: r1.createdAt, sinceObservationId: r1.id },
})
).map((r) => r.payload),
).toEqual(['mid', 'two', 'three']);
expect(
(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1', kindIs: 'summary' })).map(
(r) => r.payload,
),
).toEqual(['mid']);
expect(
(
await mem.getObservations({
scopeKind: 'thread',
scopeId: 't-1',
schemaVersionAtMost: OBSERVATION_SCHEMA_VERSION,
})
).map((r) => r.payload),
).toEqual(['one', 'mid', 'three']);
expect(
(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1', limit: 2 })).map(
(r) => r.payload,
),
).toEqual(['one', 'mid']);
expect(r2.id).toBeDefined();
expect(r3.id).toBeDefined();
expect(r4.id).toBeDefined();
});
it('keyset since includes rows sharing createdAt with the anchor when id is greater', async () => {
const mem = new InMemoryMemory();
const t = new Date();
const [first, second] = await mem.appendObservations([
makeRow({ payload: 'a', createdAt: t }),
makeRow({ payload: 'b', createdAt: t }),
]);
// Sort the two by id so we know which is the anchor.
const [low, high] = [first, second].sort((a, b) => (a.id < b.id ? -1 : 1));
const rows = await mem.getObservations({
scopeKind: 'thread',
scopeId: 't-1',
since: { sinceCreatedAt: low.createdAt, sinceObservationId: low.id },
});
expect(rows).toHaveLength(1);
expect(rows[0].id).toBe(high.id);
});
it('deleteObservations removes the named rows and is idempotent', async () => {
const mem = new InMemoryMemory();
const [r1, r2] = await mem.appendObservations([makeRow(), makeRow()]);
await mem.deleteObservations([r1.id, 'unknown-id']);
await mem.deleteObservations([r1.id]);
const remaining = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(remaining.map((r) => r.id)).toEqual([r2.id]);
});
it('deleteObservations is a no-op for an empty id list', async () => {
const mem = new InMemoryMemory();
const [r1] = await mem.appendObservations([makeRow()]);
await mem.deleteObservations([]);
const rows = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(rows.map((r) => r.id)).toEqual([r1.id]);
});
it('deleteThread removes only the deleted thread observation state', async () => {
const mem = new InMemoryMemory();
await mem.appendObservations([
makeRow({ scopeKind: 'thread', scopeId: 't-1', payload: 'deleted-thread' }),
makeRow({ scopeKind: 'thread', scopeId: 't-2', payload: 'other-thread' }),
makeRow({ scopeKind: 'resource', scopeId: 't-1', payload: 'resource-scope' }),
]);
await mem.setCursor({
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: 'm-1',
lastObservedAt: new Date(),
updatedAt: new Date(),
});
await mem.acquireObservationLock('thread', 't-1', { ttlMs: 60_000, holderId: 'A' });
await mem.deleteThread('t-1');
await expect(mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' })).resolves.toEqual([]);
await expect(mem.getCursor('thread', 't-1')).resolves.toBeNull();
await expect(
mem.acquireObservationLock('thread', 't-1', { ttlMs: 60_000, holderId: 'B' }),
).resolves.toEqual(expect.objectContaining({ holderId: 'B' }));
await expect(mem.getObservations({ scopeKind: 'thread', scopeId: 't-2' })).resolves.toEqual([
expect.objectContaining({ payload: 'other-thread' }),
]);
await expect(mem.getObservations({ scopeKind: 'resource', scopeId: 't-1' })).resolves.toEqual([
expect.objectContaining({ payload: 'resource-scope' }),
]);
});
});
describe('InMemoryMemory — cursors', () => {
it('returns null when no cursor has been written', async () => {
const mem = new InMemoryMemory();
expect(await mem.getCursor('thread', 't-1')).toBeNull();
});
it('round-trips cursor-advance fields and overwrites on re-set', async () => {
const mem = new InMemoryMemory();
const first: ObservationCursor = {
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: 'm-1',
lastObservedAt: new Date(2026, 0, 1, 0, 0, 0, 5),
updatedAt: new Date(2026, 0, 1),
};
await mem.setCursor(first);
expect(await mem.getCursor('thread', 't-1')).toEqual(first);
const second: ObservationCursor = {
...first,
lastObservedMessageId: 'm-2',
lastObservedAt: new Date(2026, 0, 2),
updatedAt: new Date(),
};
await mem.setCursor(second);
expect(await mem.getCursor('thread', 't-1')).toEqual(second);
});
it('isolates cursors by scope', async () => {
const mem = new InMemoryMemory();
await mem.setCursor({
scopeKind: 'thread',
scopeId: 'A',
lastObservedMessageId: 'm-A',
lastObservedAt: new Date(),
updatedAt: new Date(),
});
expect(await mem.getCursor('thread', 'B')).toBeNull();
});
it('returns cursor copies so callers cannot mutate stored state', async () => {
const mem = new InMemoryMemory();
const cursor: ObservationCursor = {
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: 'm-1',
lastObservedAt: new Date(2026, 0, 1),
updatedAt: new Date(2026, 0, 2),
};
await mem.setCursor(cursor);
const loaded = await mem.getCursor('thread', 't-1');
expect(loaded).not.toBeNull();
loaded!.lastObservedMessageId = 'mutated';
loaded!.lastObservedAt.setTime(new Date(2030, 0, 1).getTime());
expect(await mem.getCursor('thread', 't-1')).toEqual(cursor);
});
});
describe('InMemoryMemory — observation locks', () => {
it('grants the lock when free and refuses a different holder while held', async () => {
const mem = new InMemoryMemory();
const a = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'A',
});
expect(a).not.toBeNull();
const b = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'B',
});
expect(b).toBeNull();
});
it('reclaims an expired lock for a new holder', async () => {
const mem = new InMemoryMemory();
const a = await mem.acquireObservationLock('thread', 't-1', { ttlMs: 1, holderId: 'A' });
expect(a).not.toBeNull();
await new Promise((resolve) => setTimeout(resolve, 5));
const b = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'B',
});
expect(b).not.toBeNull();
expect(b?.holderId).toBe('B');
});
it('lets the same holder re-acquire (refresh) an active lock', async () => {
const mem = new InMemoryMemory();
const first = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'A',
});
const second = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'A',
});
expect(first).not.toBeNull();
expect(second).not.toBeNull();
expect(second?.heldUntil.getTime()).toBeGreaterThanOrEqual(first!.heldUntil.getTime());
});
it('release frees the lock and tolerates double-release', async () => {
const mem = new InMemoryMemory();
const a = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'A',
});
await mem.releaseObservationLock(a!);
await mem.releaseObservationLock(a!);
const b = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'B',
});
expect(b).not.toBeNull();
});
it('release by stale handle does not displace a fresh holder', async () => {
const mem = new InMemoryMemory();
const stale = await mem.acquireObservationLock('thread', 't-1', { ttlMs: 1, holderId: 'A' });
await new Promise((resolve) => setTimeout(resolve, 5));
const fresh = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'B',
});
expect(fresh).not.toBeNull();
await mem.releaseObservationLock(stale!);
const bClaim = await mem.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'C',
});
expect(bClaim).toBeNull();
});
});

View File

@ -29,6 +29,7 @@ export type {
AgentRunState,
MemoryConfig,
MemoryDescriptor,
ObservationCapableMemory,
TitleGenerationConfig,
Thread,
SemanticRecallConfig,
@ -40,10 +41,20 @@ export type {
PersistedExecutionOptions,
BuiltTelemetry,
AttributeValue,
BuiltObservationStore,
CompactFn,
NewObservation,
Observation,
ObservationCursor,
ObservationLockHandle,
ObservationalMemoryConfig,
ObserveFn,
ScopeKind,
} from './types';
export type { ProviderOptions } from '@ai-sdk/provider-utils';
export { AgentEvent } from './types';
export type { AgentEventData, AgentEventHandler } from './types';
export { OBSERVATION_SCHEMA_VERSION } from './types';
export { Tool, wrapToolForApproval } from './sdk/tool';
export { Memory } from './sdk/memory';

View File

@ -1,38 +1,85 @@
import type { BuiltMemory, MemoryDescriptor, Thread } from '../types';
import type { AgentDbMessage } from '../types/sdk/message';
import type {
BuiltObservationStore,
NewObservation,
Observation,
ObservationCursor,
ObservationLockHandle,
ScopeKind,
} from '../types/sdk/observation';
interface StoredMessage {
message: AgentDbMessage;
createdAt: Date;
resourceId: string;
}
function scopeKey(scopeKind: ScopeKind, scopeId: string): string {
return `${scopeKind}:${scopeId}`;
}
function cloneCursor(cursor: ObservationCursor): ObservationCursor {
return {
...cursor,
lastObservedAt: new Date(cursor.lastObservedAt),
updatedAt: new Date(cursor.updatedAt),
};
}
function compareKeyset(
a: { createdAt: Date; id: string },
b: { createdAt: Date; id: string },
): number {
const t = a.createdAt.getTime() - b.createdAt.getTime();
if (t !== 0) return t;
return a.id < b.id ? -1 : a.id > b.id ? 1 : 0;
}
/**
* In-memory implementation of BuiltMemory.
* In-memory implementation of BuiltMemory and BuiltObservationStore.
* All data is lost on process restart suitable for development and testing.
*
* Thread context for `saveMessages` is established by calling `saveThread` first.
* The most recently saved thread is used when `saveMessages` is called.
*/
export class InMemoryMemory implements BuiltMemory {
export class InMemoryMemory implements BuiltMemory, BuiltObservationStore {
private threads = new Map<string, Thread>();
private messagesByThread = new Map<string, StoredMessage[]>();
private workingMemoryByKey = new Map<string, string>();
private observationsByScope = new Map<string, Observation[]>();
private cursorsByScope = new Map<string, ObservationCursor>();
private locksByScope = new Map<string, ObservationLockHandle>();
// eslint-disable-next-line @typescript-eslint/require-await
async getWorkingMemory(params: { threadId: string; resourceId?: string }): Promise<
string | null
> {
return this.workingMemoryByKey.get(params.resourceId ?? params.threadId) ?? null;
async getWorkingMemory(params: {
threadId: string;
resourceId?: string;
scope: 'resource' | 'thread';
}): Promise<string | null> {
return this.workingMemoryByKey.get(this.workingMemoryKey(params)) ?? null;
}
// eslint-disable-next-line @typescript-eslint/require-await
async saveWorkingMemory(
params: { threadId: string; resourceId?: string },
params: { threadId: string; resourceId?: string; scope: 'resource' | 'thread' },
content: string,
): Promise<void> {
this.workingMemoryByKey.set(params.resourceId ?? params.threadId, content);
this.workingMemoryByKey.set(this.workingMemoryKey(params), content);
}
private workingMemoryKey(params: {
threadId: string;
resourceId?: string;
scope: 'resource' | 'thread';
}): string {
const id = params.scope === 'thread' ? params.threadId : (params.resourceId ?? params.threadId);
return `${params.scope}:${id}`;
}
// eslint-disable-next-line @typescript-eslint/require-await
@ -59,18 +106,42 @@ export class InMemoryMemory implements BuiltMemory {
async deleteThread(threadId: string): Promise<void> {
this.threads.delete(threadId);
this.messagesByThread.delete(threadId);
const key = scopeKey('thread', threadId);
this.observationsByScope.delete(key);
this.cursorsByScope.delete(key);
this.locksByScope.delete(key);
}
// eslint-disable-next-line @typescript-eslint/require-await
async getMessages(
threadId: string,
opts?: { limit?: number; before?: Date },
opts?: {
limit?: number;
before?: Date;
since?: { sinceCreatedAt: Date; sinceMessageId: string };
},
): Promise<AgentDbMessage[]> {
let stored = this.messagesByThread.get(threadId) ?? [];
if (opts?.before) {
const cutoff = opts.before.getTime();
stored = stored.filter((s) => s.createdAt.getTime() < cutoff);
}
if (opts?.since) {
const { sinceCreatedAt, sinceMessageId } = opts.since;
stored = stored.filter(
(s) =>
compareKeyset(
{ createdAt: s.createdAt, id: s.message.id },
{ createdAt: sinceCreatedAt, id: sinceMessageId },
) > 0,
);
}
stored = [...stored].sort((a, b) =>
compareKeyset(
{ createdAt: a.createdAt, id: a.message.id },
{ createdAt: b.createdAt, id: b.message.id },
),
);
if (opts?.limit) stored = stored.slice(-opts.limit);
return stored.map((s) => ({ ...s.message, createdAt: s.createdAt }));
}
@ -89,12 +160,17 @@ export class InMemoryMemory implements BuiltMemory {
}): Promise<void> {
const existing = this.messagesByThread.get(args.threadId) ?? [];
const byId = new Map(existing.map((s, i) => [s.message.id, i]));
const resourceId = args.resourceId ?? '';
for (const msg of args.messages) {
const entry: StoredMessage = { message: msg, createdAt: msg.createdAt };
const idx = byId.get(msg.id);
if (idx !== undefined) {
existing[idx] = entry;
existing[idx] = { message: msg, createdAt: msg.createdAt, resourceId };
} else {
const entry: StoredMessage = {
message: msg,
createdAt: msg.createdAt,
resourceId,
};
byId.set(msg.id, existing.length);
existing.push(entry);
}
@ -116,6 +192,146 @@ export class InMemoryMemory implements BuiltMemory {
describe(): MemoryDescriptor {
return { name: 'memory', constructorName: this.constructor.name, connectionParams: {} };
}
// ── Observational memory ─────────────────────────────────────────────
// eslint-disable-next-line @typescript-eslint/require-await
async appendObservations(rows: NewObservation[]): Promise<Observation[]> {
const persisted: Observation[] = [];
for (const row of rows) {
const key = scopeKey(row.scopeKind, row.scopeId);
const bucket = this.observationsByScope.get(key) ?? [];
const obs: Observation = {
...row,
id: crypto.randomUUID(),
};
bucket.push(obs);
this.observationsByScope.set(key, bucket);
persisted.push(obs);
}
return persisted;
}
// eslint-disable-next-line @typescript-eslint/require-await
async getObservations(opts: {
scopeKind: ScopeKind;
scopeId: string;
since?: { sinceCreatedAt: Date; sinceObservationId: string };
kindIs?: string;
limit?: number;
schemaVersionAtMost?: number;
}): Promise<Observation[]> {
const bucket = this.observationsByScope.get(scopeKey(opts.scopeKind, opts.scopeId)) ?? [];
let rows = [...bucket].sort((a, b) =>
compareKeyset({ createdAt: a.createdAt, id: a.id }, { createdAt: b.createdAt, id: b.id }),
);
if (opts.since) {
const { sinceCreatedAt, sinceObservationId } = opts.since;
rows = rows.filter(
(r) =>
compareKeyset(
{ createdAt: r.createdAt, id: r.id },
{ createdAt: sinceCreatedAt, id: sinceObservationId },
) > 0,
);
}
if (opts.kindIs !== undefined) {
const kind = opts.kindIs;
rows = rows.filter((r) => r.kind === kind);
}
if (opts.schemaVersionAtMost !== undefined) {
const max = opts.schemaVersionAtMost;
rows = rows.filter((r) => r.schemaVersion <= max);
}
if (opts.limit !== undefined) {
rows = rows.slice(0, opts.limit);
}
return rows.map((r) => ({ ...r }));
}
// eslint-disable-next-line @typescript-eslint/require-await
async getMessagesForScope(
scopeKind: ScopeKind,
scopeId: string,
opts?: { since?: { sinceCreatedAt: Date; sinceMessageId: string } },
): Promise<AgentDbMessage[]> {
if (scopeKind !== 'thread') {
throw new Error(`getMessagesForScope: scopeKind='${scopeKind}' is not supported in v1`);
}
const candidates = this.messagesByThread.get(scopeId) ?? [];
let rows = [...candidates].sort((a, b) =>
compareKeyset(
{ createdAt: a.createdAt, id: a.message.id },
{ createdAt: b.createdAt, id: b.message.id },
),
);
if (opts?.since) {
const { sinceCreatedAt, sinceMessageId } = opts.since;
rows = rows.filter(
(s) =>
compareKeyset(
{ createdAt: s.createdAt, id: s.message.id },
{ createdAt: sinceCreatedAt, id: sinceMessageId },
) > 0,
);
}
return rows.map((s) => ({ ...s.message, createdAt: s.createdAt }));
}
// eslint-disable-next-line @typescript-eslint/require-await
async deleteObservations(ids: string[]): Promise<void> {
if (ids.length === 0) return;
const idSet = new Set(ids);
for (const [key, bucket] of this.observationsByScope.entries()) {
this.observationsByScope.set(
key,
bucket.filter((row) => !idSet.has(row.id)),
);
}
}
// eslint-disable-next-line @typescript-eslint/require-await
async getCursor(scopeKind: ScopeKind, scopeId: string): Promise<ObservationCursor | null> {
const cursor = this.cursorsByScope.get(scopeKey(scopeKind, scopeId));
return cursor ? cloneCursor(cursor) : null;
}
// eslint-disable-next-line @typescript-eslint/require-await
async setCursor(cursor: ObservationCursor): Promise<void> {
const key = scopeKey(cursor.scopeKind, cursor.scopeId);
this.cursorsByScope.set(key, cloneCursor(cursor));
}
// eslint-disable-next-line @typescript-eslint/require-await
async acquireObservationLock(
scopeKind: ScopeKind,
scopeId: string,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLockHandle | null> {
const key = scopeKey(scopeKind, scopeId);
const existing = this.locksByScope.get(key);
const now = Date.now();
if (existing && existing.holderId !== opts.holderId && existing.heldUntil.getTime() > now) {
return null;
}
const handle: ObservationLockHandle = {
scopeKind,
scopeId,
holderId: opts.holderId,
heldUntil: new Date(now + opts.ttlMs),
};
this.locksByScope.set(key, handle);
return { ...handle };
}
// eslint-disable-next-line @typescript-eslint/require-await
async releaseObservationLock(handle: ObservationLockHandle): Promise<void> {
const key = scopeKey(handle.scopeKind, handle.scopeId);
const current = this.locksByScope.get(key);
if (current && current.holderId === handle.holderId) {
this.locksByScope.delete(key);
}
}
}
/**

View File

@ -211,7 +211,7 @@ export class Agent implements BuiltAgent, AgentBuilder {
} else {
throw new Error(
'Invalid memory configuration. Use: new Memory().lastMessages(N) for in-process memory, ' +
'or new Memory().storage(new SqliteMemory(path)).lastMessages(N) for persistent storage. ' +
'or new Memory().storage(myBuiltMemoryBackend).lastMessages(N) for a persistent backend. ' +
'See the Memory class documentation for all options.',
);
}

View File

@ -52,7 +52,7 @@ export class Memory {
* Set the storage backend for conversation history.
*
* - `'memory'` in-process memory (default, lost on restart)
* - A `BuiltMemory` instance for persistent storage (e.g. SqliteMemory)
* - A `BuiltMemory` instance for a persistent backend (e.g. cli's `N8nMemory`)
*/
storage(backend: 'memory' | BuiltMemory): this {
if (backend === 'memory') {

View File

@ -60,6 +60,7 @@ export type {
export type {
Thread,
BuiltMemory,
ObservationCapableMemory,
MemoryDescriptor,
SemanticRecallConfig,
MemoryConfig,
@ -67,6 +68,19 @@ export type {
TitleGenerationConfig,
} from './sdk/memory';
export type {
BuiltObservationStore,
CompactFn,
NewObservation,
Observation,
ObservationCursor,
ObservationLockHandle,
ObservationalMemoryConfig,
ObserveFn,
ScopeKind,
} from './sdk/observation';
export { OBSERVATION_SCHEMA_VERSION } from './sdk/observation';
export type {
EvalInput,
EvalScore,

View File

@ -2,6 +2,7 @@ import type { z } from 'zod';
import type { ModelConfig, SerializableAgentState } from './agent';
import type { AgentDbMessage } from './message';
import type { BuiltObservationStore, ObservationalMemoryConfig } from './observation';
import type { JSONObject } from '../utils/json';
/**
@ -37,6 +38,11 @@ export interface BuiltMemory {
opts?: {
limit?: number; // last N messages
before?: Date; // pagination cursor
/**
* Keyset cursor: return only messages strictly after `(createdAt, id) >
* (since.sinceCreatedAt, since.sinceMessageId)`, ordered ascending.
*/
since?: { sinceCreatedAt: Date; sinceMessageId: string };
},
): Promise<AgentDbMessage[]>;
/**
@ -123,9 +129,9 @@ export interface TitleGenerationConfig {
sync?: boolean;
}
/** Full memory configuration bundle passed from builder to runtime. */
export interface MemoryConfig {
memory: BuiltMemory;
export type ObservationCapableMemory = BuiltMemory & BuiltObservationStore;
interface MemoryConfigBase {
lastMessages: number;
workingMemory?: {
template: string;
@ -142,6 +148,17 @@ export interface MemoryConfig {
titleGeneration?: TitleGenerationConfig;
}
/** Full memory configuration bundle passed from builder to runtime. */
export type MemoryConfig =
| (MemoryConfigBase & {
memory: BuiltMemory;
observationalMemory?: undefined;
})
| (MemoryConfigBase & {
memory: ObservationCapableMemory;
observationalMemory: ObservationalMemoryConfig;
});
/**
* Interface for persisting agent execution snapshots (used for tool approval / human-in-the-loop).
*

View File

@ -183,4 +183,9 @@ export type CustomAgentMessage = {
*/
export type AgentMessage = Message | CustomAgentMessage;
/**
* Persisted message shape returned by `BuiltMemory.getMessages`. The
* `(createdAt, id)` pair forms the keyset used by observational memory
* cursors; both fields are populated on read by every backend.
*/
export type AgentDbMessage = { id: string; createdAt: Date } & AgentMessage;

View File

@ -0,0 +1,204 @@
import type { z } from 'zod';
import type { ModelConfig } from './agent';
import type { AgentDbMessage } from './message';
import type { BuiltTelemetry } from '../telemetry';
import type { JSONValue } from '../utils/json';
/**
* Schema version stamped onto every observation row. Bump when the row format
* changes incompatibly. Read-side helpers filter rows newer than the running
* SDK can interpret.
*/
export const OBSERVATION_SCHEMA_VERSION = 1;
/**
* Scope an observation belongs to. v1 writes only `'thread'`; the others are
* reserved so future resource- and agent-scoped observers are a behavioral
* change, not a schema migration.
*/
export type ScopeKind = 'thread' | 'resource' | 'agent';
/** A persisted observation row. */
export interface Observation {
id: string;
scopeKind: ScopeKind;
scopeId: string;
/** Free-form, consumer-defined. The SDK reserves no values. */
kind: string;
payload: JSONValue;
/** Populated for kinds that represent a time gap; otherwise `null`. */
durationMs: number | null;
schemaVersion: number;
createdAt: Date;
}
/** Shape passed to `appendObservations`. `id` is backend-assigned. */
export type NewObservation = Omit<Observation, 'id'>;
/**
* Per-scope mutable state for the observer's message cursor.
*/
export interface ObservationCursor {
scopeKind: ScopeKind;
scopeId: string;
lastObservedMessageId: string;
lastObservedAt: Date;
updatedAt: Date;
}
export interface ObservationLockHandle {
scopeKind: ScopeKind;
scopeId: string;
holderId: string;
heldUntil: Date;
}
/**
* Consumer-provided observer function. Called inside the orchestrator's
* lock + cursor scope; receives the message delta since the last cursor
* advance and the current thread working-memory document, then returns zero
* or more rows to append.
*/
export type ObserveFn = (ctx: {
deltaMessages: AgentDbMessage[];
currentWorkingMemory: string | null;
cursor: ObservationCursor | null;
threadId: string;
resourceId: string;
now: Date;
trigger: ObservationalMemoryTrigger;
telemetry: BuiltTelemetry | undefined;
}) => Promise<NewObservation[]>;
/**
* Consumer-provided compactor function. Reads queued observations + the
* current working-memory document, and returns the complete replacement
* working-memory document.
*/
export type CompactFn = (ctx: {
observations: Observation[];
currentWorkingMemory: string | null;
workingMemoryTemplate: string;
structured: boolean;
schema?: z.ZodObject<z.ZodRawShape>;
threadId: string;
resourceId: string;
model: ModelConfig;
compactorPrompt: string;
telemetry: BuiltTelemetry | undefined;
}) => Promise<{ content: string }>;
/**
* Storage interface for observational memory. A sibling to {@link BuiltMemory}:
* implementations typically live on the same class (cli's `N8nMemory` and the
* SDK's `InMemoryMemory` both implement both), but the interfaces are kept
* separate so observations stay out of the message-store API and consumers
* don't need to feature-check every call. When `observationalMemory` is
* configured on the builder, the configured backend must also implement this
* interface.
*/
export interface BuiltObservationStore {
/**
* Append observation rows for a scope. Backends assign `id` and return the
* persisted shape.
*/
appendObservations(rows: NewObservation[]): Promise<Observation[]>;
/**
* Query observations for a scope. Filters compose: `since`, when supplied,
* returns only rows strictly after the keyset `(createdAt, id) >
* (since.sinceCreatedAt, since.sinceObservationId)`; `kindIs` matches
* `kind` exactly; `schemaVersionAtMost` excludes rows whose `schemaVersion`
* exceeds the caller's supported version. Results are ordered by
* `(createdAt, id)` ascending.
*/
getObservations(opts: {
scopeKind: ScopeKind;
scopeId: string;
since?: { sinceCreatedAt: Date; sinceObservationId: string };
kindIs?: string;
limit?: number;
schemaVersionAtMost?: number;
}): Promise<Observation[]>;
/**
* Read the message delta the observer needs to process for a given scope.
*
* - `'thread'`: messages for `scopeId` (== threadId).
* - non-thread scopes are reserved for future versions. v1 backends should
* throw for them.
*
* When `since` is supplied, only messages strictly after the keyset
* `(createdAt, id) > (since.sinceCreatedAt, since.sinceMessageId)` are
* returned. Results are ordered by `(createdAt, id)` ascending the last
* element is the most recently appended.
*/
getMessagesForScope(
scopeKind: ScopeKind,
scopeId: string,
opts?: { since?: { sinceCreatedAt: Date; sinceMessageId: string } },
): Promise<AgentDbMessage[]>;
/** Hard-delete the given rows. Idempotent: missing ids are ignored. */
deleteObservations(ids: string[]): Promise<void>;
/** Read the cursor for a scope; `null` if none has been written yet. */
getCursor(scopeKind: ScopeKind, scopeId: string): Promise<ObservationCursor | null>;
/** Upsert the cursor-advance fields for a scope. */
setCursor(cursor: ObservationCursor): Promise<void>;
/**
* Acquire a per-scope advisory lock with TTL. Returns a handle on
* success or `null` if the lock is held by another holder and not yet
* expired. Holders other than `holderId` whose `heldUntil` is in the
* past may be displaced.
*/
acquireObservationLock(
scopeKind: ScopeKind,
scopeId: string,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLockHandle | null>;
/** Release a held lock. Tolerates the lock having already expired or been displaced. */
releaseObservationLock(handle: ObservationLockHandle): Promise<void>;
}
export type ObservationalMemoryTrigger =
| { type: 'per-turn' }
| {
type: 'idle-timer';
/** Milliseconds after TurnEnd before the observer runs. */
idleMs: number;
/** Emit a gap row when the elapsed time since the previous observed turn exceeds this. */
gapThresholdMs?: number;
};
/** Observational-memory configuration block on `MemoryConfig`. */
export interface ObservationalMemoryConfig {
/**
* Builder-time observer override. Omit this to use the SDK reference
* observer prompt + the agent's configured model.
*/
observe?: ObserveFn;
/**
* Builder-time compactor override. Omit this to use the SDK reference
* compactor prompt + the agent's configured model.
*/
compact?: CompactFn;
/** @default { type: 'per-turn' } */
trigger?: ObservationalMemoryTrigger;
/** Queue size that triggers compaction into the thread working-memory document. @default 5 */
compactionThreshold?: number;
/** Replaces the SDK reference observer system prompt. */
observerPrompt?: string;
/** Replaces the SDK reference compactor system prompt. */
compactorPrompt?: string;
/**
* TTL applied when the orchestrator acquires the per-scope observation
* lock.
* @default 30_000
*/
lockTtlMs?: number;
/**
* When `true`, `runObservationalCycle` calls dispatched by the SDK (e.g.
* lazy fallback at `TurnStart`) are awaited; otherwise they are tracked
* by the background-task tracker and resolve on `runtime.dispose()`.
* @default false
*/
sync?: boolean;
}

View File

@ -0,0 +1,60 @@
import type { MigrationContext, ReversibleMigration } from '../migration-types';
/**
* Creates the three sibling tables for observational memory:
*
* - `agents_observations`: append-only observation log keyed by `(scopeKind,
* scopeId)` and ordered by `(createdAt, id)`. Consumers define `kind`;
* payload is JSON. The compactor hard-deletes rows it folds into thread
* working memory.
* - `agents_observation_cursors`: per-scope keyset cursor
* `(lastObservedAt, lastObservedMessageId)` that advances every observe
* cycle.
* - `agents_observation_locks`: per-scope advisory lock with TTL so two
* observers on the same scope can't stomp each other.
*
* `scopeKind` is polymorphic (`thread` / `resource` / `agent`); columns are in
* place so future scopes are a behavioural change, not a schema migration. No
* FK on `scopeId` for that reason it can reference different parent tables
* depending on `scopeKind`.
*/
export class CreateAgentObservationTables1784000000000 implements ReversibleMigration {
async up({ schemaBuilder: { createTable, column } }: MigrationContext) {
await createTable('agents_observations')
.withColumns(
column('id').varchar(36).primary.notNull,
column('scopeKind').varchar(20).notNull.withEnumCheck(['thread', 'resource', 'agent']),
column('scopeId').varchar(255).notNull,
column('kind').varchar(64).notNull,
column('payload').json.notNull,
column('durationMs').bigint,
column('schemaVersion').int.notNull,
)
.withIndexOn(['scopeKind', 'scopeId', 'kind', 'createdAt'])
.withIndexOn(['scopeKind', 'scopeId', 'createdAt', 'id']).withTimestamps;
await createTable('agents_observation_cursors').withColumns(
column('scopeKind')
.varchar(20)
.notNull.primary.withEnumCheck(['thread', 'resource', 'agent']),
column('scopeId').varchar(255).notNull.primary,
column('lastObservedMessageId').varchar(36).notNull,
column('lastObservedAt').timestamp(3).notNull,
).withTimestamps;
await createTable('agents_observation_locks').withColumns(
column('scopeKind')
.varchar(20)
.notNull.primary.withEnumCheck(['thread', 'resource', 'agent']),
column('scopeId').varchar(255).notNull.primary,
column('holderId').varchar(64).notNull,
column('heldUntil').timestamp(3).notNull,
).withTimestamps;
}
async down({ schemaBuilder: { dropTable } }: MigrationContext) {
await dropTable('agents_observation_locks');
await dropTable('agents_observation_cursors');
await dropTable('agents_observations');
}
}

View File

@ -173,6 +173,7 @@ import { AddWorkflowVersionToTestRun1778100001000 } from '../common/177810000100
import { AddEvaluationConfigColumnsToTestRun1778100002000 } from '../common/1778100002000-AddEvaluationConfigColumnsToTestRun';
import { CreateAgentTables1783000000000 } from '../common/1783000000000-CreateAgentTables';
import { CreateAgentExecutionTables1783000000001 } from '../common/1783000000001-CreateAgentExecutionTables';
import { CreateAgentObservationTables1784000000000 } from '../common/1784000000000-CreateAgentObservationTables';
import type { Migration } from '../migration-types';
export const postgresMigrations: Migration[] = [
@ -351,4 +352,5 @@ export const postgresMigrations: Migration[] = [
AddExecutionDeduplicationKey1778000000000,
CreateAgentTables1783000000000,
CreateAgentExecutionTables1783000000001,
CreateAgentObservationTables1784000000000,
];

View File

@ -166,6 +166,7 @@ import { AddWorkflowVersionToTestRun1778100001000 } from '../common/177810000100
import { AddEvaluationConfigColumnsToTestRun1778100002000 } from '../common/1778100002000-AddEvaluationConfigColumnsToTestRun';
import { CreateAgentTables1783000000000 } from '../common/1783000000000-CreateAgentTables';
import { CreateAgentExecutionTables1783000000001 } from '../common/1783000000001-CreateAgentExecutionTables';
import { CreateAgentObservationTables1784000000000 } from '../common/1784000000000-CreateAgentObservationTables';
import type { Migration } from '../migration-types';
const sqliteMigrations: Migration[] = [
@ -337,6 +338,7 @@ const sqliteMigrations: Migration[] = [
AddExecutionDeduplicationKey1778000000000,
CreateAgentTables1783000000000,
CreateAgentExecutionTables1783000000001,
CreateAgentObservationTables1784000000000,
];
export { sqliteMigrations };

View File

@ -0,0 +1,83 @@
/* eslint-disable @typescript-eslint/unbound-method -- mock-based tests intentionally reference unbound methods */
import { mock } from 'jest-mock-extended';
import { mockEntityManager } from '@test/mocking';
import { AgentExecutionThread } from '../entities/agent-execution-thread.entity';
import { AgentExecutionThreadRepository } from '../repositories/agent-execution-thread.repository';
const entityManager = mockEntityManager(AgentExecutionThread);
const mockDataSource = { manager: entityManager };
describe('AgentExecutionThreadRepository', () => {
let repository: AgentExecutionThreadRepository;
beforeEach(() => {
jest.clearAllMocks();
repository = new AgentExecutionThreadRepository(mockDataSource as never);
});
describe('findOrCreate', () => {
const makeScopedRepository = (saved: AgentExecutionThread, max = 7) => ({
findOneBy: jest.fn().mockResolvedValue(null),
createQueryBuilder: jest.fn().mockReturnValue({
select: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
getRawOne: jest.fn().mockResolvedValue({ max }),
}),
create: jest.fn().mockReturnValue(saved),
save: jest.fn().mockResolvedValue(saved),
});
it('assigns the project-scoped session number inside a serializable transaction', async () => {
const saved = mock<AgentExecutionThread>({ id: 'thread-1', sessionNumber: 8 });
const scopedRepository = makeScopedRepository(saved);
const trx = { getRepository: jest.fn().mockReturnValue(scopedRepository) };
entityManager.transaction.mockImplementationOnce(async (_isolation, callback) => {
return await callback(trx as never);
});
const result = await repository.findOrCreate(
'thread-1',
'agent-1',
'Support agent',
'project-1',
);
expect(entityManager.transaction).toHaveBeenCalledWith('SERIALIZABLE', expect.any(Function));
expect(trx.getRepository).toHaveBeenCalledWith(AgentExecutionThread);
expect(scopedRepository.create).toHaveBeenCalledWith({
id: 'thread-1',
agentId: 'agent-1',
agentName: 'Support agent',
projectId: 'project-1',
sessionNumber: 8,
});
expect(result).toEqual({ thread: saved, created: true });
});
it('retries transient serialization failures before assigning a session number', async () => {
const saved = mock<AgentExecutionThread>({ id: 'thread-1', sessionNumber: 8 });
const scopedRepository = makeScopedRepository(saved);
const trx = { getRepository: jest.fn().mockReturnValue(scopedRepository) };
const serializationError = Object.assign(new Error('serialization failure'), {
driverError: { code: '40001' },
});
entityManager.transaction
.mockRejectedValueOnce(serializationError)
.mockImplementationOnce(async (_isolation, callback) => {
return await callback(trx as never);
});
const result = await repository.findOrCreate(
'thread-1',
'agent-1',
'Support agent',
'project-1',
);
expect(entityManager.transaction).toHaveBeenCalledTimes(2);
expect(result).toEqual({ thread: saved, created: true });
});
});
});

View File

@ -0,0 +1,55 @@
import { mockLogger } from '@n8n/backend-test-utils';
import { mock } from 'jest-mock-extended';
import { AgentExecutionService } from '../agent-execution.service';
import type { AgentExecutionThread } from '../entities/agent-execution-thread.entity';
import type { N8nMemory } from '../integrations/n8n-memory';
import type { AgentExecutionRepository } from '../repositories/agent-execution.repository';
import type { AgentExecutionThreadRepository } from '../repositories/agent-execution-thread.repository';
describe('AgentExecutionService', () => {
let service: AgentExecutionService;
let agentExecutionRepository: jest.Mocked<AgentExecutionRepository>;
let agentExecutionThreadRepository: jest.Mocked<AgentExecutionThreadRepository>;
let n8nMemory: jest.Mocked<N8nMemory>;
beforeEach(() => {
jest.clearAllMocks();
agentExecutionRepository = mock<AgentExecutionRepository>();
agentExecutionThreadRepository = mock<AgentExecutionThreadRepository>();
n8nMemory = mock<N8nMemory>();
service = new AgentExecutionService(
mockLogger(),
agentExecutionRepository,
agentExecutionThreadRepository,
n8nMemory,
);
});
describe('deleteThread', () => {
it('cleans SDK memory before deleting the execution thread', async () => {
agentExecutionThreadRepository.findOneBy.mockResolvedValue({
id: 'thread-1',
projectId: 'project-1',
} as AgentExecutionThread);
const result = await service.deleteThread('project-1', 'thread-1');
expect(result).toBe(true);
expect(n8nMemory.deleteThread).toHaveBeenCalledWith('thread-1');
expect(agentExecutionThreadRepository.delete).toHaveBeenCalledWith({ id: 'thread-1' });
});
it('does not clean SDK memory when the execution thread is not found', async () => {
agentExecutionThreadRepository.findOneBy.mockResolvedValue(null);
const result = await service.deleteThread('project-1', 'thread-1');
expect(result).toBe(false);
expect(n8nMemory.deleteThread).not.toHaveBeenCalled();
expect(agentExecutionThreadRepository.delete).not.toHaveBeenCalled();
});
});
});

View File

@ -178,6 +178,7 @@ export class AgentExecutionService {
});
if (!thread) return false;
await this.n8nMemory.deleteThread(threadId);
await this.agentExecutionThreadRepository.delete({ id: threadId });
return true;
}

View File

@ -100,6 +100,11 @@ export class AgentsModule implements ModuleInterface {
const { AgentExecutionThread } = await import('./entities/agent-execution-thread.entity');
const { AgentExecution } = await import('./entities/agent-execution.entity');
const { AgentPublishedVersion } = await import('./entities/agent-published-version.entity');
const { AgentObservationEntity } = await import('./entities/agent-observation.entity');
const { AgentObservationCursorEntity } = await import(
'./entities/agent-observation-cursor.entity'
);
const { AgentObservationLockEntity } = await import('./entities/agent-observation-lock.entity');
return [
Agent,
@ -110,6 +115,9 @@ export class AgentsModule implements ModuleInterface {
AgentExecutionThread,
AgentExecution,
AgentPublishedVersion,
AgentObservationEntity,
AgentObservationCursorEntity,
AgentObservationLockEntity,
];
}

View File

@ -0,0 +1,19 @@
import { DateTimeColumn, WithTimestamps } from '@n8n/db';
import { Column, Entity, PrimaryColumn } from '@n8n/typeorm';
import type { ObservationScopeKind } from './agent-observation.entity';
@Entity({ name: 'agents_observation_cursors' })
export class AgentObservationCursorEntity extends WithTimestamps {
@PrimaryColumn({ type: 'varchar', length: 20 })
scopeKind: ObservationScopeKind;
@PrimaryColumn({ type: 'varchar', length: 255 })
scopeId: string;
@Column({ type: 'varchar', length: 36 })
lastObservedMessageId: string;
@DateTimeColumn()
lastObservedAt: Date;
}

View File

@ -0,0 +1,19 @@
import { DateTimeColumn, WithTimestamps } from '@n8n/db';
import { Column, Entity, PrimaryColumn } from '@n8n/typeorm';
import type { ObservationScopeKind } from './agent-observation.entity';
@Entity({ name: 'agents_observation_locks' })
export class AgentObservationLockEntity extends WithTimestamps {
@PrimaryColumn({ type: 'varchar', length: 20 })
scopeKind: ObservationScopeKind;
@PrimaryColumn({ type: 'varchar', length: 255 })
scopeId: string;
@Column({ type: 'varchar', length: 64 })
holderId: string;
@DateTimeColumn()
heldUntil: Date;
}

View File

@ -0,0 +1,27 @@
import { JsonColumn, WithTimestampsAndStringId } from '@n8n/db';
import { Column, Entity, Index } from '@n8n/typeorm';
export type ObservationScopeKind = 'thread' | 'resource' | 'agent';
@Entity({ name: 'agents_observations' })
@Index(['scopeKind', 'scopeId', 'kind', 'createdAt'])
@Index(['scopeKind', 'scopeId', 'createdAt', 'id'])
export class AgentObservationEntity extends WithTimestampsAndStringId {
@Column({ type: 'varchar', length: 20 })
scopeKind: ObservationScopeKind;
@Column({ type: 'varchar', length: 255 })
scopeId: string;
@Column({ type: 'varchar', length: 64 })
kind: string;
@JsonColumn()
payload: unknown;
@Column({ type: 'bigint', nullable: true })
durationMs: number | null;
@Column({ type: 'int' })
schemaVersion: number;
}

View File

@ -1,9 +1,16 @@
import { LessThan } from '@n8n/typeorm';
import { OBSERVATION_SCHEMA_VERSION, type NewObservation } from '@n8n/agents';
import { Equal, In, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import type { AgentMessageEntity } from '../../entities/agent-message.entity';
import type { AgentThreadEntity } from '../../entities/agent-thread.entity';
import { AgentObservationCursorEntity } from '../../entities/agent-observation-cursor.entity';
import { AgentObservationLockEntity } from '../../entities/agent-observation-lock.entity';
import { AgentObservationEntity } from '../../entities/agent-observation.entity';
import { AgentThreadEntity } from '../../entities/agent-thread.entity';
import type { AgentMessageRepository } from '../../repositories/agent-message.repository';
import type { AgentObservationCursorRepository } from '../../repositories/agent-observation-cursor.repository';
import type { AgentObservationLockRepository } from '../../repositories/agent-observation-lock.repository';
import type { AgentObservationRepository } from '../../repositories/agent-observation.repository';
import type { AgentResourceRepository } from '../../repositories/agent-resource.repository';
import type { AgentThreadRepository } from '../../repositories/agent-thread.repository';
import { N8nMemory } from '../n8n-memory';
@ -13,6 +20,11 @@ describe('N8nMemory', () => {
let messageRepository: jest.Mocked<AgentMessageRepository>;
let threadRepository: jest.Mocked<AgentThreadRepository>;
let resourceRepository: jest.Mocked<AgentResourceRepository>;
let observationRepository: jest.Mocked<AgentObservationRepository>;
let observationCursorRepository: jest.Mocked<AgentObservationCursorRepository>;
let observationLockRepository: jest.Mocked<AgentObservationLockRepository>;
let runInTransaction: jest.Mock;
let transactionDelete: jest.Mock;
beforeEach(() => {
jest.clearAllMocks();
@ -20,8 +32,27 @@ describe('N8nMemory', () => {
messageRepository = mock<AgentMessageRepository>();
threadRepository = mock<AgentThreadRepository>();
resourceRepository = mock<AgentResourceRepository>();
observationRepository = mock<AgentObservationRepository>();
observationCursorRepository = mock<AgentObservationCursorRepository>();
observationLockRepository = mock<AgentObservationLockRepository>();
transactionDelete = jest.fn().mockResolvedValue({ affected: 1, raw: {} });
runInTransaction = jest.fn(
async (callback: (trx: { delete: typeof transactionDelete }) => Promise<void>) => {
await callback({ delete: transactionDelete });
},
);
Object.defineProperty(threadRepository, 'manager', {
value: { transaction: runInTransaction },
});
memory = new N8nMemory(threadRepository, messageRepository, resourceRepository);
memory = new N8nMemory(
threadRepository,
messageRepository,
resourceRepository,
observationRepository,
observationCursorRepository,
observationLockRepository,
);
});
function makeMessageEntity(id: string, createdAt: Date, text: string): AgentMessageEntity {
@ -137,6 +168,48 @@ describe('N8nMemory', () => {
});
});
describe('getMessagesForScope', () => {
it('queries thread-scoped messages by thread id', async () => {
const createdAt = new Date('2026-01-01T00:00:02.000Z');
messageRepository.find.mockResolvedValue([makeMessageEntity('m2', createdAt, 'middle')]);
const result = await memory.getMessagesForScope('thread', 'thread-1');
expect(messageRepository.find).toHaveBeenCalledWith(
expect.objectContaining({
where: [{ threadId: 'thread-1' }],
order: { createdAt: 'ASC', id: 'ASC' },
}),
);
expect(result.map((m) => m.id)).toEqual(['m2']);
});
it('applies the cursor keyset for thread scopes', async () => {
const sinceCreatedAt = new Date('2026-01-01T00:00:01.000Z');
messageRepository.find.mockResolvedValue([]);
await memory.getMessagesForScope('thread', 'thread-1', {
since: { sinceCreatedAt, sinceMessageId: 'm1' },
});
expect(messageRepository.find).toHaveBeenCalledWith(
expect.objectContaining({
where: [
{ threadId: 'thread-1', createdAt: MoreThan(sinceCreatedAt) },
{ threadId: 'thread-1', createdAt: Equal(sinceCreatedAt), id: MoreThan('m1') },
],
}),
);
});
it('rejects non-thread scopes in v1', async () => {
await expect(memory.getMessagesForScope('resource', 'agent-1:user-1')).rejects.toThrow(
/not supported/,
);
expect(messageRepository.find).not.toHaveBeenCalled();
});
});
describe('saveThread — existing row', () => {
it('preserves the original resourceId instead of overwriting with the callers', async () => {
// Shared threads (e.g. the test-chat thread keyed by agentId) are
@ -217,6 +290,40 @@ describe('N8nMemory', () => {
});
});
describe('deleteThread', () => {
it('deletes thread-scoped observation state and the thread row in one transaction', async () => {
await memory.deleteThread('thread-1');
const scope = { scopeKind: 'thread' as const, scopeId: 'thread-1' };
expect(runInTransaction).toHaveBeenCalledWith(expect.any(Function));
expect(transactionDelete).toHaveBeenNthCalledWith(1, AgentObservationEntity, scope);
expect(transactionDelete).toHaveBeenNthCalledWith(2, AgentObservationCursorEntity, scope);
expect(transactionDelete).toHaveBeenNthCalledWith(3, AgentObservationLockEntity, scope);
expect(transactionDelete).toHaveBeenNthCalledWith(4, AgentThreadEntity, { id: 'thread-1' });
expect(observationRepository.delete).not.toHaveBeenCalled();
expect(observationCursorRepository.delete).not.toHaveBeenCalled();
expect(observationLockRepository.delete).not.toHaveBeenCalled();
expect(threadRepository.delete).not.toHaveBeenCalled();
});
it('deletes thread-scoped observation state by thread id prefix in one transaction', async () => {
await memory.deleteThreadsByPrefix('test-agent-1');
const scope = { scopeKind: 'thread' as const, scopeId: Like('test-agent-1%') };
expect(runInTransaction).toHaveBeenCalledWith(expect.any(Function));
expect(transactionDelete).toHaveBeenNthCalledWith(1, AgentObservationEntity, scope);
expect(transactionDelete).toHaveBeenNthCalledWith(2, AgentObservationCursorEntity, scope);
expect(transactionDelete).toHaveBeenNthCalledWith(3, AgentObservationLockEntity, scope);
expect(transactionDelete).toHaveBeenNthCalledWith(4, AgentThreadEntity, {
id: Like('test-agent-1%'),
});
expect(observationRepository.delete).not.toHaveBeenCalled();
expect(observationCursorRepository.delete).not.toHaveBeenCalled();
expect(observationLockRepository.delete).not.toHaveBeenCalled();
expect(threadRepository.delete).not.toHaveBeenCalled();
});
});
describe('working memory — thread scope', () => {
it('stores thread-scoped working memory on thread metadata', async () => {
const existing = {
@ -305,4 +412,305 @@ describe('N8nMemory', () => {
).resolves.toBe('bob notes');
});
});
// ── Observational memory ─────────────────────────────────────────────
function makeNewObs(overrides: Partial<NewObservation> = {}): NewObservation {
return {
scopeKind: 'resource',
scopeId: 't-1',
kind: 'observation',
payload: { text: 'hello' },
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: new Date('2026-05-05T00:00:00Z'),
...overrides,
};
}
describe('appendObservations', () => {
beforeEach(() => {
observationRepository.create.mockImplementation(
(input) => ({ ...input }) as AgentObservationEntity,
);
});
it('returns [] for an empty input without touching the repo', async () => {
const result = await memory.appendObservations([]);
expect(result).toEqual([]);
expect(observationRepository.findOne).not.toHaveBeenCalled();
expect(observationRepository.save).not.toHaveBeenCalled();
});
it('persists rows without allocating a sequence number', async () => {
(observationRepository.save as unknown as jest.Mock).mockImplementation(
async (input: AgentObservationEntity | AgentObservationEntity[]) =>
(Array.isArray(input) ? input : [input]).map((e, i) => ({
...e,
id: `obs-${i + 1}`,
})),
);
const result = await memory.appendObservations([makeNewObs(), makeNewObs()]);
expect(observationRepository.findOne).not.toHaveBeenCalled();
expect(result.map((r) => r.id)).toEqual(['obs-1', 'obs-2']);
});
});
describe('getObservations', () => {
beforeEach(() => {
observationRepository.find.mockResolvedValue([]);
});
it('passes filters through to find()', async () => {
const sinceCreatedAt = new Date('2026-05-05T00:00:00Z');
await memory.getObservations({
scopeKind: 'resource',
scopeId: 't-1',
since: { sinceCreatedAt, sinceObservationId: 'obs-anchor' },
kindIs: 'summary',
schemaVersionAtMost: 1,
limit: 10,
});
expect(observationRepository.find).toHaveBeenCalledWith({
where: [
{
scopeKind: 'resource',
scopeId: 't-1',
kind: 'summary',
schemaVersion: LessThanOrEqual(1),
createdAt: MoreThan(sinceCreatedAt),
},
{
scopeKind: 'resource',
scopeId: 't-1',
kind: 'summary',
schemaVersion: LessThanOrEqual(1),
createdAt: Equal(sinceCreatedAt),
id: MoreThan('obs-anchor'),
},
],
order: { createdAt: 'ASC', id: 'ASC' },
take: 10,
});
});
it('omits absent filters', async () => {
await memory.getObservations({ scopeKind: 'resource', scopeId: 't-1' });
expect(observationRepository.find).toHaveBeenCalledWith({
where: [{ scopeKind: 'resource', scopeId: 't-1' }],
order: { createdAt: 'ASC', id: 'ASC' },
});
});
it('coerces bigint columns back to numbers on read', async () => {
observationRepository.find.mockResolvedValue([
{
id: 'obs-1',
scopeKind: 'resource',
scopeId: 't-1',
kind: 'observation',
payload: { text: 'hi' },
durationMs: '1000' as unknown as number | null,
schemaVersion: '1' as unknown as number,
createdAt: new Date('2026-05-05T00:00:00Z'),
updatedAt: new Date('2026-05-05T00:00:00Z'),
} as AgentObservationEntity,
]);
const [row] = await memory.getObservations({ scopeKind: 'resource', scopeId: 't-1' });
expect(row.durationMs).toBe(1000);
expect(row.schemaVersion).toBe(1);
});
});
describe('deleteObservations', () => {
it('issues a single delete with the given ids', async () => {
await memory.deleteObservations(['a', 'b']);
expect(observationRepository.delete).toHaveBeenCalledWith({ id: In(['a', 'b']) });
});
it('no-ops on empty input', async () => {
await memory.deleteObservations([]);
expect(observationRepository.delete).not.toHaveBeenCalled();
});
});
describe('cursors', () => {
it('returns null when no cursor row exists', async () => {
observationCursorRepository.findOneBy.mockResolvedValue(null);
expect(await memory.getCursor('thread', 't-1')).toBeNull();
});
it('reads lastObservedAt and lastObservedMessageId', async () => {
const lastObservedAt = new Date('2026-05-05T00:00:00.250Z');
observationCursorRepository.findOneBy.mockResolvedValue({
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: 'm-7',
lastObservedAt,
createdAt: new Date(),
updatedAt: new Date('2026-05-05T00:00:00Z'),
} as AgentObservationCursorEntity);
const cursor = await memory.getCursor('thread', 't-1');
expect(cursor?.lastObservedAt.getTime()).toBe(lastObservedAt.getTime());
expect(cursor?.lastObservedMessageId).toBe('m-7');
});
it('upserts on setCursor with cursor-advance fields keyed by (scopeKind, scopeId)', async () => {
const lastObservedAt = new Date('2026-05-05T00:00:00.500Z');
await memory.setCursor({
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: 'm-9',
lastObservedAt,
updatedAt: new Date('2026-05-05T00:00:00Z'),
});
expect(observationCursorRepository.upsert).toHaveBeenCalledWith(
expect.objectContaining({
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: 'm-9',
lastObservedAt,
}),
expect.objectContaining({ conflictPaths: ['scopeKind', 'scopeId'] }),
);
const call = observationCursorRepository.upsert.mock.calls[0][0] as Record<string, unknown>;
expect(call).not.toHaveProperty('summary');
expect(call).not.toHaveProperty('summaryUpdatedAt');
});
});
describe('locks', () => {
const mockLockWrite = ({
updateAffected,
claimed,
}: {
updateAffected: number;
claimed?: AgentObservationLockEntity | null;
}) => {
const updateQueryBuilder = {
update: jest.fn().mockReturnThis(),
set: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
andWhere: jest.fn().mockReturnThis(),
setParameters: jest.fn().mockReturnThis(),
execute: jest.fn().mockResolvedValue({ affected: updateAffected }),
};
const insertQueryBuilder = {
insert: jest.fn().mockReturnThis(),
into: jest.fn().mockReturnThis(),
values: jest.fn().mockReturnThis(),
orIgnore: jest.fn().mockReturnThis(),
execute: jest.fn().mockResolvedValue({ raw: {}, generatedMaps: [], identifiers: [] }),
};
observationLockRepository.createQueryBuilder
.mockReturnValueOnce(updateQueryBuilder as never)
.mockReturnValueOnce(insertQueryBuilder as never);
observationLockRepository.findOneBy.mockResolvedValue(claimed ?? null);
return { updateQueryBuilder, insertQueryBuilder };
};
beforeEach(() => {
observationLockRepository.create.mockImplementation(
(input) => ({ ...input }) as AgentObservationLockEntity,
);
observationLockRepository.save.mockImplementation(
async (input) => input as AgentObservationLockEntity,
);
});
it('grants the lock when the row is missing', async () => {
const { insertQueryBuilder } = mockLockWrite({
updateAffected: 0,
claimed: {
scopeKind: 'thread',
scopeId: 't-1',
holderId: 'A',
heldUntil: new Date(Date.now() + 60_000),
} as AgentObservationLockEntity,
});
const handle = await memory.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'A',
});
expect(handle).not.toBeNull();
expect(handle?.holderId).toBe('A');
expect(insertQueryBuilder.orIgnore).toHaveBeenCalled();
expect(observationLockRepository.save).not.toHaveBeenCalled();
});
it('attempts a conditional write before reading the lock row', async () => {
mockLockWrite({ updateAffected: 1 });
const handle = await memory.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'A',
});
expect(handle).not.toBeNull();
expect(observationLockRepository.findOneBy).not.toHaveBeenCalled();
});
it('refuses a different holder while the lock is live', async () => {
mockLockWrite({ updateAffected: 0 });
const handle = await memory.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'B',
});
expect(handle).toBeNull();
expect(observationLockRepository.save).not.toHaveBeenCalled();
});
it('reclaims the lock for a new holder once the prior one has expired', async () => {
const { updateQueryBuilder } = mockLockWrite({ updateAffected: 1 });
const handle = await memory.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'B',
});
expect(handle).not.toBeNull();
expect(handle?.holderId).toBe('B');
expect(updateQueryBuilder.andWhere).toHaveBeenCalledWith(
'("holderId" = :holderId OR "heldUntil" <= :now)',
);
expect(observationLockRepository.save).not.toHaveBeenCalled();
});
it('lets the same holder refresh the TTL while still held', async () => {
mockLockWrite({ updateAffected: 1 });
const handle = await memory.acquireObservationLock('thread', 't-1', {
ttlMs: 60_000,
holderId: 'A',
});
expect(handle).not.toBeNull();
expect(observationLockRepository.save).not.toHaveBeenCalled();
});
it('release deletes only the matching holder', async () => {
await memory.releaseObservationLock({
scopeKind: 'resource',
scopeId: 't-1',
holderId: 'A',
heldUntil: new Date(),
});
expect(observationLockRepository.delete).toHaveBeenCalledWith({
scopeKind: 'resource',
scopeId: 't-1',
holderId: 'A',
});
});
});
});

View File

@ -2,17 +2,30 @@ import type {
AgentDbMessage,
AgentMessage,
BuiltMemory,
BuiltObservationStore,
MemoryDescriptor,
NewObservation,
Observation,
ObservationCursor,
ObservationLockHandle,
ScopeKind,
Thread,
} from '@n8n/agents';
import { Service } from '@n8n/di';
import type { FindOptionsWhere } from '@n8n/typeorm';
import { LessThan, Like } from '@n8n/typeorm';
import { Equal, In, LessThan, LessThanOrEqual, Like, MoreThan } from '@n8n/typeorm';
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
import { UnexpectedError } from 'n8n-workflow';
import type { AgentMessageEntity } from '../entities/agent-message.entity';
import { AgentObservationCursorEntity } from '../entities/agent-observation-cursor.entity';
import { AgentObservationLockEntity } from '../entities/agent-observation-lock.entity';
import { AgentObservationEntity } from '../entities/agent-observation.entity';
import { AgentThreadEntity } from '../entities/agent-thread.entity';
import { AgentMessageRepository } from '../repositories/agent-message.repository';
import { AgentObservationCursorRepository } from '../repositories/agent-observation-cursor.repository';
import { AgentObservationLockRepository } from '../repositories/agent-observation-lock.repository';
import { AgentObservationRepository } from '../repositories/agent-observation.repository';
import { AgentResourceRepository } from '../repositories/agent-resource.repository';
import { AgentThreadRepository } from '../repositories/agent-thread.repository';
@ -20,11 +33,14 @@ import { AgentThreadRepository } from '../repositories/agent-thread.repository';
const WORKING_MEMORY_KEY = 'workingMemory';
@Service()
export class N8nMemory implements BuiltMemory {
export class N8nMemory implements BuiltMemory, BuiltObservationStore {
constructor(
private readonly threadRepository: AgentThreadRepository,
private readonly messageRepository: AgentMessageRepository,
private readonly resourceRepository: AgentResourceRepository,
private readonly observationRepository: AgentObservationRepository,
private readonly observationCursorRepository: AgentObservationCursorRepository,
private readonly observationLockRepository: AgentObservationLockRepository,
) {}
// ── Thread management ────────────────────────────────────────────────
@ -73,11 +89,24 @@ export class N8nMemory implements BuiltMemory {
}
async deleteThread(threadId: string): Promise<void> {
await this.threadRepository.delete({ id: threadId });
await this.threadRepository.manager.transaction(async (trx) => {
const scope = { scopeKind: 'thread' as const, scopeId: threadId };
await trx.delete(AgentObservationEntity, scope);
await trx.delete(AgentObservationCursorEntity, scope);
await trx.delete(AgentObservationLockEntity, scope);
await trx.delete(AgentThreadEntity, { id: threadId });
});
}
async deleteThreadsByPrefix(threadIdPrefix: string): Promise<void> {
await this.threadRepository.delete({ id: Like(`${threadIdPrefix}%`) });
const scopeId = Like(`${threadIdPrefix}%`);
await this.threadRepository.manager.transaction(async (trx) => {
const scope = { scopeKind: 'thread' as const, scopeId };
await trx.delete(AgentObservationEntity, scope);
await trx.delete(AgentObservationCursorEntity, scope);
await trx.delete(AgentObservationLockEntity, scope);
await trx.delete(AgentThreadEntity, { id: scopeId });
});
}
// ── Message persistence ──────────────────────────────────────────────
@ -184,6 +213,177 @@ export class N8nMemory implements BuiltMemory {
}
}
// ── Observational memory: data ───────────────────────────────────────
async appendObservations(rows: NewObservation[]): Promise<Observation[]> {
if (rows.length === 0) return [];
const entities: AgentObservationEntity[] = rows.map((row) =>
this.observationRepository.create({
scopeKind: row.scopeKind,
scopeId: row.scopeId,
kind: row.kind,
payload: row.payload,
durationMs: row.durationMs,
schemaVersion: row.schemaVersion,
createdAt: row.createdAt,
}),
);
const saved = await this.observationRepository.save(entities);
return saved.map((e) => this.toObservation(e));
}
async getObservations(opts: {
scopeKind: ScopeKind;
scopeId: string;
since?: { sinceCreatedAt: Date; sinceObservationId: string };
kindIs?: string;
limit?: number;
schemaVersionAtMost?: number;
}): Promise<Observation[]> {
const baseWhere: FindOptionsWhere<AgentObservationEntity> = {
scopeKind: opts.scopeKind,
scopeId: opts.scopeId,
...(opts.kindIs !== undefined && { kind: opts.kindIs }),
...(opts.schemaVersionAtMost !== undefined && {
schemaVersion: LessThanOrEqual(opts.schemaVersionAtMost),
}),
};
const where: FindOptionsWhere<AgentObservationEntity>[] = opts.since
? [
{ ...baseWhere, createdAt: MoreThan(opts.since.sinceCreatedAt) },
{
...baseWhere,
createdAt: Equal(opts.since.sinceCreatedAt),
id: MoreThan(opts.since.sinceObservationId),
},
]
: [baseWhere];
const entities = await this.observationRepository.find({
where,
order: { createdAt: 'ASC', id: 'ASC' },
...(opts.limit !== undefined && { take: opts.limit }),
});
return entities.map((e) => this.toObservation(e));
}
async getMessagesForScope(
scopeKind: ScopeKind,
scopeId: string,
opts?: { since?: { sinceCreatedAt: Date; sinceMessageId: string } },
): Promise<AgentDbMessage[]> {
if (scopeKind !== 'thread') {
throw new UnexpectedError(
`getMessagesForScope: scopeKind='${scopeKind}' is not supported in observational memory v1`,
);
}
const baseWhere: FindOptionsWhere<AgentMessageEntity> = { threadId: scopeId };
const where: FindOptionsWhere<AgentMessageEntity>[] = opts?.since
? [
{ ...baseWhere, createdAt: MoreThan(opts.since.sinceCreatedAt) },
{
...baseWhere,
createdAt: Equal(opts.since.sinceCreatedAt),
id: MoreThan(opts.since.sinceMessageId),
},
]
: [baseWhere];
const entities = await this.messageRepository.find({
where,
order: { createdAt: 'ASC', id: 'ASC' },
});
return entities.map((e) => {
const msg = e.content as AgentMessage & { id?: string };
msg.id = e.id;
return msg as AgentDbMessage;
});
}
async deleteObservations(ids: string[]): Promise<void> {
if (ids.length === 0) return;
await this.observationRepository.delete({ id: In(ids) });
}
// ── Observational memory: cursors ────────────────────────────────────
async getCursor(scopeKind: ScopeKind, scopeId: string): Promise<ObservationCursor | null> {
const entity = await this.observationCursorRepository.findOneBy({ scopeKind, scopeId });
if (!entity) return null;
return {
scopeKind: entity.scopeKind,
scopeId: entity.scopeId,
lastObservedMessageId: entity.lastObservedMessageId,
lastObservedAt: entity.lastObservedAt,
updatedAt: entity.updatedAt,
};
}
async setCursor(cursor: ObservationCursor): Promise<void> {
await this.observationCursorRepository.upsert(
{
scopeKind: cursor.scopeKind,
scopeId: cursor.scopeId,
lastObservedMessageId: cursor.lastObservedMessageId,
lastObservedAt: cursor.lastObservedAt,
updatedAt: cursor.updatedAt,
},
{ conflictPaths: ['scopeKind', 'scopeId'], skipUpdateIfNoValuesChanged: false },
);
}
// ── Observational memory: locks ──────────────────────────────────────
async acquireObservationLock(
scopeKind: ScopeKind,
scopeId: string,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLockHandle | null> {
const now = new Date();
const heldUntil = new Date(now.getTime() + opts.ttlMs);
const updateResult = await this.observationLockRepository
.createQueryBuilder()
.update(AgentObservationLockEntity)
.set({ holderId: opts.holderId, heldUntil })
.where('"scopeKind" = :scopeKind')
.andWhere('"scopeId" = :scopeId')
.andWhere('("holderId" = :holderId OR "heldUntil" <= :now)')
.setParameters({ scopeKind, scopeId, holderId: opts.holderId, now })
.execute();
if ((updateResult.affected ?? 0) > 0) {
return { scopeKind, scopeId, holderId: opts.holderId, heldUntil };
}
await this.observationLockRepository
.createQueryBuilder()
.insert()
.into(AgentObservationLockEntity)
.values({ scopeKind, scopeId, holderId: opts.holderId, heldUntil })
.orIgnore()
.execute();
const claimed = await this.observationLockRepository.findOneBy({
scopeKind,
scopeId,
holderId: opts.holderId,
});
if (!claimed) return null;
return { scopeKind, scopeId, holderId: opts.holderId, heldUntil };
}
async releaseObservationLock(handle: ObservationLockHandle): Promise<void> {
await this.observationLockRepository.delete({
scopeKind: handle.scopeKind,
scopeId: handle.scopeId,
holderId: handle.holderId,
});
}
// ── Descriptor ───────────────────────────────────────────────────────
describe(): MemoryDescriptor {
@ -192,6 +392,19 @@ export class N8nMemory implements BuiltMemory {
// ── Helpers ──────────────────────────────────────────────────────────
private toObservation(entity: AgentObservationEntity): Observation {
return {
id: entity.id,
scopeKind: entity.scopeKind,
scopeId: entity.scopeId,
kind: entity.kind,
payload: entity.payload as Observation['payload'],
durationMs: entity.durationMs === null ? null : Number(entity.durationMs),
schemaVersion: Number(entity.schemaVersion),
createdAt: entity.createdAt,
};
}
private toThread(entity: AgentThreadEntity): Thread {
let metadata: Record<string, unknown> | undefined;
if (entity.metadata) {

View File

@ -3,6 +3,8 @@ import { DataSource, LessThan, Repository } from '@n8n/typeorm';
import { AgentExecutionThread } from '../entities/agent-execution-thread.entity';
const SESSION_NUMBER_RETRY_ATTEMPTS = 3;
export interface AgentExecutionThreadPage {
threads: AgentExecutionThread[];
nextCursor: string | null;
@ -24,21 +26,53 @@ export class AgentExecutionThreadRepository extends Repository<AgentExecutionThr
agentName: string,
projectId: string,
): Promise<{ thread: AgentExecutionThread; created: boolean }> {
const existing = await this.findOneBy({ id: threadId });
if (existing) {
return { thread: existing, created: false };
for (let attempt = 0; ; attempt++) {
try {
return await this.findOrCreateInSerializableTransaction(
threadId,
agentId,
agentName,
projectId,
);
} catch (error) {
if (attempt >= SESSION_NUMBER_RETRY_ATTEMPTS - 1 || !isRetriableWriteError(error)) {
throw error;
}
}
}
}
const maxResult = await this.createQueryBuilder('t')
.select('MAX(t.sessionNumber)', 'max')
.where('t.projectId = :projectId', { projectId })
.getRawOne<{ max: number | null }>();
private async findOrCreateInSerializableTransaction(
threadId: string,
agentId: string,
agentName: string,
projectId: string,
): Promise<{ thread: AgentExecutionThread; created: boolean }> {
return await this.manager.transaction('SERIALIZABLE', async (entityManager) => {
const repository = entityManager.getRepository(AgentExecutionThread);
const existing = await repository.findOneBy({ id: threadId });
if (existing) {
return { thread: existing, created: false };
}
const sessionNumber = (maxResult?.max ?? 0) + 1;
const maxResult = await repository
.createQueryBuilder('t')
.select('MAX(t.sessionNumber)', 'max')
.where('t.projectId = :projectId', { projectId })
.getRawOne<{ max: number | null }>();
const thread = this.create({ id: threadId, agentId, agentName, projectId, sessionNumber });
const saved = await this.save(thread);
return { thread: saved, created: true };
const sessionNumber = (maxResult?.max ?? 0) + 1;
const thread = repository.create({
id: threadId,
agentId,
agentName,
projectId,
sessionNumber,
});
const saved = await repository.save(thread);
return { thread: saved, created: true };
});
}
/**
@ -113,3 +147,17 @@ export class AgentExecutionThreadRepository extends Repository<AgentExecutionThr
return (result.affected ?? 0) > 0;
}
}
function isRetriableWriteError(error: unknown): boolean {
if (!(error instanceof Error) || !('driverError' in error)) return false;
const { driverError } = error;
if (typeof driverError !== 'object' || driverError === null || !('code' in driverError)) {
return false;
}
const { code } = driverError;
return (
typeof code === 'string' &&
(code === '40001' || code === '40P01' || code === 'SQLITE_BUSY' || code === 'SQLITE_LOCKED')
);
}

View File

@ -0,0 +1,11 @@
import { Service } from '@n8n/di';
import { DataSource, Repository } from '@n8n/typeorm';
import { AgentObservationCursorEntity } from '../entities/agent-observation-cursor.entity';
@Service()
export class AgentObservationCursorRepository extends Repository<AgentObservationCursorEntity> {
constructor(dataSource: DataSource) {
super(AgentObservationCursorEntity, dataSource.manager);
}
}

View File

@ -0,0 +1,11 @@
import { Service } from '@n8n/di';
import { DataSource, Repository } from '@n8n/typeorm';
import { AgentObservationLockEntity } from '../entities/agent-observation-lock.entity';
@Service()
export class AgentObservationLockRepository extends Repository<AgentObservationLockEntity> {
constructor(dataSource: DataSource) {
super(AgentObservationLockEntity, dataSource.manager);
}
}

View File

@ -0,0 +1,11 @@
import { Service } from '@n8n/di';
import { DataSource, Repository } from '@n8n/typeorm';
import { AgentObservationEntity } from '../entities/agent-observation.entity';
@Service()
export class AgentObservationRepository extends Repository<AgentObservationEntity> {
constructor(dataSource: DataSource) {
super(AgentObservationEntity, dataSource.manager);
}
}