From 6780f10641de3d894cf0a8dd470345f58d62aef2 Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Tue, 5 May 2026 10:34:50 +0200 Subject: [PATCH] feat(instance-ai): add native memory store --- packages/@n8n/instance-ai/src/index.ts | 9 +- .../src/modules/instance-ai/storage/index.ts | 1 + .../storage/typeorm-agent-memory.ts | 181 ++++++++++++++++++ 3 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 packages/cli/src/modules/instance-ai/storage/typeorm-agent-memory.ts diff --git a/packages/@n8n/instance-ai/src/index.ts b/packages/@n8n/instance-ai/src/index.ts index 06039423795..24b98cb6079 100644 --- a/packages/@n8n/instance-ai/src/index.ts +++ b/packages/@n8n/instance-ai/src/index.ts @@ -1,5 +1,12 @@ export { MAX_STEPS } from './constants/max-steps'; -export type { CheckpointStore, SerializableAgentState } from '@n8n/agents'; +export type { + AgentDbMessage, + AgentMessage, + BuiltMemory, + CheckpointStore, + SerializableAgentState, + Thread, +} from '@n8n/agents'; export { wrapUntrustedData } from './tools/web-research/sanitize-web-content'; export type { Logger } from './logger'; export { generateCompactionSummary } from './compaction'; diff --git a/packages/cli/src/modules/instance-ai/storage/index.ts b/packages/cli/src/modules/instance-ai/storage/index.ts index b410b991b9c..9647fe19d27 100644 --- a/packages/cli/src/modules/instance-ai/storage/index.ts +++ b/packages/cli/src/modules/instance-ai/storage/index.ts @@ -4,3 +4,4 @@ export { TypeORMCompositeStore } from './typeorm-composite-store'; export { DbSnapshotStorage } from './db-snapshot-storage'; export { DbIterationLogStorage } from './db-iteration-log-storage'; export { TypeORMAgentCheckpointStore } from './typeorm-agent-checkpoint-store'; +export { TypeORMAgentMemory } from './typeorm-agent-memory'; diff --git a/packages/cli/src/modules/instance-ai/storage/typeorm-agent-memory.ts b/packages/cli/src/modules/instance-ai/storage/typeorm-agent-memory.ts new file mode 100644 index 00000000000..02b4a80120b --- /dev/null +++ b/packages/cli/src/modules/instance-ai/storage/typeorm-agent-memory.ts @@ -0,0 +1,181 @@ +import type { AgentDbMessage, AgentMessage, BuiltMemory, Thread } from '@n8n/instance-ai'; +import { Service } from '@n8n/di'; +import { In, LessThan } from '@n8n/typeorm'; + +import type { InstanceAiMessage } from '../entities/instance-ai-message.entity'; +import type { InstanceAiThread } from '../entities/instance-ai-thread.entity'; +import { InstanceAiMessageRepository } from '../repositories/instance-ai-message.repository'; +import { InstanceAiResourceRepository } from '../repositories/instance-ai-resource.repository'; +import { InstanceAiThreadRepository } from '../repositories/instance-ai-thread.repository'; + +function parseJsonSafe(text: string): unknown { + try { + return JSON.parse(text); + } catch { + return undefined; + } +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null; +} + +function isAgentMessage(value: unknown): value is AgentMessage { + if (!isRecord(value)) return false; + + if (value.type === 'custom') { + return 'data' in value; + } + + return typeof value.role === 'string' && Array.isArray(value.content); +} + +function toThread(entity: InstanceAiThread): Thread { + return { + id: entity.id, + resourceId: entity.resourceId, + title: entity.title || undefined, + createdAt: entity.createdAt, + updatedAt: entity.updatedAt, + metadata: entity.metadata ?? undefined, + }; +} + +function getMessageRole(message: AgentDbMessage): string { + return 'role' in message ? message.role : 'custom'; +} + +function getMessageType(message: AgentDbMessage): string | null { + if (message.type === 'custom') return 'custom'; + if (message.type === 'llm') return 'llm'; + return null; +} + +function toAgentMessage(entity: InstanceAiMessage): AgentDbMessage | undefined { + const parsed = parseJsonSafe(entity.content); + if (!isAgentMessage(parsed)) return undefined; + return { ...parsed, id: entity.id, createdAt: entity.createdAt }; +} + +function workingMemoryKey(params: { + threadId: string; + resourceId: string; + scope: 'resource' | 'thread'; +}): string { + return params.scope === 'thread' ? `thread:${params.threadId}` : params.resourceId; +} + +@Service() +export class TypeORMAgentMemory implements BuiltMemory { + constructor( + private readonly threadRepo: InstanceAiThreadRepository, + private readonly messageRepo: InstanceAiMessageRepository, + private readonly resourceRepo: InstanceAiResourceRepository, + ) {} + + async getThread(threadId: string): Promise { + const thread = await this.threadRepo.findOneBy({ id: threadId }); + return thread ? toThread(thread) : null; + } + + async saveThread(thread: Omit): Promise { + const existing = await this.threadRepo.findOneBy({ id: thread.id }); + if (existing) { + existing.resourceId = thread.resourceId; + if (thread.title !== undefined) existing.title = thread.title; + if (thread.metadata !== undefined) existing.metadata = thread.metadata; + return toThread(await this.threadRepo.save(existing)); + } + + const saved = await this.threadRepo.save( + this.threadRepo.create({ + id: thread.id, + resourceId: thread.resourceId, + title: thread.title ?? '', + metadata: thread.metadata ?? null, + }), + ); + return toThread(saved); + } + + async deleteThread(threadId: string): Promise { + await this.threadRepo.delete({ id: threadId }); + } + + async getMessages( + threadId: string, + opts?: { limit?: number; before?: Date }, + ): Promise { + const where = opts?.before ? { threadId, createdAt: LessThan(opts.before) } : { threadId }; + + const entities = await this.messageRepo.find({ + where, + order: opts?.limit ? { createdAt: 'DESC', id: 'DESC' } : { createdAt: 'ASC', id: 'ASC' }, + ...(opts?.limit ? { take: opts.limit } : {}), + }); + + const ordered = opts?.limit ? entities.reverse() : entities; + return ordered.flatMap((entity) => { + const message = toAgentMessage(entity); + return message ? [message] : []; + }); + } + + async saveMessages(args: { + threadId: string; + resourceId: string; + messages: AgentDbMessage[]; + }): Promise { + if (args.messages.length === 0) return; + + const entities = args.messages.map((message) => + this.messageRepo.create({ + id: message.id, + threadId: args.threadId, + content: JSON.stringify(message), + role: getMessageRole(message), + type: getMessageType(message), + resourceId: args.resourceId, + createdAt: message.createdAt, + updatedAt: message.createdAt, + }), + ); + + await this.messageRepo.save(entities); + } + + async deleteMessages(messageIds: string[]): Promise { + if (messageIds.length === 0) return; + await this.messageRepo.delete({ id: In(messageIds) }); + } + + async getWorkingMemory(params: { + threadId: string; + resourceId: string; + scope: 'resource' | 'thread'; + }): Promise { + const resource = await this.resourceRepo.findOneBy({ id: workingMemoryKey(params) }); + return resource?.workingMemory ?? null; + } + + async saveWorkingMemory( + params: { threadId: string; resourceId: string; scope: 'resource' | 'thread' }, + content: string, + ): Promise { + const id = workingMemoryKey(params); + const existing = await this.resourceRepo.findOneBy({ id }); + if (existing) { + existing.workingMemory = content; + await this.resourceRepo.save(existing); + return; + } + + await this.resourceRepo.save( + this.resourceRepo.create({ + id, + workingMemory: content, + metadata: { scope: params.scope }, + }), + ); + } +}