From 2a9a23f77473f4a450655cec3d1382ea8f26c2cf Mon Sep 17 00:00:00 2001 From: Michael Drury Date: Tue, 2 Jun 2026 17:56:18 +0100 Subject: [PATCH] fix(core): External agent channels correctly utilise the user ID for episodic memory (#31584) --- .../__tests__/agent-chat-bridge.test.ts | 99 +++++++++++++++++-- .../agents/integrations/agent-chat-bridge.ts | 5 +- .../agents/utils/agent-memory-scope.ts | 7 +- 3 files changed, 99 insertions(+), 12 deletions(-) diff --git a/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts b/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts index 07ef71532a1..baec4e11202 100644 --- a/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts +++ b/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts @@ -18,6 +18,7 @@ type ChatBotLike = ConstructorParameters[0]; interface FakeThread { id: string; channelId?: string; + adapter?: { botUserId?: string }; subscribe: jest.Mock; post: jest.Mock; startTyping: jest.Mock; @@ -44,10 +45,11 @@ function makeBot() { return { bot, handlers }; } -function makeThread(): FakeThread { +function makeThread(id = 'thread-1', adapter?: FakeThread['adapter']): FakeThread { return { - id: 'thread-1', + id, channelId: 'channel-1', + adapter, subscribe: jest.fn().mockResolvedValue(undefined), post: jest.fn().mockResolvedValue(undefined), startTyping: jest.fn().mockResolvedValue(undefined), @@ -97,6 +99,24 @@ class StreamingTestIntegration extends AgentChatIntegration { } } +class FormattedBufferedTestIntegration extends AgentChatIntegration { + readonly type = 'test-formatted-buffered'; + readonly credentialTypes: string[] = []; + readonly supportedComponents: string[] = []; + readonly description = ''; + readonly displayLabel = 'Test Formatted Buffered'; + readonly displayIcon = 'circle'; + readonly disableStreaming = true; + readonly formatThreadId = { + fromSdk: (thread: { id: string; adapter?: { botUserId?: string } }) => + `chat:${thread.adapter?.botUserId ?? 'bot'}-${thread.id}`, + toSdk: (threadId: string) => threadId.split('-').slice(1).join('-'), + }; + async createAdapter(_ctx: AgentChatIntegrationContext): Promise { + return {}; + } +} + // TODO: use real Telegram integration for testing describe('AgentChatBridge — consumeStream', () => { @@ -117,6 +137,7 @@ describe('AgentChatBridge — consumeStream', () => { registry = new ChatIntegrationRegistry(); registry.register(new BufferingTestIntegration()); registry.register(new StreamingTestIntegration()); + registry.register(new FormattedBufferedTestIntegration()); Container.set(ChatIntegrationRegistry, registry); }); @@ -219,10 +240,11 @@ describe('AgentChatBridge — consumeStream', () => { }); }); - describe('when integration keeps streaming enabled', () => { - it('uses the formatted chat thread as the episodic memory partition', async () => { + describe('when deriving memory scope', () => { + it('uses the platform user as the episodic memory partition across threads', async () => { const { bot, handlers } = makeBot(); - const thread = makeThread(); + const thread1 = makeThread('thread-1'); + const thread2 = makeThread('thread-2'); const agentExecutor = makeAgentExecutor([{ type: 'finish', finishReason: 'stop' }]); new AgentChatBridge( @@ -235,18 +257,79 @@ describe('AgentChatBridge — consumeStream', () => { streamingIntegration, ); - await handlers.mention!(thread, { text: 'hi', author: { userId: 'u1', userName: 'user1' } }); + await handlers.mention!(thread1, { text: 'hi', author: { userId: 'u1', userName: 'user1' } }); + await handlers.mention!(thread2, { + text: 'what did we discuss?', + author: { userId: 'u1', userName: 'user1' }, + }); - expect(agentExecutor.executeForChatPublished).toHaveBeenCalledWith( + expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith( + 1, expect.objectContaining({ memory: expect.objectContaining({ threadId: expect.objectContaining({ id: 'agent-1:thread-1' }), - resourceId: 'integration:test-streaming:thread-1', + resourceId: 'integration:test-streaming:u1', + }), + }), + ); + expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + memory: expect.objectContaining({ + threadId: expect.objectContaining({ id: 'agent-1:thread-2' }), + resourceId: 'integration:test-streaming:u1', }), }), ); }); + it('keeps a formatted thread ID separate from the platform user memory partition', async () => { + const { bot, handlers } = makeBot(); + const thread1 = makeThread('1001', { botUserId: 'bot-1' }); + const thread2 = makeThread('1002', { botUserId: 'bot-1' }); + const agentExecutor = makeAgentExecutor([{ type: 'finish', finishReason: 'stop' }]); + + new AgentChatBridge( + bot as unknown as ChatBotLike, + 'agent-1', + agentExecutor as never, + componentMapper, + logger, + 'project-1', + { + type: 'test-formatted-buffered', + credentialId: 'cred-1', + } as unknown as AgentIntegrationConfig, + ); + + await handlers.mention!(thread1, { text: 'hi', author: { userId: 'u1', userName: 'user1' } }); + await handlers.mention!(thread2, { + text: 'what did we discuss?', + author: { userId: 'u1', userName: 'user1' }, + }); + + expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + memory: expect.objectContaining({ + threadId: expect.objectContaining({ id: 'agent-1:chat:bot-1-1001' }), + resourceId: 'integration:test-formatted-buffered:u1', + }), + }), + ); + expect(agentExecutor.executeForChatPublished).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + memory: expect.objectContaining({ + threadId: expect.objectContaining({ id: 'agent-1:chat:bot-1-1002' }), + resourceId: 'integration:test-formatted-buffered:u1', + }), + }), + ); + }); + }); + + describe('when integration keeps streaming enabled', () => { it('posts an AsyncIterable whose drained content equals the concatenated deltas', async () => { const { bot, handlers } = makeBot(); const thread = makeThread(); diff --git a/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts b/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts index c3e3d773f96..074455670be 100644 --- a/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts +++ b/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts @@ -314,7 +314,8 @@ export class AgentChatBridge { subject, }); // threadId.id is agent-prefixed for observation storage; resourceId keeps - // the platform identity so episodic recall remains agent + resource scoped. + // the platform user identity so episodic recall works across threads for + // the same user while staying isolated between users. // Always run the published snapshot — integrations are production traffic. const stream = this.agentService.executeForChatPublished({ agentId: this.agentId, @@ -322,7 +323,7 @@ export class AgentChatBridge { message: text, memory: { threadId, - resourceId: integrationMemoryResourceId(this.integration.type, platformThreadId), + resourceId: integrationMemoryResourceId(this.integration.type, message.author.userId), }, integrationType: this.integration.type, }); diff --git a/packages/cli/src/modules/agents/utils/agent-memory-scope.ts b/packages/cli/src/modules/agents/utils/agent-memory-scope.ts index 110bdc0a35c..364b7253095 100644 --- a/packages/cli/src/modules/agents/utils/agent-memory-scope.ts +++ b/packages/cli/src/modules/agents/utils/agent-memory-scope.ts @@ -2,8 +2,11 @@ export function draftChatMemoryResourceId(userId: string): string { return `draft-chat:${userId}`; } -export function integrationMemoryResourceId(integrationType: string, threadId: string): string { - return `integration:${integrationType}:${threadId}`; +export function integrationMemoryResourceId( + integrationType: string, + platformUserId: string, +): string { + return `integration:${integrationType}:${platformUserId}`; } export function taskRunMemoryResourceId(taskId: string): string {