mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-04 10:39:23 +02:00
fix(core): External agent channels correctly utilise the user ID for episodic memory (#31584)
This commit is contained in:
parent
255b7a1543
commit
2a9a23f774
|
|
@ -18,6 +18,7 @@ type ChatBotLike = ConstructorParameters<typeof AgentChatBridge>[0];
|
|||
interface FakeThread {
|
||||
id: string;
|
||||
channelId?: string;
|
||||
adapter?: { botUserId?: string };
|
||||
subscribe: jest.Mock;
|
||||
post: jest.Mock;
|
||||
startTyping: jest.Mock;
|
||||
|
|
@ -44,10 +45,11 @@ function makeBot() {
|
|||
return { bot, handlers };
|
||||
}
|
||||
|
||||
function makeThread(): FakeThread {
|
||||
function makeThread(id = 'thread-1', adapter?: FakeThread['adapter']): FakeThread {
|
||||
return {
|
||||
id: 'thread-1',
|
||||
id,
|
||||
channelId: 'channel-1',
|
||||
adapter,
|
||||
subscribe: jest.fn().mockResolvedValue(undefined),
|
||||
post: jest.fn().mockResolvedValue(undefined),
|
||||
startTyping: jest.fn().mockResolvedValue(undefined),
|
||||
|
|
@ -97,6 +99,24 @@ class StreamingTestIntegration extends AgentChatIntegration {
|
|||
}
|
||||
}
|
||||
|
||||
class FormattedBufferedTestIntegration extends AgentChatIntegration {
|
||||
readonly type = 'test-formatted-buffered';
|
||||
readonly credentialTypes: string[] = [];
|
||||
readonly supportedComponents: string[] = [];
|
||||
readonly description = '';
|
||||
readonly displayLabel = 'Test Formatted Buffered';
|
||||
readonly displayIcon = 'circle';
|
||||
readonly disableStreaming = true;
|
||||
readonly formatThreadId = {
|
||||
fromSdk: (thread: { id: string; adapter?: { botUserId?: string } }) =>
|
||||
`chat:${thread.adapter?.botUserId ?? 'bot'}-${thread.id}`,
|
||||
toSdk: (threadId: string) => threadId.split('-').slice(1).join('-'),
|
||||
};
|
||||
async createAdapter(_ctx: AgentChatIntegrationContext): Promise<unknown> {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use real Telegram integration for testing
|
||||
|
||||
describe('AgentChatBridge — consumeStream', () => {
|
||||
|
|
@ -117,6 +137,7 @@ describe('AgentChatBridge — consumeStream', () => {
|
|||
registry = new ChatIntegrationRegistry();
|
||||
registry.register(new BufferingTestIntegration());
|
||||
registry.register(new StreamingTestIntegration());
|
||||
registry.register(new FormattedBufferedTestIntegration());
|
||||
Container.set(ChatIntegrationRegistry, registry);
|
||||
});
|
||||
|
||||
|
|
@ -219,10 +240,11 @@ describe('AgentChatBridge — consumeStream', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('when integration keeps streaming enabled', () => {
|
||||
it('uses the formatted chat thread as the episodic memory partition', async () => {
|
||||
describe('when deriving memory scope', () => {
|
||||
it('uses the platform user as the episodic memory partition across threads', async () => {
|
||||
const { bot, handlers } = makeBot();
|
||||
const thread = makeThread();
|
||||
const thread1 = makeThread('thread-1');
|
||||
const thread2 = makeThread('thread-2');
|
||||
const agentExecutor = makeAgentExecutor([{ type: 'finish', finishReason: 'stop' }]);
|
||||
|
||||
new AgentChatBridge(
|
||||
|
|
@ -235,18 +257,79 @@ describe('AgentChatBridge — consumeStream', () => {
|
|||
streamingIntegration,
|
||||
);
|
||||
|
||||
await handlers.mention!(thread, { text: 'hi', author: { userId: 'u1', userName: 'user1' } });
|
||||
await handlers.mention!(thread1, { text: 'hi', author: { userId: 'u1', userName: 'user1' } });
|
||||
await handlers.mention!(thread2, {
|
||||
text: 'what did we discuss?',
|
||||
author: { userId: 'u1', userName: 'user1' },
|
||||
});
|
||||
|
||||
expect(agentExecutor.executeForChatPublished).toHaveBeenCalledWith(
|
||||
expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
memory: expect.objectContaining({
|
||||
threadId: expect.objectContaining({ id: 'agent-1:thread-1' }),
|
||||
resourceId: 'integration:test-streaming:thread-1',
|
||||
resourceId: 'integration:test-streaming:u1',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.objectContaining({
|
||||
memory: expect.objectContaining({
|
||||
threadId: expect.objectContaining({ id: 'agent-1:thread-2' }),
|
||||
resourceId: 'integration:test-streaming:u1',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('keeps a formatted thread ID separate from the platform user memory partition', async () => {
|
||||
const { bot, handlers } = makeBot();
|
||||
const thread1 = makeThread('1001', { botUserId: 'bot-1' });
|
||||
const thread2 = makeThread('1002', { botUserId: 'bot-1' });
|
||||
const agentExecutor = makeAgentExecutor([{ type: 'finish', finishReason: 'stop' }]);
|
||||
|
||||
new AgentChatBridge(
|
||||
bot as unknown as ChatBotLike,
|
||||
'agent-1',
|
||||
agentExecutor as never,
|
||||
componentMapper,
|
||||
logger,
|
||||
'project-1',
|
||||
{
|
||||
type: 'test-formatted-buffered',
|
||||
credentialId: 'cred-1',
|
||||
} as unknown as AgentIntegrationConfig,
|
||||
);
|
||||
|
||||
await handlers.mention!(thread1, { text: 'hi', author: { userId: 'u1', userName: 'user1' } });
|
||||
await handlers.mention!(thread2, {
|
||||
text: 'what did we discuss?',
|
||||
author: { userId: 'u1', userName: 'user1' },
|
||||
});
|
||||
|
||||
expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
memory: expect.objectContaining({
|
||||
threadId: expect.objectContaining({ id: 'agent-1:chat:bot-1-1001' }),
|
||||
resourceId: 'integration:test-formatted-buffered:u1',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.objectContaining({
|
||||
memory: expect.objectContaining({
|
||||
threadId: expect.objectContaining({ id: 'agent-1:chat:bot-1-1002' }),
|
||||
resourceId: 'integration:test-formatted-buffered:u1',
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('when integration keeps streaming enabled', () => {
|
||||
it('posts an AsyncIterable whose drained content equals the concatenated deltas', async () => {
|
||||
const { bot, handlers } = makeBot();
|
||||
const thread = makeThread();
|
||||
|
|
|
|||
|
|
@ -314,7 +314,8 @@ export class AgentChatBridge {
|
|||
subject,
|
||||
});
|
||||
// threadId.id is agent-prefixed for observation storage; resourceId keeps
|
||||
// the platform identity so episodic recall remains agent + resource scoped.
|
||||
// the platform user identity so episodic recall works across threads for
|
||||
// the same user while staying isolated between users.
|
||||
// Always run the published snapshot — integrations are production traffic.
|
||||
const stream = this.agentService.executeForChatPublished({
|
||||
agentId: this.agentId,
|
||||
|
|
@ -322,7 +323,7 @@ export class AgentChatBridge {
|
|||
message: text,
|
||||
memory: {
|
||||
threadId,
|
||||
resourceId: integrationMemoryResourceId(this.integration.type, platformThreadId),
|
||||
resourceId: integrationMemoryResourceId(this.integration.type, message.author.userId),
|
||||
},
|
||||
integrationType: this.integration.type,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,8 +2,11 @@ export function draftChatMemoryResourceId(userId: string): string {
|
|||
return `draft-chat:${userId}`;
|
||||
}
|
||||
|
||||
export function integrationMemoryResourceId(integrationType: string, threadId: string): string {
|
||||
return `integration:${integrationType}:${threadId}`;
|
||||
export function integrationMemoryResourceId(
|
||||
integrationType: string,
|
||||
platformUserId: string,
|
||||
): string {
|
||||
return `integration:${integrationType}:${platformUserId}`;
|
||||
}
|
||||
|
||||
export function taskRunMemoryResourceId(taskId: string): string {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user