mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-31 16:57:08 +02:00
feat(instance-ai): add native memory store
This commit is contained in:
parent
ea8e0f1054
commit
6780f10641
|
|
@ -1,5 +1,12 @@
|
|||
export { MAX_STEPS } from './constants/max-steps';
|
||||
export type { CheckpointStore, SerializableAgentState } from '@n8n/agents';
|
||||
export type {
|
||||
AgentDbMessage,
|
||||
AgentMessage,
|
||||
BuiltMemory,
|
||||
CheckpointStore,
|
||||
SerializableAgentState,
|
||||
Thread,
|
||||
} from '@n8n/agents';
|
||||
export { wrapUntrustedData } from './tools/web-research/sanitize-web-content';
|
||||
export type { Logger } from './logger';
|
||||
export { generateCompactionSummary } from './compaction';
|
||||
|
|
|
|||
|
|
@ -4,3 +4,4 @@ export { TypeORMCompositeStore } from './typeorm-composite-store';
|
|||
export { DbSnapshotStorage } from './db-snapshot-storage';
|
||||
export { DbIterationLogStorage } from './db-iteration-log-storage';
|
||||
export { TypeORMAgentCheckpointStore } from './typeorm-agent-checkpoint-store';
|
||||
export { TypeORMAgentMemory } from './typeorm-agent-memory';
|
||||
|
|
|
|||
|
|
@ -0,0 +1,181 @@
|
|||
import type { AgentDbMessage, AgentMessage, BuiltMemory, Thread } from '@n8n/instance-ai';
|
||||
import { Service } from '@n8n/di';
|
||||
import { In, LessThan } from '@n8n/typeorm';
|
||||
|
||||
import type { InstanceAiMessage } from '../entities/instance-ai-message.entity';
|
||||
import type { InstanceAiThread } from '../entities/instance-ai-thread.entity';
|
||||
import { InstanceAiMessageRepository } from '../repositories/instance-ai-message.repository';
|
||||
import { InstanceAiResourceRepository } from '../repositories/instance-ai-resource.repository';
|
||||
import { InstanceAiThreadRepository } from '../repositories/instance-ai-thread.repository';
|
||||
|
||||
function parseJsonSafe(text: string): unknown {
|
||||
try {
|
||||
return JSON.parse(text);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === 'object' && value !== null;
|
||||
}
|
||||
|
||||
function isAgentMessage(value: unknown): value is AgentMessage {
|
||||
if (!isRecord(value)) return false;
|
||||
|
||||
if (value.type === 'custom') {
|
||||
return 'data' in value;
|
||||
}
|
||||
|
||||
return typeof value.role === 'string' && Array.isArray(value.content);
|
||||
}
|
||||
|
||||
function toThread(entity: InstanceAiThread): Thread {
|
||||
return {
|
||||
id: entity.id,
|
||||
resourceId: entity.resourceId,
|
||||
title: entity.title || undefined,
|
||||
createdAt: entity.createdAt,
|
||||
updatedAt: entity.updatedAt,
|
||||
metadata: entity.metadata ?? undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function getMessageRole(message: AgentDbMessage): string {
|
||||
return 'role' in message ? message.role : 'custom';
|
||||
}
|
||||
|
||||
function getMessageType(message: AgentDbMessage): string | null {
|
||||
if (message.type === 'custom') return 'custom';
|
||||
if (message.type === 'llm') return 'llm';
|
||||
return null;
|
||||
}
|
||||
|
||||
function toAgentMessage(entity: InstanceAiMessage): AgentDbMessage | undefined {
|
||||
const parsed = parseJsonSafe(entity.content);
|
||||
if (!isAgentMessage(parsed)) return undefined;
|
||||
return { ...parsed, id: entity.id, createdAt: entity.createdAt };
|
||||
}
|
||||
|
||||
function workingMemoryKey(params: {
|
||||
threadId: string;
|
||||
resourceId: string;
|
||||
scope: 'resource' | 'thread';
|
||||
}): string {
|
||||
return params.scope === 'thread' ? `thread:${params.threadId}` : params.resourceId;
|
||||
}
|
||||
|
||||
@Service()
|
||||
export class TypeORMAgentMemory implements BuiltMemory {
|
||||
constructor(
|
||||
private readonly threadRepo: InstanceAiThreadRepository,
|
||||
private readonly messageRepo: InstanceAiMessageRepository,
|
||||
private readonly resourceRepo: InstanceAiResourceRepository,
|
||||
) {}
|
||||
|
||||
async getThread(threadId: string): Promise<Thread | null> {
|
||||
const thread = await this.threadRepo.findOneBy({ id: threadId });
|
||||
return thread ? toThread(thread) : null;
|
||||
}
|
||||
|
||||
async saveThread(thread: Omit<Thread, 'createdAt' | 'updatedAt'>): Promise<Thread> {
|
||||
const existing = await this.threadRepo.findOneBy({ id: thread.id });
|
||||
if (existing) {
|
||||
existing.resourceId = thread.resourceId;
|
||||
if (thread.title !== undefined) existing.title = thread.title;
|
||||
if (thread.metadata !== undefined) existing.metadata = thread.metadata;
|
||||
return toThread(await this.threadRepo.save(existing));
|
||||
}
|
||||
|
||||
const saved = await this.threadRepo.save(
|
||||
this.threadRepo.create({
|
||||
id: thread.id,
|
||||
resourceId: thread.resourceId,
|
||||
title: thread.title ?? '',
|
||||
metadata: thread.metadata ?? null,
|
||||
}),
|
||||
);
|
||||
return toThread(saved);
|
||||
}
|
||||
|
||||
async deleteThread(threadId: string): Promise<void> {
|
||||
await this.threadRepo.delete({ id: threadId });
|
||||
}
|
||||
|
||||
async getMessages(
|
||||
threadId: string,
|
||||
opts?: { limit?: number; before?: Date },
|
||||
): Promise<AgentDbMessage[]> {
|
||||
const where = opts?.before ? { threadId, createdAt: LessThan(opts.before) } : { threadId };
|
||||
|
||||
const entities = await this.messageRepo.find({
|
||||
where,
|
||||
order: opts?.limit ? { createdAt: 'DESC', id: 'DESC' } : { createdAt: 'ASC', id: 'ASC' },
|
||||
...(opts?.limit ? { take: opts.limit } : {}),
|
||||
});
|
||||
|
||||
const ordered = opts?.limit ? entities.reverse() : entities;
|
||||
return ordered.flatMap((entity) => {
|
||||
const message = toAgentMessage(entity);
|
||||
return message ? [message] : [];
|
||||
});
|
||||
}
|
||||
|
||||
async saveMessages(args: {
|
||||
threadId: string;
|
||||
resourceId: string;
|
||||
messages: AgentDbMessage[];
|
||||
}): Promise<void> {
|
||||
if (args.messages.length === 0) return;
|
||||
|
||||
const entities = args.messages.map((message) =>
|
||||
this.messageRepo.create({
|
||||
id: message.id,
|
||||
threadId: args.threadId,
|
||||
content: JSON.stringify(message),
|
||||
role: getMessageRole(message),
|
||||
type: getMessageType(message),
|
||||
resourceId: args.resourceId,
|
||||
createdAt: message.createdAt,
|
||||
updatedAt: message.createdAt,
|
||||
}),
|
||||
);
|
||||
|
||||
await this.messageRepo.save(entities);
|
||||
}
|
||||
|
||||
async deleteMessages(messageIds: string[]): Promise<void> {
|
||||
if (messageIds.length === 0) return;
|
||||
await this.messageRepo.delete({ id: In(messageIds) });
|
||||
}
|
||||
|
||||
async getWorkingMemory(params: {
|
||||
threadId: string;
|
||||
resourceId: string;
|
||||
scope: 'resource' | 'thread';
|
||||
}): Promise<string | null> {
|
||||
const resource = await this.resourceRepo.findOneBy({ id: workingMemoryKey(params) });
|
||||
return resource?.workingMemory ?? null;
|
||||
}
|
||||
|
||||
async saveWorkingMemory(
|
||||
params: { threadId: string; resourceId: string; scope: 'resource' | 'thread' },
|
||||
content: string,
|
||||
): Promise<void> {
|
||||
const id = workingMemoryKey(params);
|
||||
const existing = await this.resourceRepo.findOneBy({ id });
|
||||
if (existing) {
|
||||
existing.workingMemory = content;
|
||||
await this.resourceRepo.save(existing);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.resourceRepo.save(
|
||||
this.resourceRepo.create({
|
||||
id,
|
||||
workingMemory: content,
|
||||
metadata: { scope: params.scope },
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user