feat(core): Use the new chat repositories for storage (no-changelog) (#20655)

This commit is contained in:
Jaakko Husso 2025-10-14 11:32:58 +03:00 committed by GitHub
parent 131a57e0eb
commit f7b3ff2304
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 204 additions and 41 deletions

View File

@ -23,6 +23,7 @@ export const PROVIDER_CREDENTIAL_TYPE_MAP: Record<ChatHubProvider, string> = {
export const chatHubConversationModelSchema = z.object({
provider: chatHubProviderSchema,
model: z.string(),
workflowId: z.string().nullable().default(null),
});
export type ChatHubConversationModel = z.infer<typeof chatHubConversationModelSchema>;
@ -50,6 +51,7 @@ export class ChatHubSendMessageRequest extends Z.class({
sessionId: z.string().uuid(),
message: z.string(),
model: chatHubConversationModelSchema,
previousMessageId: z.string().uuid().nullable(),
credentials: z.record(
z.object({
id: z.string(),

View File

@ -1,9 +1,9 @@
import { ChatHubProvider } from '@n8n/api-types';
import type { ChatHubProvider } from '@n8n/api-types';
import { ExecutionEntity, WithTimestampsAndStringId, WorkflowEntity } from '@n8n/db';
import { Column, Entity, ManyToOne, JoinColumn, OneToMany, type Relation } from '@n8n/typeorm';
import { ChatHubMessageState } from './chat-hub.types';
import { ChatHubSession } from './chat-session.entity';
import type { ChatHubSession } from './chat-hub-session.entity';
import type { ChatHubMessageState } from './chat-hub.types';
export type ChatHubMessageType = 'human' | 'ai' | 'system' | 'tool' | 'generic';

View File

@ -8,7 +8,7 @@ import {
} from '@n8n/db';
import { Column, Entity, ManyToOne, OneToMany, JoinColumn } from '@n8n/typeorm';
import type { ChatHubMessage } from './chat-message.entity';
import type { ChatHubMessage } from './chat-hub-message.entity';
@Entity({ name: 'chat_hub_sessions' })
export class ChatHubSession extends WithTimestampsAndStringId {

View File

@ -49,7 +49,7 @@ export class ChatHubController {
this.logger.info(`Chat send request received: ${JSON.stringify(payload)}`);
try {
await this.chatService.askN8n(res, req.user, {
await this.chatService.respondMessage(res, req.user, {
...payload,
userId: req.user.id,
replyId,

View File

@ -26,6 +26,13 @@ export class ChatHubModule implements ModuleInterface {
return { chatAccessEnabled };
}
async entities() {
const { ChatHubSession } = await import('./chat-hub-session.entity');
const { ChatHubMessage } = await import('./chat-hub-message.entity');
return [ChatHubSession, ChatHubMessage];
}
@OnShutdown()
async shutdown() {}
}

View File

@ -31,19 +31,20 @@ import {
} from 'n8n-workflow';
import { v4 as uuidv4 } from 'uuid';
import type { ChatPayloadWithCredentials, ChatMessage } from './chat-hub.types';
import { ChatHubSession } from './chat-hub-session.entity';
import type { ChatPayloadWithCredentials, MessageRecord } from './chat-hub.types';
import { ChatHubMessageRepository } from './chat-message.repository';
import { ChatHubSessionRepository } from './chat-session.repository';
import { CredentialsHelper } from '@/credentials-helper';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { getBase } from '@/workflow-execute-additional-data';
import { WorkflowExecutionService } from '@/workflows/workflow-execution.service';
import { CredentialsService } from '@/credentials/credentials.service';
import { ActiveExecutions } from '@/active-executions';
import { CredentialsService } from '@/credentials/credentials.service';
import { CredentialsHelper } from '@/credentials-helper';
import { getBase } from '@/workflow-execute-additional-data';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { WorkflowExecutionService } from '@/workflows/workflow-execution.service';
@Service()
export class ChatHubService {
private sesssions: Map<string, ChatMessage[]>;
constructor(
private readonly logger: Logger,
private readonly credentialsService: CredentialsService,
@ -54,9 +55,9 @@ export class ChatHubService {
private readonly projectRepository: ProjectRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly activeExecutions: ActiveExecutions,
) {
this.sesssions = new Map<string, ChatMessage[]>();
}
private readonly sessionRepository: ChatHubSessionRepository,
private readonly messageRepository: ChatHubMessageRepository,
) {}
async getModels(
user: User,
@ -293,23 +294,35 @@ export class ChatHubService {
return undefined;
}
async askN8n(res: Response, user: User, payload: ChatPayloadWithCredentials) {
let session = this.sesssions.get(payload.sessionId);
if (!session) {
session = [];
this.sesssions.set(payload.sessionId, session);
async respondMessage(res: Response, user: User, payload: ChatPayloadWithCredentials) {
const existing = await this.sessionRepository.getOneById(payload.sessionId, user.id);
const turnId = payload.messageId;
// TODO: Handle session ID conflicts better (different user, same ID)
let session: ChatHubSession;
if (existing) {
session = existing;
} else {
session = await this.sessionRepository.createChatSession({
id: payload.sessionId,
ownerId: user.id,
title: 'New Chat',
provider: payload.model.provider,
model: payload.model.model,
workflowId: payload.model.workflowId ?? null,
credentialId: payload.credentials?.[payload.model.provider]?.id ?? null,
});
}
const chatHistory = session.map((msg) => ({
type: msg.type,
message: msg.message,
}));
session.push({
await this.messageRepository.createChatMessage({
id: payload.messageId,
message: payload.message,
type: 'user',
createdAt: new Date(),
sessionId: payload.sessionId,
type: 'human',
name: 'You',
content: payload.message,
state: 'active',
turnId,
previousMessageId: payload.previousMessageId ?? null,
});
/* eslint-disable @typescript-eslint/naming-convention */
@ -357,7 +370,20 @@ export class ChatHubService {
parameters: {
mode: 'insert',
messages: {
messageValues: chatHistory,
messageValues: session.messages?.map((message) => {
const typeMap: Record<string, MessageRecord['type']> = {
human: 'user',
ai: 'ai',
system: 'system',
};
// TODO: Tools ?
return {
type: typeMap[message.type] || 'system',
message: message.content,
hideFromUI: false,
};
}),
},
},
type: '@n8n/n8n-nodes-langchain.memoryManager',
@ -456,12 +482,16 @@ export class ChatHubService {
const message = this.getMessage(execution);
if (message) {
this.logger.debug(`Assistant: ${message} (${payload.replyId})`);
session.push({
await this.messageRepository.createChatMessage({
id: payload.replyId,
message,
sessionId: payload.sessionId,
type: 'ai',
createdAt: new Date(),
name: 'AI',
content: message,
state: 'active',
turnId,
executionId: parseInt(execution.id, 10),
previousMessageId: payload.messageId,
});
}
}

View File

@ -7,6 +7,7 @@ export interface ChatPayloadWithCredentials {
messageId: string;
sessionId: string;
replyId: string;
previousMessageId: string | null;
model: ChatHubConversationModel;
credentials: INodeCredentials;
}
@ -19,3 +20,11 @@ export type ChatMessage = {
};
export type ChatHubMessageState = 'active' | 'superseded' | 'hidden' | 'deleted';
// From packages/@n8n/nodes-langchain/nodes/memory/MemoryManager/MemoryManager.node.ts
export type MessageRole = 'ai' | 'system' | 'user';
export interface MessageRecord {
type: MessageRole;
message: string;
hideFromUI: boolean;
}

View File

@ -0,0 +1,38 @@
import { withTransaction } from '@n8n/db';
import { Service } from '@n8n/di';
import { DataSource, EntityManager, Repository } from '@n8n/typeorm';
import { ChatHubMessage } from './chat-hub-message.entity';
import { ChatHubSessionRepository } from './chat-session.repository';
@Service()
export class ChatHubMessageRepository extends Repository<ChatHubMessage> {
constructor(
dataSource: DataSource,
private chatSessionRepository: ChatHubSessionRepository,
) {
super(ChatHubMessage, dataSource.manager);
}
async createChatMessage(message: Partial<ChatHubMessage>, trx?: EntityManager) {
return await withTransaction(this.manager, trx, async (em) => {
const chatMessage = em.create(ChatHubMessage, message);
const saved = await em.save(chatMessage);
await this.chatSessionRepository.updateLastMessageAt(saved.sessionId, saved.createdAt, em);
return saved;
});
}
async deleteChatMessage(id: string, trx?: EntityManager) {
return await withTransaction(this.manager, trx, async (em) => {
return await em.delete(ChatHubMessage, { id });
});
}
async getManyBySessionId(sessionId: string) {
return await this.find({
where: { sessionId },
order: { createdAt: 'ASC' },
});
}
}

View File

@ -0,0 +1,63 @@
import { withTransaction } from '@n8n/db';
import { Service } from '@n8n/di';
import { DataSource, EntityManager, Repository } from '@n8n/typeorm';
import { ChatHubSession } from './chat-hub-session.entity';
@Service()
export class ChatHubSessionRepository extends Repository<ChatHubSession> {
constructor(dataSource: DataSource) {
super(ChatHubSession, dataSource.manager);
}
async createChatSession(session: Partial<ChatHubSession>, trx?: EntityManager) {
return await withTransaction(this.manager, trx, async (em) => {
const chatHubSession = em.create(ChatHubSession, session);
const saved = await em.save(chatHubSession);
return await em.findOneOrFail(ChatHubSession, {
where: { id: saved.id },
relations: ['messages'],
});
});
}
async updateLastMessageAt(id: string, lastMessageAt: Date, trx?: EntityManager) {
return await withTransaction(this.manager, trx, async (em) => {
await em.update(ChatHubSession, { id }, { lastMessageAt });
return await em.findOneOrFail(ChatHubSession, {
where: { id },
relations: ['messages'],
});
});
}
async updateChatTitle(id: string, title: string, trx?: EntityManager) {
return await withTransaction(this.manager, trx, async (em) => {
await em.update(ChatHubSession, { id }, { title });
return await em.findOneOrFail(ChatHubSession, {
where: { id },
relations: ['messages'],
});
});
}
async deleteChatHubSession(id: string, trx?: EntityManager) {
return await withTransaction(this.manager, trx, async (em) => {
return await em.delete(ChatHubSession, { id });
});
}
async getManyByUserId(userId: string) {
return await this.find({
where: { ownerId: userId },
order: { lastMessageAt: 'DESC' },
});
}
async getOneById(id: string, userId: string) {
return await this.findOne({
where: { id, ownerId: userId },
relations: ['messages'],
});
}
}

View File

@ -25,6 +25,12 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
const messagesBySession = ref<Partial<Record<string, ChatMessage[]>>>({});
const sessions = ref<ChatHubConversation[]>([]);
const getLastMessage = (sessionId: string) => {
const msgs = messagesBySession.value[sessionId];
if (!msgs || msgs.length === 0) return null;
return msgs[msgs.length - 1];
};
async function fetchChatModels(credentialMap: CredentialsMap) {
loadingModels.value = true;
models.value = await fetchChatModelsApi(rootStore.restApiContext, {
@ -48,6 +54,7 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
role: msg.role,
type: 'message' as const,
text: msg.content,
key: msg.id,
})),
};
}
@ -59,6 +66,7 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
...(messagesBySession.value[sessionId] ?? []),
{
id,
key: id,
role: 'user',
type: 'message',
text: content,
@ -67,13 +75,14 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
};
}
function addAiMessage(sessionId: string, content: string, id: string) {
function addAiMessage(sessionId: string, content: string, id: string, key: string) {
messagesBySession.value = {
...messagesBySession.value,
[sessionId]: [
...(messagesBySession.value[sessionId] ?? []),
{
id,
key,
role: 'assistant',
type: 'message',
text: content,
@ -82,11 +91,11 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
};
}
function appendMessage(sessionId: string, content: string, id: string) {
function appendMessage(sessionId: string, content: string, key: string) {
messagesBySession.value = {
...messagesBySession.value,
[sessionId]: (messagesBySession.value[sessionId] ?? []).map((msg) => {
if (msg.id === id && msg.type === 'message') {
if (msg.key === key && msg.type === 'message') {
return {
...msg,
text: msg.text + content,
@ -99,7 +108,7 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
function onBeginMessage(sessionId: string, messageId: string, nodeId: string, runIndex?: number) {
isResponding.value = true;
addAiMessage(sessionId, '', `${messageId}-${nodeId}-${runIndex ?? 0}`);
addAiMessage(sessionId, '', messageId, `${messageId}-${nodeId}-${runIndex ?? 0}`);
}
function onChunk(
@ -162,6 +171,7 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
credentials: ChatHubSendMessageRequest['credentials'],
) {
const messageId = uuidv4();
const previousMessageId = getLastMessage(sessionId)?.id ?? null;
addUserMessage(sessionId, message, messageId);
@ -173,6 +183,7 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
sessionId,
message,
credentials,
previousMessageId,
},
(chunk: StructuredChunk) => onStreamMessage(sessionId, chunk, messageId),
onStreamDone,

View File

@ -3,6 +3,7 @@ import { z } from 'zod';
export interface UserMessage {
id: string;
key: string;
role: 'user';
type: 'message';
text: string;
@ -10,6 +11,7 @@ export interface UserMessage {
export interface AssistantMessage {
id: string;
key: string;
role: 'assistant';
type: 'message';
text: string;
@ -17,6 +19,7 @@ export interface AssistantMessage {
export interface ErrorMessage {
id: string;
key: string;
role: 'assistant';
type: 'error';
content: string;

View File

@ -11,7 +11,7 @@ export function findOneFromModelsResponse(
): ChatHubConversationModel | undefined {
for (const provider of chatHubProviderSchema.options) {
if (response[provider].models.length > 0) {
return { model: response[provider].models[0].name, provider };
return { model: response[provider].models[0].name, provider, workflowId: null };
}
}

View File

@ -75,7 +75,7 @@ function onSelect(id: string) {
return;
}
emit('change', { provider: parsedProvider, model });
emit('change', { provider: parsedProvider, model, workflowId: null });
}
</script>