feat(core): Add observation log reflector (#30341)

Co-authored-by: Michael Drury <michael.drury@n8n.io>
This commit is contained in:
bjorger 2026-05-19 09:58:05 +02:00 committed by GitHub
parent db69aa6509
commit 784a56dcf5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1409 additions and 7 deletions

View File

@ -56,6 +56,7 @@ export type {
ObserveFn,
ScopeKind,
BuiltObservationLogStore,
BuiltObservationLogTaskLockStore,
NewObservationLogEntry,
ObservationLogEntry,
ObservationLogMarker,
@ -66,6 +67,8 @@ export type {
ObservationLogScope,
ObservationLogScopeKind,
ObservationLogStatus,
ObservationLogTaskKind,
ObservationLogTaskLockHandle,
TokenCounter,
} from './types';
export type { ProviderOptions } from '@ai-sdk/provider-utils';
@ -157,13 +160,26 @@ export {
runObservationLogObserver,
} from './runtime/observation-log-observer';
export {
parseObservationLogReflectionJson,
renderObservationLogForReflection,
runObservationLogReflector,
} from './runtime/observation-log-reflector';
export { ScopedMemoryTaskRunner } from './runtime/scoped-memory-task-runner';
export {
buildObservationLogReflectorPrompt,
buildObservationLogObserverPrompt,
createObservationLogReflectFn,
createObservationLogObserveFn,
DEFAULT_OBSERVATION_LOG_OBSERVER_PROMPT,
DEFAULT_OBSERVATION_LOG_OBSERVER_THRESHOLD_TOKENS,
DEFAULT_OBSERVATION_LOG_REFLECTOR_PROMPT,
DEFAULT_OBSERVATION_LOG_REFLECTOR_THRESHOLD_TOKENS,
DEFAULT_OBSERVATION_LOG_TAIL_LIMIT,
} from './runtime/observation-log-defaults';
export type { CreateObservationLogObserveFnOptions } from './runtime/observation-log-defaults';
export type {
CreateObservationLogObserveFnOptions,
CreateObservationLogReflectFnOptions,
} from './runtime/observation-log-defaults';
export type {
ObservationLogObserveFn,
ObservationLogObserverInput,
@ -174,6 +190,24 @@ export type {
RunObservationLogObserverOpts,
RunObservationLogObserverResult,
} from './runtime/observation-log-observer';
export type {
ObservationLogReflectFn,
ObservationLogReflectorInput,
ObservationLogReflectorMemory,
ObservationLogReflectorWarning,
RunObservationLogReflectorOpts,
RunObservationLogReflectorResult,
} from './runtime/observation-log-reflector';
export type {
ScopedMemoryTaskDescriptor,
ScopedMemoryTaskError,
ScopedMemoryTaskEvent,
ScopedMemoryTaskHandle,
ScopedMemoryTaskInfo,
ScopedMemoryTaskResult,
ScopedMemoryTaskRunnerOptions,
ScopedMemoryTaskStatus,
} from './runtime/scoped-memory-task-runner';
export { Workspace } from './workspace';
export { BaseFilesystem } from './workspace';

View File

@ -0,0 +1,264 @@
import { InMemoryMemory } from '../memory-store';
import {
buildObservationLogReflectorPrompt,
DEFAULT_OBSERVATION_LOG_REFLECTOR_PROMPT,
DEFAULT_OBSERVATION_LOG_REFLECTOR_THRESHOLD_TOKENS,
} from '../observation-log-defaults';
import {
parseObservationLogReflectionJson,
renderObservationLogForReflection,
runObservationLogReflector,
} from '../observation-log-reflector';
describe('observation-log reflector defaults', () => {
it('keeps default policy and threshold configuration in the SDK', () => {
expect(DEFAULT_OBSERVATION_LOG_REFLECTOR_THRESHOLD_TOKENS).toBe(24_000);
expect(DEFAULT_OBSERVATION_LOG_REFLECTOR_PROMPT).toContain('Return JSON with two arrays');
expect(DEFAULT_OBSERVATION_LOG_REFLECTOR_PROMPT).toContain('🔴 Critical');
});
it('builds the default reflector prompt from active log and token budget', () => {
const prompt = buildObservationLogReflectorPrompt({
scopeKind: 'thread',
scopeId: 'thread-1',
now: new Date('2026-05-12T15:00:00.000Z'),
activeObservationLog: [],
renderedObservationLog:
'* [obs-1] 🔴 2026-05-12T14:30:00.000Z User chose observation-log memory.',
tokenCount: 42,
tokenBudget: 24_000,
});
expect(prompt).toContain('Current timestamp: 2026-05-12T15:00:00.000Z');
expect(prompt).toContain('Scope: thread:thread-1');
expect(prompt).toContain('Active observation log tokens: 42');
expect(prompt).toContain('Token budget: 24000');
expect(prompt).toContain('[obs-1] 🔴');
});
});
describe('parseObservationLogReflectionJson', () => {
it('parses reflector JSON with marker symbols into storage markers', () => {
const reflection = parseObservationLogReflectionJson(
[
'```json',
'{',
' "drop": ["obs-1"],',
' "merge": [',
' { "supersedes": ["obs-2", "obs-3"], "marker": "🟡", "text": "Merged plan detail" }',
' ]',
'}',
'```',
].join('\n'),
);
expect(reflection).toEqual({
drop: ['obs-1'],
merge: [
{
supersedes: ['obs-2', 'obs-3'],
marker: 'important',
text: 'Merged plan detail',
},
],
});
});
});
describe('renderObservationLogForReflection', () => {
it('renders active observations with IDs for reflector input', () => {
const rendered = renderObservationLogForReflection([
{
id: 'parent',
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'critical',
text: 'User chose the observation-log model.',
parentId: null,
tokenCount: 10,
status: 'active',
supersededBy: null,
createdAt: new Date('2026-05-12T14:30:00.000Z'),
},
{
id: 'child',
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'completion',
text: 'Plan 7 finished.',
parentId: 'parent',
tokenCount: 4,
status: 'active',
supersededBy: null,
createdAt: new Date('2026-05-12T14:31:00.000Z'),
},
]);
expect(rendered).toContain('* [parent] 🔴 2026-05-12T14:30:00.000Z User chose');
expect(rendered).toContain(' * [child] ✅ 2026-05-12T14:31:00.000Z Plan 7');
});
it('renders active orphan children as top-level observations', () => {
const rendered = renderObservationLogForReflection([
{
id: 'orphan',
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'important',
text: 'Orphaned active observation remains relevant.',
parentId: 'missing-parent',
tokenCount: 4,
status: 'active',
supersededBy: null,
createdAt: new Date('2026-05-12T14:32:00.000Z'),
},
]);
expect(rendered).toContain('* [orphan] 🟡 2026-05-12T14:32:00.000Z Orphaned active');
});
});
describe('runObservationLogReflector', () => {
it('waits until the active observation log exceeds the reflector threshold', async () => {
const store = new InMemoryMemory();
await store.appendObservationLogEntries([
{
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'info',
text: 'Small detail',
tokenCount: 2,
},
]);
const reflect = jest.fn().mockResolvedValue('{"drop":[],"merge":[]}');
const result = await runObservationLogReflector({
memory: store,
scopeKind: 'thread',
scopeId: 'thread-1',
reflectorThresholdTokens: 10,
reflect,
});
expect(result).toEqual({ status: 'skipped', reason: 'below-threshold', tokenCount: 2 });
expect(reflect).not.toHaveBeenCalled();
});
it('applies reflector drop and merge instructions transactionally', async () => {
const store = new InMemoryMemory();
const [stale, oldA, oldB] = await store.appendObservationLogEntries([
{
scopeKind: 'resource',
scopeId: 'user-1',
marker: 'info',
text: 'Tiny aside',
tokenCount: 9,
},
{
scopeKind: 'resource',
scopeId: 'user-1',
marker: 'important',
text: 'Old plan A',
tokenCount: 9,
},
{
scopeKind: 'resource',
scopeId: 'user-1',
marker: 'important',
text: 'Old plan B',
tokenCount: 9,
},
]);
const result = await runObservationLogReflector({
memory: store,
scopeKind: 'resource',
scopeId: 'user-1',
reflectorThresholdTokens: 10,
now: new Date('2026-05-12T15:00:00.000Z'),
reflect: async (input) => {
expect(input.renderedObservationLog).toContain(`[${stale.id}] 🟢`);
return await Promise.resolve(
JSON.stringify({
drop: [stale.id],
merge: [
{
supersedes: [oldA.id, oldB.id],
marker: '🟡',
text: 'User compared old plan A and old plan B.',
},
],
}),
);
},
});
expect(result).toMatchObject({
status: 'ran',
tokenCount: 27,
reflection: {
drop: [stale.id],
merge: [
expect.objectContaining({
supersedes: [oldA.id, oldB.id],
marker: 'important',
text: 'User compared old plan A and old plan B.',
createdAt: new Date('2026-05-12T15:00:00.000Z'),
}),
],
},
overBudgetAfterReflection: false,
});
await expect(
store.getObservationLog({ scopeKind: 'resource', scopeId: 'user-1', status: 'dropped' }),
).resolves.toMatchObject([{ id: stale.id, status: 'dropped' }]);
await expect(
store.getObservationLog({ scopeKind: 'resource', scopeId: 'user-1', status: 'superseded' }),
).resolves.toEqual(
expect.arrayContaining([
expect.objectContaining({ id: oldA.id, status: 'superseded' }),
expect.objectContaining({ id: oldB.id, status: 'superseded' }),
]),
);
});
it('warns but still applies reflection output that remains over budget', async () => {
const store = new InMemoryMemory();
const [critical, stale] = await store.appendObservationLogEntries([
{
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'critical',
text: 'Large critical fact',
tokenCount: 20,
},
{
scopeKind: 'thread',
scopeId: 'thread-1',
marker: 'info',
text: 'Small aside',
tokenCount: 20,
},
]);
const warnings: string[] = [];
const result = await runObservationLogReflector({
memory: store,
scopeKind: 'thread',
scopeId: 'thread-1',
reflectorThresholdTokens: 10,
reflect: async () => await Promise.resolve(JSON.stringify({ drop: [stale.id], merge: [] })),
onWarning: (warning) => warnings.push(warning.message),
});
expect(result).toMatchObject({
status: 'ran',
remainingTokenCount: 20,
overBudgetAfterReflection: true,
});
expect(warnings).toEqual(['Observation log remains over reflector budget after reflection']);
await expect(
store.getActiveObservationLog({ scopeKind: 'thread', scopeId: 'thread-1' }),
).resolves.toMatchObject([{ id: critical.id }]);
});
});

View File

@ -0,0 +1,203 @@
import type {
BuiltObservationLogTaskLockStore,
ObservationLogTaskKind,
ObservationLogTaskLockHandle,
ObservationLogScopeKind,
} from '../../types/sdk/observation-log';
import { ScopedMemoryTaskRunner } from '../scoped-memory-task-runner';
function deferred(): { promise: Promise<void>; resolve: () => void } {
let resolve!: () => void;
const promise = new Promise<void>((r) => {
resolve = r;
});
return { promise, resolve };
}
function lockStore(
acquire: BuiltObservationLogTaskLockStore['acquireObservationLogTaskLock'],
): BuiltObservationLogTaskLockStore {
return {
acquireObservationLogTaskLock: acquire,
releaseObservationLogTaskLock: jest.fn().mockResolvedValue(undefined),
};
}
function lockHandle(
scopeKind: ObservationLogScopeKind,
scopeId: string,
taskKind: ObservationLogTaskKind,
holderId: string,
): ObservationLogTaskLockHandle {
return {
scopeKind,
scopeId,
taskKind,
holderId,
heldUntil: new Date(Date.now() + 30_000),
};
}
describe('ScopedMemoryTaskRunner', () => {
it('serializes observer and reflector tasks for the same scope', async () => {
const runner = new ScopedMemoryTaskRunner();
const first = deferred();
const events: string[] = [];
const observer = runner.schedule(
{ taskKind: 'observer', scopeKind: 'thread', scopeId: 'thread-1' },
async () => {
events.push('observer:start');
await first.promise;
events.push('observer:end');
},
);
const reflector = runner.schedule(
{ taskKind: 'reflector', scopeKind: 'thread', scopeId: 'thread-1' },
async () => {
events.push('reflector:start');
await Promise.resolve();
},
);
await Promise.resolve();
await Promise.resolve();
expect(runner.getInFlightTasks()).toMatchObject([
{ taskKind: 'observer', status: 'running' },
{ taskKind: 'reflector', status: 'queued' },
]);
first.resolve();
await expect(observer.done).resolves.toMatchObject({ status: 'completed' });
await expect(reflector.done).resolves.toMatchObject({ status: 'completed' });
expect(events).toEqual(['observer:start', 'observer:end', 'reflector:start']);
});
it('skips a task when the store lock is already held', async () => {
const acquire = jest.fn().mockResolvedValue(null);
const runner = new ScopedMemoryTaskRunner({ lockStore: lockStore(acquire) });
const task = jest.fn().mockResolvedValue(undefined);
const handle = runner.schedule(
{ taskKind: 'observer', scopeKind: 'thread', scopeId: 'thread-1' },
task,
);
await expect(handle.done).resolves.toMatchObject({
status: 'skipped',
reason: 'lock-held',
});
expect(task).not.toHaveBeenCalled();
expect(runner.getInFlightTasks()).toEqual([]);
});
it('captures task failures without rejecting the background handle', async () => {
const error = new Error('observer failed');
const seenEvents: string[] = [];
const runner = new ScopedMemoryTaskRunner({
onEvent: (event) => seenEvents.push(event.type),
});
const handle = runner.schedule(
{ taskKind: 'observer', scopeKind: 'thread', scopeId: 'thread-1' },
async () => {
await Promise.reject(error);
},
);
await expect(handle.done).resolves.toMatchObject({ status: 'failed', error });
await expect(runner.flush()).resolves.toBeUndefined();
expect(runner.getCapturedErrors()).toMatchObject([{ error }]);
expect(seenEvents).toEqual(['started', 'failed']);
});
it('treats negative maxCapturedErrors as zero', async () => {
const runner = new ScopedMemoryTaskRunner({ maxCapturedErrors: -1 });
const handle = runner.schedule(
{ taskKind: 'observer', scopeKind: 'thread', scopeId: 'thread-1' },
async () => {
await Promise.reject(new Error('observer failed'));
},
);
await expect(handle.done).resolves.toMatchObject({ status: 'failed' });
expect(runner.getCapturedErrors()).toEqual([]);
});
it('treats NaN maxCapturedErrors as zero', async () => {
const runner = new ScopedMemoryTaskRunner({ maxCapturedErrors: Number.NaN });
const handle = runner.schedule(
{ taskKind: 'observer', scopeKind: 'thread', scopeId: 'thread-1' },
async () => {
await Promise.reject(new Error('observer failed'));
},
);
await expect(handle.done).resolves.toMatchObject({ status: 'failed' });
expect(runner.getCapturedErrors()).toEqual([]);
});
it('captures onEvent failures without failing the task lifecycle', async () => {
const eventError = new Error('event sink failed');
const task = jest.fn(async () => await Promise.resolve('done'));
const runner = new ScopedMemoryTaskRunner({
onEvent: () => {
throw eventError;
},
});
const handle = runner.schedule(
{ taskKind: 'reflector', scopeKind: 'thread', scopeId: 'thread-1' },
task,
);
await expect(handle.done).resolves.toMatchObject({ status: 'completed', value: 'done' });
expect(task).toHaveBeenCalled();
expect(runner.getInFlightTasks()).toEqual([]);
expect(runner.getCapturedErrors()).toEqual(
expect.arrayContaining([expect.objectContaining({ error: eventError })]),
);
});
it('removes settled scope queue entries after completion', async () => {
const runner = new ScopedMemoryTaskRunner();
const handle = runner.schedule(
{ taskKind: 'observer', scopeKind: 'thread', scopeId: 'thread-1' },
async () => await Promise.resolve('done'),
);
await expect(handle.done).resolves.toMatchObject({ status: 'completed', value: 'done' });
await runner.flush();
expect(Reflect.get(runner, 'queuesByScope')).toHaveProperty('size', 0);
});
it('acquires and releases a store lock around the task', async () => {
const acquire = jest.fn<
ReturnType<BuiltObservationLogTaskLockStore['acquireObservationLogTaskLock']>,
Parameters<BuiltObservationLogTaskLockStore['acquireObservationLogTaskLock']>
>(
async (scopeKind, scopeId, taskKind, opts) =>
await Promise.resolve(lockHandle(scopeKind, scopeId, taskKind, opts.holderId)),
);
const store = lockStore(acquire);
const runner = new ScopedMemoryTaskRunner({ lockStore: store, lockTtlMs: 15_000 });
const handle = runner.schedule(
{ taskKind: 'reflector', scopeKind: 'resource', scopeId: 'user-1' },
async () => await Promise.resolve('done'),
);
await expect(handle.done).resolves.toMatchObject({ status: 'completed', value: 'done' });
expect(acquire).toHaveBeenCalledWith('resource', 'user-1', 'reflector', {
holderId: handle.id,
ttlMs: 15_000,
});
expect(store.releaseObservationLogTaskLock).toHaveBeenCalledWith(
expect.objectContaining({ taskKind: 'reflector', holderId: handle.id }),
);
});
});

View File

@ -10,12 +10,16 @@ import type {
} from '../types/sdk/observation';
import type {
BuiltObservationLogStore,
BuiltObservationLogTaskLockStore,
NewObservationLogEntry,
ObservationLogEntry,
ObservationLogReadOptions,
ObservationLogReflection,
ObservationLogReflectionResult,
ObservationLogScope,
ObservationLogScopeKind,
ObservationLogTaskKind,
ObservationLogTaskLockHandle,
} from '../types/sdk/observation-log';
import { estimateObservationTokens } from '../types/sdk/observation-log';
@ -58,7 +62,11 @@ function compareKeyset(
* The most recently saved thread is used when `saveMessages` is called.
*/
export class InMemoryMemory
implements BuiltMemory, BuiltObservationStore, BuiltObservationLogStore
implements
BuiltMemory,
BuiltObservationStore,
BuiltObservationLogStore,
BuiltObservationLogTaskLockStore
{
private threads = new Map<string, Thread>();
@ -438,17 +446,39 @@ export class InMemoryMemory
scopeId: string,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLockHandle | null> {
return await this.acquireScopeLock(scopeKind, scopeId, opts);
}
async acquireObservationLogTaskLock(
scopeKind: ObservationLogScopeKind,
scopeId: string,
taskKind: ObservationLogTaskKind,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLogTaskLockHandle | null> {
const handle = await this.acquireScopeLock(scopeKind, scopeId, opts, taskKind);
if (!handle) return null;
return { scopeKind, scopeId, taskKind, holderId: handle.holderId, heldUntil: handle.heldUntil };
}
// eslint-disable-next-line @typescript-eslint/require-await
private async acquireScopeLock(
scopeKind: ScopeKind,
scopeId: string,
opts: { ttlMs: number; holderId: string },
taskKind?: ObservationLogTaskKind,
): Promise<(ObservationLockHandle & { taskKind?: ObservationLogTaskKind }) | null> {
const key = scopeKey(scopeKind, scopeId);
const existing = this.locksByScope.get(key);
const now = Date.now();
if (existing && existing.holderId !== opts.holderId && existing.heldUntil.getTime() > now) {
return null;
}
const handle: ObservationLockHandle = {
const handle: ObservationLockHandle & { taskKind?: ObservationLogTaskKind } = {
scopeKind,
scopeId,
holderId: opts.holderId,
heldUntil: new Date(now + opts.ttlMs),
...(taskKind !== undefined && { taskKind }),
};
this.locksByScope.set(key, handle);
return { ...handle };
@ -456,6 +486,15 @@ export class InMemoryMemory
// eslint-disable-next-line @typescript-eslint/require-await
async releaseObservationLock(handle: ObservationLockHandle): Promise<void> {
await this.releaseScopeLock(handle);
}
async releaseObservationLogTaskLock(handle: ObservationLogTaskLockHandle): Promise<void> {
await this.releaseScopeLock(handle);
}
// eslint-disable-next-line @typescript-eslint/require-await
private async releaseScopeLock(handle: ObservationLockHandle): Promise<void> {
const key = scopeKey(handle.scopeKind, handle.scopeId);
const current = this.locksByScope.get(key);
if (current && current.holderId === handle.holderId) {

View File

@ -5,12 +5,17 @@ import type {
ObservationLogObserveFn,
ObservationLogObserverInput,
} from './observation-log-observer';
import type {
ObservationLogReflectFn,
ObservationLogReflectorInput,
} from './observation-log-reflector';
import type { ModelConfig } from '../types/sdk/agent';
// Keep this low while runtime history is a floating message window: short but durable facts
// should become observations before older messages are likely to fall out of prompt context.
export const DEFAULT_OBSERVATION_LOG_OBSERVER_THRESHOLD_TOKENS = 2_000;
export const DEFAULT_OBSERVATION_LOG_TAIL_LIMIT = 20;
export const DEFAULT_OBSERVATION_LOG_REFLECTOR_THRESHOLD_TOKENS = 24_000;
export const DEFAULT_OBSERVATION_LOG_OBSERVER_PROMPT = `You are observing a conversation between a user and an agent. Extract durable observations about what happened, what was decided, what changed, and what needs follow-up. The agent will read your observations on later turns as its memory of this conversation.
@ -306,3 +311,241 @@ export function createObservationLogObserveFn(
return text.trim();
};
}
export const DEFAULT_OBSERVATION_LOG_REFLECTOR_PROMPT = `You are reorganizing an observation log so it stays useful and under a size limit. The log is an append-only record of what happened in a conversation. Your job is to identify what to drop, merge, or replace while preserving the most important content.
You receive: the active observation log with IDs, markers, and timestamps; the current timestamp; and the token budget.
MARKERS AND PRIORITY
🔴 Critical. Facts, decisions, identities, commitments. NEVER drop. May merge with other 🔴 observations on the SAME topic if they restate the same thing.
🟡 Important. Preferences, ongoing work, recent activity. Drop ONLY if clearly superseded or redundant. Prefer merging over dropping.
🟢 Info. Small acknowledgments, recoverable detail, conversational filler. FIRST to drop when the log is oversized. Drop older 🟢 before newer 🟢.
Completion. Drop together with the parent observation when the parent is dropped. May fold into the merged observation when the parent is merged.
TIEBREAKER: When two observations are equally important, keep the more recent one.
EXAMPLES
Example 1: Log under budget. Return empty arrays.
Input:
[obs_001] 🔴 (14:30) User is migrating @n8n/agents from Mastra to internal SDK
[obs_002] 🟡 (14:35) User adopted two-stage compression model (Observer + Reflector)
Budget: 5000 tokens. Current: 600 tokens.
Output:
{"drop": [], "merge": []}
Example 2: Multiple 🔴 observations restating the same fact. Merge them.
Input:
[obs_010] 🔴 (09:00) User works at Acme on the platform team
[obs_034] 🔴 (10:15) User confirmed they joined Acme platform team 8 months ago
[obs_078] 🔴 (12:00) User leads the storage subgroup within the platform team
Output:
{
"drop": [],
"merge": [
{
"supersedes": ["obs_010", "obs_034", "obs_078"],
"marker": "🔴",
"text": "User works at Acme on the platform team (joined 8 months ago); leads the storage subgroup."
}
]
}
Example 3: Old 🟢 acknowledgments. Drop them.
Input:
[obs_001] 🟢 (08:00) User greeted the agent
[obs_002] 🟢 (08:30) User thanked agent for an earlier explanation
[obs_023] 🟢 (14:00) User confirmed they understood the recent answer
Budget: 3000 tokens. Current: 4200 tokens.
Output:
{"drop": ["obs_001", "obs_002"], "merge": []}
(Keep the most recent acknowledgment; drop older filler. If budget pressure required it, obs_023 could also be dropped, but newer 🟢 stays before older 🟢 goes.)
Example 4: 🟡 observation superseded by a later one.
Input:
[obs_005] 🟡 (10:00) User plans to use Postgres for the memory store
[obs_044] 🟡 (12:30) User switched to SQLite for the memory store (changing from earlier Postgres plan)
Output:
{"drop": ["obs_005"], "merge": []}
(obs_044 already encodes the change explicitly; obs_005 is no longer current.)
Example 5: Completion under a dropped parent.
Input:
[obs_001] 🟡 (10:00) User asked about hybrid retrieval implementation
[obs_002] (10:30) User confirmed they understand RRF fusion
[obs_087] 🟡 (14:00) User asked about Reflector design tradeoffs
[obs_088] (14:30) User confirmed Reflector approach is clear
Output:
{"drop": ["obs_001", "obs_002"], "merge": []}
(Old completed Q&A pair drops together. Newer 🟡 + pair stays.)
Example 6: Clusters across multiple turns of the same case. Merge.
Input:
[obs_020] 🟡 (11:00) Investigation: login failing intermittently for some users
[obs_021] 🟡 (11:05) Auth service logs show no errors during failure window
[obs_022] 🟡 (11:10) DB connection pool at 12/50; not saturated
[obs_023] 🟡 (11:30) Session store identified as suspect; not yet checked
Output:
{
"drop": [],
"merge": [
{
"supersedes": ["obs_020", "obs_021", "obs_022", "obs_023"],
"marker": "🟡",
"text": "Intermittent login failure investigation: auth service logs clean, DB pool at 12/50 (ruled out). Session store identified as next suspect; not yet checked."
}
]
}
BAD AND GOOD MERGE PATTERNS
BAD: Merging across topics.
Input:
[obs_001] 🔴 User works at Acme
[obs_002] 🔴 User is migrating @n8n/agents from Mastra
Wrong merge:
{
"supersedes": ["obs_001", "obs_002"],
"marker": "🔴",
"text": "User works at Acme and is migrating @n8n/agents from Mastra"
}
These are about different topics. Do NOT merge them. Leave both as separate observations.
BAD: Inventing causation or content not in sources.
Input:
[obs_001] 🔴 User uses Postgres
[obs_002] 🟡 User mentioned performance issues with the workflow
Wrong merge:
{
"supersedes": ["obs_001", "obs_002"],
"marker": "🔴",
"text": "User has Postgres performance issues affecting workflows"
}
The sources do not state Postgres caused the performance issues. NEVER invent a causal link the observations do not state. Leave both as separate observations.
BAD: Dropping 🔴 because it feels redundant when it is not duplicated.
Input:
[obs_001] 🔴 User works at Acme
[obs_002] 🔴 User joined Acme in March 2025
Wrong:
{"drop": ["obs_001"], "merge": []}
These are not duplicates. obs_001 is current employment; obs_002 is when it started. Both are durable facts. Merge into a single observation instead, never drop.
Correct:
{
"drop": [],
"merge": [
{
"supersedes": ["obs_001", "obs_002"],
"marker": "🔴",
"text": "User works at Acme; joined in March 2025."
}
]
}
GOOD: Combining genuinely redundant facts.
Input:
[obs_001] 🟡 User prefers concise responses
[obs_034] 🟡 User asked agent to keep answers shorter
[obs_087] 🟡 User mentioned again that the previous response was too long
Output:
{
"drop": [],
"merge": [
{
"supersedes": ["obs_001", "obs_034", "obs_087"],
"marker": "🟡",
"text": "User prefers concise responses (reinforced multiple times in this conversation)."
}
]
}
OUTPUT FORMAT
Return JSON with two arrays:
{
"drop": ["obs_id_1", "obs_id_2"],
"merge": [
{
"supersedes": ["obs_id_3", "obs_id_4"],
"marker": "🟡",
"text": "Merged observation that replaces the listed ones"
}
]
}
The merged observation supersedes its sources. The drop array drops observations without replacement. An observation ID may appear in EITHER drop OR merge.supersedes, never both. Do not invent IDs that were not in the input.
GOALS
- Keep the active log under the token budget.
- Preserve every 🔴 unless it is genuinely duplicated by another 🔴.
- Preserve recent 🟡 unless clearly superseded.
- Drop 🟢 aggressively, oldest first.
- Merge clusters of related observations into denser ones.
- Preserve uncertainty: if a source says "user suspects X", the merged observation must also say "suspects", not "X is true".
- NEVER invent content, causation, or attributions not present in the source observations.
CONSERVATISM
If the log is already under budget AND no clear duplicates exist, return {"drop": [], "merge": []}. Do not restructure for the sake of restructuring. The Reflector is for reducing the log, not for prettifying it.`;
export interface CreateObservationLogReflectFnOptions {
reflectorPrompt?: string;
}
export function buildObservationLogReflectorPrompt(input: ObservationLogReflectorInput): string {
const trimmedLog = input.renderedObservationLog.trim();
const renderedLog = trimmedLog === '' ? '(empty)' : trimmedLog;
return [
`Current timestamp: ${input.now.toISOString()}`,
`Scope: ${input.scopeKind}:${input.scopeId}`,
`Active observation log tokens: ${input.tokenCount}`,
`Token budget: ${input.tokenBudget}`,
`Current active observation log:\n${renderedLog}`,
].join('\n\n');
}
export function createObservationLogReflectFn(
model: ModelConfig,
options: CreateObservationLogReflectFnOptions = {},
): ObservationLogReflectFn {
return async (input) => {
const { text } = await generateText({
model: createModel(model),
system: options.reflectorPrompt ?? DEFAULT_OBSERVATION_LOG_REFLECTOR_PROMPT,
prompt: buildObservationLogReflectorPrompt(input),
});
return text.trim();
};
}

View File

@ -0,0 +1,265 @@
import type {
BuiltObservationLogStore,
ObservationLogEntry,
ObservationLogMarker,
ObservationLogMerge,
ObservationLogReflection,
ObservationLogReflectionResult,
ObservationLogScopeKind,
TokenCounter,
} from '../types/sdk/observation-log';
import { estimateObservationTokens } from '../types/sdk/observation-log';
const MARKER_SYMBOLS: Record<ObservationLogMarker, string> = {
critical: '🔴',
important: '🟡',
info: '🟢',
completion: '✅',
};
const REFLECTOR_OVER_BUDGET_WARNING =
'Observation log remains over reflector budget after reflection';
export interface ObservationLogReflectorInput {
scopeKind: ObservationLogScopeKind;
scopeId: string;
now: Date;
activeObservationLog: ObservationLogEntry[];
renderedObservationLog: string;
tokenCount: number;
tokenBudget: number;
}
export type ObservationLogReflectFn = (input: ObservationLogReflectorInput) => Promise<string>;
export type ObservationLogReflectorMemory = BuiltObservationLogStore;
export interface ObservationLogReflectorWarning {
message: string;
scopeKind: ObservationLogScopeKind;
scopeId: string;
tokenCount: number;
tokenBudget: number;
}
export interface RunObservationLogReflectorOpts {
memory: ObservationLogReflectorMemory;
scopeKind: ObservationLogScopeKind;
scopeId: string;
reflectorThresholdTokens: number;
reflect: ObservationLogReflectFn;
tokenCounter?: TokenCounter;
now?: Date;
onWarning?: (warning: ObservationLogReflectorWarning) => void;
}
export type RunObservationLogReflectorResult =
| { status: 'skipped'; reason: 'below-threshold'; tokenCount: number }
| {
status: 'ran';
tokenCount: number;
remainingTokenCount: number;
overBudgetAfterReflection: boolean;
reflection: ObservationLogReflection;
result: ObservationLogReflectionResult;
};
export function parseObservationLogReflectionJson(output: string): ObservationLogReflection {
let parsed: unknown;
try {
parsed = JSON.parse(extractJsonObject(output));
} catch {
throw new Error('Reflector output must be valid JSON');
}
if (!isRecord(parsed)) throw new Error('Reflector output must be a JSON object');
return {
drop: readStringArray(parsed.drop ?? [], 'drop'),
merge: readMergeArray(parsed.merge ?? []),
};
}
export function renderObservationLogForReflection(entries: ObservationLogEntry[]): string {
const activeEntries = entries.filter((entry) => entry.status === 'active').sort(compareEntries);
const activeIds = new Set(activeEntries.map((entry) => entry.id));
const childrenByParent = new Map<string, ObservationLogEntry[]>();
const roots: ObservationLogEntry[] = [];
for (const entry of activeEntries) {
if (entry.parentId && activeIds.has(entry.parentId)) {
const children = childrenByParent.get(entry.parentId) ?? [];
children.push(entry);
childrenByParent.set(entry.parentId, children);
} else {
roots.push(entry);
}
}
const lines: string[] = [];
for (const root of roots) {
lines.push(renderReflectionBullet(root));
for (const child of childrenByParent.get(root.id) ?? []) {
lines.push(renderReflectionBullet(child, ' '));
}
}
return lines.join('\n');
}
export async function runObservationLogReflector(
opts: RunObservationLogReflectorOpts,
): Promise<RunObservationLogReflectorResult> {
const { memory, scopeKind, scopeId, reflectorThresholdTokens } = opts;
const tokenCounter = opts.tokenCounter ?? estimateObservationTokens;
const activeObservationLog = await memory.getActiveObservationLog({
scopeKind,
scopeId,
order: 'asc',
});
const tokenCount = countObservationTokens(activeObservationLog, tokenCounter);
if (tokenCount <= reflectorThresholdTokens) {
return { status: 'skipped', reason: 'below-threshold', tokenCount };
}
const now = opts.now ?? new Date();
const renderedObservationLog = renderObservationLogForReflection(activeObservationLog);
const output = await opts.reflect({
scopeKind,
scopeId,
now,
activeObservationLog,
renderedObservationLog,
tokenCount,
tokenBudget: reflectorThresholdTokens,
});
const reflection = withCreatedAt(parseObservationLogReflectionJson(output), now);
const result = await memory.applyObservationLogReflection({ scopeKind, scopeId }, reflection);
const remainingTokenCount = countObservationTokens(
await memory.getActiveObservationLog({ scopeKind, scopeId }),
tokenCounter,
);
const overBudgetAfterReflection = remainingTokenCount > reflectorThresholdTokens;
if (overBudgetAfterReflection) {
opts.onWarning?.({
message: REFLECTOR_OVER_BUDGET_WARNING,
scopeKind,
scopeId,
tokenCount: remainingTokenCount,
tokenBudget: reflectorThresholdTokens,
});
}
return {
status: 'ran',
tokenCount,
remainingTokenCount,
overBudgetAfterReflection,
reflection,
result,
};
}
function extractJsonObject(output: string): string {
const start = output.indexOf('{');
const end = output.lastIndexOf('}');
if (start === -1 || end === -1 || end < start) {
throw new Error('Reflector output did not contain a JSON object');
}
return output.slice(start, end + 1);
}
function readStringArray(value: unknown, fieldName: string): string[] {
if (!Array.isArray(value)) throw new Error(`Reflector field "${fieldName}" must be an array`);
const strings: string[] = [];
for (const item of value) {
if (typeof item !== 'string') {
throw new Error(`Reflector field "${fieldName}" must contain only strings`);
}
strings.push(item);
}
return strings;
}
function readMergeArray(value: unknown): ObservationLogMerge[] {
if (!Array.isArray(value)) throw new Error('Reflector field "merge" must be an array');
return value.map(readMerge);
}
function readMerge(value: unknown, index: number): ObservationLogMerge {
if (!isRecord(value)) throw new Error(`Reflector merge[${index}] must be an object`);
const supersedes = readStringArray(value.supersedes, `merge[${index}].supersedes`);
const marker = readMarker(value.marker, index);
if (typeof value.text !== 'string') {
throw new Error(`Reflector merge[${index}].text must be a string`);
}
const parentId = readOptionalParentId(value.parentId, index);
return {
supersedes,
marker,
text: value.text,
...(parentId !== undefined && { parentId }),
};
}
function readMarker(value: unknown, index: number): ObservationLogMarker {
switch (value) {
case '🔴':
case 'critical':
return 'critical';
case '🟡':
case 'important':
return 'important';
case '🟢':
case 'info':
return 'info';
case '✅':
case 'completion':
return 'completion';
default:
throw new Error(`Reflector merge[${index}].marker must be a known observation marker`);
}
}
function readOptionalParentId(value: unknown, index: number): string | null | undefined {
if (value === undefined) return undefined;
if (value === null || typeof value === 'string') return value;
throw new Error(`Reflector merge[${index}].parentId must be a string or null`);
}
function withCreatedAt(reflection: ObservationLogReflection, now: Date): ObservationLogReflection {
return {
drop: reflection.drop,
merge: reflection.merge.map((merge, index) => ({
...merge,
createdAt: merge.createdAt ?? new Date(now.getTime() + index),
})),
};
}
function countObservationTokens(
entries: ObservationLogEntry[],
tokenCounter: TokenCounter,
): number {
return entries.reduce((total, entry) => total + observationTokenCount(entry, tokenCounter), 0);
}
function observationTokenCount(entry: ObservationLogEntry, tokenCounter: TokenCounter): number {
if (Number.isFinite(entry.tokenCount) && entry.tokenCount > 0) return entry.tokenCount;
return tokenCounter(entry.text);
}
function compareEntries(a: ObservationLogEntry, b: ObservationLogEntry): number {
const timeDiff = a.createdAt.getTime() - b.createdAt.getTime();
if (timeDiff !== 0) return timeDiff;
return a.id.localeCompare(b.id);
}
function renderReflectionBullet(entry: ObservationLogEntry, indent = ''): string {
return `${indent}* [${entry.id}] ${MARKER_SYMBOLS[entry.marker]} ${entry.createdAt.toISOString()} ${entry.text}`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}

View File

@ -0,0 +1,221 @@
import { BackgroundTaskTracker } from './background-task-tracker';
import type {
BuiltObservationLogTaskLockStore,
ObservationLogScopeKind,
ObservationLogTaskKind,
ObservationLogTaskLockHandle,
} from '../types/sdk/observation-log';
const DEFAULT_LOCK_TTL_MS = 30_000;
const DEFAULT_MAX_CAPTURED_ERRORS = 50;
export interface ScopedMemoryTaskDescriptor {
taskKind: ObservationLogTaskKind;
scopeKind: ObservationLogScopeKind;
scopeId: string;
}
export type ScopedMemoryTaskStatus = 'queued' | 'running';
export interface ScopedMemoryTaskInfo extends ScopedMemoryTaskDescriptor {
id: string;
status: ScopedMemoryTaskStatus;
queuedAt: Date;
startedAt?: Date;
}
export type ScopedMemoryTaskResult<T> =
| { status: 'completed'; value: T }
| { status: 'skipped'; reason: 'lock-held' }
| { status: 'failed'; error: unknown };
export interface ScopedMemoryTaskHandle<T> extends ScopedMemoryTaskDescriptor {
id: string;
done: Promise<ScopedMemoryTaskResult<T>>;
}
export interface ScopedMemoryTaskError extends ScopedMemoryTaskDescriptor {
id: string;
error: unknown;
createdAt: Date;
}
export type ScopedMemoryTaskEvent<T = unknown> =
| { type: 'started'; task: ScopedMemoryTaskInfo }
| { type: 'completed'; task: ScopedMemoryTaskInfo; value: T }
| { type: 'skipped'; task: ScopedMemoryTaskInfo; reason: 'lock-held' }
| { type: 'failed'; task: ScopedMemoryTaskInfo; error: unknown };
export interface ScopedMemoryTaskRunnerOptions {
tracker?: BackgroundTaskTracker;
lockStore?: BuiltObservationLogTaskLockStore;
lockTtlMs?: number;
maxCapturedErrors?: number;
onEvent?: (event: ScopedMemoryTaskEvent) => void;
}
export class ScopedMemoryTaskRunner {
private readonly tracker: BackgroundTaskTracker;
private readonly lockStore: BuiltObservationLogTaskLockStore | undefined;
private readonly lockTtlMs: number;
private readonly maxCapturedErrors: number;
private readonly onEvent: ((event: ScopedMemoryTaskEvent) => void) | undefined;
private readonly queuesByScope = new Map<string, Promise<unknown>>();
private readonly inFlightTasks = new Map<string, ScopedMemoryTaskInfo>();
private readonly capturedErrors: ScopedMemoryTaskError[] = [];
constructor(options: ScopedMemoryTaskRunnerOptions = {}) {
this.tracker = options.tracker ?? new BackgroundTaskTracker();
this.lockStore = options.lockStore;
this.lockTtlMs = options.lockTtlMs ?? DEFAULT_LOCK_TTL_MS;
const maxCapturedErrors = Math.floor(options.maxCapturedErrors ?? DEFAULT_MAX_CAPTURED_ERRORS);
this.maxCapturedErrors = Number.isFinite(maxCapturedErrors)
? Math.max(0, maxCapturedErrors)
: 0;
this.onEvent = options.onEvent;
}
get pendingCount(): number {
return this.inFlightTasks.size;
}
getInFlightTasks(): ScopedMemoryTaskInfo[] {
return Array.from(this.inFlightTasks.values()).map((task) => ({
...task,
queuedAt: new Date(task.queuedAt),
...(task.startedAt ? { startedAt: new Date(task.startedAt) } : {}),
}));
}
getCapturedErrors(): ScopedMemoryTaskError[] {
return this.capturedErrors.map((entry) => ({
...entry,
createdAt: new Date(entry.createdAt),
}));
}
async flush(): Promise<void> {
await this.tracker.flush();
}
schedule<T>(
descriptor: ScopedMemoryTaskDescriptor,
task: () => Promise<T>,
): ScopedMemoryTaskHandle<T> {
const id = crypto.randomUUID();
const info: ScopedMemoryTaskInfo = {
...descriptor,
id,
status: 'queued',
queuedAt: new Date(),
};
this.inFlightTasks.set(id, info);
const scopeKey = this.scopeKey(descriptor.scopeKind, descriptor.scopeId);
const previous = this.queuesByScope.get(scopeKey) ?? Promise.resolve();
const done = previous.catch(() => undefined).then(async () => await this.runTask(info, task));
const queued = done.finally(() => {
if (this.queuesByScope.get(scopeKey) === queued) {
this.queuesByScope.delete(scopeKey);
}
});
this.queuesByScope.set(scopeKey, queued);
this.tracker.track(queued);
return { ...descriptor, id, done };
}
private async runTask<T>(
info: ScopedMemoryTaskInfo,
task: () => Promise<T>,
): Promise<ScopedMemoryTaskResult<T>> {
info.status = 'running';
info.startedAt = new Date();
this.emit({ type: 'started', task: this.cloneTaskInfo(info) });
let lock: ObservationLogTaskLockHandle | null = null;
try {
lock = await this.acquireLock(info);
if (this.lockStore && !lock) {
const result: ScopedMemoryTaskResult<T> = { status: 'skipped', reason: 'lock-held' };
this.emit({ type: 'skipped', task: this.cloneTaskInfo(info), reason: 'lock-held' });
return result;
}
const value = await task();
this.emit({ type: 'completed', task: this.cloneTaskInfo(info), value });
return { status: 'completed', value };
} catch (error) {
this.captureError(info, error);
this.emit({ type: 'failed', task: this.cloneTaskInfo(info), error });
return { status: 'failed', error };
} finally {
if (lock) await this.releaseLock(info, lock);
this.inFlightTasks.delete(info.id);
}
}
private async acquireLock(
info: ScopedMemoryTaskInfo,
): Promise<ObservationLogTaskLockHandle | null> {
if (!this.lockStore) return null;
return await this.lockStore.acquireObservationLogTaskLock(
info.scopeKind,
info.scopeId,
info.taskKind,
{ holderId: info.id, ttlMs: this.lockTtlMs },
);
}
private async releaseLock(
info: ScopedMemoryTaskInfo,
lock: ObservationLogTaskLockHandle,
): Promise<void> {
try {
await this.lockStore?.releaseObservationLogTaskLock(lock);
} catch (error) {
this.captureError(info, error);
}
}
private captureError(info: ScopedMemoryTaskInfo, error: unknown): void {
this.capturedErrors.push({
id: info.id,
taskKind: info.taskKind,
scopeKind: info.scopeKind,
scopeId: info.scopeId,
error,
createdAt: new Date(),
});
while (this.capturedErrors.length > this.maxCapturedErrors) {
this.capturedErrors.shift();
}
}
private emit(event: ScopedMemoryTaskEvent): void {
try {
this.onEvent?.(event);
} catch (error) {
this.captureError(event.task, error);
}
}
private cloneTaskInfo(info: ScopedMemoryTaskInfo): ScopedMemoryTaskInfo {
return {
...info,
queuedAt: new Date(info.queuedAt),
...(info.startedAt ? { startedAt: new Date(info.startedAt) } : {}),
};
}
private scopeKey(scopeKind: ObservationLogScopeKind, scopeId: string): string {
return `${scopeKind}:${scopeId}`;
}
}

View File

@ -92,6 +92,7 @@ export {
export type {
BuiltObservationLogStore,
BuiltObservationLogTaskLockStore,
NewObservationLogEntry,
ObservationLogEntry,
ObservationLogMarker,
@ -102,6 +103,8 @@ export type {
ObservationLogScope,
ObservationLogScopeKind,
ObservationLogStatus,
ObservationLogTaskKind,
ObservationLogTaskLockHandle,
TokenCounter,
} from './sdk/observation-log';
export {

View File

@ -8,11 +8,19 @@ export type ObservationLogStatus = (typeof OBSERVATION_LOG_STATUSES)[number];
export type ObservationLogScopeKind = 'thread' | 'resource';
export type ObservationLogTaskKind = 'observer' | 'reflector';
export interface ObservationLogScope {
scopeKind: ObservationLogScopeKind;
scopeId: string;
}
export interface ObservationLogTaskLockHandle extends ObservationLogScope {
taskKind: ObservationLogTaskKind;
holderId: string;
heldUntil: Date;
}
export interface ObservationLogEntry extends ObservationLogScope {
id: string;
marker: ObservationLogMarker;
@ -76,3 +84,13 @@ export interface BuiltObservationLogStore {
reflection: ObservationLogReflection,
): Promise<ObservationLogReflectionResult>;
}
export interface BuiltObservationLogTaskLockStore {
acquireObservationLogTaskLock(
scopeKind: ObservationLogScopeKind,
scopeId: string,
taskKind: ObservationLogTaskKind,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLogTaskLockHandle | null>;
releaseObservationLogTaskLock(handle: ObservationLogTaskLockHandle): Promise<void>;
}

View File

@ -0,0 +1,32 @@
import {
buildN8nObservationLogReflectorPrompt,
DEFAULT_REFLECTOR_PROMPT,
DEFAULT_REFLECTOR_THRESHOLD_TOKENS,
} from '../observation-log-reflector';
describe('n8n observation-log reflector policy', () => {
it('uses the n8n reflector defaults', () => {
expect(DEFAULT_REFLECTOR_THRESHOLD_TOKENS).toBe(24_000);
expect(DEFAULT_REFLECTOR_PROMPT).toContain('Return JSON with two arrays');
expect(DEFAULT_REFLECTOR_PROMPT).toContain('🔴 Critical');
});
it('builds the reflector prompt from active log and token budget', () => {
const prompt = buildN8nObservationLogReflectorPrompt({
scopeKind: 'thread',
scopeId: 'thread-1',
now: new Date('2026-05-12T15:00:00.000Z'),
activeObservationLog: [],
renderedObservationLog:
'* [obs-1] 🔴 2026-05-12T14:30:00.000Z User chose observation-log memory.',
tokenCount: 42,
tokenBudget: 24_000,
});
expect(prompt).toContain('Current timestamp: 2026-05-12T15:00:00.000Z');
expect(prompt).toContain('Scope: thread:thread-1');
expect(prompt).toContain('Active observation log tokens: 42');
expect(prompt).toContain('Token budget: 24000');
expect(prompt).toContain('[obs-1] 🔴');
});
});

View File

@ -676,6 +676,20 @@ describe('N8nMemory', () => {
expect(observationLockRepository.findOneBy).not.toHaveBeenCalled();
});
it('stores the task kind for scoped observation-log task locks', async () => {
const { updateQueryBuilder } = mockLockWrite({ updateAffected: 1 });
const handle = await memory.acquireObservationLogTaskLock('thread', 't-1', 'reflector', {
ttlMs: 60_000,
holderId: 'A',
});
expect(handle).toMatchObject({ taskKind: 'reflector', holderId: 'A' });
expect(updateQueryBuilder.set).toHaveBeenCalledWith(
expect.objectContaining({ taskKind: 'reflector', holderId: 'A' }),
);
});
it('refuses a different holder while the lock is live', async () => {
mockLockWrite({ updateAffected: 0 });
@ -730,5 +744,21 @@ describe('N8nMemory', () => {
holderId: 'A',
});
});
it('releases observation-log task locks by scope and holder', async () => {
await memory.releaseObservationLogTaskLock({
scopeKind: 'resource',
scopeId: 't-1',
taskKind: 'reflector',
holderId: 'A',
heldUntil: new Date(),
});
expect(observationLockRepository.delete).toHaveBeenCalledWith({
scopeKind: 'resource',
scopeId: 't-1',
taskKind: 'reflector',
holderId: 'A',
});
});
});
});

View File

@ -35,6 +35,16 @@ import { AgentThreadRepository } from '../repositories/agent-thread.repository';
const estimateObservationTokens = (text: string) => Math.ceil(text.length / 4);
type ObservationLogTaskKind = 'observer' | 'reflector';
interface ObservationLogTaskLockHandle {
scopeKind: ObservationLogScopeKind;
scopeId: string;
taskKind: ObservationLogTaskKind;
holderId: string;
heldUntil: Date;
}
@Service()
export class N8nMemory implements BuiltMemory, BuiltObservationLogStore {
constructor(
@ -392,9 +402,24 @@ export class N8nMemory implements BuiltMemory, BuiltObservationLogStore {
scopeId: string,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLockHandle | null> {
const handle = await this.acquireObservationLogTaskLock(scopeKind, scopeId, 'observer', opts);
if (!handle) return null;
return {
scopeKind: handle.scopeKind,
scopeId: handle.scopeId,
holderId: handle.holderId,
heldUntil: handle.heldUntil,
};
}
async acquireObservationLogTaskLock(
scopeKind: ObservationLogScopeKind,
scopeId: string,
taskKind: ObservationLogTaskKind,
opts: { ttlMs: number; holderId: string },
): Promise<ObservationLogTaskLockHandle | null> {
const now = new Date();
const heldUntil = new Date(now.getTime() + opts.ttlMs);
const taskKind = 'observer';
const updateResult = await this.observationLockRepository
.createQueryBuilder()
@ -408,7 +433,7 @@ export class N8nMemory implements BuiltMemory, BuiltObservationLogStore {
.execute();
if ((updateResult.affected ?? 0) > 0) {
return { scopeKind, scopeId, holderId: opts.holderId, heldUntil };
return { scopeKind, scopeId, taskKind, holderId: opts.holderId, heldUntil };
}
await this.observationLockRepository
@ -427,16 +452,29 @@ export class N8nMemory implements BuiltMemory, BuiltObservationLogStore {
});
if (!claimed) return null;
return { scopeKind, scopeId, holderId: opts.holderId, heldUntil };
return { scopeKind, scopeId, taskKind, holderId: opts.holderId, heldUntil };
}
async releaseObservationLock(
handle: ObservationLockHandle & { scopeKind: ObservationLogScopeKind },
): Promise<void> {
await this.releaseScopeLock(handle);
}
async releaseObservationLogTaskLock(handle: ObservationLogTaskLockHandle): Promise<void> {
await this.releaseScopeLock(handle);
}
private async releaseScopeLock(
handle: ObservationLockHandle & { scopeKind: ObservationLogScopeKind },
): Promise<void> {
const taskKind: ObservationLogTaskKind =
'taskKind' in handle && handle.taskKind === 'reflector' ? 'reflector' : 'observer';
await this.observationLockRepository.delete({
scopeKind: handle.scopeKind,
scopeId: handle.scopeId,
taskKind: 'observer',
taskKind,
holderId: handle.holderId,
});
}

View File

@ -0,0 +1,12 @@
export {
buildObservationLogReflectorPrompt as buildN8nObservationLogReflectorPrompt,
createObservationLogReflectFn as createN8nObservationLogReflectFn,
DEFAULT_OBSERVATION_LOG_REFLECTOR_PROMPT as DEFAULT_REFLECTOR_PROMPT,
DEFAULT_OBSERVATION_LOG_REFLECTOR_THRESHOLD_TOKENS as DEFAULT_REFLECTOR_THRESHOLD_TOKENS,
} from '@n8n/agents';
export type {
CreateObservationLogReflectFnOptions as CreateN8nObservationLogReflectFnOptions,
ObservationLogReflectFn as N8nObservationLogReflectFn,
ObservationLogReflectorInput as N8nObservationLogReflectorInput,
} from '@n8n/agents';