fix: align observational memory runtime wiring

This commit is contained in:
Robin Braumann 2026-05-07 22:31:38 +02:00
parent d691022d01
commit 06944d524c
9 changed files with 84 additions and 134 deletions

View File

@ -526,7 +526,8 @@ describe('AgentRuntime.stream() — working memory', () => {
});
await collectChunks(stream);
const callArgs = streamText.mock.calls[0][0] as Record<string, unknown>;
const calls = streamText.mock.calls as Array<[Record<string, unknown>]>;
const callArgs = calls[0]?.[0] ?? {};
expect(callArgs.tools ?? {}).not.toHaveProperty('update_working_memory');
expect(savedWorkingMemory).toEqual([]);
});

View File

@ -1,5 +1,13 @@
import { z } from 'zod';
import { AgentEvent } from '../../types';
import type { AgentDbMessage } from '../../types/sdk/message';
import {
OBSERVATION_SCHEMA_VERSION,
type CompactFn,
type NewObservation,
type ObserveFn,
} from '../../types/sdk/observation';
import { AgentEventBus } from '../event-bus';
import { InMemoryMemory, saveMessagesToThread } from '../memory-store';
import {
@ -8,14 +16,6 @@ import {
runObservationalCycle,
type RunObservationalCycleOpts,
} from '../observational-cycle';
import { AgentEvent } from '../../types';
import {
OBSERVATION_SCHEMA_VERSION,
type CompactFn,
type NewObservation,
type ObserveFn,
} from '../../types/sdk/observation';
import type { AgentDbMessage } from '../../types/sdk/message';
type GenerateTextCall = { model: unknown; system?: string; prompt?: string };
const mockGenerateText = jest.fn<Promise<{ text: string }>, [GenerateTextCall]>();
@ -55,7 +55,10 @@ function opts(
resourceId: 'u-1',
model: { doGenerate: jest.fn() } as never,
workingMemory: { template: '# Thread memory', structured: false },
observe: async () => [],
observe: async () => {
await Promise.resolve();
return [];
},
compactionThreshold: 5,
...overrides,
};
@ -70,6 +73,7 @@ describe('runObservationalCycle', () => {
const mem = new InMemoryMemory();
await save(mem, [msg('m1', 'remember that I prefer concise answers')]);
const observe = jest.fn<ReturnType<ObserveFn>, Parameters<ObserveFn>>(async (ctx) => {
await Promise.resolve();
expect(ctx.deltaMessages.map((m) => m.id)).toEqual(['m1']);
expect(ctx.currentWorkingMemory).toBeNull();
expect(ctx.threadId).toBe('t-1');
@ -94,6 +98,7 @@ describe('runObservationalCycle', () => {
'# Thread memory\n- Current project:',
);
const compact = jest.fn<ReturnType<CompactFn>, Parameters<CompactFn>>(async (ctx) => {
await Promise.resolve();
expect(ctx.observations).toHaveLength(1);
expect(ctx.currentWorkingMemory).toContain('Current project');
return { content: '# Thread memory\n- Current project: Memory v1' };
@ -101,7 +106,10 @@ describe('runObservationalCycle', () => {
const result = await runObservationalCycle(
opts(mem, {
observe: async () => [row('Current project is Memory v1.')],
observe: async () => {
await Promise.resolve();
return [row('Current project is Memory v1.')];
},
compact,
compactionThreshold: 1,
}),
@ -121,7 +129,10 @@ describe('runObservationalCycle', () => {
const result = await runObservationalCycle(
opts(mem, {
observe: async () => [row('one')],
observe: async () => {
await Promise.resolve();
return [row('one')];
},
compact,
compactionThreshold: 2,
}),
@ -160,6 +171,7 @@ describe('runObservationalCycle', () => {
await save(mem, [msg('m2', 'later', second)]);
const observe = jest.fn<ReturnType<ObserveFn>, Parameters<ObserveFn>>(async (ctx) => {
await Promise.resolve();
expect(ctx.gap).toMatchObject({
durationMs: 90 * 60 * 1000,
text: 'User returned after 1h 30m of inactivity.',
@ -209,9 +221,10 @@ describe('runObservationalCycle', () => {
await runObservationalCycle(opts(mem));
await save(mem, [msg('m2', 'later', second)]);
const compact = jest.fn<ReturnType<CompactFn>, Parameters<CompactFn>>(async () => ({
content: '# Thread memory\n- Continuity notes: user returned after a gap',
}));
const compact = jest.fn<ReturnType<CompactFn>, Parameters<CompactFn>>(async () => {
await Promise.resolve();
return { content: '# Thread memory\n- Continuity notes: user returned after a gap' };
});
await runObservationalCycle(opts(mem, { compact, compactionThreshold: 1 }));
expect(compact).not.toHaveBeenCalled();
@ -220,7 +233,10 @@ describe('runObservationalCycle', () => {
await save(mem, [msg('m3', 'remember this decision', third)]);
await runObservationalCycle(
opts(mem, {
observe: async () => [row('Decision was recorded.')],
observe: async () => {
await Promise.resolve();
return [row('Decision was recorded.')];
},
compact,
compactionThreshold: 1,
}),
@ -324,8 +340,14 @@ describe('runObservationalCycle', () => {
structured: true,
schema: z.object({ name: z.string() }),
},
observe: async () => [row('Name is Alice.')],
compact: async () => ({ content: '{"name": 123}' }),
observe: async () => {
await Promise.resolve();
return [row('Name is Alice.')];
},
compact: async () => {
await Promise.resolve();
return { content: '{"name": 123}' };
},
compactionThreshold: 1,
eventBus,
}),
@ -351,6 +373,7 @@ describe('runObservationalCycle', () => {
const result = await runObservationalCycle(
opts(mem, {
observe: async () => {
await Promise.resolve();
throw new Error('observer failed');
},
eventBus,

View File

@ -42,6 +42,12 @@ describe('templateFromSchema', () => {
const result = templateFromSchema(schema);
expect(result).toContain('userName');
expect(result).toContain('favoriteColor');
expect(JSON.parse(result)).toHaveProperty('userName');
let parsed: unknown;
try {
parsed = JSON.parse(result) as unknown;
} catch (error) {
throw new Error(`Expected schema template to be valid JSON: ${String(error)}`);
}
expect(parsed).toHaveProperty('userName');
});
});

View File

@ -1,4 +1,4 @@
import { z } from 'zod';
import type { z } from 'zod';
type ZodObjectSchema = z.ZodObject<z.ZodRawShape>;

View File

@ -80,8 +80,8 @@ describe('ExecutionRecorder', () => {
});
});
describe('working memory capture', () => {
it('captures the working-memory tool call as a timeline event', () => {
describe('working memory tool chunks', () => {
it('records update_working_memory as a regular tool call when present', () => {
const recorder = new ExecutionRecorder();
recorder.record({ type: 'text-delta', id: 't1', delta: 'Hello' });
@ -101,12 +101,18 @@ describe('ExecutionRecorder', () => {
const record = recorder.getMessageRecord();
expect(record.workingMemory).toBe('# Name: Alice');
expect(record.toolCalls).toEqual([]);
expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(true);
expect(record.workingMemory).toBeNull();
expect(record.toolCalls).toEqual([
{
name: 'update_working_memory',
input: { memory: '# Name: Alice' },
output: { success: true },
},
]);
expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(false);
});
it('keeps last working memory when multiple updates occur', () => {
it('does not derive execution working memory from update_working_memory calls', () => {
const recorder = new ExecutionRecorder();
recorder.record({
@ -124,7 +130,7 @@ describe('ExecutionRecorder', () => {
recorder.record({ type: 'finish', finishReason: 'stop' } as StreamChunk);
const record = recorder.getMessageRecord();
expect(record.workingMemory).toBe('second');
expect(record.workingMemory).toBeNull();
});
});

View File

@ -455,10 +455,17 @@ describe('buildFromJson()', () => {
expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Current goal/task');
expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Key active items');
expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Resolved or superseded');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('thread-scoped');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('current-state snapshot');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain(
'primary, secondary, active, resolved, and superseded',
'only to this same session/thread',
);
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('different session');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('new thread');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('cross-thread profile');
expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain(
'Treat working memory as internal context',
);
expect(getMemoryConfig(agent)?.workingMemory?.instruction).not.toContain(
'update_working_memory',
);
});

View File

@ -1,4 +1,4 @@
import { UPDATE_WORKING_MEMORY_TOOL_NAME, type AgentMessage, type StreamChunk } from '@n8n/agents';
import type { AgentMessage, StreamChunk } from '@n8n/agents';
import type {
AgentPersistedMessageContentPart,
AgentSseEvent,
@ -24,13 +24,6 @@ export interface ToolEventCallbacks {
interface ChunkHandlerCtx {
send: (e: AgentSseEvent) => void;
onToolEvent?: ToolEventCallbacks;
/**
* Tool-call ids belonging to the SDK-internal working-memory tool. The id
* Set is needed because `tool-input-delta` chunks carry only the id, not
* the tool name we capture the id on `tool-input-start` / `tool-call`
* and use it to drop the matching streamed memory content.
*/
workingMemoryToolCallIds: Set<string>;
}
/**
@ -110,51 +103,6 @@ function emitTextLikeChunk(
* SSE-emit a tool-* chunk and fire any matching builder side-effect callback.
* Returns `{ suspended: true }` when the chunk was `tool-call-suspended`.
*/
/**
* Working memory is implemented as an SDK tool, but n8n surfaces it as a
* distinct memory event in the chat UI rather than a regular tool step.
* Returns `true` when the chunk was handled and should not flow through the
* regular tool emission path.
*/
function handleWorkingMemoryChunk(
chunk: Extract<
StreamChunk,
{
type:
| 'tool-input-start'
| 'tool-input-delta'
| 'tool-call'
| 'tool-execution-start'
| 'tool-result';
}
>,
ctx: ChunkHandlerCtx,
): boolean {
const { send, workingMemoryToolCallIds } = ctx;
const isWmName = 'toolName' in chunk && chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME;
if (chunk.type === 'tool-input-delta') {
return workingMemoryToolCallIds.has(chunk.toolCallId);
}
if (!isWmName) return false;
if (chunk.type === 'tool-input-start' || chunk.type === 'tool-call') {
workingMemoryToolCallIds.add(chunk.toolCallId);
return true;
}
if (chunk.type === 'tool-execution-start') return true;
if (chunk.type === 'tool-result') {
if (chunk.isError) {
const errMsg = chunk.output instanceof Error ? chunk.output.message : String(chunk.output);
send({ type: 'error', message: `Working memory update failed: ${errMsg}` });
} else {
send({ type: 'working-memory-update', toolName: chunk.toolName });
}
return true;
}
return false;
}
function emitToolChunk(
chunk: Extract<
StreamChunk,
@ -172,10 +120,6 @@ function emitToolChunk(
): { suspended: boolean } {
const { send, onToolEvent } = ctx;
if (chunk.type !== 'tool-call-suspended' && handleWorkingMemoryChunk(chunk, ctx)) {
return { suspended: false };
}
switch (chunk.type) {
case 'tool-input-start':
send({
@ -293,7 +237,6 @@ export async function pumpChunks(
const ctx: ChunkHandlerCtx = {
send,
onToolEvent,
workingMemoryToolCallIds: new Set<string>(),
};
for await (const chunk of chunks) {

View File

@ -1,17 +1,8 @@
import { UPDATE_WORKING_MEMORY_TOOL_NAME, type StreamChunk } from '@n8n/agents';
import type { StreamChunk } from '@n8n/agents';
import { extractFromAICalls, isFromAIOnlyExpression } from 'n8n-workflow';
import type { ToolRegistry } from './tool-registry';
/** Pull the human-readable working-memory content out of the WM tool's input. */
function workingMemoryContentFromInput(input: unknown): string {
if (input && typeof input === 'object' && !Array.isArray(input)) {
const maybe = (input as Record<string, unknown>).memory;
if (typeof maybe === 'string') return maybe;
}
return JSON.stringify(input, null, 2);
}
/**
* Walk a nodeParameters tree and substitute every `$fromAI('key', ...)`
* expression with the value the LLM passed for that key (or the call's
@ -189,18 +180,9 @@ export class ExecutionRecorder {
this.textBuffer.push(chunk.delta);
break;
case 'tool-call':
if (chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME) {
this.recordWorkingMemoryUpdate(workingMemoryContentFromInput(chunk.input));
} else {
this.recordToolCall(chunk.toolCallId, chunk.toolName, chunk.input);
}
this.recordToolCall(chunk.toolCallId, chunk.toolName, chunk.input);
break;
case 'tool-result':
if (chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME) {
// WM tool-result is already represented by the timeline entry
// pushed at tool-call time; nothing more to do here.
break;
}
this.recordToolResult(
chunk.toolCallId,
chunk.toolName,
@ -281,16 +263,6 @@ export class ExecutionRecorder {
this.textStartTime = null;
}
private recordWorkingMemoryUpdate(content: string): void {
this.flushTextBuffer();
this.workingMemory = content;
this.timeline.push({
type: 'working-memory',
content,
timestamp: Date.now(),
});
}
/**
* Record a discrete `tool-call` chunk from the stream. Maintains both the
* flat `toolCalls` array (backward compat) and the ordered timeline. The

View File

@ -7,13 +7,7 @@ import type {
ToolDescriptor,
JSONObject,
} from '@n8n/agents';
import {
Agent,
Memory,
Tool,
UPDATE_WORKING_MEMORY_TOOL_NAME,
wrapToolForApproval,
} from '@n8n/agents';
import { Agent, Memory, Tool, wrapToolForApproval } from '@n8n/agents';
import type { AgentSkill } from '@n8n/api-types';
import { z } from 'zod';
@ -49,15 +43,13 @@ const DEFAULT_WORKING_MEMORY_TEMPLATE = `# Thread memory
- Resolved or superseded:`;
const DEFAULT_WORKING_MEMORY_INSTRUCTION = [
'You have thread-scoped working memory for this conversation.',
`When the user shares durable facts, preferences, decisions, goals, or unresolved follow-ups that will help later turns in this same thread, call ${UPDATE_WORKING_MEMORY_TOOL_NAME} with the complete updated memory.`,
'Treat working memory as a current-state snapshot, not an append-only log.',
'Keep it concise, factual, and current.',
'When facts, preferences, priorities, goals, decisions, or statuses change, replace outdated active items with the latest state.',
'Preserve distinctions the user makes between primary, secondary, active, resolved, and superseded items.',
'Move resolved or superseded items to that section only when they will help later; otherwise remove them.',
'Preserve useful existing notes, remove stale or contradicted notes, and do not store secrets or one-off details.',
`Only call ${UPDATE_WORKING_MEMORY_TOOL_NAME} when the memory should change.`,
'Thread working memory is maintained automatically after turns by an out-of-band observer.',
'Thread working memory applies only to this same session/thread.',
'Do not claim it is available in a different session, new thread, or cross-thread profile unless the product explicitly provides that context.',
'Use it silently as private read-only context for this session.',
'Treat working memory as internal context; do not reveal, quote, append, or reproduce the raw working-memory document in user-visible replies.',
'If the user asks what you remember, answer conversationally from relevant memory instead of dumping the document.',
'Do not try to edit, summarize, refresh, or maintain working memory directly.',
].join(' ');
export interface BuildFromJsonOptions {