From 06944d524c12b9760fbf645073c4bbff9533464c Mon Sep 17 00:00:00 2001 From: Robin Braumann Date: Thu, 7 May 2026 22:31:38 +0200 Subject: [PATCH] fix: align observational memory runtime wiring --- .../runtime/__tests__/agent-runtime.test.ts | 3 +- .../__tests__/observational-cycle.test.ts | 57 ++++++++++++------ .../runtime/__tests__/working-memory.test.ts | 8 ++- .../@n8n/agents/src/runtime/working-memory.ts | 2 +- .../__tests__/execution-recorder.test.ts | 20 ++++--- .../agents/__tests__/from-json-config.test.ts | 13 +++- .../src/modules/agents/agent-sse-stream.ts | 59 +------------------ .../src/modules/agents/execution-recorder.ts | 32 +--------- .../agents/json-config/from-json-config.ts | 24 +++----- 9 files changed, 84 insertions(+), 134 deletions(-) diff --git a/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts b/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts index 19f415e1aa2..1dd40277f22 100644 --- a/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts +++ b/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts @@ -526,7 +526,8 @@ describe('AgentRuntime.stream() — working memory', () => { }); await collectChunks(stream); - const callArgs = streamText.mock.calls[0][0] as Record; + const calls = streamText.mock.calls as Array<[Record]>; + const callArgs = calls[0]?.[0] ?? {}; expect(callArgs.tools ?? {}).not.toHaveProperty('update_working_memory'); expect(savedWorkingMemory).toEqual([]); }); diff --git a/packages/@n8n/agents/src/runtime/__tests__/observational-cycle.test.ts b/packages/@n8n/agents/src/runtime/__tests__/observational-cycle.test.ts index 0ee4c40bebf..88c3df3f65e 100644 --- a/packages/@n8n/agents/src/runtime/__tests__/observational-cycle.test.ts +++ b/packages/@n8n/agents/src/runtime/__tests__/observational-cycle.test.ts @@ -1,5 +1,13 @@ import { z } from 'zod'; +import { AgentEvent } from '../../types'; +import type { AgentDbMessage } from '../../types/sdk/message'; +import { + OBSERVATION_SCHEMA_VERSION, + type CompactFn, + type NewObservation, + type ObserveFn, +} from '../../types/sdk/observation'; import { AgentEventBus } from '../event-bus'; import { InMemoryMemory, saveMessagesToThread } from '../memory-store'; import { @@ -8,14 +16,6 @@ import { runObservationalCycle, type RunObservationalCycleOpts, } from '../observational-cycle'; -import { AgentEvent } from '../../types'; -import { - OBSERVATION_SCHEMA_VERSION, - type CompactFn, - type NewObservation, - type ObserveFn, -} from '../../types/sdk/observation'; -import type { AgentDbMessage } from '../../types/sdk/message'; type GenerateTextCall = { model: unknown; system?: string; prompt?: string }; const mockGenerateText = jest.fn, [GenerateTextCall]>(); @@ -55,7 +55,10 @@ function opts( resourceId: 'u-1', model: { doGenerate: jest.fn() } as never, workingMemory: { template: '# Thread memory', structured: false }, - observe: async () => [], + observe: async () => { + await Promise.resolve(); + return []; + }, compactionThreshold: 5, ...overrides, }; @@ -70,6 +73,7 @@ describe('runObservationalCycle', () => { const mem = new InMemoryMemory(); await save(mem, [msg('m1', 'remember that I prefer concise answers')]); const observe = jest.fn, Parameters>(async (ctx) => { + await Promise.resolve(); expect(ctx.deltaMessages.map((m) => m.id)).toEqual(['m1']); expect(ctx.currentWorkingMemory).toBeNull(); expect(ctx.threadId).toBe('t-1'); @@ -94,6 +98,7 @@ describe('runObservationalCycle', () => { '# Thread memory\n- Current project:', ); const compact = jest.fn, Parameters>(async (ctx) => { + await Promise.resolve(); expect(ctx.observations).toHaveLength(1); expect(ctx.currentWorkingMemory).toContain('Current project'); return { content: '# Thread memory\n- Current project: Memory v1' }; @@ -101,7 +106,10 @@ describe('runObservationalCycle', () => { const result = await runObservationalCycle( opts(mem, { - observe: async () => [row('Current project is Memory v1.')], + observe: async () => { + await Promise.resolve(); + return [row('Current project is Memory v1.')]; + }, compact, compactionThreshold: 1, }), @@ -121,7 +129,10 @@ describe('runObservationalCycle', () => { const result = await runObservationalCycle( opts(mem, { - observe: async () => [row('one')], + observe: async () => { + await Promise.resolve(); + return [row('one')]; + }, compact, compactionThreshold: 2, }), @@ -160,6 +171,7 @@ describe('runObservationalCycle', () => { await save(mem, [msg('m2', 'later', second)]); const observe = jest.fn, Parameters>(async (ctx) => { + await Promise.resolve(); expect(ctx.gap).toMatchObject({ durationMs: 90 * 60 * 1000, text: 'User returned after 1h 30m of inactivity.', @@ -209,9 +221,10 @@ describe('runObservationalCycle', () => { await runObservationalCycle(opts(mem)); await save(mem, [msg('m2', 'later', second)]); - const compact = jest.fn, Parameters>(async () => ({ - content: '# Thread memory\n- Continuity notes: user returned after a gap', - })); + const compact = jest.fn, Parameters>(async () => { + await Promise.resolve(); + return { content: '# Thread memory\n- Continuity notes: user returned after a gap' }; + }); await runObservationalCycle(opts(mem, { compact, compactionThreshold: 1 })); expect(compact).not.toHaveBeenCalled(); @@ -220,7 +233,10 @@ describe('runObservationalCycle', () => { await save(mem, [msg('m3', 'remember this decision', third)]); await runObservationalCycle( opts(mem, { - observe: async () => [row('Decision was recorded.')], + observe: async () => { + await Promise.resolve(); + return [row('Decision was recorded.')]; + }, compact, compactionThreshold: 1, }), @@ -324,8 +340,14 @@ describe('runObservationalCycle', () => { structured: true, schema: z.object({ name: z.string() }), }, - observe: async () => [row('Name is Alice.')], - compact: async () => ({ content: '{"name": 123}' }), + observe: async () => { + await Promise.resolve(); + return [row('Name is Alice.')]; + }, + compact: async () => { + await Promise.resolve(); + return { content: '{"name": 123}' }; + }, compactionThreshold: 1, eventBus, }), @@ -351,6 +373,7 @@ describe('runObservationalCycle', () => { const result = await runObservationalCycle( opts(mem, { observe: async () => { + await Promise.resolve(); throw new Error('observer failed'); }, eventBus, diff --git a/packages/@n8n/agents/src/runtime/__tests__/working-memory.test.ts b/packages/@n8n/agents/src/runtime/__tests__/working-memory.test.ts index 31561a048dd..439d2176b42 100644 --- a/packages/@n8n/agents/src/runtime/__tests__/working-memory.test.ts +++ b/packages/@n8n/agents/src/runtime/__tests__/working-memory.test.ts @@ -42,6 +42,12 @@ describe('templateFromSchema', () => { const result = templateFromSchema(schema); expect(result).toContain('userName'); expect(result).toContain('favoriteColor'); - expect(JSON.parse(result)).toHaveProperty('userName'); + let parsed: unknown; + try { + parsed = JSON.parse(result) as unknown; + } catch (error) { + throw new Error(`Expected schema template to be valid JSON: ${String(error)}`); + } + expect(parsed).toHaveProperty('userName'); }); }); diff --git a/packages/@n8n/agents/src/runtime/working-memory.ts b/packages/@n8n/agents/src/runtime/working-memory.ts index 97881ce6fde..8f235f8ed63 100644 --- a/packages/@n8n/agents/src/runtime/working-memory.ts +++ b/packages/@n8n/agents/src/runtime/working-memory.ts @@ -1,4 +1,4 @@ -import { z } from 'zod'; +import type { z } from 'zod'; type ZodObjectSchema = z.ZodObject; diff --git a/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts b/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts index 251b6e5b59c..cb8f5a391f0 100644 --- a/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts +++ b/packages/cli/src/modules/agents/__tests__/execution-recorder.test.ts @@ -80,8 +80,8 @@ describe('ExecutionRecorder', () => { }); }); - describe('working memory capture', () => { - it('captures the working-memory tool call as a timeline event', () => { + describe('working memory tool chunks', () => { + it('records update_working_memory as a regular tool call when present', () => { const recorder = new ExecutionRecorder(); recorder.record({ type: 'text-delta', id: 't1', delta: 'Hello' }); @@ -101,12 +101,18 @@ describe('ExecutionRecorder', () => { const record = recorder.getMessageRecord(); - expect(record.workingMemory).toBe('# Name: Alice'); - expect(record.toolCalls).toEqual([]); - expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(true); + expect(record.workingMemory).toBeNull(); + expect(record.toolCalls).toEqual([ + { + name: 'update_working_memory', + input: { memory: '# Name: Alice' }, + output: { success: true }, + }, + ]); + expect(record.timeline.some((e) => e.type === 'working-memory')).toBe(false); }); - it('keeps last working memory when multiple updates occur', () => { + it('does not derive execution working memory from update_working_memory calls', () => { const recorder = new ExecutionRecorder(); recorder.record({ @@ -124,7 +130,7 @@ describe('ExecutionRecorder', () => { recorder.record({ type: 'finish', finishReason: 'stop' } as StreamChunk); const record = recorder.getMessageRecord(); - expect(record.workingMemory).toBe('second'); + expect(record.workingMemory).toBeNull(); }); }); diff --git a/packages/cli/src/modules/agents/__tests__/from-json-config.test.ts b/packages/cli/src/modules/agents/__tests__/from-json-config.test.ts index 586c3801575..24a060c7eb4 100644 --- a/packages/cli/src/modules/agents/__tests__/from-json-config.test.ts +++ b/packages/cli/src/modules/agents/__tests__/from-json-config.test.ts @@ -455,10 +455,17 @@ describe('buildFromJson()', () => { expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Current goal/task'); expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Key active items'); expect(getMemoryConfig(agent)?.workingMemory?.template).toContain('Resolved or superseded'); - expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('thread-scoped'); - expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('current-state snapshot'); expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain( - 'primary, secondary, active, resolved, and superseded', + 'only to this same session/thread', + ); + expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('different session'); + expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('new thread'); + expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain('cross-thread profile'); + expect(getMemoryConfig(agent)?.workingMemory?.instruction).toContain( + 'Treat working memory as internal context', + ); + expect(getMemoryConfig(agent)?.workingMemory?.instruction).not.toContain( + 'update_working_memory', ); }); diff --git a/packages/cli/src/modules/agents/agent-sse-stream.ts b/packages/cli/src/modules/agents/agent-sse-stream.ts index 3a52fab6449..08f7f77af88 100644 --- a/packages/cli/src/modules/agents/agent-sse-stream.ts +++ b/packages/cli/src/modules/agents/agent-sse-stream.ts @@ -1,4 +1,4 @@ -import { UPDATE_WORKING_MEMORY_TOOL_NAME, type AgentMessage, type StreamChunk } from '@n8n/agents'; +import type { AgentMessage, StreamChunk } from '@n8n/agents'; import type { AgentPersistedMessageContentPart, AgentSseEvent, @@ -24,13 +24,6 @@ export interface ToolEventCallbacks { interface ChunkHandlerCtx { send: (e: AgentSseEvent) => void; onToolEvent?: ToolEventCallbacks; - /** - * Tool-call ids belonging to the SDK-internal working-memory tool. The id - * Set is needed because `tool-input-delta` chunks carry only the id, not - * the tool name — we capture the id on `tool-input-start` / `tool-call` - * and use it to drop the matching streamed memory content. - */ - workingMemoryToolCallIds: Set; } /** @@ -110,51 +103,6 @@ function emitTextLikeChunk( * SSE-emit a tool-* chunk and fire any matching builder side-effect callback. * Returns `{ suspended: true }` when the chunk was `tool-call-suspended`. */ -/** - * Working memory is implemented as an SDK tool, but n8n surfaces it as a - * distinct memory event in the chat UI rather than a regular tool step. - * Returns `true` when the chunk was handled and should not flow through the - * regular tool emission path. - */ -function handleWorkingMemoryChunk( - chunk: Extract< - StreamChunk, - { - type: - | 'tool-input-start' - | 'tool-input-delta' - | 'tool-call' - | 'tool-execution-start' - | 'tool-result'; - } - >, - ctx: ChunkHandlerCtx, -): boolean { - const { send, workingMemoryToolCallIds } = ctx; - const isWmName = 'toolName' in chunk && chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME; - - if (chunk.type === 'tool-input-delta') { - return workingMemoryToolCallIds.has(chunk.toolCallId); - } - if (!isWmName) return false; - - if (chunk.type === 'tool-input-start' || chunk.type === 'tool-call') { - workingMemoryToolCallIds.add(chunk.toolCallId); - return true; - } - if (chunk.type === 'tool-execution-start') return true; - if (chunk.type === 'tool-result') { - if (chunk.isError) { - const errMsg = chunk.output instanceof Error ? chunk.output.message : String(chunk.output); - send({ type: 'error', message: `Working memory update failed: ${errMsg}` }); - } else { - send({ type: 'working-memory-update', toolName: chunk.toolName }); - } - return true; - } - return false; -} - function emitToolChunk( chunk: Extract< StreamChunk, @@ -172,10 +120,6 @@ function emitToolChunk( ): { suspended: boolean } { const { send, onToolEvent } = ctx; - if (chunk.type !== 'tool-call-suspended' && handleWorkingMemoryChunk(chunk, ctx)) { - return { suspended: false }; - } - switch (chunk.type) { case 'tool-input-start': send({ @@ -293,7 +237,6 @@ export async function pumpChunks( const ctx: ChunkHandlerCtx = { send, onToolEvent, - workingMemoryToolCallIds: new Set(), }; for await (const chunk of chunks) { diff --git a/packages/cli/src/modules/agents/execution-recorder.ts b/packages/cli/src/modules/agents/execution-recorder.ts index 44f92559ceb..d47406c506c 100644 --- a/packages/cli/src/modules/agents/execution-recorder.ts +++ b/packages/cli/src/modules/agents/execution-recorder.ts @@ -1,17 +1,8 @@ -import { UPDATE_WORKING_MEMORY_TOOL_NAME, type StreamChunk } from '@n8n/agents'; +import type { StreamChunk } from '@n8n/agents'; import { extractFromAICalls, isFromAIOnlyExpression } from 'n8n-workflow'; import type { ToolRegistry } from './tool-registry'; -/** Pull the human-readable working-memory content out of the WM tool's input. */ -function workingMemoryContentFromInput(input: unknown): string { - if (input && typeof input === 'object' && !Array.isArray(input)) { - const maybe = (input as Record).memory; - if (typeof maybe === 'string') return maybe; - } - return JSON.stringify(input, null, 2); -} - /** * Walk a nodeParameters tree and substitute every `$fromAI('key', ...)` * expression with the value the LLM passed for that key (or the call's @@ -189,18 +180,9 @@ export class ExecutionRecorder { this.textBuffer.push(chunk.delta); break; case 'tool-call': - if (chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME) { - this.recordWorkingMemoryUpdate(workingMemoryContentFromInput(chunk.input)); - } else { - this.recordToolCall(chunk.toolCallId, chunk.toolName, chunk.input); - } + this.recordToolCall(chunk.toolCallId, chunk.toolName, chunk.input); break; case 'tool-result': - if (chunk.toolName === UPDATE_WORKING_MEMORY_TOOL_NAME) { - // WM tool-result is already represented by the timeline entry - // pushed at tool-call time; nothing more to do here. - break; - } this.recordToolResult( chunk.toolCallId, chunk.toolName, @@ -281,16 +263,6 @@ export class ExecutionRecorder { this.textStartTime = null; } - private recordWorkingMemoryUpdate(content: string): void { - this.flushTextBuffer(); - this.workingMemory = content; - this.timeline.push({ - type: 'working-memory', - content, - timestamp: Date.now(), - }); - } - /** * Record a discrete `tool-call` chunk from the stream. Maintains both the * flat `toolCalls` array (backward compat) and the ordered timeline. The diff --git a/packages/cli/src/modules/agents/json-config/from-json-config.ts b/packages/cli/src/modules/agents/json-config/from-json-config.ts index ae9904b3ec6..68ddde96149 100644 --- a/packages/cli/src/modules/agents/json-config/from-json-config.ts +++ b/packages/cli/src/modules/agents/json-config/from-json-config.ts @@ -7,13 +7,7 @@ import type { ToolDescriptor, JSONObject, } from '@n8n/agents'; -import { - Agent, - Memory, - Tool, - UPDATE_WORKING_MEMORY_TOOL_NAME, - wrapToolForApproval, -} from '@n8n/agents'; +import { Agent, Memory, Tool, wrapToolForApproval } from '@n8n/agents'; import type { AgentSkill } from '@n8n/api-types'; import { z } from 'zod'; @@ -49,15 +43,13 @@ const DEFAULT_WORKING_MEMORY_TEMPLATE = `# Thread memory - Resolved or superseded:`; const DEFAULT_WORKING_MEMORY_INSTRUCTION = [ - 'You have thread-scoped working memory for this conversation.', - `When the user shares durable facts, preferences, decisions, goals, or unresolved follow-ups that will help later turns in this same thread, call ${UPDATE_WORKING_MEMORY_TOOL_NAME} with the complete updated memory.`, - 'Treat working memory as a current-state snapshot, not an append-only log.', - 'Keep it concise, factual, and current.', - 'When facts, preferences, priorities, goals, decisions, or statuses change, replace outdated active items with the latest state.', - 'Preserve distinctions the user makes between primary, secondary, active, resolved, and superseded items.', - 'Move resolved or superseded items to that section only when they will help later; otherwise remove them.', - 'Preserve useful existing notes, remove stale or contradicted notes, and do not store secrets or one-off details.', - `Only call ${UPDATE_WORKING_MEMORY_TOOL_NAME} when the memory should change.`, + 'Thread working memory is maintained automatically after turns by an out-of-band observer.', + 'Thread working memory applies only to this same session/thread.', + 'Do not claim it is available in a different session, new thread, or cross-thread profile unless the product explicitly provides that context.', + 'Use it silently as private read-only context for this session.', + 'Treat working memory as internal context; do not reveal, quote, append, or reproduce the raw working-memory document in user-visible replies.', + 'If the user asks what you remember, answer conversationally from relevant memory instead of dumping the document.', + 'Do not try to edit, summarize, refresh, or maintain working memory directly.', ].join(' '); export interface BuildFromJsonOptions {