diff --git a/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts b/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts index baec4e11202..e108299f5a7 100644 --- a/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts +++ b/packages/cli/src/modules/agents/integrations/__tests__/agent-chat-bridge.test.ts @@ -441,6 +441,56 @@ describe('AgentChatBridge — consumeStream', () => { expect(thread.post).toHaveBeenCalledWith({ markdown: 'Hello' }); }); + it('clears assistant status before responding directly to top-level Slack DMs', async () => { + const { bot, handlers } = makeBot(); + const setAssistantStatus = jest.fn().mockResolvedValue(undefined); + bot.getAdapter.mockReturnValue({ setAssistantStatus }); + const thread = makeThread(); + const agentExecutor = { + executeForChatPublished: jest.fn(() => + toStream([ + { type: 'text-delta', id: 't1', delta: 'Hello' }, + { type: 'finish', finishReason: 'stop' }, + ]), + ), + resumeForChat: jest.fn(() => toStream([{ type: 'finish', finishReason: 'stop' }])), + }; + + new AgentChatBridge( + bot as unknown as ChatBotLike, + 'agent-1', + agentExecutor as never, + componentMapper, + logger, + 'project-1', + slackIntegration, + ); + + await handlers.mention!(thread, { + text: 'hi', + raw: { + type: 'message', + channel: 'D123', + channel_type: 'im', + ts: '1779466577.518139', + }, + author: { userId: 'u1', userName: 'user1' }, + }); + + expect(setAssistantStatus).toHaveBeenNthCalledWith( + 1, + 'D123', + '1779466577.518139', + 'Thinking...', + ['Thinking...'], + ); + expect(setAssistantStatus).toHaveBeenNthCalledWith(2, 'D123', '1779466577.518139', ''); + expect(setAssistantStatus.mock.invocationCallOrder[1]).toBeLessThan( + thread.post.mock.invocationCallOrder[0], + ); + expect(thread.post).toHaveBeenCalledWith({ markdown: 'Hello' }); + }); + it('retries top-level Slack assistant status when Slack has not materialized the thread yet', async () => { jest.useFakeTimers(); const { bot, handlers } = makeBot(); @@ -493,6 +543,131 @@ describe('AgentChatBridge — consumeStream', () => { await run; }); + it('does not re-set Slack DM status with a stale retry after it has been cleared', async () => { + jest.useFakeTimers(); + const { bot, handlers } = makeBot(); + const invalidThreadError = Object.assign(new Error('invalid_thread_ts'), { + data: { error: 'invalid_thread_ts' }, + }); + const setAssistantStatus = jest + .fn() + .mockRejectedValueOnce(invalidThreadError) + .mockResolvedValue(undefined); + bot.getAdapter.mockReturnValue({ setAssistantStatus }); + const thread = makeThread(); + const agentExecutor = { + // Respond (which clears the status) while the initial "Thinking..." + // set is still waiting out its retry delay, then keep the stream open + // past that delay so the retry would otherwise fire after the clear. + executeForChatPublished: jest.fn(async function* (): AsyncGenerator { + yield { type: 'text-delta', id: 't1', delta: 'Hello' }; + yield { type: 'message', message: { role: 'assistant', content: [] } }; + await new Promise((resolve) => setTimeout(resolve, 2000)); + yield { type: 'finish', finishReason: 'stop' }; + }), + resumeForChat: jest.fn(() => toStream([{ type: 'finish', finishReason: 'stop' }])), + }; + + new AgentChatBridge( + bot as unknown as ChatBotLike, + 'agent-1', + agentExecutor as never, + componentMapper, + logger, + 'project-1', + slackIntegration, + ); + + const run = handlers.mention!(thread, { + text: 'hi', + raw: { + type: 'message', + channel: 'D123', + channel_type: 'im', + ts: '1779466577.518139', + }, + author: { userId: 'u1', userName: 'user1' }, + }); + + // Let the response flush and clear the status, then run past the retry + // delay and finish the stream. + await jest.advanceTimersByTimeAsync(2000); + await run; + + const thinkingCalls = setAssistantStatus.mock.calls.filter((c) => c[2] === 'Thinking...'); + const clearCalls = setAssistantStatus.mock.calls.filter((c) => c[2] === ''); + // The cleared retry must not re-set "Thinking..." — only the initial set. + expect(thinkingCalls).toHaveLength(1); + expect(clearCalls).toHaveLength(1); + // The last status written must be the clear, never a stale "Thinking...". + expect(setAssistantStatus.mock.calls.at(-1)?.[2]).toBe(''); + }); + + it('waits for an in-flight Slack DM status set to settle before clearing', async () => { + const { bot, handlers } = makeBot(); + // Keep the initial "Thinking..." set in flight; the empty-status clear + // resolves immediately. Aborting can't recall an in-flight remote write, + // so the clear must wait for the set to land before overwriting it. + let resolveSet!: () => void; + const setInFlight = new Promise((resolve) => { + resolveSet = resolve; + }); + const setAssistantStatus = jest.fn(async (_channel: string, _ts: string, status: string) => { + if (status === 'Thinking...') await setInFlight; + }); + bot.getAdapter.mockReturnValue({ setAssistantStatus }); + const thread = makeThread(); + const agentExecutor = { + executeForChatPublished: jest.fn(() => + toStream([ + { type: 'text-delta', id: 't1', delta: 'Hello' }, + { type: 'finish', finishReason: 'stop' }, + ]), + ), + resumeForChat: jest.fn(() => toStream([{ type: 'finish', finishReason: 'stop' }])), + }; + + new AgentChatBridge( + bot as unknown as ChatBotLike, + 'agent-1', + agentExecutor as never, + componentMapper, + logger, + 'project-1', + slackIntegration, + ); + + const run = handlers.mention!(thread, { + text: 'hi', + raw: { + type: 'message', + channel: 'D123', + channel_type: 'im', + ts: '1779466577.518139', + }, + author: { userId: 'u1', userName: 'user1' }, + }); + + // Drain everything that can proceed. The clear is blocked on the + // in-flight set, so the empty-status write must not have happened yet. + for (let i = 0; i < 10; i++) await new Promise((resolve) => setImmediate(resolve)); + + expect(setAssistantStatus).toHaveBeenCalledTimes(1); + expect(setAssistantStatus).toHaveBeenLastCalledWith( + 'D123', + '1779466577.518139', + 'Thinking...', + ['Thinking...'], + ); + + // Let the in-flight set land; only now may the clear overwrite it. + resolveSet(); + await run; + + expect(setAssistantStatus).toHaveBeenCalledTimes(2); + expect(setAssistantStatus).toHaveBeenLastCalledWith('D123', '1779466577.518139', ''); + }); + it('sets a thinking status before resuming a Slack action', async () => { const { bot, handlers } = makeBot(); const thread = makeThread(); diff --git a/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts b/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts index 074455670be..7a94f9749ea 100644 --- a/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts +++ b/packages/cli/src/modules/agents/integrations/agent-chat-bridge.ts @@ -27,6 +27,7 @@ interface SlackThreadContext { channelId: string; threadTs: string; hasRealThreadTs: boolean; + isDm: boolean; } interface SlackAssistantStatusAdapter { @@ -38,6 +39,10 @@ interface SlackAssistantStatusAdapter { ): Promise; } +interface SlackAssistantStatusHandle { + clearBeforeResponse(): Promise; +} + interface AgentExecutor { executeForChatPublished(config: { agentId: string; @@ -303,8 +308,8 @@ export class AgentChatBridge { // startThinkingStatus (Slack assistant.threads.setStatus) and the lazy // `message.subject` fetch are both remote round-trips on independent // resources — run them concurrently. - const [, subject] = await Promise.all([ - this.startThinkingStatus(thread, slackThreadContext, statusRetry.signal), + const [statusHandle, subject] = await Promise.all([ + this.startThinkingStatus(thread, slackThreadContext, statusRetry), this.resolveMessageSubject(message), ]); await this.updateLatestMessageContext(threadId.id, message.author.userId, thread, { @@ -331,6 +336,7 @@ export class AgentChatBridge { try { await this.consumeStream(stream, thread, { forceBuffered: this.integration.type === 'slack' && !useNativeSlackThreadFeatures, + statusHandle, }); } finally { statusRetry.abort(); @@ -356,10 +362,12 @@ export class AgentChatBridge { private async consumeStream( stream: AsyncGenerator, thread: Thread, - options: { forceBuffered?: boolean } = {}, + options: { forceBuffered?: boolean; statusHandle?: SlackAssistantStatusHandle } = {}, ): Promise { if (this.disableStreaming || options.forceBuffered) { - await this.consumeStreamBuffered(stream, thread); + await this.consumeStreamBuffered(stream, thread, { + statusHandle: options.statusHandle, + }); return; } @@ -442,19 +450,24 @@ export class AgentChatBridge { const ensureStreamingPost = () => { if (!streamingPost) startStreamingPost(); }; + const responseLifecycle = this.createResponseLifecycle({ + statusHandle: options.statusHandle, + ensureStreamingPost, + endStreamingPost, + }); try { for await (const chunk of stream) { switch (chunk.type) { case 'text-delta': { const { delta } = chunk; - ensureStreamingPost(); + await responseLifecycle.startStreamingResponse(); textStream.yield?.(delta); break; } case 'reasoning-delta': { const { delta } = chunk; - ensureStreamingPost(); + await responseLifecycle.startStreamingResponse(); textStream.yield?.(`_${delta}_`); break; } @@ -463,24 +476,24 @@ export class AgentChatBridge { break; case 'tool-call-suspended': this.richInteractionInputs.delete(chunk.toolCallId); - await endStreamingPost(); + await responseLifecycle.startDiscreteResponse(); await this.handleSuspension(chunk, thread); // Don't start new streaming post — wait for next text delta break; case 'tool-result': if (this.isRichInteractionDisplayOnly(chunk)) { - await endStreamingPost(); + await responseLifecycle.startDiscreteResponse(); await this.handleDisplayOnly(chunk, thread); } else { this.richInteractionInputs.delete(chunk.toolCallId); } break; case 'message': - await endStreamingPost(); + await responseLifecycle.startDiscreteResponse(); await this.handleMessage(chunk, thread); break; case 'error': - await endStreamingPost(); + await responseLifecycle.startDiscreteResponse(); await this.postErrorToThread(thread, chunk.error); break; default: @@ -493,11 +506,40 @@ export class AgentChatBridge { // Always end the streaming post and drop stashed tool-call inputs so // a stream that errors mid-flight between `tool-call` and the // matching `tool-result` does not leak entries. - await endStreamingPost(); + await responseLifecycle.finish(); this.richInteractionInputs.clear(); } } + private createResponseLifecycle(options: { + statusHandle?: SlackAssistantStatusHandle; + ensureStreamingPost?: () => void; + endStreamingPost?: () => Promise; + }) { + let responseStarted = false; + + const clearStatusBeforeFirstResponse = async () => { + if (responseStarted) return; + responseStarted = true; + await options.statusHandle?.clearBeforeResponse(); + }; + + return { + startStreamingResponse: async () => { + await clearStatusBeforeFirstResponse(); + options.ensureStreamingPost?.(); + }, + startDiscreteResponse: async () => { + await options.endStreamingPost?.(); + await clearStatusBeforeFirstResponse(); + }, + finish: async () => { + await options.endStreamingPost?.(); + await clearStatusBeforeFirstResponse(); + }, + }; + } + /** * Buffered consumer — accumulates text/reasoning deltas and posts them as a * single message per flush. Used when the integration disables streaming @@ -506,14 +548,19 @@ export class AgentChatBridge { private async consumeStreamBuffered( stream: AsyncGenerator, thread: Thread, + options: { statusHandle?: SlackAssistantStatusHandle } = {}, ): Promise { let buffer = ''; + const responseLifecycle = this.createResponseLifecycle({ + statusHandle: options.statusHandle, + }); const flushBuffer = async () => { const text = buffer; buffer = ''; if (!text.trim()) return; try { + await responseLifecycle.startDiscreteResponse(); // Chat SDK's streaming path wraps accumulated deltas as `{ markdown }` // so the platform adapter applies its markdown parse-mode (Telegram: // sendMessage with parse_mode=Markdown). A raw string bypasses that @@ -543,11 +590,13 @@ export class AgentChatBridge { case 'tool-call-suspended': this.richInteractionInputs.delete(chunk.toolCallId); await flushBuffer(); + await responseLifecycle.startDiscreteResponse(); await this.handleSuspension(chunk, thread); break; case 'tool-result': if (this.isRichInteractionDisplayOnly(chunk)) { await flushBuffer(); + await responseLifecycle.startDiscreteResponse(); await this.handleDisplayOnly(chunk, thread); } else { this.richInteractionInputs.delete(chunk.toolCallId); @@ -555,10 +604,12 @@ export class AgentChatBridge { break; case 'message': await flushBuffer(); + await responseLifecycle.startDiscreteResponse(); await this.handleMessage(chunk, thread); break; case 'error': await flushBuffer(); + await responseLifecycle.startDiscreteResponse(); await this.postErrorToThread(thread, chunk.error); break; default: @@ -567,6 +618,7 @@ export class AgentChatBridge { } } finally { await flushBuffer(); + await responseLifecycle.finish(); this.richInteractionInputs.clear(); } } @@ -912,13 +964,29 @@ export class AgentChatBridge { private async startThinkingStatus( thread: Thread, slackThreadContext?: SlackThreadContext, - statusRetrySignal?: AbortSignal, - ): Promise { + statusRetry?: AbortController, + ): Promise { if (this.integration.type !== 'slack') return; if (slackThreadContext && !slackThreadContext.hasRealThreadTs) { - this.setSlackAssistantStatus(slackThreadContext, statusRetrySignal); - return; + const setStatus = this.setSlackAssistantStatus(slackThreadContext, statusRetry?.signal); + return slackThreadContext.isDm + ? { + clearBeforeResponse: async () => { + // Cancel any pending status retry first: the retry waits out a + // delay before re-setting "Thinking...", and without this it could + // fire *after* we clear and leave a stale status behind. + statusRetry?.abort(); + // Then wait for the set to settle. Aborting only cancels the + // retry's local wait — an *in-flight* "Thinking..." write can't + // be recalled, so we must let it land before we clear, otherwise + // its remote write could overwrite the clear and restore the + // stale status. (setStatus never rejects — it logs internally.) + await setStatus; + await this.clearSlackAssistantStatus(slackThreadContext); + }, + } + : undefined; } try { @@ -930,16 +998,39 @@ export class AgentChatBridge { error: error instanceof Error ? error.message : String(error), }); } + return undefined; } - private setSlackAssistantStatus( + /** + * Kick off the "Thinking..." status set (with retry). Returns the in-flight + * promise so callers can await it before clearing — see `clearBeforeResponse` + * in `startThinkingStatus`. The returned promise never rejects; failures are + * logged inside the retry helper. + */ + private async setSlackAssistantStatus( context: SlackThreadContext, statusRetrySignal?: AbortSignal, - ): void { + ): Promise { const adapter = this.getSlackAssistantStatusAdapter(); if (!adapter) return; - void this.setSlackAssistantStatusWithRetry(adapter, context, statusRetrySignal); + await this.setSlackAssistantStatusWithRetry(adapter, context, statusRetrySignal); + } + + private async clearSlackAssistantStatus(context: SlackThreadContext): Promise { + const adapter = this.getSlackAssistantStatusAdapter(); + if (!adapter) return; + + try { + await adapter.setAssistantStatus(context.channelId, context.threadTs, ''); + } catch (error) { + this.logger.warn('[AgentChatBridge] Failed to clear Slack assistant status', { + agentId: this.agentId, + channelId: context.channelId, + threadTs: context.threadTs, + error: error instanceof Error ? error.message : String(error), + }); + } } private async setSlackAssistantStatusWithRetry( @@ -965,6 +1056,9 @@ export class AgentChatBridge { } if (!(await sleep(SLACK_STATUS_RETRY_DELAY_MS, statusRetrySignal))) return; + // The status may have been cleared while we were sleeping. Bail out so the + // retry doesn't re-set "Thinking..." over an already-cleared status. + if (statusRetrySignal?.aborted) return; try { await adapter.setAssistantStatus(context.channelId, context.threadTs, SLACK_THINKING_STATUS, [ @@ -997,6 +1091,7 @@ export class AgentChatBridge { if (!isRecord(raw)) return undefined; const channelId = stringValue(raw.channel); + const channelType = stringValue(raw.channel_type); const realThreadTs = stringValue(raw.thread_ts); const threadTs = realThreadTs ?? stringValue(raw.ts); if (!channelId || !threadTs) return undefined; @@ -1005,6 +1100,7 @@ export class AgentChatBridge { channelId, threadTs, hasRealThreadTs: realThreadTs !== undefined, + isDm: channelType === 'im', }; }