diff --git a/packages/@n8n/agents/src/runtime/agent-runtime.ts b/packages/@n8n/agents/src/runtime/agent-runtime.ts index 13873b2a638..2551afe304b 100644 --- a/packages/@n8n/agents/src/runtime/agent-runtime.ts +++ b/packages/@n8n/agents/src/runtime/agent-runtime.ts @@ -10,7 +10,6 @@ import type { AnthropicThinkingConfig, AttributeValue, BuiltMemory, - BuiltObservationStore, BuiltProviderTool, BuiltTelemetry, BuiltTool, @@ -39,6 +38,7 @@ import { saveMessagesToThread } from './memory-store'; import { AgentMessageList, type SerializedMessageList } from './message-list'; import { fromAiFinishReason, fromAiMessages } from './messages'; import { createEmbeddingModel, createModel } from './model-factory'; +import { hasObservationStore } from './observation-store'; import { runObservationalCycle, type RunObservationalCycleOpts } from './observational-cycle'; import { generateRunId, RunStateManager } from './run-state'; import { @@ -2106,12 +2106,10 @@ export class AgentRuntime { const memory = this.config.memory; const workingMemory = this.config.workingMemory; if (!obsConfig || !memory || !workingMemory || !persistence) return null; - if (typeof (memory as Partial).appendObservations !== 'function') { - return null; - } + if (!hasObservationStore(memory)) return null; if (!memory.saveWorkingMemory) return null; return { - memory: memory as BuiltMemory & BuiltObservationStore, + memory, threadId: persistence.threadId, resourceId: persistence.resourceId, model: this.config.model, diff --git a/packages/@n8n/agents/src/runtime/observation-store.ts b/packages/@n8n/agents/src/runtime/observation-store.ts new file mode 100644 index 00000000000..222e74b200e --- /dev/null +++ b/packages/@n8n/agents/src/runtime/observation-store.ts @@ -0,0 +1,25 @@ +import type { BuiltMemory, BuiltObservationStore } from '../types'; + +const OBSERVATION_STORE_METHODS = [ + 'appendObservations', + 'getObservations', + 'getMessagesForScope', + 'deleteObservations', + 'getCursor', + 'setCursor', + 'acquireObservationLock', + 'releaseObservationLock', +] as const satisfies ReadonlyArray; + +function hasFunctionProperty( + value: object, + property: K, +): value is Record unknown> { + return property in value && typeof Reflect.get(value, property) === 'function'; +} + +export function hasObservationStore( + memory: BuiltMemory, +): memory is BuiltMemory & BuiltObservationStore { + return OBSERVATION_STORE_METHODS.every((method) => hasFunctionProperty(memory, method)); +} diff --git a/packages/@n8n/agents/src/sdk/__tests__/agent-reflect.test.ts b/packages/@n8n/agents/src/sdk/__tests__/agent-reflect.test.ts index f75100f6782..11ce6bd7ca7 100644 --- a/packages/@n8n/agents/src/sdk/__tests__/agent-reflect.test.ts +++ b/packages/@n8n/agents/src/sdk/__tests__/agent-reflect.test.ts @@ -1,4 +1,5 @@ import { InMemoryMemory } from '../../runtime/memory-store'; +import { AgentEvent } from '../../types/runtime/event'; import type { AgentDbMessage } from '../../types/sdk/message'; import { OBSERVATION_SCHEMA_VERSION, @@ -52,7 +53,11 @@ describe('agent.reflect', () => { const observe = jest .fn() .mockResolvedValue([makeNewObs('builder-observed')]) as unknown as ObserveFn; - const memory = new Memory().storage(store).freeform('# Notes').observationalMemory({ observe }); + const memory = new Memory() + .storage(store) + .freeform('# Notes') + .scope('thread') + .observationalMemory({ observe }); const agent = new Agent('a').model('openai/gpt-4o-mini').instructions('test').memory(memory); const result = await agent.reflect({ threadId: 't-1', resourceId: 'u-1' }); @@ -73,6 +78,7 @@ describe('agent.reflect', () => { const memory = new Memory() .storage(store) .freeform('# Notes') + .scope('thread') .observationalMemory({ observe: builderObserve }); const agent = new Agent('a').model('openai/gpt-4o-mini').instructions('test').memory(memory); @@ -88,7 +94,11 @@ describe('agent.reflect', () => { await store.acquireObservationLock('thread', 't-1', { ttlMs: 60_000, holderId: 'other' }); const observe = jest.fn().mockResolvedValue([makeNewObs('x')]) as unknown as ObserveFn; - const memory = new Memory().storage(store).freeform('# Notes').observationalMemory({ observe }); + const memory = new Memory() + .storage(store) + .freeform('# Notes') + .scope('thread') + .observationalMemory({ observe }); const agent = new Agent('a').model('openai/gpt-4o-mini').instructions('test').memory(memory); const result = await agent.reflect({ threadId: 't-1', resourceId: 'u-1' }); @@ -97,3 +107,24 @@ describe('agent.reflect', () => { expect(observe).not.toHaveBeenCalled(); }); }); + +describe('agent.reflectInBackground', () => { + it('emits AgentEvent.Error when background setup fails before scheduling the cycle', async () => { + const store = new InMemoryMemory(); + const errors: string[] = []; + const memory = new Memory() + .storage(store) + .freeform('# Notes') + .scope('thread') + .observationalMemory(); + const agent = new Agent('a').model('openai/gpt-4o-mini').memory(memory); + agent.on(AgentEvent.Error, (event) => { + if (event.type === AgentEvent.Error) errors.push(event.message); + }); + + agent.reflectInBackground({ threadId: 't-1', resourceId: 'u-1' }); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(errors).toEqual(['Agent "a" requires instructions']); + }); +}); diff --git a/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts b/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts index 7ac339ef9d7..4687bbaa33d 100644 --- a/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts +++ b/packages/@n8n/agents/src/sdk/__tests__/memory-builder-observational.test.ts @@ -12,12 +12,20 @@ describe('Memory builder — observational memory', () => { }); it('applies lockTtlMs default', () => { - const config = new Memory().freeform('# Notes').observationalMemory({ observe }).build(); + const config = new Memory() + .freeform('# Notes') + .scope('thread') + .observationalMemory({ observe }) + .build(); expect(config.observationalMemory?.lockTtlMs).toBe(30_000); }); it('applies trigger, compaction, and gap defaults', () => { - const config = new Memory().freeform('# Notes').observationalMemory({ observe }).build(); + const config = new Memory() + .freeform('# Notes') + .scope('thread') + .observationalMemory({ observe }) + .build(); expect(config.observationalMemory?.trigger).toEqual({ type: 'per-turn' }); expect(config.observationalMemory?.compactionThreshold).toBe(5); @@ -27,6 +35,7 @@ describe('Memory builder — observational memory', () => { it('respects consumer overrides for lockTtlMs', () => { const config = new Memory() .freeform('# Notes') + .scope('thread') .observationalMemory({ observe, lockTtlMs: 5_000 }) .build(); expect(config.observationalMemory?.lockTtlMs).toBe(5_000); @@ -36,6 +45,7 @@ describe('Memory builder — observational memory', () => { const compact = jest.fn().mockResolvedValue({ content: '# Notes' }) as unknown as CompactFn; const config = new Memory() .freeform('# Notes') + .scope('thread') .observationalMemory({ observe, compact, @@ -65,6 +75,7 @@ describe('Memory builder — observational memory', () => { it('uses idle-timer trigger gapThresholdMs when no top-level override is set', () => { const config = new Memory() .freeform('# Notes') + .scope('thread') .observationalMemory({ observe, trigger: { type: 'idle-timer', idleMs: 5 * 60 * 1000, gapThresholdMs: 45 * 60_000 }, @@ -93,6 +104,34 @@ describe('Memory builder — observational memory', () => { new Memory() .storage(minimalBackend) .freeform('# Notes') + .scope('thread') + .observationalMemory({ observe }) + .build(), + ).toThrow(/BuiltObservationStore/); + }); + + it('rejects partial observation backends before runtime cycles can use them', () => { + const partialObservationBackend = { + getThread: jest.fn().mockResolvedValue(null), + saveThread: jest.fn().mockResolvedValue({}), + deleteThread: jest.fn().mockResolvedValue(undefined), + getMessages: jest.fn().mockResolvedValue([]), + saveMessages: jest.fn().mockResolvedValue(undefined), + deleteMessages: jest.fn().mockResolvedValue(undefined), + saveWorkingMemory: jest.fn().mockResolvedValue(undefined), + appendObservations: jest.fn().mockResolvedValue([]), + describe: () => ({ + name: 'partial-observation', + constructorName: 'PartialObservationMemory', + connectionParams: null, + }), + } as unknown as BuiltMemory; + + expect(() => + new Memory() + .storage(partialObservationBackend) + .freeform('# Notes') + .scope('thread') .observationalMemory({ observe }) .build(), ).toThrow(/BuiltObservationStore/); @@ -102,10 +141,21 @@ describe('Memory builder — observational memory', () => { expect(() => new Memory().observationalMemory({ observe }).build()).toThrow(/working memory/); }); + it('requires thread-scoped working memory', () => { + expect(() => + new Memory().freeform('# Notes').scope('resource').observationalMemory({ observe }).build(), + ).toThrow(/thread-scoped working memory/); + }); + it('coexists with workingMemory', () => { - const config = new Memory().freeform('# Notes').observationalMemory({ observe }).build(); + const config = new Memory() + .freeform('# Notes') + .scope('thread') + .observationalMemory({ observe }) + .build(); expect(config.workingMemory).toBeDefined(); + expect(config.workingMemory?.scope).toBe('thread'); expect(config.observationalMemory).toBeDefined(); }); @@ -122,7 +172,10 @@ describe('Memory builder — observational memory', () => { }); it('is true when observationalMemory is configured', () => { - const memory = new Memory().freeform('# Notes').observationalMemory({ observe }); + const memory = new Memory() + .freeform('# Notes') + .scope('thread') + .observationalMemory({ observe }); const agent = new Agent('a').model('openai/gpt-4o-mini').memory(memory); expect(agent.snapshot.hasObservationalMemory).toBe(true); }); diff --git a/packages/@n8n/agents/src/sdk/agent.ts b/packages/@n8n/agents/src/sdk/agent.ts index bdb3f3245b3..31ec26347c9 100644 --- a/packages/@n8n/agents/src/sdk/agent.ts +++ b/packages/@n8n/agents/src/sdk/agent.ts @@ -8,6 +8,7 @@ import { Telemetry } from './telemetry'; import { Tool, wrapToolForApproval } from './tool'; import { AgentRuntime } from '../runtime/agent-runtime'; import { AgentEventBus } from '../runtime/event-bus'; +import { hasObservationStore } from '../runtime/observation-store'; import { runObservationalCycle, type RunObservationalCycleOpts, @@ -15,14 +16,12 @@ import { } from '../runtime/observational-cycle'; import { createAgentToolResult } from '../runtime/tool-adapter'; import type { - AgentEvent, AgentEventHandler, AgentMiddleware, BuiltAgent, BuiltEval, BuiltGuardrail, BuiltMemory, - BuiltObservationStore, BuiltProviderTool, BuiltTool, BuiltTelemetry, @@ -42,6 +41,7 @@ import type { ThinkingConfigFor, ResumeOptions, } from '../types'; +import { AgentEvent } from '../types/runtime/event'; import type { AgentBuilder } from '../types/sdk/agent-builder'; import type { AgentMessage } from '../types/sdk/message'; import type { Workspace } from '../workspace/workspace'; @@ -580,7 +580,10 @@ export class Agent implements BuiltAgent, AgentBuilder { if (cycle === null) return; const runtime = await this.ensureBuilt(); runtime.scheduleBackgroundCycle(cycle); - })(); + })().catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + this.eventBus.emit({ type: AgentEvent.Error, message, error }); + }); } /** @@ -601,14 +604,14 @@ export class Agent implements BuiltAgent, AgentBuilder { !memory || !workingMemory || !this.modelConfig || - typeof (memory as Partial).appendObservations !== 'function' + !hasObservationStore(memory) ) { return null; } const runtime = await this.ensureBuilt(); const telemetry = runtime.getConfiguredTelemetry(); return { - memory: memory as BuiltMemory & BuiltObservationStore, + memory, threadId: opts.threadId, resourceId: opts.resourceId, model: this.modelConfig, diff --git a/packages/@n8n/agents/src/sdk/memory.ts b/packages/@n8n/agents/src/sdk/memory.ts index a898fdcc78a..5e320aebb78 100644 --- a/packages/@n8n/agents/src/sdk/memory.ts +++ b/packages/@n8n/agents/src/sdk/memory.ts @@ -1,10 +1,10 @@ import type { z } from 'zod'; import { InMemoryMemory } from '../runtime/memory-store'; +import { hasObservationStore } from '../runtime/observation-store'; import { templateFromSchema } from '../runtime/working-memory'; import type { BuiltMemory, - BuiltObservationStore, MemoryConfig, ObservationalMemoryConfig, SemanticRecallConfig, @@ -15,10 +15,6 @@ import { DEFAULT_OBSERVATION_GAP_THRESHOLD_MS } from '../types'; const DEFAULT_OBSERVATION_LOCK_TTL_MS = 30_000; const DEFAULT_OBSERVATION_COMPACTION_THRESHOLD = 5; -function hasObservationStore(memory: BuiltMemory): memory is BuiltMemory & BuiltObservationStore { - return typeof (memory as Partial).appendObservations === 'function'; -} - type ZodObjectSchema = z.ZodObject; const DEFAULT_LAST_MESSAGES = 10; @@ -241,6 +237,12 @@ export class Memory { ); } + if (this.observationalMemoryConfig && workingMemory?.scope !== 'thread') { + throw new Error( + "Observational memory requires thread-scoped working memory. Add .scope('thread') before .observationalMemory().", + ); + } + if (this.observationalMemoryConfig && !memory.saveWorkingMemory) { throw new Error( 'Observational memory requires a storage backend that implements saveWorkingMemory().',