mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-04 10:39:23 +02:00
fix(core): Agents correctly clear thinking status once they respond in Slack DMs (no-changelog) (#31591)
This commit is contained in:
parent
3c46e3155c
commit
64d4a7457b
|
|
@ -441,6 +441,56 @@ describe('AgentChatBridge — consumeStream', () => {
|
|||
expect(thread.post).toHaveBeenCalledWith({ markdown: 'Hello' });
|
||||
});
|
||||
|
||||
it('clears assistant status before responding directly to top-level Slack DMs', async () => {
|
||||
const { bot, handlers } = makeBot();
|
||||
const setAssistantStatus = jest.fn().mockResolvedValue(undefined);
|
||||
bot.getAdapter.mockReturnValue({ setAssistantStatus });
|
||||
const thread = makeThread();
|
||||
const agentExecutor = {
|
||||
executeForChatPublished: jest.fn(() =>
|
||||
toStream([
|
||||
{ type: 'text-delta', id: 't1', delta: 'Hello' },
|
||||
{ type: 'finish', finishReason: 'stop' },
|
||||
]),
|
||||
),
|
||||
resumeForChat: jest.fn(() => toStream([{ type: 'finish', finishReason: 'stop' }])),
|
||||
};
|
||||
|
||||
new AgentChatBridge(
|
||||
bot as unknown as ChatBotLike,
|
||||
'agent-1',
|
||||
agentExecutor as never,
|
||||
componentMapper,
|
||||
logger,
|
||||
'project-1',
|
||||
slackIntegration,
|
||||
);
|
||||
|
||||
await handlers.mention!(thread, {
|
||||
text: 'hi',
|
||||
raw: {
|
||||
type: 'message',
|
||||
channel: 'D123',
|
||||
channel_type: 'im',
|
||||
ts: '1779466577.518139',
|
||||
},
|
||||
author: { userId: 'u1', userName: 'user1' },
|
||||
});
|
||||
|
||||
expect(setAssistantStatus).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
'D123',
|
||||
'1779466577.518139',
|
||||
'Thinking...',
|
||||
['Thinking...'],
|
||||
);
|
||||
expect(setAssistantStatus).toHaveBeenNthCalledWith(2, 'D123', '1779466577.518139', '');
|
||||
expect(setAssistantStatus.mock.invocationCallOrder[1]).toBeLessThan(
|
||||
thread.post.mock.invocationCallOrder[0],
|
||||
);
|
||||
expect(thread.post).toHaveBeenCalledWith({ markdown: 'Hello' });
|
||||
});
|
||||
|
||||
it('retries top-level Slack assistant status when Slack has not materialized the thread yet', async () => {
|
||||
jest.useFakeTimers();
|
||||
const { bot, handlers } = makeBot();
|
||||
|
|
@ -493,6 +543,131 @@ describe('AgentChatBridge — consumeStream', () => {
|
|||
await run;
|
||||
});
|
||||
|
||||
it('does not re-set Slack DM status with a stale retry after it has been cleared', async () => {
|
||||
jest.useFakeTimers();
|
||||
const { bot, handlers } = makeBot();
|
||||
const invalidThreadError = Object.assign(new Error('invalid_thread_ts'), {
|
||||
data: { error: 'invalid_thread_ts' },
|
||||
});
|
||||
const setAssistantStatus = jest
|
||||
.fn()
|
||||
.mockRejectedValueOnce(invalidThreadError)
|
||||
.mockResolvedValue(undefined);
|
||||
bot.getAdapter.mockReturnValue({ setAssistantStatus });
|
||||
const thread = makeThread();
|
||||
const agentExecutor = {
|
||||
// Respond (which clears the status) while the initial "Thinking..."
|
||||
// set is still waiting out its retry delay, then keep the stream open
|
||||
// past that delay so the retry would otherwise fire after the clear.
|
||||
executeForChatPublished: jest.fn(async function* (): AsyncGenerator<StreamChunk> {
|
||||
yield { type: 'text-delta', id: 't1', delta: 'Hello' };
|
||||
yield { type: 'message', message: { role: 'assistant', content: [] } };
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
yield { type: 'finish', finishReason: 'stop' };
|
||||
}),
|
||||
resumeForChat: jest.fn(() => toStream([{ type: 'finish', finishReason: 'stop' }])),
|
||||
};
|
||||
|
||||
new AgentChatBridge(
|
||||
bot as unknown as ChatBotLike,
|
||||
'agent-1',
|
||||
agentExecutor as never,
|
||||
componentMapper,
|
||||
logger,
|
||||
'project-1',
|
||||
slackIntegration,
|
||||
);
|
||||
|
||||
const run = handlers.mention!(thread, {
|
||||
text: 'hi',
|
||||
raw: {
|
||||
type: 'message',
|
||||
channel: 'D123',
|
||||
channel_type: 'im',
|
||||
ts: '1779466577.518139',
|
||||
},
|
||||
author: { userId: 'u1', userName: 'user1' },
|
||||
});
|
||||
|
||||
// Let the response flush and clear the status, then run past the retry
|
||||
// delay and finish the stream.
|
||||
await jest.advanceTimersByTimeAsync(2000);
|
||||
await run;
|
||||
|
||||
const thinkingCalls = setAssistantStatus.mock.calls.filter((c) => c[2] === 'Thinking...');
|
||||
const clearCalls = setAssistantStatus.mock.calls.filter((c) => c[2] === '');
|
||||
// The cleared retry must not re-set "Thinking..." — only the initial set.
|
||||
expect(thinkingCalls).toHaveLength(1);
|
||||
expect(clearCalls).toHaveLength(1);
|
||||
// The last status written must be the clear, never a stale "Thinking...".
|
||||
expect(setAssistantStatus.mock.calls.at(-1)?.[2]).toBe('');
|
||||
});
|
||||
|
||||
it('waits for an in-flight Slack DM status set to settle before clearing', async () => {
|
||||
const { bot, handlers } = makeBot();
|
||||
// Keep the initial "Thinking..." set in flight; the empty-status clear
|
||||
// resolves immediately. Aborting can't recall an in-flight remote write,
|
||||
// so the clear must wait for the set to land before overwriting it.
|
||||
let resolveSet!: () => void;
|
||||
const setInFlight = new Promise<void>((resolve) => {
|
||||
resolveSet = resolve;
|
||||
});
|
||||
const setAssistantStatus = jest.fn(async (_channel: string, _ts: string, status: string) => {
|
||||
if (status === 'Thinking...') await setInFlight;
|
||||
});
|
||||
bot.getAdapter.mockReturnValue({ setAssistantStatus });
|
||||
const thread = makeThread();
|
||||
const agentExecutor = {
|
||||
executeForChatPublished: jest.fn(() =>
|
||||
toStream([
|
||||
{ type: 'text-delta', id: 't1', delta: 'Hello' },
|
||||
{ type: 'finish', finishReason: 'stop' },
|
||||
]),
|
||||
),
|
||||
resumeForChat: jest.fn(() => toStream([{ type: 'finish', finishReason: 'stop' }])),
|
||||
};
|
||||
|
||||
new AgentChatBridge(
|
||||
bot as unknown as ChatBotLike,
|
||||
'agent-1',
|
||||
agentExecutor as never,
|
||||
componentMapper,
|
||||
logger,
|
||||
'project-1',
|
||||
slackIntegration,
|
||||
);
|
||||
|
||||
const run = handlers.mention!(thread, {
|
||||
text: 'hi',
|
||||
raw: {
|
||||
type: 'message',
|
||||
channel: 'D123',
|
||||
channel_type: 'im',
|
||||
ts: '1779466577.518139',
|
||||
},
|
||||
author: { userId: 'u1', userName: 'user1' },
|
||||
});
|
||||
|
||||
// Drain everything that can proceed. The clear is blocked on the
|
||||
// in-flight set, so the empty-status write must not have happened yet.
|
||||
for (let i = 0; i < 10; i++) await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(setAssistantStatus).toHaveBeenCalledTimes(1);
|
||||
expect(setAssistantStatus).toHaveBeenLastCalledWith(
|
||||
'D123',
|
||||
'1779466577.518139',
|
||||
'Thinking...',
|
||||
['Thinking...'],
|
||||
);
|
||||
|
||||
// Let the in-flight set land; only now may the clear overwrite it.
|
||||
resolveSet();
|
||||
await run;
|
||||
|
||||
expect(setAssistantStatus).toHaveBeenCalledTimes(2);
|
||||
expect(setAssistantStatus).toHaveBeenLastCalledWith('D123', '1779466577.518139', '');
|
||||
});
|
||||
|
||||
it('sets a thinking status before resuming a Slack action', async () => {
|
||||
const { bot, handlers } = makeBot();
|
||||
const thread = makeThread();
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ interface SlackThreadContext {
|
|||
channelId: string;
|
||||
threadTs: string;
|
||||
hasRealThreadTs: boolean;
|
||||
isDm: boolean;
|
||||
}
|
||||
|
||||
interface SlackAssistantStatusAdapter {
|
||||
|
|
@ -38,6 +39,10 @@ interface SlackAssistantStatusAdapter {
|
|||
): Promise<void>;
|
||||
}
|
||||
|
||||
interface SlackAssistantStatusHandle {
|
||||
clearBeforeResponse(): Promise<void>;
|
||||
}
|
||||
|
||||
interface AgentExecutor {
|
||||
executeForChatPublished(config: {
|
||||
agentId: string;
|
||||
|
|
@ -303,8 +308,8 @@ export class AgentChatBridge {
|
|||
// startThinkingStatus (Slack assistant.threads.setStatus) and the lazy
|
||||
// `message.subject` fetch are both remote round-trips on independent
|
||||
// resources — run them concurrently.
|
||||
const [, subject] = await Promise.all([
|
||||
this.startThinkingStatus(thread, slackThreadContext, statusRetry.signal),
|
||||
const [statusHandle, subject] = await Promise.all([
|
||||
this.startThinkingStatus(thread, slackThreadContext, statusRetry),
|
||||
this.resolveMessageSubject(message),
|
||||
]);
|
||||
await this.updateLatestMessageContext(threadId.id, message.author.userId, thread, {
|
||||
|
|
@ -331,6 +336,7 @@ export class AgentChatBridge {
|
|||
try {
|
||||
await this.consumeStream(stream, thread, {
|
||||
forceBuffered: this.integration.type === 'slack' && !useNativeSlackThreadFeatures,
|
||||
statusHandle,
|
||||
});
|
||||
} finally {
|
||||
statusRetry.abort();
|
||||
|
|
@ -356,10 +362,12 @@ export class AgentChatBridge {
|
|||
private async consumeStream(
|
||||
stream: AsyncGenerator<StreamChunk>,
|
||||
thread: Thread,
|
||||
options: { forceBuffered?: boolean } = {},
|
||||
options: { forceBuffered?: boolean; statusHandle?: SlackAssistantStatusHandle } = {},
|
||||
): Promise<void> {
|
||||
if (this.disableStreaming || options.forceBuffered) {
|
||||
await this.consumeStreamBuffered(stream, thread);
|
||||
await this.consumeStreamBuffered(stream, thread, {
|
||||
statusHandle: options.statusHandle,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -442,19 +450,24 @@ export class AgentChatBridge {
|
|||
const ensureStreamingPost = () => {
|
||||
if (!streamingPost) startStreamingPost();
|
||||
};
|
||||
const responseLifecycle = this.createResponseLifecycle({
|
||||
statusHandle: options.statusHandle,
|
||||
ensureStreamingPost,
|
||||
endStreamingPost,
|
||||
});
|
||||
|
||||
try {
|
||||
for await (const chunk of stream) {
|
||||
switch (chunk.type) {
|
||||
case 'text-delta': {
|
||||
const { delta } = chunk;
|
||||
ensureStreamingPost();
|
||||
await responseLifecycle.startStreamingResponse();
|
||||
textStream.yield?.(delta);
|
||||
break;
|
||||
}
|
||||
case 'reasoning-delta': {
|
||||
const { delta } = chunk;
|
||||
ensureStreamingPost();
|
||||
await responseLifecycle.startStreamingResponse();
|
||||
textStream.yield?.(`_${delta}_`);
|
||||
break;
|
||||
}
|
||||
|
|
@ -463,24 +476,24 @@ export class AgentChatBridge {
|
|||
break;
|
||||
case 'tool-call-suspended':
|
||||
this.richInteractionInputs.delete(chunk.toolCallId);
|
||||
await endStreamingPost();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.handleSuspension(chunk, thread);
|
||||
// Don't start new streaming post — wait for next text delta
|
||||
break;
|
||||
case 'tool-result':
|
||||
if (this.isRichInteractionDisplayOnly(chunk)) {
|
||||
await endStreamingPost();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.handleDisplayOnly(chunk, thread);
|
||||
} else {
|
||||
this.richInteractionInputs.delete(chunk.toolCallId);
|
||||
}
|
||||
break;
|
||||
case 'message':
|
||||
await endStreamingPost();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.handleMessage(chunk, thread);
|
||||
break;
|
||||
case 'error':
|
||||
await endStreamingPost();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.postErrorToThread(thread, chunk.error);
|
||||
break;
|
||||
default:
|
||||
|
|
@ -493,11 +506,40 @@ export class AgentChatBridge {
|
|||
// Always end the streaming post and drop stashed tool-call inputs so
|
||||
// a stream that errors mid-flight between `tool-call` and the
|
||||
// matching `tool-result` does not leak entries.
|
||||
await endStreamingPost();
|
||||
await responseLifecycle.finish();
|
||||
this.richInteractionInputs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private createResponseLifecycle(options: {
|
||||
statusHandle?: SlackAssistantStatusHandle;
|
||||
ensureStreamingPost?: () => void;
|
||||
endStreamingPost?: () => Promise<void>;
|
||||
}) {
|
||||
let responseStarted = false;
|
||||
|
||||
const clearStatusBeforeFirstResponse = async () => {
|
||||
if (responseStarted) return;
|
||||
responseStarted = true;
|
||||
await options.statusHandle?.clearBeforeResponse();
|
||||
};
|
||||
|
||||
return {
|
||||
startStreamingResponse: async () => {
|
||||
await clearStatusBeforeFirstResponse();
|
||||
options.ensureStreamingPost?.();
|
||||
},
|
||||
startDiscreteResponse: async () => {
|
||||
await options.endStreamingPost?.();
|
||||
await clearStatusBeforeFirstResponse();
|
||||
},
|
||||
finish: async () => {
|
||||
await options.endStreamingPost?.();
|
||||
await clearStatusBeforeFirstResponse();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffered consumer — accumulates text/reasoning deltas and posts them as a
|
||||
* single message per flush. Used when the integration disables streaming
|
||||
|
|
@ -506,14 +548,19 @@ export class AgentChatBridge {
|
|||
private async consumeStreamBuffered(
|
||||
stream: AsyncGenerator<StreamChunk>,
|
||||
thread: Thread,
|
||||
options: { statusHandle?: SlackAssistantStatusHandle } = {},
|
||||
): Promise<void> {
|
||||
let buffer = '';
|
||||
const responseLifecycle = this.createResponseLifecycle({
|
||||
statusHandle: options.statusHandle,
|
||||
});
|
||||
|
||||
const flushBuffer = async () => {
|
||||
const text = buffer;
|
||||
buffer = '';
|
||||
if (!text.trim()) return;
|
||||
try {
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
// Chat SDK's streaming path wraps accumulated deltas as `{ markdown }`
|
||||
// so the platform adapter applies its markdown parse-mode (Telegram:
|
||||
// sendMessage with parse_mode=Markdown). A raw string bypasses that
|
||||
|
|
@ -543,11 +590,13 @@ export class AgentChatBridge {
|
|||
case 'tool-call-suspended':
|
||||
this.richInteractionInputs.delete(chunk.toolCallId);
|
||||
await flushBuffer();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.handleSuspension(chunk, thread);
|
||||
break;
|
||||
case 'tool-result':
|
||||
if (this.isRichInteractionDisplayOnly(chunk)) {
|
||||
await flushBuffer();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.handleDisplayOnly(chunk, thread);
|
||||
} else {
|
||||
this.richInteractionInputs.delete(chunk.toolCallId);
|
||||
|
|
@ -555,10 +604,12 @@ export class AgentChatBridge {
|
|||
break;
|
||||
case 'message':
|
||||
await flushBuffer();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.handleMessage(chunk, thread);
|
||||
break;
|
||||
case 'error':
|
||||
await flushBuffer();
|
||||
await responseLifecycle.startDiscreteResponse();
|
||||
await this.postErrorToThread(thread, chunk.error);
|
||||
break;
|
||||
default:
|
||||
|
|
@ -567,6 +618,7 @@ export class AgentChatBridge {
|
|||
}
|
||||
} finally {
|
||||
await flushBuffer();
|
||||
await responseLifecycle.finish();
|
||||
this.richInteractionInputs.clear();
|
||||
}
|
||||
}
|
||||
|
|
@ -912,13 +964,29 @@ export class AgentChatBridge {
|
|||
private async startThinkingStatus(
|
||||
thread: Thread<unknown, unknown>,
|
||||
slackThreadContext?: SlackThreadContext,
|
||||
statusRetrySignal?: AbortSignal,
|
||||
): Promise<void> {
|
||||
statusRetry?: AbortController,
|
||||
): Promise<SlackAssistantStatusHandle | undefined> {
|
||||
if (this.integration.type !== 'slack') return;
|
||||
|
||||
if (slackThreadContext && !slackThreadContext.hasRealThreadTs) {
|
||||
this.setSlackAssistantStatus(slackThreadContext, statusRetrySignal);
|
||||
return;
|
||||
const setStatus = this.setSlackAssistantStatus(slackThreadContext, statusRetry?.signal);
|
||||
return slackThreadContext.isDm
|
||||
? {
|
||||
clearBeforeResponse: async () => {
|
||||
// Cancel any pending status retry first: the retry waits out a
|
||||
// delay before re-setting "Thinking...", and without this it could
|
||||
// fire *after* we clear and leave a stale status behind.
|
||||
statusRetry?.abort();
|
||||
// Then wait for the set to settle. Aborting only cancels the
|
||||
// retry's local wait — an *in-flight* "Thinking..." write can't
|
||||
// be recalled, so we must let it land before we clear, otherwise
|
||||
// its remote write could overwrite the clear and restore the
|
||||
// stale status. (setStatus never rejects — it logs internally.)
|
||||
await setStatus;
|
||||
await this.clearSlackAssistantStatus(slackThreadContext);
|
||||
},
|
||||
}
|
||||
: undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
@ -930,16 +998,39 @@ export class AgentChatBridge {
|
|||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private setSlackAssistantStatus(
|
||||
/**
|
||||
* Kick off the "Thinking..." status set (with retry). Returns the in-flight
|
||||
* promise so callers can await it before clearing — see `clearBeforeResponse`
|
||||
* in `startThinkingStatus`. The returned promise never rejects; failures are
|
||||
* logged inside the retry helper.
|
||||
*/
|
||||
private async setSlackAssistantStatus(
|
||||
context: SlackThreadContext,
|
||||
statusRetrySignal?: AbortSignal,
|
||||
): void {
|
||||
): Promise<void> {
|
||||
const adapter = this.getSlackAssistantStatusAdapter();
|
||||
if (!adapter) return;
|
||||
|
||||
void this.setSlackAssistantStatusWithRetry(adapter, context, statusRetrySignal);
|
||||
await this.setSlackAssistantStatusWithRetry(adapter, context, statusRetrySignal);
|
||||
}
|
||||
|
||||
private async clearSlackAssistantStatus(context: SlackThreadContext): Promise<void> {
|
||||
const adapter = this.getSlackAssistantStatusAdapter();
|
||||
if (!adapter) return;
|
||||
|
||||
try {
|
||||
await adapter.setAssistantStatus(context.channelId, context.threadTs, '');
|
||||
} catch (error) {
|
||||
this.logger.warn('[AgentChatBridge] Failed to clear Slack assistant status', {
|
||||
agentId: this.agentId,
|
||||
channelId: context.channelId,
|
||||
threadTs: context.threadTs,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async setSlackAssistantStatusWithRetry(
|
||||
|
|
@ -965,6 +1056,9 @@ export class AgentChatBridge {
|
|||
}
|
||||
|
||||
if (!(await sleep(SLACK_STATUS_RETRY_DELAY_MS, statusRetrySignal))) return;
|
||||
// The status may have been cleared while we were sleeping. Bail out so the
|
||||
// retry doesn't re-set "Thinking..." over an already-cleared status.
|
||||
if (statusRetrySignal?.aborted) return;
|
||||
|
||||
try {
|
||||
await adapter.setAssistantStatus(context.channelId, context.threadTs, SLACK_THINKING_STATUS, [
|
||||
|
|
@ -997,6 +1091,7 @@ export class AgentChatBridge {
|
|||
if (!isRecord(raw)) return undefined;
|
||||
|
||||
const channelId = stringValue(raw.channel);
|
||||
const channelType = stringValue(raw.channel_type);
|
||||
const realThreadTs = stringValue(raw.thread_ts);
|
||||
const threadTs = realThreadTs ?? stringValue(raw.ts);
|
||||
if (!channelId || !threadTs) return undefined;
|
||||
|
|
@ -1005,6 +1100,7 @@ export class AgentChatBridge {
|
|||
channelId,
|
||||
threadTs,
|
||||
hasRealThreadTs: realThreadTs !== undefined,
|
||||
isDm: channelType === 'im',
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user