mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-12 16:10:30 +02:00
fix(core): Agent sessions correctly quoting columns in queries for Postgres (#29999)
This commit is contained in:
parent
f7c7acc244
commit
9f92005938
|
|
@ -26,14 +26,17 @@ export class AgentExecutionRepository extends Repository<AgentExecution> {
|
|||
if (threadIds.length === 0) return new Map();
|
||||
|
||||
// Correlated subquery: for each thread, pick the row with the smallest
|
||||
// createdAt that has a non-empty userMessage.
|
||||
// createdAt that has a non-empty userMessage. Identifiers are double-quoted
|
||||
// so Postgres preserves their camelCase (it lowercases unquoted names),
|
||||
// and the table name is read from metadata so DB_TABLE_PREFIX is respected.
|
||||
const tableName = this.metadata.tablePath;
|
||||
const rows = await this.createQueryBuilder('e')
|
||||
.select(['e.threadId AS threadId', 'e.userMessage AS userMessage'])
|
||||
.where('e.threadId IN (:...threadIds)', { threadIds })
|
||||
.andWhere("e.userMessage != ''")
|
||||
.select(['e."threadId" AS "threadId"', 'e."userMessage" AS "userMessage"'])
|
||||
.where('e."threadId" IN (:...threadIds)', { threadIds })
|
||||
.andWhere('e."userMessage" != \'\'')
|
||||
.andWhere(
|
||||
'e.createdAt = (SELECT MIN(e2.createdAt) FROM agent_execution e2 ' +
|
||||
"WHERE e2.threadId = e.threadId AND e2.userMessage != '')",
|
||||
`e."createdAt" = (SELECT MIN(e2."createdAt") FROM ${tableName} e2 ` +
|
||||
'WHERE e2."threadId" = e."threadId" AND e2."userMessage" != \'\')',
|
||||
)
|
||||
.getRawMany<{ threadId: string; userMessage: string }>();
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,146 @@
|
|||
import { createTeamProject, testDb, testModules } from '@n8n/backend-test-utils';
|
||||
import { Container } from '@n8n/di';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import type { AgentExecutionThread } from '@/modules/agents/entities/agent-execution-thread.entity';
|
||||
import type { AgentExecution } from '@/modules/agents/entities/agent-execution.entity';
|
||||
import type { Agent } from '@/modules/agents/entities/agent.entity';
|
||||
import { AgentExecutionThreadRepository } from '@/modules/agents/repositories/agent-execution-thread.repository';
|
||||
import { AgentExecutionRepository } from '@/modules/agents/repositories/agent-execution.repository';
|
||||
import { AgentRepository } from '@/modules/agents/repositories/agent.repository';
|
||||
|
||||
describe('AgentExecutionRepository', () => {
|
||||
let repository: AgentExecutionRepository;
|
||||
let threadRepo: AgentExecutionThreadRepository;
|
||||
let agentRepo: AgentRepository;
|
||||
let projectId: string;
|
||||
let agentId: string;
|
||||
|
||||
beforeAll(async () => {
|
||||
await testModules.loadModules(['agents']);
|
||||
await testDb.init();
|
||||
repository = Container.get(AgentExecutionRepository);
|
||||
threadRepo = Container.get(AgentExecutionThreadRepository);
|
||||
agentRepo = Container.get(AgentRepository);
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
const project = await createTeamProject();
|
||||
projectId = project.id;
|
||||
|
||||
const agent = agentRepo.create({
|
||||
id: uuid(),
|
||||
name: 'Test Agent',
|
||||
projectId,
|
||||
integrations: [],
|
||||
tools: {},
|
||||
skills: {},
|
||||
} as Partial<Agent>);
|
||||
await agentRepo.save(agent);
|
||||
agentId = agent.id;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await repository.delete({});
|
||||
await threadRepo.delete({});
|
||||
await agentRepo.delete({});
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await testDb.terminate();
|
||||
});
|
||||
|
||||
const createThread = async (overrides: Partial<AgentExecutionThread> = {}) => {
|
||||
const thread = threadRepo.create({
|
||||
id: uuid(),
|
||||
agentId,
|
||||
agentName: 'Test Agent',
|
||||
projectId,
|
||||
sessionNumber: 1,
|
||||
...overrides,
|
||||
});
|
||||
return await threadRepo.save(thread);
|
||||
};
|
||||
|
||||
const createExecution = async (overrides: Partial<AgentExecution>) => {
|
||||
const execution = repository.create({
|
||||
id: uuid(),
|
||||
status: 'success',
|
||||
userMessage: '',
|
||||
assistantResponse: '',
|
||||
...overrides,
|
||||
} as Partial<AgentExecution>);
|
||||
return await repository.save(execution);
|
||||
};
|
||||
|
||||
describe('findFirstUserMessageByThreadIds', () => {
|
||||
// The repository builds a raw SQL fragment referencing camelCase columns.
|
||||
// Postgres folds unquoted identifiers to lowercase, so this regression
|
||||
// fails on Postgres if the identifiers ever lose their double quotes.
|
||||
it('returns the earliest non-empty user message per thread', async () => {
|
||||
const threadA = await createThread({ sessionNumber: 1 });
|
||||
const threadB = await createThread({ id: uuid(), sessionNumber: 2 });
|
||||
|
||||
await createExecution({
|
||||
threadId: threadA.id,
|
||||
userMessage: 'first A',
|
||||
createdAt: new Date('2024-01-01T00:00:00Z'),
|
||||
});
|
||||
await createExecution({
|
||||
threadId: threadA.id,
|
||||
userMessage: 'second A',
|
||||
createdAt: new Date('2024-01-02T00:00:00Z'),
|
||||
});
|
||||
await createExecution({
|
||||
threadId: threadB.id,
|
||||
userMessage: 'only B',
|
||||
createdAt: new Date('2024-01-03T00:00:00Z'),
|
||||
});
|
||||
|
||||
const result = await repository.findFirstUserMessageByThreadIds([threadA.id, threadB.id]);
|
||||
|
||||
expect(result.get(threadA.id)).toBe('first A');
|
||||
expect(result.get(threadB.id)).toBe('only B');
|
||||
expect(result.size).toBe(2);
|
||||
});
|
||||
|
||||
it('skips executions with empty user messages when picking the earliest', async () => {
|
||||
const thread = await createThread();
|
||||
|
||||
await createExecution({
|
||||
threadId: thread.id,
|
||||
userMessage: '',
|
||||
createdAt: new Date('2024-01-01T00:00:00Z'),
|
||||
});
|
||||
await createExecution({
|
||||
threadId: thread.id,
|
||||
userMessage: 'real message',
|
||||
createdAt: new Date('2024-01-02T00:00:00Z'),
|
||||
});
|
||||
|
||||
const result = await repository.findFirstUserMessageByThreadIds([thread.id]);
|
||||
|
||||
expect(result.get(thread.id)).toBe('real message');
|
||||
});
|
||||
|
||||
it('returns an empty map when no thread ids are provided', async () => {
|
||||
const result = await repository.findFirstUserMessageByThreadIds([]);
|
||||
|
||||
expect(result.size).toBe(0);
|
||||
});
|
||||
|
||||
it('omits threads that contain only empty user messages', async () => {
|
||||
const thread = await createThread();
|
||||
|
||||
await createExecution({
|
||||
threadId: thread.id,
|
||||
userMessage: '',
|
||||
createdAt: new Date('2024-01-01T00:00:00Z'),
|
||||
});
|
||||
|
||||
const result = await repository.findFirstUserMessageByThreadIds([thread.id]);
|
||||
|
||||
expect(result.has(thread.id)).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
Loading…
Reference in New Issue
Block a user