feat(core): Save errored AI messages to DB (no-changelog) (#20970)

This commit is contained in:
Jaakko Husso 2025-10-20 16:55:28 +03:00 committed by GitHub
parent d127ef46c2
commit 14d9c20e02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 64 additions and 142 deletions

View File

@ -94,7 +94,7 @@ export class ChatHubChangeConversationTitleRequest extends Z.class({
}) {}
export type ChatHubMessageType = 'human' | 'ai' | 'system' | 'tool' | 'generic';
export type ChatHubMessageState = 'active' | 'replaced';
export type ChatHubMessageState = 'success' | 'error' | 'pending';
export type ChatSessionId = string; // UUID
export type ChatMessageId = string; // UUID
@ -127,10 +127,8 @@ export interface ChatHubMessageDto {
updatedAt: string;
previousMessageId: ChatMessageId | null;
turnId: ChatMessageId | null;
retryOfMessageId: ChatMessageId | null;
revisionOfMessageId: ChatMessageId | null;
runIndex: number;
}
export type ChatHubConversationsResponse = ChatHubSessionDto[];

View File

@ -136,7 +136,6 @@ describe('chatHub', () => {
name: 'Nathan',
type: 'human',
content: 'message 1',
turnId: ids[0],
createdAt: new Date('2025-01-03T00:00:00Z'),
});
const msg2 = await messagesRepository.createChatMessage({
@ -146,7 +145,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 2',
previousMessageId: msg1.id,
turnId: ids[0],
createdAt: new Date('2025-01-03T00:05:00Z'),
});
const msg3 = await messagesRepository.createChatMessage({
@ -156,7 +154,6 @@ describe('chatHub', () => {
type: 'human',
content: 'message 3',
previousMessageId: msg2.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:10:00Z'),
});
const msg4 = await messagesRepository.createChatMessage({
@ -166,7 +163,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 4',
previousMessageId: msg3.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:15:00Z'),
});
@ -181,16 +177,12 @@ describe('chatHub', () => {
expect(Object.keys(messages)).toHaveLength(4);
expect(messages[msg1.id].content).toBe('message 1');
expect(messages[msg1.id].type).toBe('human');
expect(messages[msg1.id].turnId).toBe(msg1.id);
expect(messages[msg2.id].content).toBe('message 2');
expect(messages[msg2.id].type).toBe('ai');
expect(messages[msg2.id].turnId).toBe(msg1.id);
expect(messages[msg3.id].content).toBe('message 3');
expect(messages[msg3.id].type).toBe('human');
expect(messages[msg3.id].turnId).toBe(msg3.id);
expect(messages[msg4.id].content).toBe('message 4');
expect(messages[msg4.id].type).toBe('ai');
expect(messages[msg4.id].turnId).toBe(msg3.id);
});
it('should get conversation with a edit branch', async () => {
@ -215,7 +207,6 @@ describe('chatHub', () => {
name: 'Nathan',
type: 'human',
content: 'message 1',
turnId: ids[0],
createdAt: new Date('2025-01-03T00:00:00Z'),
});
const msg2 = await messagesRepository.createChatMessage({
@ -225,7 +216,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 2',
previousMessageId: msg1.id,
turnId: ids[0],
createdAt: new Date('2025-01-03T00:05:00Z'),
});
const msg3 = await messagesRepository.createChatMessage({
@ -235,7 +225,6 @@ describe('chatHub', () => {
type: 'human',
content: 'message 3a',
previousMessageId: msg2.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:10:00Z'),
});
const msg4 = await messagesRepository.createChatMessage({
@ -245,7 +234,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 4a',
previousMessageId: msg3.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:15:00Z'),
});
// Edit message 3 to create a branch
@ -257,7 +245,6 @@ describe('chatHub', () => {
content: 'message 3b',
previousMessageId: msg2.id,
revisionOfMessageId: msg3.id,
turnId: ids[4],
createdAt: new Date('2025-01-03T00:20:00Z'),
});
const msg6 = await messagesRepository.createChatMessage({
@ -267,7 +254,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 4b',
previousMessageId: msg5.id,
turnId: ids[4],
createdAt: new Date('2025-01-03T00:25:00Z'),
});
@ -309,7 +295,6 @@ describe('chatHub', () => {
name: 'Nathan',
type: 'human',
content: 'message 1a',
turnId: ids[0],
createdAt: new Date('2025-01-03T00:00:00Z'),
});
await messagesRepository.createChatMessage({
@ -319,7 +304,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 2a',
previousMessageId: msg1.id,
turnId: ids[1],
createdAt: new Date('2025-01-03T00:05:00Z'),
});
// Edit message 1 to create a branch
@ -330,7 +314,6 @@ describe('chatHub', () => {
type: 'human',
content: 'message 1b',
revisionOfMessageId: msg1.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:10:00Z'),
});
await messagesRepository.createChatMessage({
@ -340,7 +323,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 2b',
previousMessageId: msg3.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:15:00Z'),
});
@ -377,7 +359,6 @@ describe('chatHub', () => {
name: 'Nathan',
type: 'human',
content: 'message 1',
turnId: ids[0],
createdAt: new Date('2025-01-03T00:00:00Z'),
});
const msg2 = await messagesRepository.createChatMessage({
@ -387,7 +368,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 2',
previousMessageId: msg1.id,
turnId: ids[0],
createdAt: new Date('2025-01-03T00:05:00Z'),
});
const msg3 = await messagesRepository.createChatMessage({
@ -397,7 +377,6 @@ describe('chatHub', () => {
type: 'human',
content: 'message 3',
previousMessageId: msg2.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:10:00Z'),
});
const msg4 = await messagesRepository.createChatMessage({
@ -407,7 +386,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 4a',
previousMessageId: msg3.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:15:00Z'),
});
// Retry message 4 to create a branch
@ -419,7 +397,6 @@ describe('chatHub', () => {
content: 'message 4b',
previousMessageId: msg3.id,
retryOfMessageId: msg4.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:20:00Z'),
});
@ -471,7 +448,6 @@ describe('chatHub', () => {
name: 'Nathan',
type: 'human',
content: 'message 1',
turnId: ids[0],
createdAt: new Date('2025-01-03T00:00:00Z'),
});
const msg2 = await messagesRepository.createChatMessage({
@ -481,7 +457,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 2a',
previousMessageId: msg1.id,
turnId: ids[0],
createdAt: new Date('2025-01-03T00:05:00Z'),
});
const msg3a = await messagesRepository.createChatMessage({
@ -491,7 +466,6 @@ describe('chatHub', () => {
type: 'human',
content: 'message 3a',
previousMessageId: msg2.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:10:00Z'),
});
await messagesRepository.createChatMessage({
@ -501,7 +475,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 4a',
previousMessageId: msg3a.id,
turnId: ids[2],
createdAt: new Date('2025-01-03T00:15:00Z'),
});
const msg3b = await messagesRepository.createChatMessage({
@ -512,7 +485,6 @@ describe('chatHub', () => {
content: 'message 3b',
revisionOfMessageId: msg3a.id,
previousMessageId: msg2.id,
turnId: ids[4],
createdAt: new Date('2025-01-03T00:20:00Z'),
});
await messagesRepository.createChatMessage({
@ -522,7 +494,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 4b',
previousMessageId: msg3b.id,
turnId: ids[4],
createdAt: new Date('2025-01-03T00:25:00Z'),
});
await messagesRepository.createChatMessage({
@ -532,7 +503,6 @@ describe('chatHub', () => {
type: 'human',
content: 'message 1b',
revisionOfMessageId: msg1.id,
turnId: ids[6],
createdAt: new Date('2025-01-03T00:30:00Z'),
});
const msg2r = await messagesRepository.createChatMessage({
@ -543,7 +513,6 @@ describe('chatHub', () => {
content: 'message 2b',
previousMessageId: msg1.id,
retryOfMessageId: msg2.id,
turnId: ids[0],
createdAt: new Date('2025-01-03T00:35:00Z'),
});
const msg3d = await messagesRepository.createChatMessage({
@ -553,7 +522,6 @@ describe('chatHub', () => {
type: 'human',
content: 'message 3d',
previousMessageId: msg2r.id,
turnId: ids[8],
createdAt: new Date('2025-01-03T00:40:00Z'),
});
await messagesRepository.createChatMessage({
@ -563,7 +531,6 @@ describe('chatHub', () => {
type: 'ai',
content: 'message 4c',
previousMessageId: msg3d.id,
turnId: ids[8],
createdAt: new Date('2025-01-03T00:45:00Z'),
});

View File

@ -111,28 +111,6 @@ export class ChatHubMessage extends WithTimestamps {
@OneToMany('ChatHubMessage', (m: ChatHubMessage) => m.previousMessage)
responses?: Array<Relation<ChatHubMessage>>;
/**
* Root message of a conversation turn (Human message + AI responses)
*/
@Column({ type: String })
turnId: string;
/**
* Message that began the turn, probably from the human/user.
*/
@ManyToOne('ChatHubMessage', (m: ChatHubMessage) => m.turnMessages, {
onDelete: 'CASCADE',
nullable: true,
})
@JoinColumn({ name: 'turnId' })
turn?: Relation<ChatHubMessage> | null;
/**
* All messages that are part of this turn (including the root message).
*/
@OneToMany('ChatHubMessage', (m: ChatHubMessage) => m.turn)
turnMessages?: Array<Relation<ChatHubMessage>>;
/**
* ID of the message that this message is a retry of (if applicable).
*/
@ -155,12 +133,6 @@ export class ChatHubMessage extends WithTimestamps {
@OneToMany('ChatHubMessage', (m: ChatHubMessage) => m.retryOfMessage)
retries?: Array<Relation<ChatHubMessage>>;
/**
* The nth time this message has been generated/retried within the turn (0 = first attempt).
*/
@Column({ type: 'int', default: 0 })
runIndex: number;
/**
* ID of the message that this message is a revision/edit of (if applicable).
*/
@ -184,8 +156,8 @@ export class ChatHubMessage extends WithTimestamps {
revisions?: Array<Relation<ChatHubMessage>>;
/**
* State of the message, e.g. 'active', 'superseded'.
* State of the message, e.g. 'success', 'error'.
*/
@Column({ type: 'varchar', length: 16, default: 'active' })
@Column({ type: 'varchar', length: 16, default: 'success' })
state: ChatHubMessageState;
}

View File

@ -87,12 +87,11 @@ export class ChatHubController {
code: 500,
message: executionError.message,
});
} else {
} else if (!res.writableEnded) {
res.write(
JSON.stringify({
type: 'error',
content: executionError.message,
id: payload.replyId,
}) + '\n',
);
res.flush();
@ -129,7 +128,7 @@ export class ChatHubController {
code: 500,
message: executionError.message,
});
} else {
} else if (!res.writableEnded) {
res.write(
JSON.stringify({
type: 'error',
@ -171,7 +170,7 @@ export class ChatHubController {
code: 500,
message: executionError.message,
});
} else {
} else if (!res.writableEnded) {
res.write(
JSON.stringify({
type: 'error',

View File

@ -9,6 +9,7 @@ import {
type ChatMessageId,
type ChatSessionId,
ChatHubConversationModel,
ChatHubMessageState,
} from '@n8n/api-types';
import { Logger } from '@n8n/backend-common';
import {
@ -317,6 +318,14 @@ export class ChatHubService {
await this.workflowRepository.delete(workflowId);
}
private getErrorMessage(execution: IExecutionResponse): string | undefined {
if (execution.data.resultData.error) {
return execution.data.resultData.error.description ?? execution.data.resultData.error.message;
}
return undefined;
}
private getAIOutput(execution: IExecutionResponse): string | undefined {
const agent = execution.data.resultData.runData[NODE_NAMES.AI_AGENT];
if (!agent || !Array.isArray(agent) || agent.length === 0) return undefined;
@ -365,8 +374,7 @@ export class ChatHubService {
const messages = Object.fromEntries((session.messages ?? []).map((m) => [m.id, m]));
const history = this.buildMessageHistory(messages, payload.previousMessageId);
const turnId = messageId;
await this.saveHumanMessage(payload, user, turnId, payload.previousMessageId, selectedModel);
await this.saveHumanMessage(payload, user, payload.previousMessageId, selectedModel);
const workflow = await this.createChatWorkflow(
user,
@ -385,7 +393,6 @@ export class ChatHubService {
replyId,
sessionId,
messageId,
turnId,
selectedModel,
);
} finally {
@ -412,27 +419,13 @@ export class ChatHubService {
// If the message to edit isn't the original message, we want to point to the original message
const revisionOfMessageId = messageToEdit.revisionOfMessageId ?? messageToEdit.id;
const otherRuns = (session.messages ?? []).filter(
(m) => m.revisionOfMessageId === revisionOfMessageId,
);
const runIndex = otherRuns.length + 1;
await this.messageRepository.updateChatMessage(revisionOfMessageId, { state: 'replaced' });
for (const run of otherRuns) {
if (run.state === 'active') {
await this.messageRepository.updateChatMessage(run.id, { state: 'replaced' });
}
}
const turnId = payload.messageId;
await this.saveHumanMessage(
payload,
user,
turnId,
messageToEdit.previousMessageId,
selectedModel,
revisionOfMessageId,
runIndex,
);
const workflow = await this.createChatWorkflow(
@ -452,7 +445,6 @@ export class ChatHubService {
replyId,
sessionId,
messageId,
turnId,
selectedModel,
);
} finally {
@ -493,17 +485,6 @@ export class ChatHubService {
// If the message being retried is itself a retry, we want to point to the original message
const retryOfMessageId = messageToRetry.retryOfMessageId ?? messageToRetry.id;
const otherRuns = (session.messages ?? []).filter(
(m) => m.retryOfMessageId === retryOfMessageId,
);
const runIndex = otherRuns.length + 1;
await this.messageRepository.updateChatMessage(retryOfMessageId, { state: 'replaced' });
for (const run of otherRuns) {
if (run.state === 'active') {
await this.messageRepository.updateChatMessage(run.id, { state: 'replaced' });
}
}
const workflow = await this.createChatWorkflow(
user,
@ -522,10 +503,8 @@ export class ChatHubService {
replyId,
sessionId,
lastHumanMessage.id,
messageToRetry.turnId,
selectedModel,
retryOfMessageId,
runIndex,
);
} finally {
await this.deleteChatWorkflow(workflow.workflowData.id);
@ -543,10 +522,8 @@ export class ChatHubService {
replyId: ChatMessageId,
sessionId: ChatSessionId,
previousMessageId: ChatMessageId,
turnId: ChatMessageId,
selectedModel: ModelWithCredentials,
retryOfMessageId?: ChatMessageId,
runIndex?: number,
) {
const { workflowData, startNodes, triggerToStartFrom } = workflow;
@ -580,18 +557,34 @@ export class ChatHubService {
if (!execution) {
throw new NotFoundError(`Could not find execution with ID ${executionId}`);
}
if (!execution.status || execution.status !== 'success') {
const message = this.getErrorMessage(execution) ?? 'Error: Failed to generate a response';
await this.saveAIMessage({
id: replyId,
sessionId,
executionId: execution.id,
previousMessageId,
message,
selectedModel,
retryOfMessageId,
state: 'error',
});
throw new OperationalError(`Chat workflow execution failed: ${message}`);
}
const message = this.getAIOutput(execution) ?? 'Error: No response generated';
await this.saveAIMessage(
replyId,
await this.saveAIMessage({
id: replyId,
sessionId,
turnId,
execution.id,
executionId: execution.id,
previousMessageId,
message,
selectedModel,
retryOfMessageId,
runIndex,
);
state: 'success',
});
}
private prepareChatWorkflow(
@ -754,50 +747,53 @@ export class ChatHubService {
private async saveHumanMessage(
payload: HumanMessagePayload | EditMessagePayload,
user: User,
turnId: string,
previousMessageId: ChatMessageId | null,
selectedModel: ModelWithCredentials,
revisionOfMessageId?: ChatMessageId,
runIndex?: number,
) {
await this.messageRepository.createChatMessage({
id: payload.messageId,
sessionId: payload.sessionId,
type: 'human',
name: user.firstName || 'User',
state: 'active',
state: 'success',
content: payload.message,
turnId,
previousMessageId,
revisionOfMessageId,
runIndex,
...selectedModel,
});
}
private async saveAIMessage(
id: ChatMessageId,
sessionId: ChatSessionId,
turnId: ChatMessageId,
executionId: string,
previousMessageId: ChatMessageId,
message: string,
selectedModel: ModelWithCredentials,
retryOfMessageId?: ChatMessageId,
runIndex?: number,
) {
private async saveAIMessage({
id,
sessionId,
executionId,
previousMessageId,
message,
selectedModel,
retryOfMessageId,
state,
}: {
id: ChatMessageId;
sessionId: ChatSessionId;
executionId: string;
previousMessageId: ChatMessageId;
message: string;
selectedModel: ModelWithCredentials;
retryOfMessageId?: ChatMessageId;
editOfMessageId?: ChatMessageId;
state?: ChatHubMessageState;
}) {
await this.messageRepository.createChatMessage({
id,
sessionId,
turnId,
previousMessageId,
executionId: parseInt(executionId, 10),
type: 'ai',
name: 'AI',
state: 'active',
state,
content: message,
retryOfMessageId,
runIndex,
...selectedModel,
});
}
@ -946,10 +942,8 @@ export class ChatHubService {
updatedAt: message.updatedAt.toISOString(),
previousMessageId: message.previousMessageId,
turnId: message.turnId,
retryOfMessageId: message.retryOfMessageId,
revisionOfMessageId: message.revisionOfMessageId,
runIndex: message.runIndex,
};
}

View File

@ -245,14 +245,12 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
model: null,
workflowId: null,
executionId: null,
state: 'active',
state: 'success',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
previousMessageId: replyToMessageId,
turnId: null,
retryOfMessageId,
revisionOfMessageId: null,
runIndex: 0,
responses: [],
alternatives: [],
});
@ -339,14 +337,12 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
model: model?.model ?? null,
workflowId: null,
executionId: null,
state: 'active',
state: 'success',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
previousMessageId,
turnId: null,
retryOfMessageId: null,
revisionOfMessageId: null,
runIndex: 0,
responses: [],
alternatives: [],
});
@ -362,14 +358,12 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
model: null,
workflowId: null,
executionId: null,
state: 'active',
state: 'error',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
previousMessageId: messageId,
turnId: null,
retryOfMessageId: null,
revisionOfMessageId: null,
runIndex: 0,
responses: [],
alternatives: [],
});
@ -416,14 +410,12 @@ export const useChatStore = defineStore(CHAT_STORE, () => {
model: null,
workflowId: null,
executionId: null,
state: 'active',
state: 'success',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
previousMessageId,
turnId: null,
retryOfMessageId: null,
revisionOfMessageId: editId,
runIndex: 0,
responses: [],
alternatives: [],
});