From e413a7740d7a170df271473dd997d90b02f6a49c Mon Sep 17 00:00:00 2001 From: Jaakko Husso Date: Mon, 1 Jun 2026 12:55:19 +0300 Subject: [PATCH] feat(core): Persist pending confirmations on database (#31052) Co-authored-by: Claude Opus 4.7 --- .../src/schemas/instance-ai.schema.ts | 1 + packages/@n8n/instance-ai/src/index.ts | 3 + .../resumable-stream-executor.test.ts | 2 + .../runtime/__tests__/stream-runner.test.ts | 1 + .../src/stream/consume-with-hitl.ts | 83 ++++ .../__tests__/blueprint-accumulator.test.ts | 72 +++ .../__tests__/plan-with-agent.tool.test.ts | 90 +++- .../orchestration/plan-with-agent.tool.ts | 437 ++++++++++++------ packages/@n8n/instance-ai/src/types.ts | 28 ++ .../utils/__tests__/stream-helpers.test.ts | 4 + .../instance-ai/src/utils/stream-helpers.ts | 4 +- .../instance-ai-memory.service.test.ts | 10 +- .../__tests__/message-parser.test.ts | 381 ++++++++++++++- .../instance-ai/instance-ai-memory.service.ts | 29 +- .../instance-ai/instance-ai.service.ts | 8 + .../src/modules/instance-ai/message-parser.ts | 230 ++++++--- ...ai-pending-confirmation.repository.test.ts | 25 +- .../instance-ai-checkpoint.repository.ts | 17 + ...ance-ai-pending-confirmation.repository.ts | 29 +- .../typeorm-agent-checkpoint-store.test.ts | 51 ++ .../storage/typeorm-agent-checkpoint-store.ts | 35 ++ .../frontend/@n8n/i18n/src/locales/en.json | 2 + .../instanceAi/components/AgentTimeline.vue | 40 +- .../instanceAi/components/PlanReviewPanel.vue | 32 +- .../ai/instanceAi/instanceAi.threadRuntime.ts | 5 + 25 files changed, 1391 insertions(+), 228 deletions(-) diff --git a/packages/@n8n/api-types/src/schemas/instance-ai.schema.ts b/packages/@n8n/api-types/src/schemas/instance-ai.schema.ts index 747873950ab..beb90a29b43 100644 --- a/packages/@n8n/api-types/src/schemas/instance-ai.schema.ts +++ b/packages/@n8n/api-types/src/schemas/instance-ai.schema.ts @@ -727,6 +727,7 @@ export interface InstanceAiConfirmation { introMessage?: string; tasks?: TaskList; resourceDecision?: GatewayConfirmationRequiredPayload; + expired?: boolean; } export interface InstanceAiToolCallState { diff --git a/packages/@n8n/instance-ai/src/index.ts b/packages/@n8n/instance-ai/src/index.ts index c8d46cfbf3e..2d693a391ed 100644 --- a/packages/@n8n/instance-ai/src/index.ts +++ b/packages/@n8n/instance-ai/src/index.ts @@ -100,6 +100,8 @@ export type { AgentMessage, BuiltMemory, CheckpointStore, + ContentToolCall, + MessageContent, SerializableAgentState, Thread, } from '@n8n/agents'; @@ -190,6 +192,7 @@ export const createSubAgent: typeof SubAgentFactoryMod.createSubAgent = lazyFunc ); export { createAllTools, createOrchestrationTools } from './tools'; export { + createSubAgentResourceId, createSubAgentResourceIdPrefix, SUB_AGENT_RESOURCE_PREFIX, } from './tools/orchestration/agent-persistence'; diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts index b2ababe9406..8e047826b4b 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/resumable-stream-executor.test.ts @@ -139,6 +139,7 @@ describe('executeResumableStream', () => { toolCallId: 'tool-call-1', requestId: 'request-1', toolName: 'ask-user', + suspendPayload: { requestId: 'request-1', message: 'Need approval' }, }, }), ); @@ -479,6 +480,7 @@ describe('executeResumableStream', () => { requestId: 'request-1', toolCallId: 'tool-call-1', toolName: 'pause-for-user', + suspendPayload: { requestId: 'request-1', message: 'First confirmation' }, }); expect(waitForConfirmation).toHaveBeenCalledTimes(1); expect(waitForConfirmation).toHaveBeenCalledWith('request-1'); diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts index 5598633b3f3..c62ce04631b 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/stream-runner.test.ts @@ -136,6 +136,7 @@ describe('streamAgentRun', () => { requestId: 'request-1', toolCallId: 'tool-call-1', toolName: 'pause-for-user', + suspendPayload: {}, }, confirmationEvent: { type: 'confirmation-request', diff --git a/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts b/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts index ad6c6a788bd..0c066f99939 100644 --- a/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts +++ b/packages/@n8n/instance-ai/src/stream/consume-with-hitl.ts @@ -1,3 +1,5 @@ +import type { InstanceAiEvent } from '@n8n/api-types'; + import type { InstanceAiEventBus } from '../event-bus/event-bus.interface'; import type { Logger } from '../logger'; import { @@ -6,6 +8,9 @@ import { type TraceStatus, } from '../runtime/resumable-stream-executor'; import type { WorkSummary } from '../stream/work-summary-accumulator'; +import type { SuspensionInfo } from '../utils/stream-helpers'; + +type ConfirmationRequestEvent = Extract; export interface ConsumeWithHitlOptions { agent: unknown; @@ -107,3 +112,81 @@ export async function consumeStreamWithHitl( workSummary: result.workSummary, }; } + +export interface ConsumeStreamCascadingOptions { + agent: unknown; + stream: unknown; + runId: string; + agentId: string; + eventBus: InstanceAiEventBus; + logger: Logger; + threadId: string; + abortSignal: AbortSignal; +} + +export type ConsumeStreamCascadingResult = + | { + status: 'completed' | 'cancelled' | 'errored'; + agentRunId: string; + text: Promise; + workSummary: WorkSummary; + } + | { + status: 'suspended'; + agentRunId: string; + suspension: SuspensionInfo; + confirmationEvent?: ConfirmationRequestEvent; + text?: Promise; + workSummary: WorkSummary; + }; + +/** + * Consume a sub-agent stream and return cleanly when it either finishes or + * hits a HITL suspension. Unlike `consumeStreamWithHitl` (which transparently + * bridges sub-agent suspensions to a parent `waitForConfirmation` Promise), + * this returns the suspension info to the caller so the caller can decide + * how to handle it — e.g. cascade the suspension up to its own SDK suspend + * via `ctx.suspend(payload)`, so the parent's agent run is also checkpointed + * and survives a process restart. + * + * Uses `executeResumableStream`'s manual mode, which suppresses the + * sub-agent's `confirmation-request` event publish (returning it on the + * result instead) — the caller emits the card at whatever runId is + * meaningful at its level. + */ +export async function consumeStreamCascading( + options: ConsumeStreamCascadingOptions, +): Promise { + const stream = normalizeStreamSource(options.stream); + const result = await executeResumableStream({ + agent: options.agent, + stream, + context: { + threadId: options.threadId, + runId: options.runId, + agentId: options.agentId, + eventBus: options.eventBus, + signal: options.abortSignal, + logger: options.logger, + }, + control: { mode: 'manual' }, + }); + + if (result.status === 'suspended' && result.suspension) { + return { + status: 'suspended', + agentRunId: result.agentRunId, + suspension: result.suspension, + ...(result.confirmationEvent ? { confirmationEvent: result.confirmationEvent } : {}), + ...(result.text ? { text: result.text } : {}), + workSummary: result.workSummary, + }; + } + + return { + status: result.status === 'suspended' ? 'errored' : result.status, + agentRunId: result.agentRunId, + text: result.text ?? stream.text ?? Promise.resolve(''), + workSummary: result.workSummary, + }; +} diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/blueprint-accumulator.test.ts b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/blueprint-accumulator.test.ts index 277ecaa9b4f..19e72f4d60d 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/blueprint-accumulator.test.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/blueprint-accumulator.test.ts @@ -141,4 +141,76 @@ describe('BlueprintAccumulator', () => { expect(checkpoint?.deps).toEqual(['wf-2']); }); }); + + describe('loadFromTasks (revision flow)', () => { + const originalTasks = [ + { + id: 'wf-1', + title: "Build 'A' workflow", + kind: 'build-workflow', + spec: 'Build A', + deps: [], + }, + { + id: 'wf-2', + title: "Build 'B' workflow", + kind: 'build-workflow', + spec: 'Build B', + deps: [], + }, + { + id: 'verify-1', + title: 'Verify A', + kind: 'checkpoint', + spec: 'Verify A', + deps: ['wf-1'], + }, + ]; + + it('seeds the accumulator with the persisted plan', () => { + accumulator.loadFromTasks(originalTasks); + + expect(accumulator.isEmpty()).toBe(false); + expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-2', 'verify-1']); + }); + + it('preserves original items across an ask-for-edits revision (remove + add + resubmit)', () => { + // Simulates the resume path: the parent handler rebuilt a fresh + // accumulator, then rehydrated it from the persisted graph before + // the planner revised the plan. + accumulator.loadFromTasks(originalTasks); + + // Planner revises: drop one original, add a new one. + expect(accumulator.removeItem('wf-2')).toBe(true); + accumulator.addItem({ + kind: 'workflow', + id: 'wf-3', + name: 'C', + purpose: 'Build C', + integrations: [], + dependsOn: [], + }); + + // The resubmitted plan keeps the surviving originals plus the new item, + // rather than collapsing to only the newly-added one. + expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'verify-1', 'wf-3']); + }); + + it('upserts by id so a revised original replaces in place', () => { + accumulator.loadFromTasks(originalTasks); + + accumulator.addItem({ + kind: 'workflow', + id: 'wf-1', + name: 'A (revised)', + purpose: 'Build A differently', + integrations: [], + dependsOn: [], + }); + + const list = accumulator.getTaskList(); + expect(list.map((t) => t.id)).toEqual(['wf-1', 'wf-2', 'verify-1']); + expect(list.find((t) => t.id === 'wf-1')?.title).toBe("Build 'A (revised)' workflow"); + }); + }); }); diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/plan-with-agent.tool.test.ts b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/plan-with-agent.tool.test.ts index 17d6f173384..4ca7aaefe42 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/plan-with-agent.tool.test.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/plan-with-agent.tool.test.ts @@ -1,4 +1,10 @@ -import type { OrchestrationContext, PlannedTaskGraph, PlannedTaskService } from '../../../types'; +import type { + OrchestrationContext, + PlannedTaskGraph, + PlannedTaskRecord, + PlannedTaskService, +} from '../../../types'; +import { BlueprintAccumulator } from '../blueprint-accumulator'; const { __testBuildPlannerBriefingContext, @@ -6,6 +12,7 @@ const { __testFormatMessagesForBriefing, __testGetRecentMessages, __testGetPriorToolObservations, + __testRehydrateAccumulatorFromGraph, } = // eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/consistent-type-imports require('../plan-with-agent.tool') as typeof import('../plan-with-agent.tool'); @@ -103,6 +110,87 @@ describe('clearPlannedTaskGraph', () => { }); }); +describe('rehydrateAccumulatorFromGraph (resume revision flow)', () => { + const persistedTasks: PlannedTaskRecord[] = [ + { + id: 'wf-1', + title: "Build 'A' workflow", + kind: 'build-workflow', + spec: 'A', + deps: [], + status: 'planned', + }, + { + id: 'wf-2', + title: "Build 'B' workflow", + kind: 'build-workflow', + spec: 'B', + deps: [], + status: 'planned', + }, + ]; + + it('seeds the accumulator from an awaiting-approval graph so a revision keeps originals', async () => { + // Reproduces "ask for edits -> revise existing plan -> submit again": + // on resume the parent rebuilt a fresh accumulator; without rehydration + // the planner's remove/add would operate on an empty plan and the + // re-submit would drop every original item. + const { context } = makeContext({ + graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks }, + }); + const accumulator = new BlueprintAccumulator(); + + await __testRehydrateAccumulatorFromGraph(context, accumulator); + + // Planner revises: drop one original, add a new one, then resubmits. + expect(accumulator.removeItem('wf-2')).toBe(true); + accumulator.addItem({ + kind: 'workflow', + id: 'wf-3', + name: 'C', + purpose: 'C', + integrations: [], + dependsOn: [], + }); + + expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-3']); + }); + + it('does not reopen an already-approved/active graph', async () => { + const { context } = makeContext({ + graph: { planRunId: 'run-current', status: 'active', tasks: persistedTasks }, + }); + const accumulator = new BlueprintAccumulator(); + + await __testRehydrateAccumulatorFromGraph(context, accumulator); + + expect(accumulator.isEmpty()).toBe(true); + }); + + it('is a no-op when no graph exists', async () => { + const { context, getGraph } = makeContext({ graph: null }); + const accumulator = new BlueprintAccumulator(); + + await __testRehydrateAccumulatorFromGraph(context, accumulator); + + expect(getGraph).toHaveBeenCalledWith('t-1'); + expect(accumulator.isEmpty()).toBe(true); + }); + + it('leaves the accumulator empty when getGraph throws', async () => { + const { context, getGraph } = makeContext({ + graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks }, + }); + getGraph.mockRejectedValueOnce(new Error('db down')); + const accumulator = new BlueprintAccumulator(); + + await expect( + __testRehydrateAccumulatorFromGraph(context, accumulator), + ).resolves.toBeUndefined(); + expect(accumulator.isEmpty()).toBe(true); + }); +}); + describe('formatMessagesForBriefing', () => { // The planner system prompt (plan-agent-prompt.ts) treats // and as a paired contract — schedule/cron decisions read diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts b/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts index 991b163c5f0..f6aeb0bcd02 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts @@ -15,7 +15,6 @@ import { Agent, Tool } from '@n8n/agents'; import type { InstanceAiEvent } from '@n8n/api-types'; import { DateTime } from 'luxon'; -import { nanoid } from 'nanoid'; import { z } from 'zod'; import { createAddPlanItemTool, createRemovePlanItemTool } from './add-plan-item.tool'; @@ -33,10 +32,12 @@ import { } from './tracing-utils'; import { attachRuntimeWorkspaceCapabilities } from '../../agent/runtime-workspace'; import { MAX_STEPS } from '../../constants/max-steps'; -import { consumeStreamWithHitl, requireCompletedHitlText } from '../../stream/consume-with-hitl'; +import { consumeStreamCascading } from '../../stream/consume-with-hitl'; +import type { ConsumeStreamCascadingResult } from '../../stream/consume-with-hitl'; import { createToolRegistry, toolRegistryKeys, toolRegistryValues } from '../../tool-registry'; import { buildAgentTraceInputs, mergeTraceRunInputs } from '../../tracing/langsmith-tracing'; import type { OrchestrationContext } from '../../types'; +import { resumeAgentStream } from '../../utils/stream-helpers'; import { CREDENTIALS_TOOL_ID } from '../credentials.tool'; import { DATA_TABLES_TOOL_ID } from '../data-tables.tool'; import { ASK_USER_TOOL_ID } from '../shared/ask-user.tool'; @@ -612,10 +613,74 @@ async function clearPlannedTaskGraph(context: OrchestrationContext): Promise { + return await rehydrateAccumulatorFromGraph(context, accumulator); +} + +/** + * Seed a freshly-built accumulator from the persisted plan before a planner + * resume. The parent plan-tool handler exits on every cascade-suspend, so the + * first-call accumulator is gone by the time an "ask for edits" revision + * resumes the planner — without this, remove-plan-item can't touch the + * original items, add-plan-item only carries the newly-added ones, and the + * re-submit's createPlan (which overwrites unconditionally) replaces the graph + * with a partial plan. + * + * Only rehydrates while the plan is still `awaiting_approval` (the revision + * window) — an already-approved/active graph with in-flight tasks must not be + * reopened here. Best-effort: a getGraph failure leaves the accumulator empty + * rather than blocking the resume. + */ +async function rehydrateAccumulatorFromGraph( + context: OrchestrationContext, + accumulator: BlueprintAccumulator, +): Promise { + if (!context.plannedTaskService) return; + try { + const graph = await context.plannedTaskService.getGraph(context.threadId); + if (graph?.status === 'awaiting_approval' && graph.tasks.length > 0) { + accumulator.loadFromTasks(graph.tasks); + } + } catch { + // Best-effort — fall back to an empty accumulator rather than block resume. + } +} + // --------------------------------------------------------------------------- // Tool factory // --------------------------------------------------------------------------- +/** + * The plan tool cascades sub-agent HITL suspensions UP through the SDK's + * native suspend/resume mechanism: when the planner sub-agent (or any tool + * inside it) emits a `tool-call-suspended` chunk, the plan tool catches it + * via `consumeStreamCascading` and calls its own `ctx.suspend()` with the + * same payload. This checkpoints the orchestrator's full state alongside the + * planner's, so a process restart between user prompt and click can resume + * the planner without losing any state. + * + * The schemas below are permissive on purpose: the plan tool just forwards + * whatever the inner tool emitted (submit-plan's plan-review payload OR + * ask-user's questions payload) and accepts whatever the frontend sent back + * for that card. Validation already happened on the inner tool. + */ +const planToolSuspendSchema = z + .object({ + requestId: z.string(), + message: z.string(), + severity: z.string(), + // Only submit-plan + ask-user carry an `inputType`; cascaded suspensions + // from other planner tools (credentials, data-tables, ...) don't, and a + // strict `inputType: string` would reject otherwise-valid payloads. + inputType: z.string().optional(), + }) + .passthrough(); + +const planToolResumeSchema = z.record(z.unknown()); + export function createPlanWithAgentTool(context: OrchestrationContext) { return new Tool('plan') .description( @@ -643,13 +708,20 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { result: z.string(), }), ) - .handler(async (input: { guidance?: string }) => { + .suspend(planToolSuspendSchema) + .resume(planToolResumeSchema) + .handler(async (input: { guidance?: string }, ctx) => { + const resumeData = ctx.resumeData; + const isResume = resumeData !== undefined && resumeData !== null; + // ── Same-turn denial guard ───────────────────────────────────── // If the user denied a plan earlier in this same message group, the // orchestrator must not silently spawn another planner. Without this // guard the LLM can ignore the "stop on denial" prompt and start a // fresh planner with a new accumulator, defeating the denial. - if (context.plannedTaskService && context.messageGroupId) { + // Only applies to first-call invocations — resume continues an + // already-suspended planner and cannot be a fresh re-spawn. + if (!isResume && context.plannedTaskService && context.messageGroupId) { const existing = await context.plannedTaskService.getGraph(context.threadId); if ( existing?.status === 'cancelled' && @@ -666,147 +738,230 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { } } - // ── Collect planner tools ────────────────────────────────────── + // ── Collect planner tools (shared between first-call and resume) ── const plannerTools = createToolRegistry(); for (const name of PLANNER_DOMAIN_TOOL_NAMES) { const tool = context.domainTools.get(name); - if (tool) { - plannerTools.set(name, tool); - } + if (tool) plannerTools.set(name, tool); } for (const name of PLANNER_RESEARCH_TOOL_NAMES) { const tool = context.domainTools.get(name); - if (tool) { - plannerTools.set(name, tool); - } + if (tool) plannerTools.set(name, tool); } - // Best-practices guidance — planner-exclusive plannerTools.set('templates', createTemplatesTool()); - // Incremental plan accumulation + approval tools const accumulator = new BlueprintAccumulator(); plannerTools.set('add-plan-item', createAddPlanItemTool(accumulator, context)); plannerTools.set('remove-plan-item', createRemovePlanItemTool(accumulator, context)); plannerTools.set('submit-plan', createSubmitPlanTool(accumulator, context)); - // ── Retrieve conversation history ───────────────────────────── - const messages = await getRecentMessages(context, MESSAGE_HISTORY_COUNT); - const briefingContext = buildPlannerBriefingContext(getPriorToolObservations(context)); - const briefing = formatMessagesForBriefing( - messages, - input.guidance, - context.timeZone, - briefingContext, - ); - - // ── IDs & events ────────────────────────────────────────────── - const subAgentId = `agent-planner-${nanoid(6)}`; - const subtitle = - input.guidance ?? messages.find((m) => m.role === 'user')?.content ?? 'Planning...'; - - context.eventBus.publish(context.threadId, { - type: 'agent-spawned', - runId: context.runId, - agentId: subAgentId, - payload: { - parentId: context.orchestratorAgentId, - role: 'planner', - tools: toolRegistryKeys(plannerTools), - kind: 'planner' as const, - title: 'Planning', - subtitle: truncateLabel(subtitle), - goal: briefing, - }, - }); - - // ── Tracing ─────────────────────────────────────────────────── - const traceRun = await startSubAgentTrace(context, { - agentId: subAgentId, - role: 'planner', - kind: 'planner', - inputs: { - guidance: input.guidance, - messageCount: messages.length, - }, - }); + // Use a runId-derived sub-agent id so resume reuses the same event stream identity. + const subAgentId = `agent-planner-${context.runId}`; const tracedPlannerTools = traceSubAgentTools(context, plannerTools, 'planner'); + // ── Build sub-agent (shared) ───────────────────────────────── + const subAgent = new Agent('Workflow Planner Agent') + .model(context.modelId) + .instructions(PLANNER_AGENT_PROMPT, { + providerOptions: { + anthropic: { cacheControl: { type: 'ephemeral' } }, + }, + }) + .tool(toolRegistryValues(tracedPlannerTools)) + .checkpoint(context.checkpointStore ?? 'memory'); + attachRuntimeWorkspaceCapabilities(subAgent, { + runtimeSkills: context.runtimeSkills, + }); + const telemetry = context.tracing?.getTelemetry?.({ + agentRole: 'planner', + functionId: 'instance-ai.subagent.planner', + executionMode: 'background', + metadata: { agent_id: subAgentId }, + }); + if (telemetry) { + subAgent.telemetry(telemetry); + } + + let traceRun: Awaited> | undefined; + try { - // ── Create & stream sub-agent (inline, blocking) ────────── - const subAgent = new Agent('Workflow Planner Agent') - .model(context.modelId) - .instructions(PLANNER_AGENT_PROMPT, { - providerOptions: { - anthropic: { cacheControl: { type: 'ephemeral' } }, - }, - }) - .tool(toolRegistryValues(tracedPlannerTools)) - .checkpoint(context.checkpointStore ?? 'memory'); - attachRuntimeWorkspaceCapabilities(subAgent, { - runtimeSkills: context.runtimeSkills, - }); - const telemetry = context.tracing?.getTelemetry?.({ - agentRole: 'planner', - functionId: 'instance-ai.subagent.planner', - executionMode: 'background', - metadata: { agent_id: subAgentId }, - }); - if (telemetry) { - subAgent.telemetry(telemetry); - } - mergeTraceRunInputs( - traceRun, - buildAgentTraceInputs({ - systemPrompt: PLANNER_AGENT_PROMPT, - tools: tracedPlannerTools, - modelId: context.modelId, - }), - ); + let consumeResult: ConsumeStreamCascadingResult; - const resultText = await withTraceRun(context, traceRun, async () => { - const persistence = await createSubAgentPersistence(context, { - agentKind: 'planner', - }); - const stream = await subAgent.stream(briefing, { - maxIterations: MAX_STEPS.PLANNER, - abortSignal: context.abortSignal, - persistence, - providerOptions: { - anthropic: { cacheControl: { type: 'ephemeral' } }, - }, - }); + if (isResume) { + // ── Resume path ───────────────────────────────────────── + const resumeInfo = await context.findSubAgentResumeInfo?.('planner'); + if (!resumeInfo) { + return { + result: + 'The planning step could not be resumed because its state was lost. Please send a new message to continue.', + }; + } - const result = await consumeStreamWithHitl({ - agent: subAgent, - stream, - runId: context.runId, - agentId: subAgentId, - eventBus: context.eventBus, - logger: context.logger, - threadId: context.threadId, - abortSignal: context.abortSignal, - waitForConfirmation: context.waitForConfirmation, - maxIterations: MAX_STEPS.PLANNER, - persistence, - }); + // Rehydrate the accumulator from the persisted plan so an + // "ask for edits" revision operates on the full plan rather + // than an empty accumulator. See rehydrateAccumulatorFromGraph. + await rehydrateAccumulatorFromGraph(context, accumulator); - return await requireCompletedHitlText(result, 'Planner sub-agent'); - }); - - await finishTraceRun(context, traceRun, { - outputs: { - result: resultText, + // Open a trace span for the resumed leg so a plan that suspended + // at HITL and resumed still shows its continuation in LangSmith. + // The planner card itself is already in the snapshot from the + // first call, so no agent-spawned event is (re-)published here. + traceRun = await startSubAgentTrace(context, { agentId: subAgentId, role: 'planner', - hasItems: !accumulator.isEmpty(), - itemCount: accumulator.getTaskItemsForEvent().length, - }, - }); + kind: 'planner', + inputs: { resumed: true }, + }); + mergeTraceRunInputs( + traceRun, + buildAgentTraceInputs({ + systemPrompt: PLANNER_AGENT_PROMPT, + tools: tracedPlannerTools, + modelId: context.modelId, + }), + ); + + consumeResult = await withTraceRun(context, traceRun, async () => { + const resumed = await resumeAgentStream(subAgent, resumeData, { + runId: resumeInfo.runId, + toolCallId: resumeInfo.toolCallId, + persistence: resumeInfo.persistence, + maxIterations: MAX_STEPS.PLANNER, + }); + + return await consumeStreamCascading({ + agent: subAgent, + stream: resumed, + runId: context.runId, + agentId: subAgentId, + eventBus: context.eventBus, + logger: context.logger, + threadId: context.threadId, + abortSignal: context.abortSignal, + }); + }); + } else { + // ── First-call path ───────────────────────────────────── + // The planner is the most common inline HITL entry point — when it + // suspends the orchestrator cascades-suspends too, and the SDK does + // not flush the user-message row to memory until a clean loop end + // (which a suspended run never reaches). Persist eagerly so the + // user's bubble is visible if they reload during the suspend window. + if (context.persistInFlightUserMessage) { + await context.persistInFlightUserMessage(); + } + + const messages = await getRecentMessages(context, MESSAGE_HISTORY_COUNT); + const briefingContext = buildPlannerBriefingContext(getPriorToolObservations(context)); + const briefing = formatMessagesForBriefing( + messages, + input.guidance, + context.timeZone, + briefingContext, + ); + + const subtitle = + input.guidance ?? messages.find((m) => m.role === 'user')?.content ?? 'Planning...'; + + context.eventBus.publish(context.threadId, { + type: 'agent-spawned', + runId: context.runId, + agentId: subAgentId, + payload: { + parentId: context.orchestratorAgentId, + role: 'planner', + tools: toolRegistryKeys(plannerTools), + kind: 'planner' as const, + title: 'Planning', + subtitle: truncateLabel(subtitle), + goal: briefing, + }, + }); + + traceRun = await startSubAgentTrace(context, { + agentId: subAgentId, + role: 'planner', + kind: 'planner', + inputs: { + guidance: input.guidance, + messageCount: messages.length, + }, + }); + + mergeTraceRunInputs( + traceRun, + buildAgentTraceInputs({ + systemPrompt: PLANNER_AGENT_PROMPT, + tools: tracedPlannerTools, + modelId: context.modelId, + }), + ); + + consumeResult = await withTraceRun(context, traceRun, async () => { + const persistence = await createSubAgentPersistence(context, { + agentKind: 'planner', + }); + const stream = await subAgent.stream(briefing, { + maxIterations: MAX_STEPS.PLANNER, + abortSignal: context.abortSignal, + persistence, + providerOptions: { + anthropic: { cacheControl: { type: 'ephemeral' } }, + }, + }); + + return await consumeStreamCascading({ + agent: subAgent, + stream, + runId: context.runId, + agentId: subAgentId, + eventBus: context.eventBus, + logger: context.logger, + threadId: context.threadId, + abortSignal: context.abortSignal, + }); + }); + } + + // ── Cascade suspension up to the orchestrator ─────────── + if (consumeResult.status === 'suspended') { + const parsed = planToolSuspendSchema.safeParse(consumeResult.suspension.suspendPayload); + if (!parsed.success) { + context.logger.warn('Planner emitted a suspension payload missing required fields', { + threadId: context.threadId, + runId: context.runId, + toolName: consumeResult.suspension.toolName, + zodIssues: parsed.error.issues, + }); + publishClearingEvent(context); + await clearDraftChecklist(context); + await clearPlannedTaskGraph(context); + return { + result: + 'Planner requested user input but the payload was malformed. Please try again.', + }; + } + return await ctx.suspend(parsed.data); + } + + // ── Stream finished (completed/cancelled/errored) ────── + const resultText = consumeResult.status === 'completed' ? await consumeResult.text : ''; + + if (traceRun) { + await finishTraceRun(context, traceRun, { + outputs: { + result: resultText, + agentId: subAgentId, + role: 'planner', + hasItems: !accumulator.isEmpty(), + itemCount: accumulator.getTaskItemsForEvent().length, + }, + }); + } - // ── Publish agent-completed ─────────────────────────────── context.eventBus.publish(context.threadId, { type: 'agent-completed', runId: context.runId, @@ -818,9 +973,10 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { }); // ── Schedule tasks after planner-driven approval ────────── - // Only dispatch if submit-plan was called AND the user approved. - // createPlan persists the graph as `awaiting_approval`; flip it - // to `active` before scheduling so tick() can dispatch. + // Approval is detected via the accumulator's flag, which submit-plan + // flips in its resume handler. createPlan persisted the graph as + // `awaiting_approval` on the first call; flip it to `active` and + // schedule. if (accumulator.isApproved()) { if (context.plannedTaskService) { await context.plannedTaskService.approvePlan(context.threadId); @@ -828,7 +984,10 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { if (context.schedulePlannedTasks) { await context.schedulePlannedTasks(); } - const taskCount = accumulator.getTaskList().length; + // On resume the accumulator is fresh and reports 0 — query the + // persisted graph instead so the orchestrator gets accurate text. + const persistedCount = await getPersistedTaskCount(context); + const taskCount = persistedCount ?? accumulator.getTaskList().length; return { result: `Plan approved and ${taskCount} task${taskCount === 1 ? '' : 's'} dispatched.`, }; @@ -843,13 +1002,9 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { return { result: 'Plan denied by user. No tasks were dispatched.' }; } - // Planner finished without approval (no submit-plan or user didn't approve) + // Planner finished without approval (no submit-plan or user didn't approve). publishClearingEvent(context); await clearDraftChecklist(context); - // Clear the persisted planned-task graph too. submit-plan persists - // it BEFORE user approval (so HITL can display the checklist), so - // leaving it intact on planner give-up would let a later - // schedulePlannedTasks() tick pick up and dispatch a rejected plan. await clearPlannedTaskGraph(context); if (!accumulator.isEmpty()) { return { @@ -861,10 +1016,12 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); - await failTraceRun(context, traceRun, error, { - agent_id: subAgentId, - agent_role: 'planner', - }); + if (traceRun) { + await failTraceRun(context, traceRun, error, { + agent_id: subAgentId, + agent_role: 'planner', + }); + } context.eventBus.publish(context.threadId, { type: 'agent-completed', @@ -877,12 +1034,6 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { }, }); - // Clear draft checklist and persisted graph on error — same reason - // as the non-approval path: an error-aborted plan must not later be - // auto-dispatched by the post-run reschedule. Skip both when the user - // already approved this plan: the failure is downstream of approval - // (e.g. approvePlan/schedulePlannedTasks threw), and clearing would - // drop a plan the user explicitly accepted. if (!accumulator.isApproved()) { publishClearingEvent(context); await clearDraftChecklist(context); @@ -894,3 +1045,13 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { }) .build(); } + +async function getPersistedTaskCount(context: OrchestrationContext): Promise { + if (!context.plannedTaskService) return undefined; + try { + const graph = await context.plannedTaskService.getGraph(context.threadId); + return graph?.tasks?.length; + } catch { + return undefined; + } +} diff --git a/packages/@n8n/instance-ai/src/types.ts b/packages/@n8n/instance-ai/src/types.ts index f4111924c28..4baec57bfe3 100644 --- a/packages/@n8n/instance-ai/src/types.ts +++ b/packages/@n8n/instance-ai/src/types.ts @@ -1216,6 +1216,34 @@ export interface OrchestrationContext { taskId: string, correction: string, ) => 'queued' | 'task-completed' | 'task-not-found'; + /** + * Resume info for a suspended sub-agent of this thread, looked up from the + * persisted checkpoint store by the deterministic sub-agent resourceId + * (`instance-ai-subagent:{threadId}:{agentKind}`). Used by the cascading + * suspend path: when the orchestrator's `plan` tool resumes, it calls + * this to find the planner sub-agent's `runId` + suspended `toolCallId` + * + the persistence the planner was running under, so the resume path + * can rebuild the sub-agent with the same persistence and call + * `plannerAgent.resume('stream', resumeData, { runId, toolCallId })` + * without stashing anything across its own suspend/resume cycle. + */ + findSubAgentResumeInfo?: (agentKind: string) => Promise< + | { + runId: string; + toolCallId: string; + persistence: { threadId: string; resourceId: string }; + } + | undefined + >; + /** + * Persist the current user message to thread memory immediately, so it + * survives a restart that happens while the orchestrator is suspended on + * an inline HITL tool call. The SDK only flushes the turn delta on a clean + * loop completion, which a suspended run never reaches — without this the + * user's bubble is invisible on reload until the turn eventually completes. + * Idempotent: safe to call multiple times within a run. + */ + persistInFlightUserMessage?: () => Promise; /** Mark the current orchestrator run as making progress. */ touchRun?: () => boolean; /** Mark a running background task as making progress. */ diff --git a/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts b/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts index 342da8154a5..9f6256984d4 100644 --- a/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts +++ b/packages/@n8n/instance-ai/src/utils/__tests__/stream-helpers.test.ts @@ -50,6 +50,7 @@ describe('parseSuspension', () => { toolCallId: 'tc-1', requestId: 'req-1', toolName: 'setup-credentials', + suspendPayload: { requestId: 'req-1' }, }); }); @@ -67,6 +68,7 @@ describe('parseSuspension', () => { toolCallId: 'tc-1', requestId: 'req-1', toolName: 'setup-credentials', + suspendPayload: { requestId: 'req-1' }, }); }); @@ -83,6 +85,7 @@ describe('parseSuspension', () => { toolCallId: 'tc-1', requestId: 'tc-1', toolName: undefined, + suspendPayload: {}, }); }); @@ -116,6 +119,7 @@ describe('parseSuspension', () => { toolCallId: 'tc-1', requestId: 'tc-1', toolName: undefined, + suspendPayload: {}, }); }); }); diff --git a/packages/@n8n/instance-ai/src/utils/stream-helpers.ts b/packages/@n8n/instance-ai/src/utils/stream-helpers.ts index 30c690c2a44..b32402cc828 100644 --- a/packages/@n8n/instance-ai/src/utils/stream-helpers.ts +++ b/packages/@n8n/instance-ai/src/utils/stream-helpers.ts @@ -13,6 +13,8 @@ export interface SuspensionInfo { toolCallId: string; requestId: string; toolName?: string; + /** The raw suspend payload as passed to `ctx.suspend()` by the inner tool. */ + suspendPayload: Record; } /** Extract suspension info from a stream chunk. */ @@ -29,7 +31,7 @@ export function parseSuspension(chunk: unknown): SuspensionInfo | null { const toolName = typeof sp.toolName === 'string' ? sp.toolName : undefined; if (!reqId || !tcId) return null; - return { toolCallId: tcId, requestId: reqId, toolName }; + return { toolCallId: tcId, requestId: reqId, toolName, suspendPayload: suspPayload }; } export interface Resumable { diff --git a/packages/cli/src/modules/instance-ai/__tests__/instance-ai-memory.service.test.ts b/packages/cli/src/modules/instance-ai/__tests__/instance-ai-memory.service.test.ts index 55b789cf59c..974a0b334ba 100644 --- a/packages/cli/src/modules/instance-ai/__tests__/instance-ai-memory.service.test.ts +++ b/packages/cli/src/modules/instance-ai/__tests__/instance-ai-memory.service.test.ts @@ -44,9 +44,14 @@ function createService(options: { threadTtlDays?: number } = {}): InstanceAiMemo mockAgentMemory as never, mockDbSnapshotStorage as never, mockCheckpointRepository as never, + mockPendingConfirmationRepository as never, ); } +const mockPendingConfirmationRepository = { + findLiveRequestIds: jest.fn(async () => new Set()), +}; + function makeTree(overrides?: Partial): InstanceAiAgentNode { return { agentId: 'agent-001', @@ -132,11 +137,12 @@ describe('InstanceAiMemoryService.getRichMessages', () => { content: [ { type: 'text', text: 'Here are your workflows' }, { - type: 'tool-result', + type: 'tool-call', toolCallId: 'tc-1', toolName: 'list-workflows', input: {}, - result: { workflows: [] }, + state: 'resolved', + output: { workflows: [] }, }, ], createdAt: new Date('2026-01-01T00:00:01.000Z'), diff --git a/packages/cli/src/modules/instance-ai/__tests__/message-parser.test.ts b/packages/cli/src/modules/instance-ai/__tests__/message-parser.test.ts index a9c6606c6d0..12e3de850e9 100644 --- a/packages/cli/src/modules/instance-ai/__tests__/message-parser.test.ts +++ b/packages/cli/src/modules/instance-ai/__tests__/message-parser.test.ts @@ -1,6 +1,10 @@ -import type { InstanceAiAgentNode } from '@n8n/api-types'; +import type { InstanceAiAgentNode, InstanceAiMessage } from '@n8n/api-types'; -import { parseStoredMessages } from '../message-parser'; +import { + collectConfirmationRequestIds, + markExpiredConfirmations, + parseStoredMessages, +} from '../message-parser'; import type { StoredAgentMessage } from '../message-parser'; const BASE_DATE_MS = Date.UTC(2026, 0, 1); @@ -106,11 +110,12 @@ describe('parseStoredMessages', () => { content: [ { type: 'text', text: 'Here are your workflows' }, { - type: 'tool-result', + type: 'tool-call', toolCallId: 'tc-1', toolName: 'list-workflows', input: { limit: 10 }, - result: { workflows: ['wf1'] }, + state: 'resolved', + output: { workflows: ['wf1'] }, }, ], createdAt: makeDate(1), @@ -162,6 +167,109 @@ describe('parseStoredMessages', () => { expect(tc?.renderHint).toBe('tasks'); }); + it('should surface rejected tool calls via `error`, not `result`', () => { + const messages: StoredAgentMessage[] = [ + { + id: 'msg-u', + role: 'user', + content: 'Do something', + createdAt: makeDate(), + }, + { + id: 'msg-a', + role: 'assistant', + content: [ + { + type: 'tool-call', + toolCallId: 'tc-rej', + toolName: 'workflows', + input: { name: 'x' }, + state: 'rejected', + error: 'Workflow not found', + }, + ], + createdAt: makeDate(1), + }, + ]; + + const result = parseStoredMessages(messages); + + const tc = result[1].agentTree?.toolCalls[0]; + expect(tc?.isLoading).toBe(false); + expect(tc?.result).toBeUndefined(); + expect(tc?.error).toBe('Workflow not found'); + }); + + it('should skip malformed tool-call parts instead of rendering half-populated cards', () => { + const messages: StoredAgentMessage[] = [ + { + id: 'msg-u', + role: 'user', + content: 'Go', + createdAt: makeDate(), + }, + { + id: 'msg-a', + role: 'assistant', + content: [ + // Valid tool call — should survive. + { + type: 'tool-call', + toolCallId: 'tc-ok', + toolName: 'list-workflows', + input: {}, + state: 'resolved', + output: { ok: true }, + }, + // Missing toolName — fails the schema, must be dropped. + { type: 'tool-call', toolCallId: 'tc-no-name', input: {}, state: 'resolved' }, + // Missing toolCallId — dropped. + { type: 'tool-call', toolName: 'orphan', input: {}, state: 'resolved' }, + // `error` wrong type for a rejected call — dropped. + { + type: 'tool-call', + toolCallId: 'tc-bad-error', + toolName: 'workflows', + state: 'rejected', + error: { not: 'a string' }, + }, + ], + createdAt: makeDate(1), + }, + ]; + + const result = parseStoredMessages(messages); + + const toolCalls = result[1].agentTree?.toolCalls ?? []; + expect(toolCalls.map((tc) => tc.toolCallId)).toEqual(['tc-ok']); + }); + + it('should drop content parts with an unrecognized type', () => { + const messages: StoredAgentMessage[] = [ + { + id: 'msg-u', + role: 'user', + content: 'Go', + createdAt: makeDate(), + }, + { + id: 'msg-a', + role: 'assistant', + content: [ + { type: 'text', text: 'Hello' }, + // Unknown type — not in the content-part union, must be ignored. + { type: 'bogus-part', text: 'should not surface', payload: 42 }, + ], + createdAt: makeDate(1), + }, + ]; + + const result = parseStoredMessages(messages); + + expect(result[1].content).toBe('Hello'); + expect(result[1].agentTree?.timeline).toEqual([{ type: 'text', content: 'Hello' }]); + }); + it('should parse reasoning from native parts', () => { const messages: StoredAgentMessage[] = [ { @@ -388,6 +496,87 @@ describe('parseStoredMessages', () => { }); }); + it('should keep the snapshot tree when dedupe collapses in-flight checkpoint messages', () => { + // Simulates the in-flight HITL case: the SDK hasn't committed + // the turn to memory yet, so `loadInFlightCheckpointMessages` + // surfaces several intermediate assistant messages from the + // checkpoint blob. The snapshot was paired with a middle + // message via timestamp matching, while a later message + // (with no tree of its own) carries the latest text. Dedupe + // must transfer the agentTree forward so the confirmation + // card in the snapshot tree survives. + const snapshotTree: InstanceAiAgentNode = { + agentId: 'agent-001', + role: 'orchestrator', + status: 'active', + textContent: 'Streaming...', + reasoning: '', + toolCalls: [ + { + toolCallId: 'tc-cred', + toolName: 'credentials', + args: {}, + isLoading: true, + confirmation: { + requestId: 'req-live', + inputType: 'approval', + message: 'Select a credential', + severity: 'info', + }, + renderHint: 'default', + }, + ], + children: [], + timeline: [], + }; + + const messages: StoredAgentMessage[] = [ + { + id: 'msg-u', + role: 'user', + content: 'Build it', + createdAt: makeDate(0), + }, + { + id: 'msg-a-early', + role: 'assistant', + content: [{ type: 'text', text: '' }], + createdAt: makeDate(10), + }, + { + id: 'msg-a-paired', + role: 'assistant', + content: [{ type: 'text', text: 'Looking up credentials' }], + createdAt: makeDate(20), + }, + { + id: 'msg-a-latest', + role: 'assistant', + content: [{ type: 'text', text: 'Need credential confirmation' }], + createdAt: makeDate(40), + }, + ]; + + const result = parseStoredMessages(messages, [ + { + tree: snapshotTree, + runId: 'run_paired', + messageGroupId: 'mg_inflight', + createdAt: makeDate(25), + updatedAt: makeDate(25), + }, + ]); + + // One user + one assistant (dedup collapses the three assistant rows). + expect(result).toHaveLength(2); + const assistant = result[1]; + // Latest message id survives so live SSE deltas keep correlating. + expect(assistant.id).toBe('msg-a-latest'); + // Tree from the snapshot is transferred onto the kept message. + expect(assistant.agentTree).toBe(snapshotTree); + expect(assistant.agentTree?.toolCalls[0].confirmation?.requestId).toBe('req-live'); + }); + it('should apply renderHint correctly for known tool names', () => { const messages: StoredAgentMessage[] = [ { @@ -401,22 +590,28 @@ describe('parseStoredMessages', () => { role: 'assistant', content: [ { - type: 'tool-result', + type: 'tool-call', toolCallId: 'tc-1', toolName: 'delegate', - result: 'ok', + input: {}, + state: 'resolved', + output: 'ok', }, { - type: 'tool-result', + type: 'tool-call', toolCallId: 'tc-2', toolName: 'build-workflow-with-agent', - result: 'ok', + input: {}, + state: 'resolved', + output: 'ok', }, { - type: 'tool-result', + type: 'tool-call', toolCallId: 'tc-3', toolName: 'plan', - result: 'ok', + input: {}, + state: 'resolved', + output: 'ok', }, ], createdAt: makeDate(1), @@ -786,11 +981,12 @@ describe('parseStoredMessages', () => { role: 'assistant', content: [ { - type: 'tool-result', + type: 'tool-call', toolCallId: 'tc-parts', toolName: 'plan', input: { goal: 'x' }, - result: 'done', + state: 'resolved', + output: 'done', }, ], createdAt: makeDate(1), @@ -804,3 +1000,164 @@ describe('parseStoredMessages', () => { }); }); }); + +describe('confirmation expiration helpers', () => { + function makeMessageWithConfirmations(requestIds: string[]): InstanceAiMessage { + return { + id: 'msg-a', + role: 'assistant', + createdAt: makeDate().toISOString(), + content: '', + reasoning: '', + isStreaming: false, + runId: 'run-1', + agentTree: { + agentId: 'agent-001', + role: 'orchestrator', + status: 'completed', + textContent: '', + reasoning: '', + toolCalls: requestIds.map((requestId, idx) => ({ + toolCallId: `tc-${idx}`, + toolName: 'plan', + args: {}, + isLoading: true, + confirmation: { + requestId, + severity: 'info' as const, + message: '', + inputType: 'plan-review' as const, + }, + })), + children: [ + { + agentId: 'agent-planner', + role: 'planner', + status: 'completed', + textContent: '', + reasoning: '', + toolCalls: [ + { + toolCallId: 'tc-sub', + toolName: 'submit-plan', + args: {}, + isLoading: true, + confirmation: { + requestId: 'req-sub', + severity: 'info' as const, + message: '', + inputType: 'plan-review' as const, + }, + }, + ], + children: [], + timeline: [], + }, + ], + timeline: [], + }, + }; + } + + it('collects request IDs from orchestrator and sub-agent tool calls', () => { + const messages = [makeMessageWithConfirmations(['req-1', 'req-2'])]; + expect(collectConfirmationRequestIds(messages).sort()).toEqual(['req-1', 'req-2', 'req-sub']); + }); + + it('flips expired flag only on confirmations whose requestId is not in the live set', () => { + const messages = [makeMessageWithConfirmations(['req-1'])]; + markExpiredConfirmations(messages, new Set(['req-1'])); + + const node = messages[0].agentTree!; + expect(node.toolCalls[0].confirmation?.expired).toBeUndefined(); + expect(node.children[0].toolCalls[0].confirmation?.expired).toBe(true); + }); + + it('does nothing for messages without an agent tree', () => { + const messages: InstanceAiMessage[] = [ + { + id: 'msg-u', + role: 'user', + createdAt: makeDate().toISOString(), + content: 'hi', + reasoning: '', + isStreaming: false, + }, + ]; + expect(collectConfirmationRequestIds(messages)).toEqual([]); + markExpiredConfirmations(messages, new Set()); + }); + + /** Build a single assistant message carrying one plan-review confirmation + * card, with overridable actionability fields. */ + function makeCardMessage( + overrides: Partial<{ isLoading: boolean; confirmationStatus: 'approved' | 'denied' }>, + ): InstanceAiMessage { + return { + id: 'msg-a', + role: 'assistant', + createdAt: makeDate().toISOString(), + content: '', + reasoning: '', + isStreaming: false, + runId: 'run-1', + agentTree: { + agentId: 'agent-001', + role: 'orchestrator', + status: 'completed', + textContent: '', + reasoning: '', + toolCalls: [ + { + toolCallId: 'tc-0', + toolName: 'plan', + args: {}, + isLoading: overrides.isLoading ?? true, + ...(overrides.confirmationStatus + ? { confirmationStatus: overrides.confirmationStatus } + : {}), + confirmation: { + requestId: 'req-resolved', + severity: 'info' as const, + message: '', + inputType: 'plan-review' as const, + }, + }, + ], + children: [], + timeline: [], + }, + }; + } + + // Regression: a resolved plan card reloaded after the user approved/denied it + // has no pending-confirmation row (claim() deleted it), but that absence must + // NOT relabel the historical card as "Plan (expired)". + it('does not mark a settled (no longer loading) card expired even with no live row', () => { + const messages = [makeCardMessage({ isLoading: false })]; + markExpiredConfirmations(messages, new Set()); + expect(messages[0].agentTree!.toolCalls[0].confirmation?.expired).toBeUndefined(); + }); + + it.each(['approved', 'denied'] as const)( + 'does not mark a %s card expired even with no live row', + (confirmationStatus) => { + const messages = [makeCardMessage({ confirmationStatus })]; + markExpiredConfirmations(messages, new Set()); + expect(messages[0].agentTree!.toolCalls[0].confirmation?.expired).toBeUndefined(); + }, + ); + + it('does not collect request IDs for settled cards', () => { + expect(collectConfirmationRequestIds([makeCardMessage({ isLoading: false })])).toEqual([]); + expect( + collectConfirmationRequestIds([makeCardMessage({ confirmationStatus: 'approved' })]), + ).toEqual([]); + }); + + it('still marks a genuinely actionable card expired when its row is gone', () => { + const messages = [makeCardMessage({ isLoading: true })]; + markExpiredConfirmations(messages, new Set()); + expect(messages[0].agentTree!.toolCalls[0].confirmation?.expired).toBe(true); + }); +}); diff --git a/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts b/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts index 56937f9968a..9b96b5dbbb3 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai-memory.service.ts @@ -20,8 +20,13 @@ import { DbSnapshotStorage } from './storage/db-snapshot-storage'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; -import { parseStoredMessages } from './message-parser'; +import { + collectConfirmationRequestIds, + markExpiredConfirmations, + parseStoredMessages, +} from './message-parser'; import { InstanceAiCheckpointRepository } from './repositories/instance-ai-checkpoint.repository'; +import { InstanceAiPendingConfirmationRepository } from './repositories/instance-ai-pending-confirmation.repository'; import { TypeORMAgentMemory } from './storage/typeorm-agent-memory'; function isAgentMessageLike(value: unknown): value is AgentDbMessage { @@ -58,6 +63,7 @@ export class InstanceAiMemoryService { private readonly agentMemory: TypeORMAgentMemory, private readonly dbSnapshotStorage: DbSnapshotStorage, private readonly checkpointRepository: InstanceAiCheckpointRepository, + private readonly pendingConfirmationRepository: InstanceAiPendingConfirmationRepository, ) { this.instanceAiConfig = globalConfig.instanceAi; } @@ -159,10 +165,31 @@ export class InstanceAiMemoryService { const storedMessages = mergeMessagesById(result.messages, checkpointMessages); const messages = parseStoredMessages(storedMessages, snapshots); + await this.flagExpiredConfirmations(messages); return { threadId, messages }; } + /** Cross-check every confirmation card against `instance_ai_pending_confirmations` + * and flip `confirmation.expired = true` on the ones with no live row. */ + private async flagExpiredConfirmations( + messages: Awaited>, + ): Promise { + const requestIds = collectConfirmationRequestIds(messages); + if (requestIds.length === 0) return; + try { + const live = await this.pendingConfirmationRepository.findLiveRequestIds( + requestIds, + new Date(), + ); + markExpiredConfirmations(messages, live); + } catch (error) { + this.logger.warn('Failed to flag expired confirmation cards', { + error: error instanceof Error ? error.message : String(error), + }); + } + } + private async loadInFlightCheckpointMessages(threadId: string): Promise { let checkpoints; try { diff --git a/packages/cli/src/modules/instance-ai/instance-ai.service.ts b/packages/cli/src/modules/instance-ai/instance-ai.service.ts index 5f05aaaa067..e140304385f 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai.service.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai.service.ts @@ -39,6 +39,7 @@ import { InstanceAiLivenessPolicy, McpClientManager, createDomainAccessTracker, + createSubAgentResourceId, BackgroundTaskManager, buildAgentTreeFromEvents, classifyAttachments, @@ -3070,6 +3071,13 @@ export class InstanceAiService { iterationLog, sendCorrectionToTask: (taskId, correction) => this.sendCorrectionToTask(threadId, taskId, correction), + findSubAgentResumeInfo: async (agentKind) => + await this.checkpointStore.findSuspendedSubAgentResumeInfo( + createSubAgentResourceId(threadId, agentKind), + ), + persistInFlightUserMessage: async () => { + await this.persistUserMessageOnFirstSuspend(threadId, runId); + }, workflowTaskService: workflowTasks, workspace: runtimeWorkspace, nodeDefinitionDirs: nodeDefDirs.length > 0 ? nodeDefDirs : undefined, diff --git a/packages/cli/src/modules/instance-ai/message-parser.ts b/packages/cli/src/modules/instance-ai/message-parser.ts index b770afdbe25..1e95151818f 100644 --- a/packages/cli/src/modules/instance-ai/message-parser.ts +++ b/packages/cli/src/modules/instance-ai/message-parser.ts @@ -5,12 +5,36 @@ import type { InstanceAiToolCallState, InstanceAiTimelineEntry, } from '@n8n/api-types'; -import type { AgentDbMessage, AgentTreeSnapshot } from '@n8n/instance-ai'; +import type { AgentDbMessage, AgentTreeSnapshot, MessageContent } from '@n8n/instance-ai'; +import { z } from 'zod'; import { cleanStoredUserMessage } from './internal-messages'; type RunSnapshots = AgentTreeSnapshot[]; +const toolCallContentPartSchema = z.object({ + type: z.literal('tool-call'), + toolCallId: z.string(), + toolName: z.string(), + input: z.unknown().optional(), + state: z.enum(['pending', 'resolved', 'rejected']).optional(), + output: z.unknown().optional(), + error: z.string().optional(), +}); + +const textContentPartSchema = z.object({ type: z.literal('text'), text: z.string() }); +const reasoningContentPartSchema = z.object({ type: z.literal('reasoning'), text: z.string() }); +const opaqueContentPartSchema = z + .object({ type: z.enum(['invalid-tool-call', 'file', 'citation', 'provider']) }) + .passthrough(); + +const contentPartSchema = z.union([ + textContentPartSchema, + reasoningContentPartSchema, + toolCallContentPartSchema, + opaqueContentPartSchema, +]); + // --------------------------------------------------------------------------- // Persisted message shapes // --------------------------------------------------------------------------- @@ -21,16 +45,10 @@ interface StoredToolInvocation { toolName: string; args: Record; result?: unknown; + error?: string; } -interface StoredContentPart { - type: string; - text?: string; - toolCallId?: string; - toolName?: string; - input?: Record; - result?: unknown; -} +type StoredContentPart = MessageContent; export interface StoredAgentMessage { id: string; @@ -65,31 +83,19 @@ function extractReasoningFromContent(content: unknown): string { function extractTextFromParts(parts: unknown[]): string { return parts - .filter( - (p): p is { type: 'text'; text: string } => - typeof p === 'object' && - p !== null && - 'type' in p && - p.type === 'text' && - 'text' in p && - typeof p.text === 'string', - ) - .map((p) => p.text) + .flatMap((p) => { + const parsed = textContentPartSchema.safeParse(p); + return parsed.success ? [parsed.data.text] : []; + }) .join(''); } function extractReasoningFromParts(parts: unknown[]): string { return parts - .filter( - (p): p is { type: 'reasoning'; text: string } => - typeof p === 'object' && - p !== null && - 'type' in p && - p.type === 'reasoning' && - 'text' in p && - typeof p.text === 'string', - ) - .map((p) => p.text) + .flatMap((p) => { + const parsed = reasoningContentPartSchema.safeParse(p); + return parsed.success ? [parsed.data.text] : []; + }) .join(''); } @@ -99,28 +105,47 @@ function extractParts(content: unknown): StoredContentPart[] | undefined { } function isStoredContentPart(value: unknown): value is StoredContentPart { - return typeof value === 'object' && value !== null && 'type' in value; + return contentPartSchema.safeParse(value).success; +} + +function toRecord(value: unknown): Record { + return typeof value === 'object' && value !== null && !Array.isArray(value) + ? (value as Record) + : {}; } function nativeToolPartToInvocation(part: StoredContentPart): StoredToolInvocation | undefined { - if (part.type === 'tool-call' && part.toolCallId && part.toolName) { - return { - state: 'call', - toolCallId: part.toolCallId, - toolName: part.toolName, - args: part.input ?? {}, - }; - } - if (part.type === 'tool-result' && part.toolCallId && part.toolName) { + if (part.type !== 'tool-call') return undefined; + + const parsed = toolCallContentPartSchema.safeParse(part); + if (!parsed.success) return undefined; + const toolCall = parsed.data; + + const args = toRecord(toolCall.input); + if (toolCall.state === 'resolved') { return { state: 'result', - toolCallId: part.toolCallId, - toolName: part.toolName, - args: part.input ?? {}, - result: part.result, + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args, + result: toolCall.output, }; } - return undefined; + if (toolCall.state === 'rejected') { + return { + state: 'result', + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args, + error: toolCall.error, + }; + } + return { + state: 'call', + toolCallId: toolCall.toolCallId, + toolName: toolCall.toolName, + args, + }; } function extractToolInvocations(content: unknown): StoredToolInvocation[] { @@ -140,6 +165,7 @@ function buildToolCallState(invocation: StoredToolInvocation): InstanceAiToolCal toolName: invocation.toolName, args: invocation.args, result: isCompleted ? invocation.result : undefined, + error: isCompleted ? invocation.error : undefined, isLoading: !isCompleted, renderHint: getRenderHint(invocation.toolName), }; @@ -160,7 +186,7 @@ function buildTimeline( for (const part of parts) { if (part.type === 'text' && part.text) { timeline.push({ type: 'text', content: part.text }); - } else if ((part.type === 'tool-call' || part.type === 'tool-result') && part.toolCallId) { + } else if (part.type === 'tool-call' && part.toolCallId) { timeline.push({ type: 'tool-call', toolCallId: part.toolCallId }); } } @@ -262,9 +288,19 @@ export function parseStoredMessages( // orphan snapshots before, between, or after assistant rows. let nextSnapshotIdx = 0; const consumedSnapshots = new Set(); + // Messages whose `agentTree` originated from a snapshot (as opposed to + // being synthesized by `buildFlatAgentTree`). Used by the dedupe pass to + // prefer transferring snapshot trees forward in the in-flight HITL case. + const messagesWithSnapshotTree = new Set(); let lastUserMessageId: string | undefined; + function pushSnapshotMessage(snapshot: AgentTreeSnapshot): void { + const built = buildSnapshotMessage(snapshot); + messagesWithSnapshotTree.add(built); + messages.push(built); + } + function appendChronologicalOrphansBefore(message: ConversationStoredMessage): void { const messageTimestamp = messageCreatedAtMs(message); while (nextSnapshotIdx < snapshotList.length) { @@ -273,7 +309,7 @@ export function parseStoredMessages( if (snapshotTimestamp === undefined || snapshotTimestamp >= messageTimestamp) return; consumedSnapshots.add(snapshot); - messages.push(buildSnapshotMessage(snapshot)); + pushSnapshotMessage(snapshot); nextSnapshotIdx++; } } @@ -343,7 +379,7 @@ export function parseStoredMessages( ? buildFlatAgentTree(text, reasoning, toolCalls, parts) : undefined); - messages.push({ + const assistantMessage: InstanceAiMessage = { id: msg.id, runId, messageGroupId: snapshot?.messageGroupId, @@ -354,7 +390,9 @@ export function parseStoredMessages( reasoning, isStreaming: false, agentTree, - }); + }; + if (snapshot) messagesWithSnapshotTree.add(assistantMessage); + messages.push(assistantMessage); continue; } @@ -364,7 +402,7 @@ export function parseStoredMessages( for (const snapshot of snapshots ?? []) { if (consumedSnapshots.has(snapshot)) continue; - messages.push(buildSnapshotMessage(snapshot)); + pushSnapshotMessage(snapshot); } // Propagate messageGroupId across assistant rows in the same conversational @@ -384,15 +422,35 @@ export function parseStoredMessages( // Deduplicate assistant messages by messageGroupId. // Follow-up runs in the same group produce separate DB rows; keep only // the latest (which carries the full runIds array and complete tree). - const seen = new Set(); + // + // In-flight HITL turns are different: the snapshot is paired with a + // *middle* checkpoint message via timestamp matching, and the latest + // message in the turn has only an auto-generated flat tree from + // `buildFlatAgentTree`. Keeping just the latest would drop the + // snapshot's tree (including its live confirmation cards), so transfer + // the snapshot's `agentTree` + `runIds` onto the kept message when the + // kept one's tree didn't come from a snapshot. + const keptIndexByGid = new Map(); + const toRemove = new Set(); for (let i = messages.length - 1; i >= 0; i--) { const gid = messages[i].messageGroupId; if (!gid) continue; - if (seen.has(gid)) { - messages.splice(i, 1); - } else { - seen.add(gid); + const keptIdx = keptIndexByGid.get(gid); + if (keptIdx === undefined) { + keptIndexByGid.set(gid, i); + continue; } + const kept = messages[keptIdx]; + const candidate = messages[i]; + if (!messagesWithSnapshotTree.has(kept) && messagesWithSnapshotTree.has(candidate)) { + kept.agentTree = candidate.agentTree; + kept.runIds = candidate.runIds; + messagesWithSnapshotTree.add(kept); + } + toRemove.add(i); + } + for (let i = messages.length - 1; i >= 0; i--) { + if (toRemove.has(i)) messages.splice(i, 1); } return messages; @@ -435,3 +493,67 @@ function propagateMessageGroupIdWithinRange( } } } + +/** Pull every confirmation requestId out of the parsed messages' agent trees. */ +/** + * A confirmation card is "actionable" only while the user can still respond to + * it: the tool call is in-flight and no terminal status has been recorded. + * Once approved/denied (or otherwise settled) the card is historical — its + * pending-confirmation row is gone after claim/delete, but that absence means + * "resolved", not "expired". + */ +function isActionableConfirmation(tc: InstanceAiToolCallState): boolean { + return ( + tc.confirmation !== undefined && + tc.isLoading && + tc.confirmationStatus !== 'approved' && + tc.confirmationStatus !== 'denied' + ); +} + +export function collectConfirmationRequestIds(messages: InstanceAiMessage[]): string[] { + const requestIds: string[] = []; + for (const message of messages) { + if (!message.agentTree) continue; + walkAgentNodes(message.agentTree, (node) => { + for (const tc of node.toolCalls) { + const { confirmation } = tc; + if (!confirmation || !isActionableConfirmation(tc)) continue; + requestIds.push(confirmation.requestId); + } + }); + } + return requestIds; +} + +/** + * Flip `confirmation.expired = true` on still-actionable cards whose + * pending-confirmation row is no longer live. Settled cards (approved/denied, + * or no longer loading) are left untouched — their row is also gone, but that + * means "resolved", not "expired", so relabeling them would rewrite history. + */ +export function markExpiredConfirmations( + messages: InstanceAiMessage[], + liveRequestIds: Set, +): void { + for (const message of messages) { + if (!message.agentTree) continue; + walkAgentNodes(message.agentTree, (node) => { + for (const tc of node.toolCalls) { + const { confirmation } = tc; + if (!confirmation || !isActionableConfirmation(tc)) continue; + if (!liveRequestIds.has(confirmation.requestId)) { + confirmation.expired = true; + } + } + }); + } +} + +function walkAgentNodes( + node: InstanceAiAgentNode, + visit: (node: InstanceAiAgentNode) => void, +): void { + visit(node); + for (const child of node.children) walkAgentNodes(child, visit); +} diff --git a/packages/cli/src/modules/instance-ai/repositories/__tests__/instance-ai-pending-confirmation.repository.test.ts b/packages/cli/src/modules/instance-ai/repositories/__tests__/instance-ai-pending-confirmation.repository.test.ts index 0c9ce09693d..cd5680ddcee 100644 --- a/packages/cli/src/modules/instance-ai/repositories/__tests__/instance-ai-pending-confirmation.repository.test.ts +++ b/packages/cli/src/modules/instance-ai/repositories/__tests__/instance-ai-pending-confirmation.repository.test.ts @@ -60,9 +60,11 @@ describe('InstanceAiPendingConfirmationRepository.claim', () => { expect(result).toBe(row); expect(txRepo.findOne).toHaveBeenCalledWith({ - where: { requestId: 'req-1', userId: 'user-1' }, + where: expect.objectContaining({ requestId: 'req-1', userId: 'user-1' }), }); - expect(txRepo.delete).toHaveBeenCalledWith({ requestId: 'req-1', userId: 'user-1' }); + expect(txRepo.delete).toHaveBeenCalledWith( + expect.objectContaining({ requestId: 'req-1', userId: 'user-1' }), + ); }); it('returns undefined when no row matches the requestId+userId', async () => { @@ -96,7 +98,24 @@ describe('InstanceAiPendingConfirmationRepository.claim', () => { expect(result).toBeUndefined(); expect(txRepo.findOne).toHaveBeenCalledWith({ - where: { requestId: 'req-1', userId: 'attacker-user' }, + where: expect.objectContaining({ requestId: 'req-1', userId: 'attacker-user' }), }); }); + + it('treats expired rows as already gone — same predicate as findLiveRequestIds', async () => { + // Driver behavior: an expired row would not match the live-where + // predicate, so findOne returns null even though the row physically + // exists. The expired-prune sweep is responsible for the physical row + // — the claim path treats it as unclaimable in the meantime. + const txRepo = mock>(); + txRepo.findOne.mockResolvedValueOnce(null); + const { repo } = buildRepoWithTxRepo(txRepo); + + const result = await repo.claim('req-expired', 'user-1'); + + expect(result).toBeUndefined(); + expect(txRepo.delete).not.toHaveBeenCalled(); + const where = (txRepo.findOne.mock.calls[0][0] as { where: Record }).where; + expect(where).toHaveProperty('expiresAt'); + }); }); diff --git a/packages/cli/src/modules/instance-ai/repositories/instance-ai-checkpoint.repository.ts b/packages/cli/src/modules/instance-ai/repositories/instance-ai-checkpoint.repository.ts index 01b81ca528e..3367f9607e3 100644 --- a/packages/cli/src/modules/instance-ai/repositories/instance-ai-checkpoint.repository.ts +++ b/packages/cli/src/modules/instance-ai/repositories/instance-ai-checkpoint.repository.ts @@ -23,4 +23,21 @@ export class InstanceAiCheckpointRepository extends Repository { + const row = await this.findOne({ + where: { resourceId, expiredAt: IsNull() }, + order: { createdAt: 'DESC' }, + }); + return row ?? undefined; + } } diff --git a/packages/cli/src/modules/instance-ai/repositories/instance-ai-pending-confirmation.repository.ts b/packages/cli/src/modules/instance-ai/repositories/instance-ai-pending-confirmation.repository.ts index fd911051b0a..6609c84449f 100644 --- a/packages/cli/src/modules/instance-ai/repositories/instance-ai-pending-confirmation.repository.ts +++ b/packages/cli/src/modules/instance-ai/repositories/instance-ai-pending-confirmation.repository.ts @@ -1,5 +1,5 @@ import { Service } from '@n8n/di'; -import { DataSource, LessThan, Repository } from '@n8n/typeorm'; +import { DataSource, In, IsNull, LessThan, MoreThanOrEqual, Or, Repository } from '@n8n/typeorm'; import { InstanceAiPendingConfirmation } from '../entities/instance-ai-pending-confirmation.entity'; @@ -15,7 +15,8 @@ export class InstanceAiPendingConfirmationRepository extends Repository { return await this.manager.transaction(async (manager) => { const repo = manager.getRepository(InstanceAiPendingConfirmation); + const now = new Date(); + const liveWhere = { + requestId, + userId, + expiresAt: Or(IsNull(), MoreThanOrEqual(now)), + }; const row = await repo.findOne({ - where: { requestId, userId }, + where: liveWhere, ...(manager.connection.options.type === 'postgres' ? { lock: { mode: 'pessimistic_write' as const } } : {}), }); if (!row) return undefined; - const result = await repo.delete({ requestId, userId }); + const result = await repo.delete(liveWhere); if (result.affected === 0) return undefined; return row; }); @@ -61,4 +68,18 @@ export class InstanceAiPendingConfirmationRepository extends Repository { return await this.find({ where: { threadId } }); } + + /** Of the given request IDs, return those still actionable (row exists and + * not past `expiresAt`). The complement is treated as expired by the UI. */ + async findLiveRequestIds(requestIds: string[], now: Date): Promise> { + if (requestIds.length === 0) return new Set(); + const rows = await this.find({ + where: { + requestId: In(requestIds), + expiresAt: Or(IsNull(), MoreThanOrEqual(now)), + }, + select: ['requestId'], + }); + return new Set(rows.map((row) => row.requestId)); + } } diff --git a/packages/cli/src/modules/instance-ai/storage/__tests__/typeorm-agent-checkpoint-store.test.ts b/packages/cli/src/modules/instance-ai/storage/__tests__/typeorm-agent-checkpoint-store.test.ts index 7faf95bb896..8986ba85ac0 100644 --- a/packages/cli/src/modules/instance-ai/storage/__tests__/typeorm-agent-checkpoint-store.test.ts +++ b/packages/cli/src/modules/instance-ai/storage/__tests__/typeorm-agent-checkpoint-store.test.ts @@ -159,4 +159,55 @@ describe('TypeORMAgentCheckpointStore', () => { await expect(store.deleteOlderThan(new Date(0))).resolves.toBe(7); expect(spy).toHaveBeenCalledTimes(1); }); + + describe('findSuspendedSubAgentResumeInfo', () => { + it('picks the suspended tool call when parallel non-suspended ones are present', async () => { + const state = makeState({ + pendingToolCalls: { + 'tc-finished': { + toolCallId: 'tc-finished', + toolName: 'list-tools', + input: {}, + suspended: false, + }, + 'tc-suspended': { + toolCallId: 'tc-suspended', + toolName: 'ask-user', + input: {}, + suspended: true, + suspendPayload: {}, + resumeSchema: {}, + runId: 'inner-run', + }, + }, + }); + checkpointRepo.findActiveByResourceId.mockResolvedValueOnce( + makeCheckpoint({ key: 'run_outer', state }), + ); + + const info = await store.findSuspendedSubAgentResumeInfo('resource-x'); + + expect(info).toEqual({ + runId: 'run_outer', + toolCallId: 'tc-suspended', + persistence: { threadId: 'thread-1', resourceId: 'user-1' }, + }); + }); + + it('returns undefined when no entry in pendingToolCalls is suspended', async () => { + const state = makeState({ + pendingToolCalls: { + 'tc-1': { + toolCallId: 'tc-1', + toolName: 'list-tools', + input: {}, + suspended: false, + }, + }, + }); + checkpointRepo.findActiveByResourceId.mockResolvedValueOnce(makeCheckpoint({ state })); + + await expect(store.findSuspendedSubAgentResumeInfo('resource-x')).resolves.toBeUndefined(); + }); + }); }); diff --git a/packages/cli/src/modules/instance-ai/storage/typeorm-agent-checkpoint-store.ts b/packages/cli/src/modules/instance-ai/storage/typeorm-agent-checkpoint-store.ts index c3238f31575..aaf365bd06a 100644 --- a/packages/cli/src/modules/instance-ai/storage/typeorm-agent-checkpoint-store.ts +++ b/packages/cli/src/modules/instance-ai/storage/typeorm-agent-checkpoint-store.ts @@ -79,6 +79,41 @@ export class TypeORMAgentCheckpointStore implements CheckpointStore { return await this.markExpiredOlderThan(olderThan); } + /** + * Look up the most recent suspended sub-agent run for a given resourceId + * and pull the info needed to resume it. Used by the orchestrator's + * cascade-suspend path: when the `plan` tool resumes, it needs the + * planner sub-agent's `runId` + `toolCallId` (to call `subAgent.resume`) + * + its `persistence` (so the resumed call reuses the same sub-agent + * thread and resourceId the original used). The sub-agent's resourceId + * is deterministically derived from the parent thread + agent kind, so + * the caller can compute the lookup key without stashing anything. + */ + async findSuspendedSubAgentResumeInfo(resourceId: string): Promise< + | { + runId: string; + toolCallId: string; + persistence: { threadId: string; resourceId: string }; + } + | undefined + > { + const row = await this.checkpointRepo.findActiveByResourceId(resourceId); + if (!row?.state) return undefined; + // `pendingToolCalls` can hold parallel tool calls from one turn, only + // some of which suspended. Pick the suspended entry explicitly so we + // don't try to resume a tool that ran to completion in the same batch. + const suspendedEntry = Object.entries(row.state.pendingToolCalls ?? {}).find( + ([, call]) => call.suspended, + ); + const persistence = row.state.persistence; + if (!suspendedEntry || !persistence?.threadId || !persistence.resourceId) return undefined; + return { + runId: row.key, + toolCallId: suspendedEntry[0], + persistence: { threadId: persistence.threadId, resourceId: persistence.resourceId }, + }; + } + /** Drop expired tombstones outright once they're past the GC horizon. */ async hardDeleteExpiredOlderThan(olderThan: Date): Promise { const result = await this.checkpointRepo.delete({ diff --git a/packages/frontend/@n8n/i18n/src/locales/en.json b/packages/frontend/@n8n/i18n/src/locales/en.json index 208ea20fa90..357cdd981ca 100644 --- a/packages/frontend/@n8n/i18n/src/locales/en.json +++ b/packages/frontend/@n8n/i18n/src/locales/en.json @@ -5768,6 +5768,8 @@ "instanceAi.backgroundTask.failed": "Background task failed", "instanceAi.planReview.title": "Review plan", "instanceAi.planReview.titleResolved": "Plan", + "instanceAi.planReview.titleExpired": "Plan (expired)", + "instanceAi.planReview.expiredHint": "This plan can no longer be approved. Send a new message to continue.", "instanceAi.planReview.building": "Building plan...", "instanceAi.planReview.awaitingApproval": "Awaiting approval", "instanceAi.planReview.description": "Review the plan, then approve it to start building or request changes to revise it.", diff --git a/packages/frontend/editor-ui/src/features/ai/instanceAi/components/AgentTimeline.vue b/packages/frontend/editor-ui/src/features/ai/instanceAi/components/AgentTimeline.vue index 87a47e1fecb..c81f77073e8 100644 --- a/packages/frontend/editor-ui/src/features/ai/instanceAi/components/AgentTimeline.vue +++ b/packages/frontend/editor-ui/src/features/ai/instanceAi/components/AgentTimeline.vue @@ -223,10 +223,22 @@ function handlePlanDeny(tc: InstanceAiToolCallState) { void thread.confirmAction(requestId, { kind: 'planDeny' }); } -/** Find the latest plan-review confirmation from a planner child's submit-plan tool call. - * Prefers pending (isLoading) over resolved — handles revision loops where - * multiple submit-plan calls exist. */ +/** Find the plan-review confirmation for this turn. Two shapes coexist: + * + * 1. Cascade flow (this feature): the planner sub-agent's submit-plan + * confirmation is captured-not-published, so it cascades up onto the + * orchestrator's own `plan` tool call. + * 2. Direct flow: the planner child's submit-plan tool call carries it. + * + * Check the orchestrator's own tool calls first (the cascade case), then fall + * back to the planner child. Prefers pending (isLoading) over resolved to + * handle revision loops where multiple submit-plan calls exist. */ const plannerConfirmation = computed(() => { + const onOrchestrator = props.agentNode.toolCalls.find( + (tc) => tc.confirmation?.inputType === 'plan-review', + ); + if (onOrchestrator) return onOrchestrator; + let latest: InstanceAiToolCallState | undefined; for (const child of props.agentNode.children) { if (child.role !== 'planner') continue; @@ -240,6 +252,17 @@ const plannerConfirmation = computed(() => return latest; }); +/** True when a planner sub-agent was spawned for this orchestrator turn. The + * cascade flow leaves the plan-review confirmation on the orchestrator's own + * `plan` tool call AND the planner child renders its own card, so without + * this guard the tool-call slot and the post-AgentSection slot both draw a + * plan card (one interactive, one loading). Suppress the tool-call slot when + * a planner child exists — the post-AgentSection slot is the canonical render + * and shows the planner's step list above the card. */ +const hasPlannerChild = computed(() => + props.agentNode.children.some((c) => c.role === 'planner'), +); + /** Map simplified TaskList items to PlannedTaskArg shape for loading preview */ function mapTaskItemsToPlannedTasks(tasks?: TaskList): PlannedTaskArg[] | undefined { if (!tasks?.tasks?.length) return undefined; @@ -291,14 +314,20 @@ function mapTaskItemsToPlannedTasks(tasks?: TaskList): PlannedTaskArg[] | undefi @@ -259,6 +272,11 @@ function handleDeny() { max-width: 90%; } +.expiredHint { + padding: var(--spacing--xs) var(--spacing--sm); + border-top: var(--border); +} + .header { display: flex; align-items: center; diff --git a/packages/frontend/editor-ui/src/features/ai/instanceAi/instanceAi.threadRuntime.ts b/packages/frontend/editor-ui/src/features/ai/instanceAi/instanceAi.threadRuntime.ts index 06f359a2fca..c8141e14699 100644 --- a/packages/frontend/editor-ui/src/features/ai/instanceAi/instanceAi.threadRuntime.ts +++ b/packages/frontend/editor-ui/src/features/ai/instanceAi/instanceAi.threadRuntime.ts @@ -79,6 +79,11 @@ function collectPendingConfirmations( tc.confirmationStatus !== 'approved' && tc.confirmationStatus !== 'denied' && !resolved.has(tc.confirmation.requestId) && + // Expired cards render as a terminal "this action has expired" state + // in their inline slot; surfacing them in the floating/inline panel + // would block the chat input on a confirmation the user can no + // longer act on. + !tc.confirmation.expired && // Plan review renders inline in the timeline, not in the confirmation panel tc.confirmation.inputType !== 'plan-review' ) {