feat(core): Add observational memory runtime, builder, and read path (#29815)

This commit is contained in:
bjorger 2026-05-12 11:55:52 +02:00 committed by GitHub
parent d06bbe4f32
commit 744bb92c2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 2235 additions and 601 deletions

View File

@ -167,6 +167,7 @@ async function runObservationCycleForTest({
resourceId,
now,
trigger: { type: 'per-turn' },
gap: null,
telemetry: undefined,
});
const persistedRows = await store.appendObservations(observedRows);

View File

@ -1,207 +0,0 @@
import { z } from 'zod';
import {
buildWorkingMemoryInstruction,
buildWorkingMemoryTool,
templateFromSchema,
UPDATE_WORKING_MEMORY_TOOL_NAME,
WORKING_MEMORY_DEFAULT_INSTRUCTION,
} from '../runtime/working-memory';
describe('buildWorkingMemoryInstruction', () => {
it('mentions the updateWorkingMemory tool name', () => {
const result = buildWorkingMemoryInstruction('# Context\n- Name:', false);
expect(result).toContain(UPDATE_WORKING_MEMORY_TOOL_NAME);
});
it('instructs the model to call the tool only when something changed', () => {
const result = buildWorkingMemoryInstruction('# Context\n- Name:', false);
expect(result).toContain('Only call it when something has actually changed');
});
it('includes the template in the instruction', () => {
const template = '# Context\n- Name:\n- City:';
const result = buildWorkingMemoryInstruction(template, false);
expect(result).toContain(template);
});
it('mentions JSON for structured variant', () => {
const result = buildWorkingMemoryInstruction('{"name": ""}', true);
expect(result).toContain('JSON');
});
describe('custom instruction', () => {
it('replaces the default instruction body when provided', () => {
const custom = 'Always update working memory after every message.';
const result = buildWorkingMemoryInstruction('# Template', false, custom);
expect(result).toContain(custom);
expect(result).not.toContain(WORKING_MEMORY_DEFAULT_INSTRUCTION);
});
it('still includes the ## Working Memory heading', () => {
const result = buildWorkingMemoryInstruction('# Template', false, 'Custom text.');
expect(result).toContain('## Working Memory');
});
it('still includes the template block', () => {
const template = '# Context\n- Name:\n- City:';
const result = buildWorkingMemoryInstruction(template, false, 'Custom text.');
expect(result).toContain(template);
});
it('still includes the format hint for structured memory', () => {
const result = buildWorkingMemoryInstruction('{}', true, 'Custom text.');
expect(result).toContain('JSON');
});
it('still includes the format hint for freeform memory', () => {
const result = buildWorkingMemoryInstruction('# Template', false, 'Custom text.');
expect(result).toContain('Update the template with any new information learned');
});
it('uses the default instruction when undefined is passed explicitly', () => {
const withDefault = buildWorkingMemoryInstruction('# Template', false, undefined);
const withoutArg = buildWorkingMemoryInstruction('# Template', false);
expect(withDefault).toBe(withoutArg);
});
it('WORKING_MEMORY_DEFAULT_INSTRUCTION appears in the output when no custom instruction is set', () => {
const result = buildWorkingMemoryInstruction('# Template', false);
expect(result).toContain(WORKING_MEMORY_DEFAULT_INSTRUCTION);
});
});
});
describe('templateFromSchema', () => {
it('converts Zod schema to JSON template', () => {
const schema = z.object({
userName: z.string().optional().describe("The user's name"),
favoriteColor: z.string().optional().describe('Favorite color'),
});
const result = templateFromSchema(schema);
expect(result).toContain('userName');
expect(result).toContain('favoriteColor');
let parsed: unknown;
try {
parsed = JSON.parse(result);
} catch {
parsed = undefined;
}
expect(parsed).toHaveProperty('userName');
});
});
describe('buildWorkingMemoryTool — freeform', () => {
it('returns a BuiltTool with the correct name', () => {
const tool = buildWorkingMemoryTool({
structured: false,
persist: async () => {},
});
expect(tool.name).toBe(UPDATE_WORKING_MEMORY_TOOL_NAME);
});
it('has a description', () => {
const tool = buildWorkingMemoryTool({
structured: false,
persist: async () => {},
});
expect(tool.description).toBeTruthy();
});
it('has a freeform input schema with a memory field', () => {
const tool = buildWorkingMemoryTool({
structured: false,
persist: async () => {},
});
expect(tool.inputSchema).toBeDefined();
const schema = tool.inputSchema as z.ZodObject<z.ZodRawShape>;
const result = schema.safeParse({ memory: 'hello' });
expect(result.success).toBe(true);
});
it('rejects input without memory field', () => {
const tool = buildWorkingMemoryTool({
structured: false,
persist: async () => {},
});
const schema = tool.inputSchema as z.ZodObject<z.ZodRawShape>;
const result = schema.safeParse({ other: 'value' });
expect(result.success).toBe(false);
});
it('handler calls persist with the memory string', async () => {
const persisted: string[] = [];
const tool = buildWorkingMemoryTool({
structured: false,
// eslint-disable-next-line @typescript-eslint/require-await
persist: async (content) => {
persisted.push(content);
},
});
const result = await tool.handler!({ memory: 'test content' }, {} as never);
expect(persisted).toEqual(['test content']);
expect(result).toMatchObject({ success: true });
});
});
describe('buildWorkingMemoryTool — structured', () => {
const schema = z.object({
userName: z.string().optional().describe("The user's name"),
location: z.string().optional().describe('Where the user lives'),
});
it('uses the Zod schema as input schema', () => {
const tool = buildWorkingMemoryTool({
structured: true,
schema,
persist: async () => {},
});
const inputSchema = tool.inputSchema as typeof schema;
const result = inputSchema.safeParse({ userName: 'Alice', location: 'Berlin' });
expect(result.success).toBe(true);
});
it('handler serializes input to JSON and calls persist', async () => {
const persisted: string[] = [];
const tool = buildWorkingMemoryTool({
structured: true,
schema,
// eslint-disable-next-line @typescript-eslint/require-await
persist: async (content) => {
persisted.push(content);
},
});
const input = { userName: 'Alice', location: 'Berlin' };
await tool.handler!(input, {} as never);
expect(persisted).toHaveLength(1);
let parsed: unknown;
try {
parsed = JSON.parse(persisted[0]) as unknown;
} catch {
parsed = undefined;
}
expect(parsed).toMatchObject(input);
});
it('handler returns success confirmation', async () => {
const tool = buildWorkingMemoryTool({
structured: true,
schema,
persist: async () => {},
});
const result = await tool.handler!({ userName: 'Alice' }, {} as never);
expect(result).toMatchObject({ success: true });
});
it('falls back to freeform when no schema provided despite structured:true', () => {
const tool = buildWorkingMemoryTool({
structured: true,
persist: async () => {},
});
const inputSchema = tool.inputSchema as z.ZodObject<z.ZodRawShape>;
const result = inputSchema.safeParse({ memory: 'fallback text' });
expect(result.success).toBe(true);
});
});

View File

@ -45,16 +45,23 @@ export type {
CompactFn,
NewObservation,
Observation,
ObservationCategory,
ObservationCursor,
ObservationGapContext,
ObservationLockHandle,
ObservationalMemoryConfig,
ObservationalMemoryTrigger,
ObserveFn,
ScopeKind,
} from './types';
export type { ProviderOptions } from '@ai-sdk/provider-utils';
export { AgentEvent } from './types';
export type { AgentEventData, AgentEventHandler } from './types';
export { OBSERVATION_SCHEMA_VERSION } from './types';
export {
DEFAULT_OBSERVATION_GAP_THRESHOLD_MS,
OBSERVATION_CATEGORIES,
OBSERVATION_SCHEMA_VERSION,
} from './types';
export { Tool, wrapToolForApproval } from './sdk/tool';
export { Memory } from './sdk/memory';
@ -109,10 +116,11 @@ export type {
ModelLimits,
} from './sdk/catalog';
export { SqliteMemory, SqliteMemoryConfigSchema } from './storage/sqlite-memory';
export { WORKING_MEMORY_DEFAULT_INSTRUCTION } from './runtime/working-memory';
export {
UPDATE_WORKING_MEMORY_TOOL_NAME,
WORKING_MEMORY_DEFAULT_INSTRUCTION,
} from './runtime/working-memory';
DEFAULT_COMPACTOR_PROMPT,
DEFAULT_OBSERVER_PROMPT,
} from './runtime/observational-cycle';
export type { SqliteMemoryConfig } from './storage/sqlite-memory';
export { PostgresMemory } from './storage/postgres-memory';
export type {

View File

@ -1,15 +1,16 @@
import { z } from 'zod';
import { AgentRuntime } from '../runtime/agent-runtime';
import { AgentEventBus } from '../runtime/event-bus';
import { isLlmMessage } from '../sdk/message';
import { Tool, Tool as ToolBuilder } from '../sdk/tool';
import { AgentEvent } from '../types/runtime/event';
import type { StreamChunk } from '../types/sdk/agent';
import type { BuiltMemory } from '../types/sdk/memory';
import type { ContentToolCall, Message } from '../types/sdk/message';
import type { BuiltTool, InterruptibleToolContext } from '../types/sdk/tool';
import type { BuiltTelemetry } from '../types/telemetry';
import { isLlmMessage } from '../../sdk/message';
import { Tool, Tool as ToolBuilder } from '../../sdk/tool';
import { AgentEvent } from '../../types/runtime/event';
import type { StreamChunk } from '../../types/sdk/agent';
import type { BuiltMemory } from '../../types/sdk/memory';
import type { ContentToolCall, Message } from '../../types/sdk/message';
import type { BuiltTool, InterruptibleToolContext } from '../../types/sdk/tool';
import type { BuiltTelemetry } from '../../types/telemetry';
import { AgentRuntime } from '../agent-runtime';
import { AgentEventBus } from '../event-bus';
import { InMemoryMemory } from '../memory-store';
// ---------------------------------------------------------------------------
// Module mocks
@ -502,9 +503,8 @@ describe('AgentRuntime.stream() — working memory', () => {
};
}
it('persists working memory and streams the tool chunks unfiltered', async () => {
it('does not expose a working-memory write tool to the main agent', async () => {
const savedWorkingMemory: string[] = [];
const memoryContent = '# Thread memory\n- User facts: Alice likes concise answers';
const memory = makeMemory(savedWorkingMemory);
const runtime = new AgentRuntime({
name: 'test',
@ -519,65 +519,17 @@ describe('AgentRuntime.stream() — working memory', () => {
},
});
streamText
.mockReturnValueOnce({
fullStream: makeChunkStream([
{ type: 'tool-input-start', id: 'wm-1', toolName: 'update_working_memory' },
{ type: 'tool-input-delta', id: 'wm-1', delta: memoryContent },
{
type: 'tool-call',
toolCallId: 'wm-1',
toolName: 'update_working_memory',
input: { memory: memoryContent },
},
]),
finishReason: Promise.resolve('tool-calls'),
usage: Promise.resolve({ inputTokens: 10, outputTokens: 5, totalTokens: 15 }),
response: Promise.resolve({
messages: [
{
role: 'assistant',
content: [
{
type: 'tool-call',
toolCallId: 'wm-1',
toolName: 'update_working_memory',
args: { memory: memoryContent },
},
],
},
],
}),
toolCalls: Promise.resolve([
{
toolCallId: 'wm-1',
toolName: 'update_working_memory',
input: { memory: memoryContent },
},
]),
})
.mockReturnValueOnce(makeStreamSuccess('Done'));
streamText.mockReturnValueOnce(makeStreamSuccess('Done'));
const { stream } = await runtime.stream('remember this', {
persistence: { threadId: 'thread-1', resourceId: 'user-1' },
});
const chunks = await collectChunks(stream);
await collectChunks(stream);
expect(savedWorkingMemory).toEqual([memoryContent]);
expect(chunks).toContainEqual(
expect.objectContaining({
type: 'tool-call',
toolCallId: 'wm-1',
toolName: 'update_working_memory',
}),
);
expect(chunks).toContainEqual(
expect.objectContaining({
type: 'tool-result',
toolCallId: 'wm-1',
toolName: 'update_working_memory',
}),
);
const calls = streamText.mock.calls as Array<[Record<string, unknown>]>;
const callArgs = calls[0]?.[0] ?? {};
expect(callArgs.tools ?? {}).not.toHaveProperty('update_working_memory');
expect(savedWorkingMemory).toEqual([]);
});
});
@ -1519,7 +1471,7 @@ describe('providerOptions — tool adapter', () => {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const ai = require('ai') as { tool: jest.Mock };
// eslint-disable-next-line @typescript-eslint/no-require-imports
const adapter = require('../runtime/tool-adapter') as {
const adapter = require('../tool-adapter') as {
toAiSdkTools: (tools: BuiltTool[]) => Record<string, unknown>;
};
@ -1547,7 +1499,7 @@ describe('providerOptions — tool adapter', () => {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const ai = require('ai') as { tool: jest.Mock };
// eslint-disable-next-line @typescript-eslint/no-require-imports
const adapter = require('../runtime/tool-adapter') as {
const adapter = require('../tool-adapter') as {
toAiSdkTools: (tools: BuiltTool[]) => Record<string, unknown>;
};
@ -1572,7 +1524,7 @@ describe('providerOptions — tool adapter', () => {
// eslint-disable-next-line @typescript-eslint/no-require-imports
const ai = require('ai') as { tool: jest.Mock };
// eslint-disable-next-line @typescript-eslint/no-require-imports
const adapter = require('../runtime/tool-adapter') as {
const adapter = require('../tool-adapter') as {
toAiSdkTools: (tools: BuiltTool[]) => Record<string, unknown>;
};
@ -2736,3 +2688,75 @@ describe('AgentRuntime — telemetry propagation', () => {
expect(callArgs.experimental_telemetry).toBeUndefined();
});
});
// ---------------------------------------------------------------------------
// Observational memory — post-turn writer
// ---------------------------------------------------------------------------
describe('AgentRuntime — observational memory writer', () => {
beforeEach(() => {
jest.clearAllMocks();
generateText.mockResolvedValue(makeGenerateSuccess());
});
it('runs the observer after saving the turn and compacts into thread working memory', async () => {
const store = new InMemoryMemory();
const observe = jest.fn().mockResolvedValue([
{
scopeKind: 'thread',
scopeId: 't-obs',
kind: 'observation',
payload: { text: 'User prefers concise answers.' },
durationMs: null,
schemaVersion: 1,
createdAt: new Date(),
},
]);
const compact = jest.fn().mockResolvedValue({
content: '# Thread memory\n- User preferences: concise answers',
});
const runtime = new AgentRuntime({
name: 'obs-writer',
model: 'openai/gpt-4o-mini',
instructions: 'base instructions',
memory: store,
workingMemory: {
template: '# Thread memory\n- User preferences:',
structured: false,
scope: 'thread',
},
observationalMemory: { observe, compact, compactionThreshold: 1, sync: true },
});
await runtime.generate('remember that I like concise answers', {
persistence: { threadId: 't-obs', resourceId: 'u-1' },
});
expect(observe).toHaveBeenCalledTimes(1);
expect(compact).toHaveBeenCalledTimes(1);
expect(
await store.getWorkingMemory({ threadId: 't-obs', resourceId: 'u-1', scope: 'thread' }),
).toBe('# Thread memory\n- User preferences: concise answers');
expect(await store.getObservations({ scopeKind: 'thread', scopeId: 't-obs' })).toEqual([]);
});
it('does not run when observational memory is not configured', async () => {
const store = new InMemoryMemory();
const runtime = new AgentRuntime({
name: 'obs-disabled',
model: 'openai/gpt-4o-mini',
instructions: 'base instructions',
memory: store,
workingMemory: {
template: '# Thread memory',
structured: false,
scope: 'thread',
},
});
await runtime.generate('hi', { persistence: { threadId: 't-none', resourceId: 'u-1' } });
expect(await store.getCursor('thread', 't-none')).toBeNull();
});
});

View File

@ -0,0 +1,71 @@
import { BackgroundTaskTracker } from '../background-task-tracker';
describe('BackgroundTaskTracker', () => {
it('flushes a single in-flight promise', async () => {
const tracker = new BackgroundTaskTracker();
let resolveInner!: () => void;
const inner = new Promise<void>((resolve) => {
resolveInner = resolve;
});
tracker.track(inner);
expect(tracker.pendingCount).toBe(1);
const flush = tracker.flush();
resolveInner();
await flush;
expect(tracker.pendingCount).toBe(0);
});
it('waits for all tracked promises in flush()', async () => {
const tracker = new BackgroundTaskTracker();
const events: string[] = [];
const a = new Promise<void>((resolve) =>
setTimeout(() => {
events.push('a');
resolve();
}, 10),
);
const b = new Promise<void>((resolve) =>
setTimeout(() => {
events.push('b');
resolve();
}, 5),
);
tracker.track(a);
tracker.track(b);
await tracker.flush();
expect(events.sort()).toEqual(['a', 'b']);
});
it('flush() does not throw on rejected tracked promises', async () => {
const tracker = new BackgroundTaskTracker();
const rejected = Promise.reject(new Error('boom'));
// Suppress unhandled-rejection warning by attaching a no-op handler before track.
rejected.catch(() => {});
tracker.track(rejected);
await expect(tracker.flush()).resolves.toBeUndefined();
});
it('flush() is a no-op when nothing is tracked', async () => {
const tracker = new BackgroundTaskTracker();
await expect(tracker.flush()).resolves.toBeUndefined();
});
it('removes promises from pendingCount after they settle', async () => {
const tracker = new BackgroundTaskTracker();
const inner = Promise.resolve();
tracker.track(inner);
await inner;
// One microtask is needed for the .then cleanup to run.
await Promise.resolve();
expect(tracker.pendingCount).toBe(0);
});
it('flush() called twice in a row both resolve', async () => {
const tracker = new BackgroundTaskTracker();
tracker.track(Promise.resolve());
await tracker.flush();
await expect(tracker.flush()).resolves.toBeUndefined();
});
});

View File

@ -1,4 +1,4 @@
import { AgentEventBus } from '../runtime/event-bus';
import { AgentEventBus } from '../event-bus';
describe('AgentEventBus', () => {
describe('resetAbort', () => {

View File

@ -1,5 +1,5 @@
import { InMemoryMemory } from '../runtime/memory-store';
import type { AgentDbMessage, AgentMessage, Message } from '../types/sdk/message';
import type { AgentDbMessage, AgentMessage, Message } from '../../types/sdk/message';
import { InMemoryMemory } from '../memory-store';
function makeMsg(role: 'user' | 'assistant', text: string, createdAt = new Date()): AgentDbMessage {
return {

View File

@ -1,5 +1,5 @@
import { InMemoryMemory } from '../runtime/memory-store';
import type { AgentDbMessage, Message } from '../types/sdk/message';
import type { AgentDbMessage, Message } from '../../types/sdk/message';
import { InMemoryMemory } from '../memory-store';
describe('InMemoryMemory working memory', () => {
it('returns null for unknown key', async () => {

View File

@ -1,9 +1,9 @@
import { InMemoryMemory } from '../runtime/memory-store';
import {
OBSERVATION_SCHEMA_VERSION,
type NewObservation,
type ObservationCursor,
} from '../types/sdk/observation';
} from '../../types/sdk/observation';
import { InMemoryMemory } from '../memory-store';
function makeRow(overrides: Partial<NewObservation> = {}): NewObservation {
return {

View File

@ -1,6 +1,11 @@
import { AgentMessageList } from '../runtime/message-list';
import { isLlmMessage } from '../sdk/message';
import type { AgentDbMessage, AgentMessage, ContentToolCall, Message } from '../types/sdk/message';
import { isLlmMessage } from '../../sdk/message';
import type {
AgentDbMessage,
AgentMessage,
ContentToolCall,
Message,
} from '../../types/sdk/message';
import { AgentMessageList } from '../message-list';
function makeUserMsg(text: string): AgentMessage {
return { role: 'user', content: [{ type: 'text', text }] };

View File

@ -1,6 +1,6 @@
import type { LanguageModel } from 'ai';
import { createModel } from '../runtime/model-factory';
import { createModel } from '../model-factory';
type ProviderOpts = {
apiKey?: string;

View File

@ -0,0 +1,170 @@
import type { AgentDbMessage, AgentMessage, Message } from '../../types/sdk/message';
import { InMemoryMemory } from '../memory-store';
import { advanceCursor, getDeltaSinceCursor } from '../observation-cursor';
function makeMsg(role: 'user' | 'assistant', text: string, createdAt = new Date()): AgentDbMessage {
return {
id: crypto.randomUUID(),
createdAt,
role,
content: [{ type: 'text', text }],
};
}
function textOf(msg: AgentMessage): string {
const m = msg as Message;
return (m.content[0] as { text: string }).text;
}
describe('getDeltaSinceCursor', () => {
it('returns the full thread history when no cursor exists', async () => {
const store = new InMemoryMemory();
const t = Date.now();
await store.saveThread({ id: 't-1', resourceId: 'u-1' });
await store.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'one', new Date(t)), makeMsg('assistant', 'two', new Date(t + 1))],
});
const { messages, cursor } = await getDeltaSinceCursor(store, 'thread', 't-1');
expect(cursor).toBeNull();
expect(messages.map(textOf)).toEqual(['one', 'two']);
});
it('returns only messages strictly after the cursor keyset', async () => {
const store = new InMemoryMemory();
const t = Date.now();
await store.saveThread({ id: 't-1', resourceId: 'u-1' });
await store.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'one', new Date(t)), makeMsg('assistant', 'two', new Date(t + 1))],
});
const [first] = await store.getMessages('t-1');
await store.setCursor({
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: first.id,
lastObservedAt: first.createdAt,
updatedAt: new Date(),
});
await store.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'three', new Date(t + 2))],
});
const { messages, cursor } = await getDeltaSinceCursor(store, 'thread', 't-1');
expect(cursor?.lastObservedMessageId).toBe(first.id);
expect(messages.map(textOf)).toEqual(['two', 'three']);
});
it('returns an empty delta when the cursor is at the latest message', async () => {
const store = new InMemoryMemory();
await store.saveThread({ id: 't-1', resourceId: 'u-1' });
await store.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'one')],
});
const [only] = await store.getMessages('t-1');
await store.setCursor({
scopeKind: 'thread',
scopeId: 't-1',
lastObservedMessageId: only.id,
lastObservedAt: only.createdAt,
updatedAt: new Date(),
});
const { messages } = await getDeltaSinceCursor(store, 'thread', 't-1');
expect(messages).toEqual([]);
});
it('isolates cursors by scope', async () => {
const store = new InMemoryMemory();
const t = Date.now();
await store.saveThread({ id: 't-A', resourceId: 'u-1' });
await store.saveThread({ id: 't-B', resourceId: 'u-1' });
await store.saveMessages({
threadId: 't-A',
resourceId: 'u-1',
messages: [makeMsg('user', 'a-1', new Date(t)), makeMsg('user', 'a-2', new Date(t + 1))],
});
await store.saveMessages({
threadId: 't-B',
resourceId: 'u-1',
messages: [makeMsg('user', 'b-1', new Date(t + 2))],
});
const aMessages = await store.getMessages('t-A');
await store.setCursor({
scopeKind: 'thread',
scopeId: 't-A',
lastObservedMessageId: aMessages[0].id,
lastObservedAt: aMessages[0].createdAt,
updatedAt: new Date(),
});
const aDelta = await getDeltaSinceCursor(store, 'thread', 't-A');
expect(aDelta.messages.map(textOf)).toEqual(['a-2']);
// Thread B has no cursor; should still return its full history.
const bDelta = await getDeltaSinceCursor(store, 'thread', 't-B');
expect(bDelta.cursor).toBeNull();
expect(bDelta.messages.map(textOf)).toEqual(['b-1']);
});
});
describe('advanceCursor', () => {
it('writes a cursor row matching the message id and createdAt', async () => {
const store = new InMemoryMemory();
await store.saveThread({ id: 't-1', resourceId: 'u-1' });
await store.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'one')],
});
const [only] = await store.getMessages('t-1');
const written = await advanceCursor(store, 'thread', 't-1', only);
expect(written.lastObservedMessageId).toBe(only.id);
expect(written.lastObservedAt.getTime()).toBe(only.createdAt.getTime());
const reread = await store.getCursor('thread', 't-1');
expect(reread?.lastObservedMessageId).toBe(only.id);
expect(reread?.lastObservedAt.getTime()).toBe(only.createdAt.getTime());
});
it('uses the provided `now` for updatedAt', async () => {
const store = new InMemoryMemory();
await store.saveThread({ id: 't-1', resourceId: 'u-1' });
await store.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'one')],
});
const [only] = await store.getMessages('t-1');
const now = new Date('2026-05-05T12:00:00Z');
const cursor = await advanceCursor(store, 'thread', 't-1', only, now);
expect(cursor.updatedAt.getTime()).toBe(now.getTime());
});
it('overwrites a prior cursor (advance is upsert, not append)', async () => {
const store = new InMemoryMemory();
const t = Date.now();
await store.saveThread({ id: 't-1', resourceId: 'u-1' });
await store.saveMessages({
threadId: 't-1',
resourceId: 'u-1',
messages: [makeMsg('user', 'one', new Date(t)), makeMsg('user', 'two', new Date(t + 1))],
});
const [first, second] = await store.getMessages('t-1');
await advanceCursor(store, 'thread', 't-1', first);
await advanceCursor(store, 'thread', 't-1', second);
const reread = await store.getCursor('thread', 't-1');
expect(reread?.lastObservedMessageId).toBe(second.id);
});
});

View File

@ -0,0 +1,97 @@
import { InMemoryMemory } from '../memory-store';
import { withObservationLock } from '../observation-lock';
describe('withObservationLock', () => {
it('runs fn and returns its value when the lock is free', async () => {
const store = new InMemoryMemory();
const result = await withObservationLock(
store,
'thread',
't-1',
{ ttlMs: 60_000 },
async () => await Promise.resolve(42),
);
expect(result).toEqual({ status: 'ran', value: 42 });
});
it('skips when another holder is currently holding the lock', async () => {
const store = new InMemoryMemory();
await store.acquireObservationLock('thread', 't-1', { ttlMs: 60_000, holderId: 'external' });
const fn = jest.fn().mockResolvedValue(undefined);
const result = await withObservationLock(store, 'thread', 't-1', { ttlMs: 60_000 }, fn);
expect(result).toEqual({ status: 'skipped' });
expect(fn).not.toHaveBeenCalled();
});
it('releases the lock so a subsequent caller can acquire it', async () => {
const store = new InMemoryMemory();
await withObservationLock(
store,
'thread',
't-1',
{ ttlMs: 60_000 },
async () => await Promise.resolve(),
);
const second = await withObservationLock(
store,
'thread',
't-1',
{ ttlMs: 60_000 },
async () => await Promise.resolve('after'),
);
expect(second).toEqual({ status: 'ran', value: 'after' });
});
it('releases the lock even when fn throws', async () => {
const store = new InMemoryMemory();
const boom = new Error('boom');
await expect(
withObservationLock(store, 'thread', 't-1', { ttlMs: 60_000 }, async () => {
await Promise.resolve();
throw boom;
}),
).rejects.toBe(boom);
// Lock should be released — a fresh acquire by a different holder succeeds.
const followup = await withObservationLock(
store,
'thread',
't-1',
{ ttlMs: 60_000 },
async () => await Promise.resolve('post-throw'),
);
expect(followup).toEqual({ status: 'ran', value: 'post-throw' });
});
it('tolerates the lock having already been released by the time fn returns', async () => {
const store = new InMemoryMemory();
const failing = {
...store,
releaseObservationLock: jest.fn().mockRejectedValue(new Error('already gone')),
} as unknown as InMemoryMemory;
Object.setPrototypeOf(failing, InMemoryMemory.prototype);
const result = await withObservationLock(
failing,
'thread',
't-1',
{ ttlMs: 60_000 },
async () => await Promise.resolve('done'),
);
expect(result).toEqual({ status: 'ran', value: 'done' });
});
it('passes the granted handle to fn', async () => {
const store = new InMemoryMemory();
const result = await withObservationLock(
store,
'thread',
't-1',
{ ttlMs: 60_000, holderId: 'caller-A' },
async (handle) => await Promise.resolve(handle.holderId),
);
expect(result).toEqual({ status: 'ran', value: 'caller-A' });
});
});

View File

@ -0,0 +1,386 @@
import { z } from 'zod';
import { AgentEvent } from '../../types';
import type { AgentDbMessage } from '../../types/sdk/message';
import {
OBSERVATION_SCHEMA_VERSION,
type CompactFn,
type NewObservation,
type ObserveFn,
} from '../../types/sdk/observation';
import { AgentEventBus } from '../event-bus';
import { InMemoryMemory, saveMessagesToThread } from '../memory-store';
import {
DEFAULT_COMPACTOR_PROMPT,
DEFAULT_OBSERVER_PROMPT,
runObservationalCycle,
type RunObservationalCycleOpts,
} from '../observational-cycle';
type GenerateTextCall = { model: unknown; system?: string; prompt?: string };
const mockGenerateText = jest.fn<Promise<{ text: string }>, [GenerateTextCall]>();
jest.mock('ai', () => ({
generateText: async (call: GenerateTextCall): Promise<{ text: string }> =>
await mockGenerateText(call),
}));
function msg(id: string, text: string, createdAt = new Date()): AgentDbMessage {
return { id, createdAt, role: 'user', content: [{ type: 'text', text }] };
}
function row(text: string): NewObservation {
return {
scopeKind: 'thread',
scopeId: 't-1',
kind: 'observation',
payload: { text },
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: new Date(),
};
}
async function save(mem: InMemoryMemory, messages: AgentDbMessage[]) {
await saveMessagesToThread(mem, 't-1', 'u-1', messages);
}
function opts(
mem: InMemoryMemory,
overrides: Partial<RunObservationalCycleOpts> = {},
): RunObservationalCycleOpts {
return {
memory: mem,
threadId: 't-1',
resourceId: 'u-1',
model: { doGenerate: jest.fn() } as never,
workingMemory: { template: '# Thread memory', structured: false },
observe: async () => {
await Promise.resolve();
return [];
},
compactionThreshold: 5,
...overrides,
};
}
describe('runObservationalCycle', () => {
beforeEach(() => {
mockGenerateText.mockReset();
});
it('runs the observer over the message delta and advances the cursor', async () => {
const mem = new InMemoryMemory();
await save(mem, [msg('m1', 'remember that I prefer concise answers')]);
const observe = jest.fn<ReturnType<ObserveFn>, Parameters<ObserveFn>>(async (ctx) => {
await Promise.resolve();
expect(ctx.deltaMessages.map((m) => m.id)).toEqual(['m1']);
expect(ctx.currentWorkingMemory).toBeNull();
expect(ctx.threadId).toBe('t-1');
return [row('User prefers concise answers.')];
});
const result = await runObservationalCycle(opts(mem, { observe }));
expect(result).toEqual({ status: 'ran', observationsWritten: 1, compacted: false });
expect(observe).toHaveBeenCalledTimes(1);
const rows = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(rows.map((r) => r.payload)).toEqual([{ text: 'User prefers concise answers.' }]);
const cursor = await mem.getCursor('thread', 't-1');
expect(cursor?.lastObservedMessageId).toBe('m1');
});
it('compacts queued observations into thread working memory at the threshold', async () => {
const mem = new InMemoryMemory();
await save(mem, [msg('m1', 'my project is Memory v1')]);
await mem.saveWorkingMemory(
{ threadId: 't-1', resourceId: 'u-1', scope: 'thread' },
'# Thread memory\n- Current project:',
);
const compact = jest.fn<ReturnType<CompactFn>, Parameters<CompactFn>>(async (ctx) => {
await Promise.resolve();
expect(ctx.observations).toHaveLength(1);
expect(ctx.currentWorkingMemory).toContain('Current project');
return { content: '# Thread memory\n- Current project: Memory v1' };
});
const result = await runObservationalCycle(
opts(mem, {
observe: async () => {
await Promise.resolve();
return [row('Current project is Memory v1.')];
},
compact,
compactionThreshold: 1,
}),
);
expect(result).toMatchObject({ status: 'ran', compacted: true });
expect(
await mem.getWorkingMemory({ threadId: 't-1', resourceId: 'u-1', scope: 'thread' }),
).toBe('# Thread memory\n- Current project: Memory v1');
expect(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' })).toEqual([]);
});
it('does not compact below the threshold', async () => {
const mem = new InMemoryMemory();
await save(mem, [msg('m1', 'one')]);
const compact = jest.fn<ReturnType<CompactFn>, Parameters<CompactFn>>();
const result = await runObservationalCycle(
opts(mem, {
observe: async () => {
await Promise.resolve();
return [row('one')];
},
compact,
compactionThreshold: 2,
}),
);
expect(result).toMatchObject({ status: 'ran', compacted: false });
expect(compact).not.toHaveBeenCalled();
});
it('adds a gap row for idle-timer triggers when the elapsed gap crosses the bound', async () => {
const mem = new InMemoryMemory();
const first = new Date('2026-05-07T10:00:00.000Z');
const second = new Date('2026-05-07T12:30:00.000Z');
await save(mem, [msg('m1', 'first', first)]);
await runObservationalCycle(opts(mem));
await save(mem, [msg('m2', 'later', second)]);
await runObservationalCycle(
opts(mem, {
trigger: { type: 'idle-timer', idleMs: 1, gapThresholdMs: 60 * 60 * 1000 },
}),
);
const rows = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(rows).toHaveLength(1);
expect(rows[0].kind).toBe('gap');
expect(rows[0].durationMs).toBe(2.5 * 60 * 60 * 1000);
});
it('adds a gap row for per-turn triggers when the elapsed gap crosses the default bound', async () => {
const mem = new InMemoryMemory();
const first = new Date('2026-05-07T10:00:00.000Z');
const second = new Date('2026-05-07T11:30:00.000Z');
await save(mem, [msg('m1', 'first', first)]);
await runObservationalCycle(opts(mem));
await save(mem, [msg('m2', 'later', second)]);
const observe = jest.fn<ReturnType<ObserveFn>, Parameters<ObserveFn>>(async (ctx) => {
await Promise.resolve();
expect(ctx.gap).toMatchObject({
durationMs: 90 * 60 * 1000,
text: 'User returned after 1h 30m of inactivity.',
previousObservedAt: first,
nextMessageAt: second,
});
return [];
});
const result = await runObservationalCycle(opts(mem, { observe }));
expect(result).toEqual({ status: 'ran', observationsWritten: 1, compacted: false });
const rows = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(rows).toHaveLength(1);
expect(rows[0]).toMatchObject({
kind: 'gap',
payload: {
category: 'continuity',
text: 'User returned after 1h 30m of inactivity.',
},
durationMs: 90 * 60 * 1000,
createdAt: second,
});
});
it('does not add gap rows on first observation or below the configured bound', async () => {
const mem = new InMemoryMemory();
const first = new Date('2026-05-07T10:00:00.000Z');
const second = new Date('2026-05-07T10:30:00.000Z');
await save(mem, [msg('m1', 'first', first)]);
await runObservationalCycle(opts(mem));
expect(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' })).toEqual([]);
await save(mem, [msg('m2', 'later', second)]);
await runObservationalCycle(opts(mem, { gapThresholdMs: 60 * 60 * 1000 }));
expect(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' })).toEqual([]);
});
it('does not count gap rows toward compaction but includes them when observations trigger it', async () => {
const mem = new InMemoryMemory();
const first = new Date('2026-05-07T10:00:00.000Z');
const second = new Date('2026-05-07T12:00:00.000Z');
const third = new Date('2026-05-07T12:05:00.000Z');
await save(mem, [msg('m1', 'first', first)]);
await runObservationalCycle(opts(mem));
await save(mem, [msg('m2', 'later', second)]);
const compact = jest.fn<ReturnType<CompactFn>, Parameters<CompactFn>>(async () => {
await Promise.resolve();
return { content: '# Thread memory\n- Continuity notes: user returned after a gap' };
});
await runObservationalCycle(opts(mem, { compact, compactionThreshold: 1 }));
expect(compact).not.toHaveBeenCalled();
expect(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' })).toHaveLength(1);
await save(mem, [msg('m3', 'remember this decision', third)]);
await runObservationalCycle(
opts(mem, {
observe: async () => {
await Promise.resolve();
return [row('Decision was recorded.')];
},
compact,
compactionThreshold: 1,
}),
);
expect(compact).toHaveBeenCalledTimes(1);
expect(compact.mock.calls[0][0].observations.map((observation) => observation.kind)).toEqual([
'gap',
'observation',
]);
expect(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' })).toEqual([]);
});
it('parses categorized default observer output and preserves legacy text rows', async () => {
const mem = new InMemoryMemory();
await save(mem, [msg('m1', 'I prefer terse answers')]);
mockGenerateText.mockResolvedValue({
text: [
'{"kind":"observation","category":"preferences","text":"User prefers terse answers."}',
'{"kind":"observation","text":"Legacy row stays readable."}',
'{"kind":"gap","category":"continuity","text":"Model-emitted gap is stored as an observation."}',
].join('\n'),
});
await runObservationalCycle(opts(mem, { observe: undefined }));
const rows = await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(rows.map((observation) => observation.kind)).toEqual([
'observation',
'observation',
'observation',
]);
expect(rows.map((observation) => observation.payload)).toEqual(
expect.arrayContaining([
{ category: 'preferences', text: 'User prefers terse answers.' },
{ category: 'other', text: 'Legacy row stays readable.' },
{ category: 'continuity', text: 'Model-emitted gap is stored as an observation.' },
]),
);
});
it('injects gap and timestamp context into the default observer prompt', async () => {
const mem = new InMemoryMemory();
const first = new Date('2026-05-07T10:00:00.000Z');
const second = new Date('2026-05-07T12:00:00.000Z');
await save(mem, [msg('m1', 'first', first)]);
await runObservationalCycle(opts(mem));
await save(mem, [msg('m2', 'later', second)]);
mockGenerateText.mockResolvedValue({ text: '' });
await runObservationalCycle(opts(mem, { observe: undefined }));
const call = mockGenerateText.mock.calls[0][0];
expect(call.system).toBe(DEFAULT_OBSERVER_PROMPT);
expect(call.system).toContain('category');
expect(call.system).toContain('Do not emit temporal-gap rows');
expect(call.prompt).toContain('Computed temporal gap:');
expect(call.prompt).toContain('User returned after 2h of inactivity.');
expect(call.prompt).toContain('[2026-05-07T12:00:00.000Z] [user] later');
});
it('groups queued rows with timestamps and durations in the default compactor prompt', async () => {
const mem = new InMemoryMemory();
const first = new Date('2026-05-07T10:00:00.000Z');
const second = new Date('2026-05-07T12:00:00.000Z');
await save(mem, [msg('m1', 'first', first)]);
await runObservationalCycle(opts(mem));
await save(mem, [msg('m2', 'later', second)]);
mockGenerateText
.mockResolvedValueOnce({
text: '{"kind":"observation","category":"decisions","text":"Decision: tune memory prompts."}',
})
.mockResolvedValueOnce({ text: '# Thread memory\n- Decisions made: tune memory prompts' });
await runObservationalCycle(opts(mem, { observe: undefined, compactionThreshold: 1 }));
const compactorCall = mockGenerateText.mock.calls[1][0];
expect(compactorCall.system).toBe(DEFAULT_COMPACTOR_PROMPT);
expect(compactorCall.system).toContain(
'Do not delete useful thread context merely because it is old',
);
expect(compactorCall.prompt).toContain('### continuity / gap');
expect(compactorCall.prompt).toContain('duration=2h');
expect(compactorCall.prompt).toContain('### decisions / observation');
expect(compactorCall.prompt).toContain('[2026-05-07T12:00:00.000Z]');
});
it('validates structured compactor output before saving and deleting observations', async () => {
const mem = new InMemoryMemory();
const eventBus = new AgentEventBus();
const errors: string[] = [];
eventBus.on(AgentEvent.Error, (event) => {
if (event.type === AgentEvent.Error) errors.push(event.message);
});
await save(mem, [msg('m1', 'Alice')]);
const result = await runObservationalCycle(
opts(mem, {
workingMemory: {
template: '{"name": ""}',
structured: true,
schema: z.object({ name: z.string() }),
},
observe: async () => {
await Promise.resolve();
return [row('Name is Alice.')];
},
compact: async () => {
await Promise.resolve();
return { content: '{"name": 123}' };
},
compactionThreshold: 1,
eventBus,
}),
);
expect(result).toMatchObject({ status: 'ran', compacted: false });
expect(errors[0]).toContain('does not match schema');
expect(await mem.getObservations({ scopeKind: 'thread', scopeId: 't-1' })).toHaveLength(1);
expect(
await mem.getWorkingMemory({ threadId: 't-1', resourceId: 'u-1', scope: 'thread' }),
).toBeNull();
});
it('emits observer errors without throwing', async () => {
const mem = new InMemoryMemory();
const eventBus = new AgentEventBus();
const errors: string[] = [];
eventBus.on(AgentEvent.Error, (event) => {
if (event.type === AgentEvent.Error) errors.push(event.message);
});
await save(mem, [msg('m1', 'hello')]);
const result = await runObservationalCycle(
opts(mem, {
observe: async () => {
await Promise.resolve();
throw new Error('observer failed');
},
eventBus,
}),
);
expect(result).toEqual({ status: 'skipped', reason: 'no-delta' });
expect(errors).toEqual(['observer failed']);
});
});

View File

@ -1,5 +1,5 @@
import { stripOrphanedToolMessages } from '../runtime/strip-orphaned-tool-messages';
import type { AgentMessage, Message } from '../types/sdk/message';
import type { AgentMessage, Message } from '../../types/sdk/message';
import { stripOrphanedToolMessages } from '../strip-orphaned-tool-messages';
describe('stripOrphanedToolMessages', () => {
it('returns messages unchanged when all tool-calls are settled', () => {

View File

@ -1,8 +1,8 @@
import type * as AiImport from 'ai';
import type { LanguageModel } from 'ai';
import { generateTitleFromMessage } from '../runtime/title-generation';
import type { BuiltTelemetry } from '../types';
import type { BuiltTelemetry } from '../../types';
import { generateTitleFromMessage } from '../title-generation';
type GenerateTextCall = {
messages: Array<{ role: string; content: string }>;

View File

@ -1,8 +1,8 @@
import type { JSONSchema7 } from 'json-schema';
import { z } from 'zod';
import { toAiSdkTools } from '../runtime/tool-adapter';
import type { BuiltTool } from '../types';
import type { BuiltTool } from '../../types';
import { toAiSdkTools } from '../tool-adapter';
// ---------------------------------------------------------------------------
// Module mocks

View File

@ -0,0 +1,53 @@
import { z } from 'zod';
import {
buildWorkingMemoryInstruction,
templateFromSchema,
WORKING_MEMORY_DEFAULT_INSTRUCTION,
} from '../working-memory';
describe('buildWorkingMemoryInstruction', () => {
it('describes working memory as observer-maintained read-only context', () => {
const result = buildWorkingMemoryInstruction('# Context\n- Name:', false);
expect(result).toContain('out-of-band observer');
expect(result).toContain('Do not try to edit working memory directly');
});
it('includes the template in the instruction', () => {
const template = '# Context\n- Name:\n- City:';
const result = buildWorkingMemoryInstruction(template, false);
expect(result).toContain(template);
});
it('mentions JSON for structured variant', () => {
const result = buildWorkingMemoryInstruction('{"name": ""}', true);
expect(result).toContain('JSON');
});
it('replaces the default instruction body when provided', () => {
const custom = 'Use this memory as read-only context.';
const result = buildWorkingMemoryInstruction('# Template', false, custom);
expect(result).toContain(custom);
expect(result).not.toContain(WORKING_MEMORY_DEFAULT_INSTRUCTION);
});
});
describe('templateFromSchema', () => {
it('converts Zod schema to JSON template', () => {
const schema = z.object({
userName: z.string().optional().describe("The user's name"),
favoriteColor: z.string().optional().describe('Favorite color'),
});
const result = templateFromSchema(schema);
expect(result).toContain('userName');
expect(result).toContain('favoriteColor');
let parsed: unknown;
try {
parsed = JSON.parse(result) as unknown;
} catch (error) {
throw new Error(`Expected schema template to be valid JSON: ${String(error)}`);
}
expect(parsed).toHaveProperty('userName');
});
});

View File

@ -17,6 +17,7 @@ import type {
FinishReason,
GenerateResult,
GoogleThinkingConfig,
ObservationalMemoryConfig,
OpenAIThinkingConfig,
PendingToolCall,
RunOptions,
@ -30,12 +31,15 @@ import type {
TokenUsage,
XaiThinkingConfig,
} from '../types';
import { BackgroundTaskTracker } from './background-task-tracker';
import { AgentEventBus } from './event-bus';
import { toJsonValue } from './json-value';
import { saveMessagesToThread } from './memory-store';
import { AgentMessageList, type SerializedMessageList } from './message-list';
import { fromAiFinishReason, fromAiMessages } from './messages';
import { createEmbeddingModel, createModel } from './model-factory';
import { hasObservationStore } from './observation-store';
import { runObservationalCycle, type RunObservationalCycleOpts } from './observational-cycle';
import { generateRunId, RunStateManager } from './run-state';
import {
accumulateUsage,
@ -55,7 +59,6 @@ import {
toAiSdkProviderTools,
toAiSdkTools,
} from './tool-adapter';
import { buildWorkingMemoryTool } from './working-memory';
import { AgentEvent } from '../types/runtime/event';
import type { AgentEventData } from '../types/runtime/event';
import type {
@ -177,6 +180,7 @@ export interface AgentRuntimeConfig {
/** Number of tool calls to execute concurrently. Default `1` (sequential). */
toolCallConcurrency?: number;
titleGeneration?: TitleGenerationConfig;
observationalMemory?: ObservationalMemoryConfig;
telemetry?: BuiltTelemetry;
}
@ -299,6 +303,10 @@ export class AgentRuntime {
private modelCost: ModelCost | undefined;
private backgroundTasks = new BackgroundTaskTracker();
private observationTimers = new Map<string, ReturnType<typeof setTimeout>>();
/** Resolved telemetry for the current run (own config or inherited from parent). */
constructor(config: AgentRuntimeConfig) {
@ -313,6 +321,36 @@ export class AgentRuntime {
};
}
/**
* Wait for in-flight background tasks (title generation, future
* observer cycles) to settle. Safe to call multiple times.
*/
async dispose(): Promise<void> {
for (const timer of this.observationTimers.values()) {
clearTimeout(timer);
}
this.observationTimers.clear();
await this.backgroundTasks.flush();
}
/**
* Schedule an observational-memory cycle to run via the background-task
* tracker. Returns immediately; callers do not await. Errors inside the
* cycle are caught by `runObservationalCycle` and emitted via
* `AgentEvent.Error` with `source: 'observer' | 'compactor'`.
*
* Used by `Agent.reflectInBackground(...)` so consumers (e.g. the cli's
* post-stream trigger) can fire-and-forget without blocking the response.
*/
scheduleBackgroundCycle(opts: RunObservationalCycleOpts): void {
this.backgroundTasks.track(
runObservationalCycle(opts).then(
() => undefined,
() => undefined,
),
);
}
/** Return the latest state snapshot. */
getState(): SerializableAgentState {
return { ...this.currentState };
@ -652,6 +690,9 @@ export class AgentRuntime {
options?: RunOptions & ExecutionOptions,
): Promise<AgentMessageList> {
this.eventBus.resetAbort(options?.abortSignal);
if (options?.persistence?.threadId) {
this.cancelIdleObservation(options.persistence.threadId);
}
this.updateState({
status: 'running',
persistence: options?.persistence,
@ -982,6 +1023,7 @@ export class AgentRuntime {
agentModel: this.config.model,
turnDelta: list.turnDelta(),
});
this.backgroundTasks.track(titlePromise);
if (this.config.titleGeneration.sync) {
await titlePromise;
}
@ -1305,6 +1347,7 @@ export class AgentRuntime {
agentModel: this.config.model,
turnDelta: list.turnDelta(),
});
this.backgroundTasks.track(titlePromise);
if (this.config.titleGeneration.sync) {
await titlePromise;
}
@ -1343,6 +1386,8 @@ export class AgentRuntime {
delta,
);
}
await this.dispatchObservationalMemory(options.persistence);
}
private async saveEmbeddingsForMessages(
@ -1871,10 +1916,7 @@ export class AgentRuntime {
private buildLoopContext(
execOptions?: ExecutionOptions & { persistence?: AgentPersistenceOptions },
) {
const wmTool = this.buildWorkingMemoryToolForRun(execOptions?.persistence);
const allUserTools = wmTool
? [...(this.config.tools ?? []), wmTool]
: (this.config.tools ?? []);
const allUserTools = this.config.tools ?? [];
const aiTools = toAiSdkTools(allUserTools);
const aiProviderTools = toAiSdkProviderTools(this.config.providerTools);
const allTools = { ...aiTools, ...aiProviderTools };
@ -1911,20 +1953,6 @@ export class AgentRuntime {
return userInstructions ? `${block}\n\n${userInstructions}` : block;
}
/**
* Build the update_working_memory BuiltTool for the current run.
* Returns undefined when working memory is not configured or persistence is unavailable.
*/
private buildWorkingMemoryToolForRun(persistence: AgentPersistenceOptions | undefined) {
const wmParams = this.resolveWorkingMemoryParams(persistence);
if (!wmParams) return undefined;
return buildWorkingMemoryTool({
structured: wmParams.structured,
schema: wmParams.schema,
persist: wmParams.persistFn,
});
}
/**
* Persist a suspended run state and update the current state snapshot.
* Returns the runId (reuses existingRunId when resuming to prevent dangling runs).
@ -2026,6 +2054,97 @@ export class AgentRuntime {
};
}
private async dispatchObservationalMemory(persistence: AgentPersistenceOptions): Promise<void> {
const cycle = this.buildObservationCycleOpts(persistence);
if (!cycle) return;
const trigger = cycle.trigger ?? { type: 'per-turn' };
if (trigger.type === 'idle-timer') {
this.scheduleIdleObservation(persistence.threadId, cycle, trigger.idleMs);
return;
}
const promise = runObservationalCycle(cycle).then(
() => undefined,
() => undefined,
);
if (this.config.observationalMemory?.sync) {
await promise;
} else {
this.backgroundTasks.track(promise);
}
}
private scheduleIdleObservation(
threadId: string,
cycle: RunObservationalCycleOpts,
idleMs: number,
): void {
this.cancelIdleObservation(threadId);
const timer = setTimeout(() => {
this.observationTimers.delete(threadId);
this.backgroundTasks.track(
runObservationalCycle(cycle).then(
() => undefined,
() => undefined,
),
);
}, idleMs);
this.observationTimers.set(threadId, timer);
}
private cancelIdleObservation(threadId: string): void {
const existing = this.observationTimers.get(threadId);
if (!existing) return;
clearTimeout(existing);
this.observationTimers.delete(threadId);
}
private buildObservationCycleOpts(
persistence: AgentPersistenceOptions | undefined,
): RunObservationalCycleOpts | null {
const obsConfig = this.config.observationalMemory;
const memory = this.config.memory;
const workingMemory = this.config.workingMemory;
if (!obsConfig || !memory || !workingMemory || !persistence) return null;
if (!hasObservationStore(memory)) return null;
if (!memory.saveWorkingMemory) return null;
return {
memory,
threadId: persistence.threadId,
resourceId: persistence.resourceId,
model: this.config.model,
workingMemory: {
template: workingMemory.template,
structured: workingMemory.structured,
...(workingMemory.schema !== undefined && { schema: workingMemory.schema }),
},
...(obsConfig.observe !== undefined && { observe: obsConfig.observe }),
...(obsConfig.compact !== undefined && { compact: obsConfig.compact }),
...(obsConfig.trigger !== undefined && { trigger: obsConfig.trigger }),
...(obsConfig.compactionThreshold !== undefined && {
compactionThreshold: obsConfig.compactionThreshold,
}),
...(obsConfig.gapThresholdMs !== undefined && { gapThresholdMs: obsConfig.gapThresholdMs }),
...(obsConfig.observerPrompt !== undefined && { observerPrompt: obsConfig.observerPrompt }),
...(obsConfig.compactorPrompt !== undefined && {
compactorPrompt: obsConfig.compactorPrompt,
}),
...(obsConfig.lockTtlMs !== undefined && { lockTtlMs: obsConfig.lockTtlMs }),
...(this.config.telemetry !== undefined && { telemetry: this.config.telemetry }),
eventBus: this.eventBus,
};
}
/**
* Configured telemetry handle (build-time). Run-time inheritance via
* `ExecutionOptions.parentTelemetry` only applies inside an active
* agentic loop; out-of-band callers like `agent.reflect()` see the
* builder-time value.
*/
getConfiguredTelemetry(): BuiltTelemetry | undefined {
return this.config.telemetry;
}
private resolveWorkingMemoryParams(options: AgentPersistenceOptions | undefined) {
if (!options) return null;
if (!this.config.workingMemory) return null;

View File

@ -0,0 +1,21 @@
export class BackgroundTaskTracker {
private inFlight = new Set<Promise<unknown>>();
get pendingCount(): number {
return this.inFlight.size;
}
track(promise: Promise<unknown>): void {
this.inFlight.add(promise);
const cleanup = () => {
this.inFlight.delete(promise);
};
void promise.then(cleanup, cleanup);
}
async flush(): Promise<void> {
if (this.inFlight.size === 0) return;
const snapshot = Array.from(this.inFlight);
await Promise.allSettled(snapshot);
}
}

View File

@ -0,0 +1,42 @@
import type { BuiltMemory } from '../types/sdk/memory';
import type { AgentDbMessage } from '../types/sdk/message';
import type { BuiltObservationStore, ObservationCursor, ScopeKind } from '../types/sdk/observation';
export async function getDeltaSinceCursor(
store: BuiltMemory & BuiltObservationStore,
scopeKind: ScopeKind,
scopeId: string,
): Promise<{ messages: AgentDbMessage[]; cursor: ObservationCursor | null }> {
const cursor = await store.getCursor(scopeKind, scopeId);
const messages = await store.getMessagesForScope(
scopeKind,
scopeId,
cursor
? {
since: {
sinceCreatedAt: cursor.lastObservedAt,
sinceMessageId: cursor.lastObservedMessageId,
},
}
: undefined,
);
return { messages, cursor };
}
export async function advanceCursor(
store: BuiltObservationStore,
scopeKind: ScopeKind,
scopeId: string,
lastMessage: AgentDbMessage,
now: Date = new Date(),
): Promise<ObservationCursor> {
const cursor: ObservationCursor = {
scopeKind,
scopeId,
lastObservedMessageId: lastMessage.id,
lastObservedAt: lastMessage.createdAt,
updatedAt: now,
};
await store.setCursor(cursor);
return cursor;
}

View File

@ -0,0 +1,28 @@
import type {
BuiltObservationStore,
ObservationLockHandle,
ScopeKind,
} from '../types/sdk/observation';
export type WithObservationLockResult<T> = { status: 'ran'; value: T } | { status: 'skipped' };
export async function withObservationLock<T>(
store: BuiltObservationStore,
scopeKind: ScopeKind,
scopeId: string,
opts: { ttlMs: number; holderId?: string },
fn: (handle: ObservationLockHandle) => Promise<T>,
): Promise<WithObservationLockResult<T>> {
const holderId = opts.holderId ?? crypto.randomUUID();
const handle = await store.acquireObservationLock(scopeKind, scopeId, {
ttlMs: opts.ttlMs,
holderId,
});
if (!handle) return { status: 'skipped' };
try {
const value = await fn(handle);
return { status: 'ran', value };
} finally {
await store.releaseObservationLock(handle).catch(() => {});
}
}

View File

@ -0,0 +1,25 @@
import type { BuiltMemory, BuiltObservationStore } from '../types';
const OBSERVATION_STORE_METHODS = [
'appendObservations',
'getObservations',
'getMessagesForScope',
'deleteObservations',
'getCursor',
'setCursor',
'acquireObservationLock',
'releaseObservationLock',
] as const satisfies ReadonlyArray<keyof BuiltObservationStore>;
function hasFunctionProperty<K extends PropertyKey>(
value: object,
property: K,
): value is Record<K, (...args: never[]) => unknown> {
return property in value && typeof Reflect.get(value, property) === 'function';
}
export function hasObservationStore(
memory: BuiltMemory,
): memory is BuiltMemory & BuiltObservationStore {
return OBSERVATION_STORE_METHODS.every((method) => hasFunctionProperty(memory, method));
}

View File

@ -0,0 +1,487 @@
import { generateText } from 'ai';
import type { z } from 'zod';
import type { AgentEventBus } from './event-bus';
import { createModel } from './model-factory';
import { advanceCursor, getDeltaSinceCursor } from './observation-cursor';
import { withObservationLock } from './observation-lock';
import { isLlmMessage } from '../sdk/message';
import { AgentEvent } from '../types/runtime/event';
import type { ModelConfig } from '../types/sdk/agent';
import type { BuiltMemory } from '../types/sdk/memory';
import type { AgentDbMessage } from '../types/sdk/message';
import {
DEFAULT_OBSERVATION_GAP_THRESHOLD_MS,
OBSERVATION_CATEGORIES,
OBSERVATION_SCHEMA_VERSION,
type BuiltObservationStore,
type CompactFn,
type NewObservation,
type Observation,
type ObservationCategory,
type ObservationGapContext,
type ObservationalMemoryTrigger,
type ObserveFn,
} from '../types/sdk/observation';
import type { BuiltTelemetry } from '../types/telemetry';
import { parseWithSchema } from '../utils/parse';
const DEFAULT_LOCK_TTL_MS = 30_000;
const DEFAULT_COMPACTION_THRESHOLD = 5;
export const DEFAULT_OBSERVER_PROMPT = `You maintain thread working memory for an agent.
You receive the current working memory document and the new transcript delta since
the last observation. Extract durable thread state that should help later turns in
this same conversation: explicitly stated facts, preferences, identifiers, goals,
decisions, constraints, open follow-ups, corrections, and concrete progress.
Output JSON Lines only, one object per line:
{"kind":"observation","category":"<category>","text":"<short durable note>"}
Allowed categories: facts, preferences, goal, state, active_items, decisions,
follow_ups, continuity, superseded, other.
Rules:
- Prefer over-recording explicit user statements over missing useful state.
- Preserve user-stated facts and preferences verbatim when short enough.
- Record changes and corrections as latest state, not as debate history.
- Record decisions, open follow-ups, and concrete assistant-reported progress when
they affect what should happen next in this thread.
- Use continuity only for useful re-entry context, repeated corrections, notable
friction, or resume cues.
- Do not emit temporal-gap rows. Gaps are computed by the runtime.
- Do not record secrets, one-off small talk, or the assistant's own claims.
- Output an empty response when nothing durable changed.
- No markdown fences, preamble, or commentary.`;
export const DEFAULT_COMPACTOR_PROMPT = `You update the complete thread working memory document.
You receive:
- The working-memory template.
- The current working memory document.
- Queued observations from recent turns.
Return the full replacement working memory document, not a diff.
Rules:
- Preserve useful existing state.
- Add durable new facts, preferences, goals, decisions, constraints, and open follow-ups.
- Replace stale or contradicted items with the latest state.
- Move or remove stale items only when observations show they were corrected,
resolved, abandoned, or superseded.
- Do not delete useful thread context merely because it is old.
- Keep continuity notes short and only when useful for re-entry, notable pauses,
repeated corrections, or resume cues.
- Keep the document concise and current, not an append-only transcript.
- Do not include secrets or one-off details.
- If nothing changed, return the current working memory document unchanged.
- Output only the working memory document. No markdown fences or preamble.`;
export interface RunObservationalCycleOpts {
memory: BuiltMemory & BuiltObservationStore;
threadId: string;
resourceId: string;
model: ModelConfig;
workingMemory: {
template: string;
structured: boolean;
schema?: z.ZodObject<z.ZodRawShape>;
};
observe?: ObserveFn;
compact?: CompactFn;
trigger?: ObservationalMemoryTrigger;
compactionThreshold?: number;
gapThresholdMs?: number;
observerPrompt?: string;
compactorPrompt?: string;
lockTtlMs?: number;
telemetry?: BuiltTelemetry;
eventBus?: AgentEventBus;
}
export type RunObservationalCycleResult =
| { status: 'skipped'; reason: 'lock-held' | 'no-delta' }
| { status: 'ran'; observationsWritten: number; compacted: boolean };
export async function runObservationalCycle(
opts: RunObservationalCycleOpts,
): Promise<RunObservationalCycleResult> {
const ttlMs = opts.lockTtlMs ?? DEFAULT_LOCK_TTL_MS;
const lockResult = await withObservationLock(
opts.memory,
'thread',
opts.threadId,
{ ttlMs },
async () => await runInsideLock(opts),
);
if (lockResult.status === 'skipped') return { status: 'skipped', reason: 'lock-held' };
return lockResult.value;
}
async function runInsideLock(
opts: RunObservationalCycleOpts,
): Promise<RunObservationalCycleResult> {
const { memory, threadId, resourceId, eventBus, telemetry } = opts;
const trigger = opts.trigger ?? { type: 'per-turn' };
const { messages: deltaMessages, cursor } = await getDeltaSinceCursor(memory, 'thread', threadId);
if (deltaMessages.length === 0) return { status: 'skipped', reason: 'no-delta' };
const currentWorkingMemory =
(await memory.getWorkingMemory?.({ threadId, resourceId, scope: 'thread' })) ?? null;
const gap = buildGapContext(cursor, deltaMessages, getGapThresholdMs(opts));
let observerRows: NewObservation[];
try {
const observe = opts.observe ?? buildDefaultObserveFn(opts.model, opts.observerPrompt);
const now = new Date();
observerRows = await observe({
deltaMessages,
currentWorkingMemory,
cursor,
threadId,
resourceId,
now,
trigger,
gap,
telemetry,
});
} catch (error) {
emitError(eventBus, 'observer', error);
return { status: 'skipped', reason: 'no-delta' };
}
const gapRow = gap ? buildGapRow(gap, threadId) : null;
const rowsToAppend = [
...(gapRow ? [gapRow] : []),
...observerRows.map((row) => ({ ...row, scopeKind: 'thread' as const, scopeId: threadId })),
];
if (rowsToAppend.length > 0) {
await memory.appendObservations(rowsToAppend);
}
const lastMessage = deltaMessages[deltaMessages.length - 1];
await advanceCursor(memory, 'thread', threadId, lastMessage);
let compacted = false;
try {
compacted = await maybeCompact(opts, currentWorkingMemory);
} catch (error) {
emitError(eventBus, 'compactor', error);
}
return { status: 'ran', observationsWritten: rowsToAppend.length, compacted };
}
async function maybeCompact(
opts: RunObservationalCycleOpts,
currentWorkingMemory: string | null,
): Promise<boolean> {
const threshold = opts.compactionThreshold ?? DEFAULT_COMPACTION_THRESHOLD;
const observations = await opts.memory.getObservations({
scopeKind: 'thread',
scopeId: opts.threadId,
schemaVersionAtMost: OBSERVATION_SCHEMA_VERSION,
});
const contentObservationCount = observations.filter((row) => row.kind === 'observation').length;
if (contentObservationCount < threshold) return false;
if (!opts.memory.saveWorkingMemory) {
throw new Error('Observational memory compaction requires saveWorkingMemory()');
}
const compact = opts.compact ?? defaultCompact;
const result = await compact({
observations,
currentWorkingMemory,
workingMemoryTemplate: opts.workingMemory.template,
structured: opts.workingMemory.structured,
...(opts.workingMemory.schema !== undefined && { schema: opts.workingMemory.schema }),
threadId: opts.threadId,
resourceId: opts.resourceId,
model: opts.model,
compactorPrompt: opts.compactorPrompt ?? DEFAULT_COMPACTOR_PROMPT,
telemetry: opts.telemetry,
});
const content = await validateWorkingMemoryOutput(result.content, opts.workingMemory);
await opts.memory.saveWorkingMemory(
{ threadId: opts.threadId, resourceId: opts.resourceId, scope: 'thread' },
content,
);
await opts.memory.deleteObservations(observations.map((row) => row.id));
return true;
}
async function defaultCompact(ctx: Parameters<CompactFn>[0]): Promise<{ content: string }> {
const prompt = [
`Working memory template:\n${ctx.workingMemoryTemplate}`,
`Current working memory:\n${ctx.currentWorkingMemory ?? ctx.workingMemoryTemplate}`,
`Queued observations:\n${renderObservationsByCategory(ctx.observations)}`,
]
.filter(Boolean)
.join('\n\n');
const { text } = await generateText({
model: createModel(ctx.model),
system: ctx.compactorPrompt,
prompt,
...telemetryOptions(ctx.telemetry),
});
return { content: stripMarkdownFence(text.trim()) };
}
export function buildDefaultObserveFn(model: ModelConfig, observerPrompt?: string): ObserveFn {
return async (ctx) => {
const prompt = [
ctx.currentWorkingMemory
? `Current working memory:\n${ctx.currentWorkingMemory}`
: 'Current working memory: (empty)',
`Time now: ${ctx.now.toISOString()}`,
ctx.cursor ? `Last observed message time: ${ctx.cursor.lastObservedAt.toISOString()}` : '',
`Trigger: ${ctx.trigger.type}`,
ctx.gap ? `Computed temporal gap:\n${renderGapContext(ctx.gap)}` : '',
`Recent transcript:\n${renderTranscript(ctx.deltaMessages)}`,
]
.filter(Boolean)
.join('\n\n');
const { text } = await generateText({
model: createModel(model),
system: observerPrompt ?? DEFAULT_OBSERVER_PROMPT,
prompt,
...telemetryOptions(ctx.telemetry),
});
return parseObservationJsonLines(text, ctx.threadId);
};
}
function getGapThresholdMs(opts: RunObservationalCycleOpts): number {
if (opts.gapThresholdMs !== undefined) return opts.gapThresholdMs;
const trigger = opts.trigger;
if (trigger?.type === 'idle-timer' && trigger.gapThresholdMs !== undefined) {
return trigger.gapThresholdMs;
}
return DEFAULT_OBSERVATION_GAP_THRESHOLD_MS;
}
function buildGapContext(
cursor: { lastObservedAt: Date } | null,
deltaMessages: AgentDbMessage[],
gapThresholdMs: number,
): ObservationGapContext | null {
if (!cursor) return null;
const firstMessage = firstUserMessage(deltaMessages) ?? deltaMessages[0];
if (!firstMessage) return null;
const durationMs = firstMessage.createdAt.getTime() - cursor.lastObservedAt.getTime();
if (durationMs < gapThresholdMs) return null;
const text = `User returned after ${humanizeMs(durationMs)} of inactivity.`;
return {
durationMs,
text,
previousObservedAt: cursor.lastObservedAt,
nextMessageAt: firstMessage.createdAt,
};
}
function buildGapRow(gap: ObservationGapContext, threadId: string): NewObservation {
return {
scopeKind: 'thread',
scopeId: threadId,
kind: 'gap',
payload: { category: 'continuity', text: gap.text },
durationMs: gap.durationMs,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: gap.nextMessageAt,
};
}
function parseObservationJsonLines(text: string, threadId: string): NewObservation[] {
const now = new Date();
const rows: NewObservation[] = [];
for (const line of text.split('\n')) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const parsed = JSON.parse(trimmed) as {
kind?: unknown;
category?: unknown;
text?: unknown;
durationMs?: unknown;
};
if (typeof parsed.text !== 'string' || parsed.text.trim() === '') continue;
const category = observationCategory(parsed.category);
rows.push({
scopeKind: 'thread',
scopeId: threadId,
kind: 'observation',
payload: { category, text: parsed.text.trim() },
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: now,
});
} catch {
continue;
}
}
return rows;
}
async function validateWorkingMemoryOutput(
raw: string,
workingMemory: RunObservationalCycleOpts['workingMemory'],
): Promise<string> {
const content = stripMarkdownFence(raw.trim());
if (content.length === 0) {
throw new Error('Compactor returned empty working memory');
}
if (!workingMemory.structured) return content;
let parsed: unknown;
try {
parsed = JSON.parse(content);
} catch (error) {
throw new Error(
`Compactor returned invalid JSON working memory: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
if (!workingMemory.schema) return JSON.stringify(parsed, null, 2);
const result = await parseWithSchema(workingMemory.schema, parsed);
if (!result.success) {
throw new Error(
`Compactor returned working memory that does not match schema: ${result.error}`,
);
}
return JSON.stringify(result.data, null, 2);
}
function renderTranscript(messages: AgentDbMessage[]): string {
return messages
.map((message) => {
const role = isLlmMessage(message) ? message.role : 'custom';
const text = isLlmMessage(message)
? message.content
.filter((part): part is { type: 'text'; text: string } => part.type === 'text')
.map((part) => part.text)
.join(' ')
: '';
return `[${message.createdAt.toISOString()}] [${role}] ${text}`;
})
.join('\n');
}
function renderObservationsByCategory(observations: Observation[]): string {
const groups = new Map<string, Observation[]>();
for (const row of observations) {
const key = `${payloadCategory(row.payload)}:${row.kind}`;
groups.set(key, [...(groups.get(key) ?? []), row]);
}
return Array.from(groups.entries())
.map(([key, rows]) => {
const [category, kind] = key.split(':');
const items = rows.map(renderObservationRow).join('\n');
return `### ${category} / ${kind}\n${items}`;
})
.join('\n\n');
}
function renderObservationRow(row: Observation): string {
const payload = payloadText(row.payload);
const duration = row.durationMs !== null ? ` duration=${humanizeMs(row.durationMs)}` : '';
return `- [${row.createdAt.toISOString()}]${duration} ${payload}`;
}
function renderGapContext(gap: ObservationGapContext): string {
return [
gap.text,
`Previous observed message time: ${gap.previousObservedAt.toISOString()}`,
`Next message time: ${gap.nextMessageAt.toISOString()}`,
`Duration: ${humanizeMs(gap.durationMs)}`,
].join('\n');
}
function firstUserMessage(messages: AgentDbMessage[]): AgentDbMessage | undefined {
return messages.find((message) => isLlmMessage(message) && message.role === 'user');
}
function observationCategory(value: unknown): ObservationCategory {
return isObservationCategory(value) ? value : 'other';
}
function payloadCategory(payload: unknown): ObservationCategory {
if (typeof payload === 'object' && payload !== null) {
const category = (payload as { category?: unknown }).category;
return observationCategory(category);
}
return 'other';
}
function isObservationCategory(value: unknown): value is ObservationCategory {
const categories: readonly string[] = OBSERVATION_CATEGORIES;
return typeof value === 'string' && categories.includes(value);
}
function payloadText(payload: unknown): string {
if (typeof payload === 'string') return payload;
if (typeof payload === 'object' && payload !== null) {
const text = (payload as { text?: unknown }).text;
if (typeof text === 'string') return text;
}
try {
return JSON.stringify(payload);
} catch {
return '';
}
}
function stripMarkdownFence(value: string): string {
const trimmed = value.trim();
const match = trimmed.match(/^```(?:json|markdown|md)?\s*\n([\s\S]*?)\n```$/i);
return match ? match[1].trim() : trimmed;
}
function humanizeMs(ms: number): string {
const sec = Math.max(0, Math.floor(ms / 1000));
const min = Math.floor(sec / 60);
const hr = Math.floor(min / 60);
const day = Math.floor(hr / 24);
if (day > 0) return hr % 24 > 0 ? `${day}d ${hr % 24}h` : `${day}d`;
if (hr > 0) return min % 60 > 0 ? `${hr}h ${min % 60}m` : `${hr}h`;
if (min > 0) return `${min}m`;
return `${sec}s`;
}
function telemetryOptions(telemetry: BuiltTelemetry | undefined): Record<string, unknown> {
if (!telemetry?.enabled) return {};
return {
experimental_telemetry: {
isEnabled: true,
functionId: telemetry.functionId,
metadata: telemetry.metadata,
recordInputs: telemetry.recordInputs,
recordOutputs: telemetry.recordOutputs,
tracer: telemetry.tracer,
integrations: telemetry.integrations.length > 0 ? telemetry.integrations : undefined,
},
};
}
function emitError(
eventBus: AgentEventBus | undefined,
source: 'observer' | 'compactor',
error: unknown,
): void {
if (!eventBus) return;
const message = error instanceof Error ? error.message : String(error);
eventBus.emit({ type: AgentEvent.Error, message, error, source });
}

View File

@ -1,25 +1,20 @@
import { z } from 'zod';
import type { BuiltTool } from '../types';
import type { z } from 'zod';
type ZodObjectSchema = z.ZodObject<z.ZodRawShape>;
export const UPDATE_WORKING_MEMORY_TOOL_NAME = 'update_working_memory';
/**
* The default instruction block injected into the system prompt when working memory
* is configured. Exported so callers can reference it when building custom instructions.
*/
export const WORKING_MEMORY_DEFAULT_INSTRUCTION = [
'You have persistent working memory that survives across conversations.',
'Your current working memory state is shown below.',
`When you learn new information about the user or conversation that should be remembered, call the \`${UPDATE_WORKING_MEMORY_TOOL_NAME}\` tool.`,
'Only call it when something has actually changed — do NOT call it if nothing new was learned.',
'You have thread working memory that is maintained automatically by an out-of-band observer.',
'Your current working memory state is shown below. Use it as context for this conversation.',
'Do not try to edit working memory directly. The observer updates it after turns when durable thread state changes.',
].join('\n');
/**
* Generate the system prompt instruction for working memory.
* Tells the LLM to call the update_working_memory tool when it has new information to persist.
* Tells the LLM how to read the injected working-memory document.
*
* @param template - The working memory template or schema.
* @param structured - Whether the working memory is structured (JSON schema).
@ -32,8 +27,8 @@ export function buildWorkingMemoryInstruction(
instruction?: string,
): string {
const format = structured
? 'The memory argument must be valid JSON matching the schema'
: 'Update the template with any new information learned';
? 'The working memory document is valid JSON matching this schema'
: 'The working memory document follows this template';
const body = instruction ?? WORKING_MEMORY_DEFAULT_INSTRUCTION;
@ -62,52 +57,3 @@ export function templateFromSchema(schema: ZodObjectSchema): string {
}
return JSON.stringify(obj, null, 2);
}
export interface WorkingMemoryToolConfig {
/** Whether this is structured (Zod-schema-driven) working memory. */
structured: boolean;
/** Zod schema for structured working memory input validation. */
schema?: ZodObjectSchema;
/** Called with the serialized working memory string to persist it. */
persist: (content: string) => Promise<void>;
}
/**
* Build the update_working_memory BuiltTool that the agent calls to persist working memory.
*
* For freeform working memory the input schema is `{ memory: string }`.
* For structured working memory the input schema is the configured Zod object schema,
* whose values are serialized to JSON before persisting.
*/
export function buildWorkingMemoryTool(config: WorkingMemoryToolConfig): BuiltTool {
if (config.structured && config.schema) {
const schema = config.schema;
return {
name: UPDATE_WORKING_MEMORY_TOOL_NAME,
description:
'Update your persistent working memory with new information about the user or conversation. Only call this when something has actually changed.',
inputSchema: schema,
handler: async (input: unknown) => {
const content = JSON.stringify(input, null, 2);
await config.persist(content);
return { success: true, message: 'Working memory updated.' };
},
};
}
const freeformSchema = z.object({
memory: z.string().describe('The updated working memory content.'),
});
return {
name: UPDATE_WORKING_MEMORY_TOOL_NAME,
description:
'Update your persistent working memory with new information about the user or conversation. Only call this when something has actually changed.',
inputSchema: freeformSchema,
handler: async (input: unknown) => {
const { memory } = input as z.infer<typeof freeformSchema>;
await config.persist(memory);
return { success: true, message: 'Working memory updated.' };
},
};
}

View File

@ -0,0 +1,130 @@
import { InMemoryMemory } from '../../runtime/memory-store';
import { AgentEvent } from '../../types/runtime/event';
import type { AgentDbMessage } from '../../types/sdk/message';
import {
OBSERVATION_SCHEMA_VERSION,
type NewObservation,
type ObserveFn,
} from '../../types/sdk/observation';
import { Agent } from '../agent';
import { Memory } from '../memory';
function makeMsg(role: 'user' | 'assistant', text: string): AgentDbMessage {
return {
id: crypto.randomUUID(),
createdAt: new Date(),
role,
content: [{ type: 'text', text }],
};
}
async function seedThread(store: InMemoryMemory, threadId: string): Promise<void> {
await store.saveThread({ id: threadId, resourceId: 'u-1' });
await store.saveMessages({
threadId,
resourceId: 'u-1',
messages: [makeMsg('user', 'one'), makeMsg('assistant', 'two')],
});
}
function makeNewObs(payload: string): NewObservation {
return {
scopeKind: 'thread',
scopeId: 't-1',
kind: 'observation',
payload,
durationMs: null,
schemaVersion: OBSERVATION_SCHEMA_VERSION,
createdAt: new Date(),
};
}
describe('agent.reflect', () => {
it('returns no-config when observational memory is not configured', async () => {
const agent = new Agent('a').model('openai/gpt-4o-mini');
const result = await agent.reflect({ threadId: 't-1', resourceId: 'u-1' });
expect(result).toEqual({ status: 'no-config' });
});
it('runs the cycle with the builder-time observer', async () => {
const store = new InMemoryMemory();
await seedThread(store, 't-1');
const observe = jest
.fn()
.mockResolvedValue([makeNewObs('builder-observed')]) as unknown as ObserveFn;
const memory = new Memory()
.storage(store)
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe });
const agent = new Agent('a').model('openai/gpt-4o-mini').instructions('test').memory(memory);
const result = await agent.reflect({ threadId: 't-1', resourceId: 'u-1' });
expect(result).toEqual({ status: 'ran', observationsWritten: 1, compacted: false });
const written = await store.getObservations({ scopeKind: 'thread', scopeId: 't-1' });
expect(written.map((r) => r.payload)).toEqual(['builder-observed']);
});
it('lets a call-time observer override the builder default', async () => {
const store = new InMemoryMemory();
await seedThread(store, 't-1');
const builderObserve = jest
.fn()
.mockResolvedValue([makeNewObs('builder')]) as unknown as ObserveFn;
const callObserve = jest.fn().mockResolvedValue([makeNewObs('call')]) as unknown as ObserveFn;
const memory = new Memory()
.storage(store)
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe: builderObserve });
const agent = new Agent('a').model('openai/gpt-4o-mini').instructions('test').memory(memory);
await agent.reflect({ threadId: 't-1', resourceId: 'u-1', observe: callObserve });
expect(builderObserve).not.toHaveBeenCalled();
expect(callObserve).toHaveBeenCalledTimes(1);
});
it('skips with lock-held when another holder is on the lock', async () => {
const store = new InMemoryMemory();
await seedThread(store, 't-1');
await store.acquireObservationLock('thread', 't-1', { ttlMs: 60_000, holderId: 'other' });
const observe = jest.fn().mockResolvedValue([makeNewObs('x')]) as unknown as ObserveFn;
const memory = new Memory()
.storage(store)
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe });
const agent = new Agent('a').model('openai/gpt-4o-mini').instructions('test').memory(memory);
const result = await agent.reflect({ threadId: 't-1', resourceId: 'u-1' });
expect(result).toEqual({ status: 'skipped', reason: 'lock-held' });
expect(observe).not.toHaveBeenCalled();
});
});
describe('agent.reflectInBackground', () => {
it('emits AgentEvent.Error when background setup fails before scheduling the cycle', async () => {
const store = new InMemoryMemory();
const errors: string[] = [];
const memory = new Memory()
.storage(store)
.freeform('# Notes')
.scope('thread')
.observationalMemory();
const agent = new Agent('a').model('openai/gpt-4o-mini').memory(memory);
agent.on(AgentEvent.Error, (event) => {
if (event.type === AgentEvent.Error) errors.push(event.message);
});
agent.reflectInBackground({ threadId: 't-1', resourceId: 'u-1' });
await new Promise((resolve) => setTimeout(resolve, 0));
expect(errors).toEqual(['Agent "a" requires instructions']);
});
});

View File

@ -1,8 +1,8 @@
import { z } from 'zod';
import { Tool } from '../sdk/tool';
import type { AgentMessage } from '../types/sdk/message';
import type { InterruptibleToolContext } from '../types/sdk/tool';
import type { AgentMessage } from '../../types/sdk/message';
import type { InterruptibleToolContext } from '../../types/sdk/tool';
import { Tool } from '../tool';
// ---------------------------------------------------------------------------
// Tool.describe() tests

View File

@ -0,0 +1,183 @@
import type { BuiltMemory } from '../../types';
import type { CompactFn, ObserveFn } from '../../types/sdk/observation';
import { Agent } from '../agent';
import { Memory } from '../memory';
describe('Memory builder — observational memory', () => {
const observe = jest.fn().mockResolvedValue([]) as unknown as ObserveFn;
it('omits observationalMemory when not configured', () => {
const config = new Memory().build();
expect(config.observationalMemory).toBeUndefined();
});
it('applies lockTtlMs default', () => {
const config = new Memory()
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe })
.build();
expect(config.observationalMemory?.lockTtlMs).toBe(30_000);
});
it('applies trigger, compaction, and gap defaults', () => {
const config = new Memory()
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe })
.build();
expect(config.observationalMemory?.trigger).toEqual({ type: 'per-turn' });
expect(config.observationalMemory?.compactionThreshold).toBe(5);
expect(config.observationalMemory?.gapThresholdMs).toBe(60 * 60_000);
});
it('respects consumer overrides for lockTtlMs', () => {
const config = new Memory()
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe, lockTtlMs: 5_000 })
.build();
expect(config.observationalMemory?.lockTtlMs).toBe(5_000);
});
it('forwards optional fields untouched', () => {
const compact = jest.fn().mockResolvedValue({ content: '# Notes' }) as unknown as CompactFn;
const config = new Memory()
.freeform('# Notes')
.scope('thread')
.observationalMemory({
observe,
compact,
trigger: { type: 'idle-timer', idleMs: 5 * 60 * 1000, gapThresholdMs: 3600_000 },
compactionThreshold: 25,
gapThresholdMs: 30 * 60_000,
observerPrompt: 'Observe.',
compactorPrompt: 'Compact.',
sync: true,
})
.build();
expect(config.observationalMemory?.observe).toBe(observe);
expect(config.observationalMemory?.compact).toBe(compact);
expect(config.observationalMemory?.compactionThreshold).toBe(25);
expect(config.observationalMemory?.trigger).toEqual({
type: 'idle-timer',
idleMs: 5 * 60 * 1000,
gapThresholdMs: 3600_000,
});
expect(config.observationalMemory?.gapThresholdMs).toBe(30 * 60_000);
expect(config.observationalMemory?.observerPrompt).toBe('Observe.');
expect(config.observationalMemory?.compactorPrompt).toBe('Compact.');
expect(config.observationalMemory?.sync).toBe(true);
});
it('uses idle-timer trigger gapThresholdMs when no top-level override is set', () => {
const config = new Memory()
.freeform('# Notes')
.scope('thread')
.observationalMemory({
observe,
trigger: { type: 'idle-timer', idleMs: 5 * 60 * 1000, gapThresholdMs: 45 * 60_000 },
})
.build();
expect(config.observationalMemory?.gapThresholdMs).toBe(45 * 60_000);
});
it('rejects backends that do not implement BuiltObservationStore', () => {
const minimalBackend = {
getThread: jest.fn().mockResolvedValue(null),
saveThread: jest.fn().mockResolvedValue({}),
deleteThread: jest.fn().mockResolvedValue(undefined),
getMessages: jest.fn().mockResolvedValue([]),
saveMessages: jest.fn().mockResolvedValue(undefined),
deleteMessages: jest.fn().mockResolvedValue(undefined),
describe: () => ({
name: 'minimal',
constructorName: 'MinimalMemory',
connectionParams: null,
}),
} as unknown as BuiltMemory;
expect(() =>
new Memory()
.storage(minimalBackend)
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe })
.build(),
).toThrow(/BuiltObservationStore/);
});
it('rejects partial observation backends before runtime cycles can use them', () => {
const partialObservationBackend = {
getThread: jest.fn().mockResolvedValue(null),
saveThread: jest.fn().mockResolvedValue({}),
deleteThread: jest.fn().mockResolvedValue(undefined),
getMessages: jest.fn().mockResolvedValue([]),
saveMessages: jest.fn().mockResolvedValue(undefined),
deleteMessages: jest.fn().mockResolvedValue(undefined),
saveWorkingMemory: jest.fn().mockResolvedValue(undefined),
appendObservations: jest.fn().mockResolvedValue([]),
describe: () => ({
name: 'partial-observation',
constructorName: 'PartialObservationMemory',
connectionParams: null,
}),
} as unknown as BuiltMemory;
expect(() =>
new Memory()
.storage(partialObservationBackend)
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe })
.build(),
).toThrow(/BuiltObservationStore/);
});
it('requires workingMemory', () => {
expect(() => new Memory().observationalMemory({ observe }).build()).toThrow(/working memory/);
});
it('requires thread-scoped working memory', () => {
expect(() =>
new Memory().freeform('# Notes').scope('resource').observationalMemory({ observe }).build(),
).toThrow(/thread-scoped working memory/);
});
it('coexists with workingMemory', () => {
const config = new Memory()
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe })
.build();
expect(config.workingMemory).toBeDefined();
expect(config.workingMemory?.scope).toBe('thread');
expect(config.observationalMemory).toBeDefined();
});
describe('agent.snapshot.hasObservationalMemory', () => {
it('is false when no memory is configured', () => {
const agent = new Agent('a').model('openai/gpt-4o-mini');
expect(agent.snapshot.hasObservationalMemory).toBe(false);
});
it('is false when memory is configured without observational block', () => {
const memory = new Memory();
const agent = new Agent('a').model('openai/gpt-4o-mini').memory(memory);
expect(agent.snapshot.hasObservationalMemory).toBe(false);
});
it('is true when observationalMemory is configured', () => {
const memory = new Memory()
.freeform('# Notes')
.scope('thread')
.observationalMemory({ observe });
const agent = new Agent('a').model('openai/gpt-4o-mini').memory(memory);
expect(agent.snapshot.hasObservationalMemory).toBe(true);
});
});
});

View File

@ -1,5 +1,5 @@
import { getCreatedAt } from '../sdk/message';
import type { AgentMessage } from '../types/sdk/message';
import type { AgentMessage } from '../../types/sdk/message';
import { getCreatedAt } from '../message';
function userMessage(partial: Partial<AgentMessage> & { createdAt?: unknown }): AgentMessage {
return partial as AgentMessage;

View File

@ -1,6 +1,6 @@
import type { TelemetryIntegration } from 'ai';
import { Telemetry } from '../sdk/telemetry';
import { Telemetry } from '../telemetry';
describe('Telemetry builder', () => {
it('builds with defaults', async () => {

View File

@ -1,7 +1,7 @@
import { z } from 'zod';
import { Tool, wrapToolForApproval } from '../sdk/tool';
import type { BuiltTelemetry, BuiltTool, InterruptibleToolContext, ToolContext } from '../types';
import type { BuiltTelemetry, BuiltTool, InterruptibleToolContext, ToolContext } from '../../types';
import { Tool, wrapToolForApproval } from '../tool';
// ---------------------------------------------------------------------------
// Test helpers

View File

@ -8,9 +8,14 @@ import { Telemetry } from './telemetry';
import { Tool, wrapToolForApproval } from './tool';
import { AgentRuntime } from '../runtime/agent-runtime';
import { AgentEventBus } from '../runtime/event-bus';
import { hasObservationStore } from '../runtime/observation-store';
import {
runObservationalCycle,
type RunObservationalCycleOpts,
type RunObservationalCycleResult,
} from '../runtime/observational-cycle';
import { createAgentToolResult } from '../runtime/tool-adapter';
import type {
AgentEvent,
AgentEventHandler,
AgentMiddleware,
BuiltAgent,
@ -20,6 +25,8 @@ import type {
BuiltProviderTool,
BuiltTool,
BuiltTelemetry,
CompactFn,
ObserveFn,
CheckpointStore,
ExecutionOptions,
GenerateResult,
@ -34,6 +41,7 @@ import type {
ThinkingConfigFor,
ResumeOptions,
} from '../types';
import { AgentEvent } from '../types/runtime/event';
import type { AgentBuilder } from '../types/sdk/agent-builder';
import type { AgentMessage } from '../types/sdk/message';
import type { Workspace } from '../workspace/workspace';
@ -57,6 +65,8 @@ export interface AgentSnapshot {
tools: ReadonlyArray<{ name: string; description: string | undefined }>;
/** True when `.memory()` has been configured. */
hasMemory: boolean;
/** True when observational memory has been configured on the memory builder. */
hasObservationalMemory: boolean;
/** The thinking config if set, otherwise null. */
thinking: ThinkingConfig | null;
/** Tool-call concurrency limit if set, otherwise null. */
@ -397,6 +407,15 @@ export class Agent implements BuiltAgent, AgentBuilder {
this.eventBus.on(event, handler);
}
/**
* Remove a previously registered event handler. Pair with `on()` so
* per-request subscribers (e.g. the cli's ExecutionRecorder) can detach
* cleanly between turns instead of accumulating on a long-lived agent.
*/
off(event: AgentEvent, handler: AgentEventHandler): void {
this.eventBus.off(event, handler);
}
/**
* Wrap this agent as a tool for use in multi-agent composition.
* The tool sends a text prompt to this agent and returns the text of the response.
@ -491,6 +510,7 @@ export class Agent implements BuiltAgent, AgentBuilder {
instructions: this.instructionsText ?? null,
tools: this.tools.map((t) => ({ name: t.name, description: t.description })),
hasMemory: this.memoryConfig !== undefined,
hasObservationalMemory: this.memoryConfig?.observationalMemory !== undefined,
thinking: this.thinkingConfig ?? null,
toolCallConcurrency: this.concurrencyValue ?? null,
requireToolApproval: this.requireToolApprovalValue,
@ -518,6 +538,109 @@ export class Agent implements BuiltAgent, AgentBuilder {
this.eventBus.abort();
}
/**
* Wait for any in-flight background tasks (title generation, future
* observer cycles) to settle. Call before letting the agent go out of
* scope to ensure deferred writes land. Safe to call multiple times.
*/
async close(): Promise<void> {
if (this.runtime) await this.runtime.dispose();
}
/** Run one observational cycle for a thread synchronously. */
async reflect(opts: {
threadId: string;
resourceId: string;
observe?: ObserveFn;
compact?: CompactFn;
}): Promise<{ status: 'no-config' } | RunObservationalCycleResult> {
const cycle = await this.buildCycleOpts(opts);
if (cycle === null) return { status: 'no-config' };
return await runObservationalCycle(cycle);
}
/**
* Schedule an observational-memory cycle on the background-task tracker
* and return immediately. Used by consumers (e.g. the cli's post-stream
* trigger) that want the observer + compactor to run without blocking
* the response. Errors inside the cycle are surfaced via
* `AgentEvent.Error` (source: 'observer' | 'compactor').
*
* No-ops when observational memory isn't configured or no observer is
* available same `'no-config'` short-circuit as `reflect()`.
*/
reflectInBackground(opts: {
threadId: string;
resourceId: string;
observe?: ObserveFn;
compact?: CompactFn;
}): void {
void (async () => {
const cycle = await this.buildCycleOpts(opts);
if (cycle === null) return;
const runtime = await this.ensureBuilt();
runtime.scheduleBackgroundCycle(cycle);
})().catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
this.eventBus.emit({ type: AgentEvent.Error, message, error });
});
}
/**
* Build {@link RunObservationalCycleOpts} from the agent's configured
* observational memory + per-call observer/compactor overrides.
*/
private async buildCycleOpts(opts: {
threadId: string;
resourceId: string;
observe?: ObserveFn;
compact?: CompactFn;
}): Promise<RunObservationalCycleOpts | null> {
const obsConfig = this.memoryConfig?.observationalMemory;
const memory = this.memoryConfig?.memory;
const workingMemory = this.memoryConfig?.workingMemory;
if (
!obsConfig ||
!memory ||
!workingMemory ||
!this.modelConfig ||
!hasObservationStore(memory)
) {
return null;
}
const runtime = await this.ensureBuilt();
const telemetry = runtime.getConfiguredTelemetry();
return {
memory,
threadId: opts.threadId,
resourceId: opts.resourceId,
model: this.modelConfig,
workingMemory: {
template: workingMemory.template,
structured: workingMemory.structured,
...(workingMemory.schema !== undefined && { schema: workingMemory.schema }),
},
...((opts.observe ?? obsConfig.observe)
? { observe: opts.observe ?? obsConfig.observe }
: {}),
...((opts.compact ?? obsConfig.compact)
? { compact: opts.compact ?? obsConfig.compact }
: {}),
...(obsConfig.trigger !== undefined && { trigger: obsConfig.trigger }),
...(obsConfig.compactionThreshold !== undefined && {
compactionThreshold: obsConfig.compactionThreshold,
}),
...(obsConfig.gapThresholdMs !== undefined && { gapThresholdMs: obsConfig.gapThresholdMs }),
...(obsConfig.observerPrompt !== undefined && { observerPrompt: obsConfig.observerPrompt }),
...(obsConfig.compactorPrompt !== undefined && {
compactorPrompt: obsConfig.compactorPrompt,
}),
...(obsConfig.lockTtlMs !== undefined && { lockTtlMs: obsConfig.lockTtlMs }),
...(telemetry !== undefined && { telemetry }),
eventBus: this.eventBus,
};
}
/** Generate a response (non-streaming). Lazy-builds on first call. */
async generate(
input: AgentMessage[] | string,
@ -702,6 +825,7 @@ export class Agent implements BuiltAgent, AgentBuilder {
eventBus: this.eventBus,
toolCallConcurrency: this.concurrencyValue,
titleGeneration: this.memoryConfig?.titleGeneration,
observationalMemory: this.memoryConfig?.observationalMemory,
telemetry: this.telemetryConfig ?? (await this.telemetryBuilder?.build()),
});

View File

@ -1,13 +1,19 @@
import type { z } from 'zod';
import { InMemoryMemory } from '../runtime/memory-store';
import { hasObservationStore } from '../runtime/observation-store';
import { templateFromSchema } from '../runtime/working-memory';
import type {
BuiltMemory,
MemoryConfig,
ObservationalMemoryConfig,
SemanticRecallConfig,
TitleGenerationConfig,
} from '../types';
import { DEFAULT_OBSERVATION_GAP_THRESHOLD_MS } from '../types';
const DEFAULT_OBSERVATION_LOCK_TTL_MS = 30_000;
const DEFAULT_OBSERVATION_COMPACTION_THRESHOLD = 5;
type ZodObjectSchema = z.ZodObject<z.ZodRawShape>;
@ -43,6 +49,8 @@ export class Memory {
private titleGenerationConfig?: TitleGenerationConfig;
private observationalMemoryConfig?: ObservationalMemoryConfig;
/** The configured number of recent messages to include. */
get lastMessageCount(): number {
return this.lastMessagesValue;
@ -144,6 +152,11 @@ export class Memory {
return this;
}
observationalMemory(config: ObservationalMemoryConfig = {}): this {
this.observationalMemoryConfig = config;
return this;
}
/**
* Validate configuration and produce a `MemoryConfig`.
*
@ -204,12 +217,59 @@ export class Memory {
};
}
return {
const baseConfig = {
memory,
lastMessages: this.lastMessagesValue,
workingMemory,
semanticRecall: this.semanticRecallConfig,
titleGeneration: this.titleGenerationConfig,
};
if (!this.observationalMemoryConfig) {
return baseConfig;
}
if (!hasObservationStore(memory)) {
throw new Error(
"Observational memory requires a storage backend that implements BuiltObservationStore (e.g. SqliteMemory or n8n's N8nMemory).",
);
}
if (!workingMemory) {
throw new Error(
'Observational memory requires working memory. Add .freeform(template) or .structured(schema) before .observationalMemory().',
);
}
if (workingMemory.scope !== 'thread') {
throw new Error(
"Observational memory requires thread-scoped working memory. Add .scope('thread') before .observationalMemory().",
);
}
if (!memory.saveWorkingMemory) {
throw new Error(
'Observational memory requires a storage backend that implements saveWorkingMemory().',
);
}
return {
...baseConfig,
memory,
observationalMemory: {
...this.observationalMemoryConfig,
lockTtlMs: this.observationalMemoryConfig.lockTtlMs ?? DEFAULT_OBSERVATION_LOCK_TTL_MS,
compactionThreshold:
this.observationalMemoryConfig.compactionThreshold ??
DEFAULT_OBSERVATION_COMPACTION_THRESHOLD,
trigger: this.observationalMemoryConfig.trigger ?? { type: 'per-turn' },
gapThresholdMs:
this.observationalMemoryConfig.gapThresholdMs ??
(this.observationalMemoryConfig.trigger?.type === 'idle-timer'
? this.observationalMemoryConfig.trigger.gapThresholdMs
: undefined) ??
DEFAULT_OBSERVATION_GAP_THRESHOLD_MS,
},
};
}
}

View File

@ -73,13 +73,20 @@ export type {
CompactFn,
NewObservation,
Observation,
ObservationCategory,
ObservationCursor,
ObservationGapContext,
ObservationLockHandle,
ObservationalMemoryConfig,
ObservationalMemoryTrigger,
ObserveFn,
ScopeKind,
} from './sdk/observation';
export { OBSERVATION_SCHEMA_VERSION } from './sdk/observation';
export {
DEFAULT_OBSERVATION_GAP_THRESHOLD_MS,
OBSERVATION_CATEGORIES,
OBSERVATION_SCHEMA_VERSION,
} from './sdk/observation';
export type {
EvalInput,

View File

@ -23,7 +23,12 @@ export type AgentEventData =
result: unknown;
isError: boolean;
}
| { type: AgentEvent.Error; message: string; error: unknown };
| {
type: AgentEvent.Error;
message: string;
error: unknown;
source?: 'observer' | 'compactor';
};
export type AgentEventHandler = (data: AgentEventData) => void;

View File

@ -5,40 +5,47 @@ import type { AgentDbMessage } from './message';
import type { BuiltTelemetry } from '../telemetry';
import type { JSONValue } from '../utils/json';
/**
* Schema version stamped onto every observation row. Bump when the row format
* changes incompatibly. Read-side helpers filter rows newer than the running
* SDK can interpret.
*/
export const OBSERVATION_SCHEMA_VERSION = 1;
/**
* Scope an observation belongs to. v1 writes only `'thread'`; the others are
* reserved so future resource- and agent-scoped observers are a behavioral
* change, not a schema migration.
*/
export const DEFAULT_OBSERVATION_GAP_THRESHOLD_MS = 60 * 60_000;
export const OBSERVATION_CATEGORIES = [
'facts',
'preferences',
'goal',
'state',
'active_items',
'decisions',
'follow_ups',
'continuity',
'superseded',
'other',
] as const;
export type ObservationCategory = (typeof OBSERVATION_CATEGORIES)[number];
export interface ObservationGapContext {
durationMs: number;
text: string;
previousObservedAt: Date;
nextMessageAt: Date;
}
export type ScopeKind = 'thread' | 'resource' | 'agent';
/** A persisted observation row. */
export interface Observation {
id: string;
scopeKind: ScopeKind;
scopeId: string;
/** Free-form, consumer-defined. The SDK reserves no values. */
kind: string;
payload: JSONValue;
/** Populated for kinds that represent a time gap; otherwise `null`. */
durationMs: number | null;
schemaVersion: number;
createdAt: Date;
}
/** Shape passed to `appendObservations`. `id` is backend-assigned. */
export type NewObservation = Omit<Observation, 'id'>;
/**
* Per-scope mutable state for the observer's message cursor.
*/
export interface ObservationCursor {
scopeKind: ScopeKind;
scopeId: string;
@ -54,12 +61,6 @@ export interface ObservationLockHandle {
heldUntil: Date;
}
/**
* Consumer-provided observer function. Called inside the orchestrator's
* lock + cursor scope; receives the message delta since the last cursor
* advance and the current thread working-memory document, then returns zero
* or more rows to append.
*/
export type ObserveFn = (ctx: {
deltaMessages: AgentDbMessage[];
currentWorkingMemory: string | null;
@ -68,14 +69,10 @@ export type ObserveFn = (ctx: {
resourceId: string;
now: Date;
trigger: ObservationalMemoryTrigger;
gap: ObservationGapContext | null;
telemetry: BuiltTelemetry | undefined;
}) => Promise<NewObservation[]>;
/**
* Consumer-provided compactor function. Reads queued observations + the
* current working-memory document, and returns the complete replacement
* working-memory document.
*/
export type CompactFn = (ctx: {
observations: Observation[];
currentWorkingMemory: string | null;
@ -89,29 +86,8 @@ export type CompactFn = (ctx: {
telemetry: BuiltTelemetry | undefined;
}) => Promise<{ content: string }>;
/**
* Storage interface for observational memory. A sibling to {@link BuiltMemory}:
* implementations typically live on the same class (cli's `N8nMemory` and the
* SDK's `InMemoryMemory` both implement both), but the interfaces are kept
* separate so observations stay out of the message-store API and consumers
* don't need to feature-check every call. When `observationalMemory` is
* configured on the builder, the configured backend must also implement this
* interface.
*/
export interface BuiltObservationStore {
/**
* Append observation rows for a scope. Backends assign `id` and return the
* persisted shape.
*/
appendObservations(rows: NewObservation[]): Promise<Observation[]>;
/**
* Query observations for a scope. Filters compose: `since`, when supplied,
* returns only rows strictly after the keyset `(createdAt, id) >
* (since.sinceCreatedAt, since.sinceObservationId)`; `kindIs` matches
* `kind` exactly; `schemaVersionAtMost` excludes rows whose `schemaVersion`
* exceeds the caller's supported version. Results are ordered by
* `(createdAt, id)` ascending.
*/
getObservations(opts: {
scopeKind: ScopeKind;
scopeId: string;
@ -120,41 +96,19 @@ export interface BuiltObservationStore {
limit?: number;
schemaVersionAtMost?: number;
}): Promise<Observation[]>;
/**
* Read the message delta the observer needs to process for a given scope.
*
* - `'thread'`: messages for `scopeId` (== threadId).
* - non-thread scopes are reserved for future versions. v1 backends should
* throw for them.
*
* When `since` is supplied, only messages strictly after the keyset
* `(createdAt, id) > (since.sinceCreatedAt, since.sinceMessageId)` are
* returned. Results are ordered by `(createdAt, id)` ascending the last
* element is the most recently appended.
*/
getMessagesForScope(
scopeKind: ScopeKind,
scopeId: string,
opts?: { since?: { sinceCreatedAt: Date; sinceMessageId: string } },
): Promise<AgentDbMessage[]>;
/** Hard-delete the given rows. Idempotent: missing ids are ignored. */
deleteObservations(ids: string[]): Promise<void>;
/** Read the cursor for a scope; `null` if none has been written yet. */
getCursor(scopeKind: ScopeKind, scopeId: string): Promise<ObservationCursor | null>;
/** Upsert the cursor-advance fields for a scope. */
setCursor(cursor: ObservationCursor): Promise<void>;
/**
* Acquire a per-scope advisory lock with TTL. Returns a handle on
* success or `null` if the lock is held by another holder and not yet
* expired. Holders other than `holderId` whose `heldUntil` is in the
* past may be displaced.
*/
acquireObservationLock(
scopeKind: ScopeKind,
scopeId: string,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLockHandle | null>;
/** Release a held lock. Tolerates the lock having already expired or been displaced. */
releaseObservationLock(handle: ObservationLockHandle): Promise<void>;
}
@ -162,43 +116,18 @@ export type ObservationalMemoryTrigger =
| { type: 'per-turn' }
| {
type: 'idle-timer';
/** Milliseconds after TurnEnd before the observer runs. */
idleMs: number;
/** Emit a gap row when the elapsed time since the previous observed turn exceeds this. */
gapThresholdMs?: number;
};
/** Observational-memory configuration block on `MemoryConfig`. */
export interface ObservationalMemoryConfig {
/**
* Builder-time observer override. Omit this to use the SDK reference
* observer prompt + the agent's configured model.
*/
observe?: ObserveFn;
/**
* Builder-time compactor override. Omit this to use the SDK reference
* compactor prompt + the agent's configured model.
*/
compact?: CompactFn;
/** @default { type: 'per-turn' } */
trigger?: ObservationalMemoryTrigger;
/** Queue size that triggers compaction into the thread working-memory document. @default 5 */
compactionThreshold?: number;
/** Replaces the SDK reference observer system prompt. */
gapThresholdMs?: number;
observerPrompt?: string;
/** Replaces the SDK reference compactor system prompt. */
compactorPrompt?: string;
/**
* TTL applied when the orchestrator acquires the per-scope observation
* lock.
* @default 30_000
*/
lockTtlMs?: number;
/**
* When `true`, `runObservationalCycle` calls dispatched by the SDK (e.g.
* lazy fallback at `TurnStart`) are awaited; otherwise they are tracked
* by the background-task tracker and resolve on `runtime.dispose()`.
* @default false
*/
sync?: boolean;
}

View File

@ -1,7 +1,7 @@
import type { JSONSchema7 } from 'json-schema';
import { z } from 'zod';
import { parseWithSchema } from '../utils/parse';
import { parseWithSchema } from '../parse';
// ---------------------------------------------------------------------------
// parseWithSchema — Zod schemas

View File

@ -80,8 +80,8 @@ describe('ExecutionRecorder', () => {
});
});
describe('working memory capture', () => {
it('captures the working-memory tool call as a timeline event', () => {
describe('working memory tool chunks', () => {
it('records update_working_memory as a regular tool call when present', () => {
const recorder = new ExecutionRecorder();
recorder.record({ type: 'text-delta', id: 't1', delta: 'Hello' });
@ -101,12 +101,18 @@ describe('ExecutionRecorder', () => {
const record = recorder.getMessageRecord();
expect(record.workingMemory).toBe('# Name: Alice');
expect(record.toolCalls).toEqual([]);
expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(true);
expect(record.workingMemory).toBeNull();
expect(record.toolCalls).toEqual([
{
name: 'update_working_memory',
input: { memory: '# Name: Alice' },
output: { success: true },
},
]);
expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(false);
});
it('keeps last working memory when multiple updates occur', () => {
it('does not derive execution working memory from update_working_memory calls', () => {
const recorder = new ExecutionRecorder();
recorder.record({
@ -124,7 +130,7 @@ describe('ExecutionRecorder', () => {
recorder.record({ type: 'finish', finishReason: 'stop' } as StreamChunk);
const record = recorder.getMessageRecord();
expect(record.workingMemory).toBe('second');
expect(record.workingMemory).toBeNull();
});
});

View File

@ -455,10 +455,17 @@ describe('buildFromJson()', () => {
expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Current goal/task');
expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Key active items');
expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Resolved or superseded');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('thread-scoped');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('current-state snapshot');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain(
'primary, secondary, active, resolved, and superseded',
'only to this same session/thread',
);
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('different session');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('new thread');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('cross-thread profile');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain(
'Treat working memory as internal context',
);
expect(getMemoryConfig(agent)?.workingMemory?.instruction).not.toContain(
'update_working_memory',
);
});

View File

@ -1,4 +1,4 @@
import { UPDATE_WORKING_MEMORY_TOOL_NAME, type AgentMessage, type StreamChunk } from '@n8n/agents';
import type { AgentMessage, StreamChunk } from '@n8n/agents';
import type {
AgentPersistedMessageContentPart,
AgentSseEvent,
@ -24,13 +24,6 @@ export interface ToolEventCallbacks {
interface ChunkHandlerCtx {
send: (e: AgentSseEvent) => void;
onToolEvent?: ToolEventCallbacks;
/**
* Tool-call ids belonging to the SDK-internal working-memory tool. The id
* Set is needed because `tool-input-delta` chunks carry only the id, not
* the tool name we capture the id on `tool-input-start` / `tool-call`
* and use it to drop the matching streamed memory content.
*/
workingMemoryToolCallIds: Set<string>;
}
/**
@ -110,51 +103,6 @@ function emitTextLikeChunk(
* SSE-emit a tool-* chunk and fire any matching builder side-effect callback.
* Returns `{ suspended: true }` when the chunk was `tool-call-suspended`.
*/
/**
* Working memory is implemented as an SDK tool, but n8n surfaces it as a
* distinct memory event in the chat UI rather than a regular tool step.
* Returns `true` when the chunk was handled and should not flow through the
* regular tool emission path.
*/
function handleWorkingMemoryChunk(
chunk: Extract<
StreamChunk,
{
type:
| 'tool-input-start'
| 'tool-input-delta'
| 'tool-call'
| 'tool-execution-start'
| 'tool-result';
}
>,
ctx: ChunkHandlerCtx,
): boolean {
const { send, workingMemoryToolCallIds } = ctx;
const isWmName = 'toolName' in chunk && chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME;
if (chunk.type === 'tool-input-delta') {
return workingMemoryToolCallIds.has(chunk.toolCallId);
}
if (!isWmName) return false;
if (chunk.type === 'tool-input-start' || chunk.type === 'tool-call') {
workingMemoryToolCallIds.add(chunk.toolCallId);
return true;
}
if (chunk.type === 'tool-execution-start') return true;
if (chunk.type === 'tool-result') {
if (chunk.isError) {
const errMsg = chunk.output instanceof Error ? chunk.output.message : String(chunk.output);
send({ type: 'error', message: `Working memory update failed: ${errMsg}` });
} else {
send({ type: 'working-memory-update', toolName: chunk.toolName });
}
return true;
}
return false;
}
function emitToolChunk(
chunk: Extract<
StreamChunk,
@ -172,10 +120,6 @@ function emitToolChunk(
): { suspended: boolean } {
const { send, onToolEvent } = ctx;
if (chunk.type !== 'tool-call-suspended' && handleWorkingMemoryChunk(chunk, ctx)) {
return { suspended: false };
}
switch (chunk.type) {
case 'tool-input-start':
send({
@ -293,7 +237,6 @@ export async function pumpChunks(
const ctx: ChunkHandlerCtx = {
send,
onToolEvent,
workingMemoryToolCallIds: new Set<string>(),
};
for await (const chunk of chunks) {

View File

@ -1,17 +1,8 @@
import { UPDATE_WORKING_MEMORY_TOOL_NAME, type StreamChunk } from '@n8n/agents';
import type { StreamChunk } from '@n8n/agents';
import { extractFromAICalls, isFromAIOnlyExpression } from 'n8n-workflow';
import type { ToolRegistry } from './tool-registry';
/** Pull the human-readable working-memory content out of the WM tool's input. */
function workingMemoryContentFromInput(input: unknown): string {
if (input && typeof input === 'object' && !Array.isArray(input)) {
const maybe = (input as Record<string, unknown>).memory;
if (typeof maybe === 'string') return maybe;
}
return JSON.stringify(input, null, 2);
}
/**
* Walk a nodeParameters tree and substitute every `$fromAI('key', ...)`
* expression with the value the LLM passed for that key (or the call's
@ -189,18 +180,9 @@ export class ExecutionRecorder {
this.textBuffer.push(chunk.delta);
break;
case 'tool-call':
if (chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME) {
this.recordWorkingMemoryUpdate(workingMemoryContentFromInput(chunk.input));
} else {
this.recordToolCall(chunk.toolCallId, chunk.toolName, chunk.input);
}
break;
case 'tool-result':
if (chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME) {
// WM tool-result is already represented by the timeline entry
// pushed at tool-call time; nothing more to do here.
break;
}
this.recordToolResult(
chunk.toolCallId,
chunk.toolName,
@ -281,16 +263,6 @@ export class ExecutionRecorder {
this.textStartTime = null;
}
private recordWorkingMemoryUpdate(content: string): void {
this.flushTextBuffer();
this.workingMemory = content;
this.timeline.push({
type: 'working-memory',
content,
timestamp: Date.now(),
});
}
/**
* Record a discrete `tool-call` chunk from the stream. Maintains both the
* flat `toolCalls` array (backward compat) and the ordered timeline. The

View File

@ -7,13 +7,7 @@ import type {
ToolDescriptor,
JSONObject,
} from '@n8n/agents';
import {
Agent,
Memory,
Tool,
UPDATE_WORKING_MEMORY_TOOL_NAME,
wrapToolForApproval,
} from '@n8n/agents';
import { Agent, Memory, Tool, wrapToolForApproval } from '@n8n/agents';
import type { AgentSkill } from '@n8n/api-types';
import { z } from 'zod';
@ -49,15 +43,13 @@ const DEFAULT_WORKING_MEMORY_TEMPLATE = `# Thread memory
- Resolved or superseded:`;
const DEFAULT_WORKING_MEMORY_INSTRUCTION = [
'You have thread-scoped working memory for this conversation.',
`When the user shares durable facts, preferences, decisions, goals, or unresolved follow-ups that will help later turns in this same thread, call ${UPDATE_WORKING_MEMORY_TOOL_NAME} with the complete updated memory.`,
'Treat working memory as a current-state snapshot, not an append-only log.',
'Keep it concise, factual, and current.',
'When facts, preferences, priorities, goals, decisions, or statuses change, replace outdated active items with the latest state.',
'Preserve distinctions the user makes between primary, secondary, active, resolved, and superseded items.',
'Move resolved or superseded items to that section only when they will help later; otherwise remove them.',
'Preserve useful existing notes, remove stale or contradicted notes, and do not store secrets or one-off details.',
`Only call ${UPDATE_WORKING_MEMORY_TOOL_NAME} when the memory should change.`,
'Thread working memory is maintained automatically after turns by an out-of-band observer.',
'Thread working memory applies only to this same session/thread.',
'Do not claim it is available in a different session, new thread, or cross-thread profile unless the product explicitly provides that context.',
'Use it silently as private read-only context for this session.',
'Treat working memory as internal context; do not reveal, quote, append, or reproduce the raw working-memory document in user-visible replies.',
'If the user asks what you remember, answer conversationally from relevant memory instead of dumping the document.',
'Do not try to edit, summarize, refresh, or maintain working memory directly.',
].join(' ');
export interface BuildFromJsonOptions {