From a0b616073bc1cefd484359bebf4a6302fd5fb3c3 Mon Sep 17 00:00:00 2001 From: Jaakko Husso Date: Tue, 2 Jun 2026 15:28:07 +0300 Subject: [PATCH] refactor(core): Split planner run into briefing + coordinator modules (#31458) Co-authored-by: Claude Opus 4.7 --- ....tool.test.ts => planner-briefing.test.ts} | 219 +--- .../__tests__/planner-run-coordinator.test.ts | 180 ++++ .../orchestration/plan-with-agent.tool.ts | 984 +----------------- .../tools/orchestration/planner-briefing.ts | 510 +++++++++ .../orchestration/planner-run-coordinator.ts | 521 ++++++++++ 5 files changed, 1243 insertions(+), 1171 deletions(-) rename packages/@n8n/instance-ai/src/tools/orchestration/__tests__/{plan-with-agent.tool.test.ts => planner-briefing.test.ts} (53%) create mode 100644 packages/@n8n/instance-ai/src/tools/orchestration/__tests__/planner-run-coordinator.test.ts create mode 100644 packages/@n8n/instance-ai/src/tools/orchestration/planner-briefing.ts create mode 100644 packages/@n8n/instance-ai/src/tools/orchestration/planner-run-coordinator.ts diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/plan-with-agent.tool.test.ts b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/planner-briefing.test.ts similarity index 53% rename from packages/@n8n/instance-ai/src/tools/orchestration/__tests__/plan-with-agent.tool.test.ts rename to packages/@n8n/instance-ai/src/tools/orchestration/__tests__/planner-briefing.test.ts index 4ca7aaefe42..9844a6f4719 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/plan-with-agent.tool.test.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/planner-briefing.test.ts @@ -1,195 +1,10 @@ -import type { - OrchestrationContext, - PlannedTaskGraph, - PlannedTaskRecord, - PlannedTaskService, -} from '../../../types'; -import { BlueprintAccumulator } from '../blueprint-accumulator'; - -const { - __testBuildPlannerBriefingContext, - __testClearPlannedTaskGraph, - __testFormatMessagesForBriefing, - __testGetRecentMessages, - __testGetPriorToolObservations, - __testRehydrateAccumulatorFromGraph, -} = - // eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/consistent-type-imports - require('../plan-with-agent.tool') as typeof import('../plan-with-agent.tool'); - -function makeContext(overrides: { - graph: PlannedTaskGraph | null; - runId?: string; -}): { - context: OrchestrationContext; - clear: jest.Mock; - getGraph: jest.Mock; -} { - const clear = jest.fn(async () => { - await Promise.resolve(); - }); - const getGraph = jest.fn(async () => { - await Promise.resolve(); - return overrides.graph; - }); - const plannedTaskService: Partial = { - getGraph, - clear, - }; - const context = { - threadId: 't-1', - runId: overrides.runId ?? 'run-current', - plannedTaskService: plannedTaskService as PlannedTaskService, - } as unknown as OrchestrationContext; - return { context, clear, getGraph }; -} - -describe('clearPlannedTaskGraph', () => { - it('clears the graph when it belongs to this run and is awaiting approval', async () => { - const { context, clear } = makeContext({ - graph: { - planRunId: 'run-current', - status: 'awaiting_approval', - tasks: [], - }, - }); - - await __testClearPlannedTaskGraph(context); - - expect(clear).toHaveBeenCalledWith('t-1'); - }); - - it('does not clear an active graph from a prior approved plan', async () => { - // A previous `/plan` call already succeeded; its graph is `active` with - // pending checkpoints. A new planner error must not wipe it. - const { context, clear } = makeContext({ - graph: { - planRunId: 'run-previous', - status: 'active', - tasks: [], - }, - }); - - await __testClearPlannedTaskGraph(context); - - expect(clear).not.toHaveBeenCalled(); - }); - - it('does not clear an awaiting-approval graph that was created by a different planner run', async () => { - // Defensive: a concurrent plan for a different run should not have its - // unapproved graph wiped by this run's error-path cleanup. - const { context, clear } = makeContext({ - graph: { - planRunId: 'run-other', - status: 'awaiting_approval', - tasks: [], - }, - }); - - await __testClearPlannedTaskGraph(context); - - expect(clear).not.toHaveBeenCalled(); - }); - - it('is a no-op when no graph exists', async () => { - const { context, clear, getGraph } = makeContext({ graph: null }); - - await __testClearPlannedTaskGraph(context); - - expect(getGraph).toHaveBeenCalled(); - expect(clear).not.toHaveBeenCalled(); - }); - - it('swallows getGraph errors so the caller can return its own error', async () => { - const { context, getGraph } = makeContext({ - graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: [] }, - }); - getGraph.mockRejectedValueOnce(new Error('db down')); - - await expect(__testClearPlannedTaskGraph(context)).resolves.toBeUndefined(); - }); -}); - -describe('rehydrateAccumulatorFromGraph (resume revision flow)', () => { - const persistedTasks: PlannedTaskRecord[] = [ - { - id: 'wf-1', - title: "Build 'A' workflow", - kind: 'build-workflow', - spec: 'A', - deps: [], - status: 'planned', - }, - { - id: 'wf-2', - title: "Build 'B' workflow", - kind: 'build-workflow', - spec: 'B', - deps: [], - status: 'planned', - }, - ]; - - it('seeds the accumulator from an awaiting-approval graph so a revision keeps originals', async () => { - // Reproduces "ask for edits -> revise existing plan -> submit again": - // on resume the parent rebuilt a fresh accumulator; without rehydration - // the planner's remove/add would operate on an empty plan and the - // re-submit would drop every original item. - const { context } = makeContext({ - graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks }, - }); - const accumulator = new BlueprintAccumulator(); - - await __testRehydrateAccumulatorFromGraph(context, accumulator); - - // Planner revises: drop one original, add a new one, then resubmits. - expect(accumulator.removeItem('wf-2')).toBe(true); - accumulator.addItem({ - kind: 'workflow', - id: 'wf-3', - name: 'C', - purpose: 'C', - integrations: [], - dependsOn: [], - }); - - expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-3']); - }); - - it('does not reopen an already-approved/active graph', async () => { - const { context } = makeContext({ - graph: { planRunId: 'run-current', status: 'active', tasks: persistedTasks }, - }); - const accumulator = new BlueprintAccumulator(); - - await __testRehydrateAccumulatorFromGraph(context, accumulator); - - expect(accumulator.isEmpty()).toBe(true); - }); - - it('is a no-op when no graph exists', async () => { - const { context, getGraph } = makeContext({ graph: null }); - const accumulator = new BlueprintAccumulator(); - - await __testRehydrateAccumulatorFromGraph(context, accumulator); - - expect(getGraph).toHaveBeenCalledWith('t-1'); - expect(accumulator.isEmpty()).toBe(true); - }); - - it('leaves the accumulator empty when getGraph throws', async () => { - const { context, getGraph } = makeContext({ - graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks }, - }); - getGraph.mockRejectedValueOnce(new Error('db down')); - const accumulator = new BlueprintAccumulator(); - - await expect( - __testRehydrateAccumulatorFromGraph(context, accumulator), - ).resolves.toBeUndefined(); - expect(accumulator.isEmpty()).toBe(true); - }); -}); +import type { OrchestrationContext } from '../../../types'; +import { + buildPlannerBriefingContext, + formatMessagesForBriefing, + getPriorToolObservations, + getRecentMessages, +} from '../planner-briefing'; describe('formatMessagesForBriefing', () => { // The planner system prompt (plan-agent-prompt.ts) treats @@ -197,7 +12,7 @@ describe('formatMessagesForBriefing', () => { // both. Emitting only one drops half the contract. it('emits alongside when a zone is provided', () => { - const briefing = __testFormatMessagesForBriefing( + const briefing = formatMessagesForBriefing( [{ role: 'user', content: 'schedule me a daily digest' }], undefined, 'America/New_York', @@ -208,14 +23,14 @@ describe('formatMessagesForBriefing', () => { }); it('still emits when no zone is provided', () => { - const briefing = __testFormatMessagesForBriefing([], undefined, undefined); + const briefing = formatMessagesForBriefing([], undefined, undefined); expect(briefing).toMatch(/[^<]+<\/current-datetime>/); expect(briefing).not.toContain(''); }); it('renders already-collected answers and discovered resources as dedicated sections', () => { - const briefing = __testFormatMessagesForBriefing( + const briefing = formatMessagesForBriefing( [{ role: 'user', content: 'Build a Slack to-do agent' }], undefined, 'America/New_York', @@ -238,7 +53,7 @@ describe('formatMessagesForBriefing', () => { describe('buildPlannerBriefingContext', () => { it('extracts ask-user answers and credential selections from prior tool results', () => { - const context = __testBuildPlannerBriefingContext([ + const context = buildPlannerBriefingContext([ { toolName: 'credentials', args: { action: 'list' }, @@ -290,7 +105,7 @@ describe('buildPlannerBriefingContext', () => { }); it('ignores unanswered and skipped ask-user answers', () => { - const context = __testBuildPlannerBriefingContext([ + const context = buildPlannerBriefingContext([ { toolName: 'ask-user', args: { @@ -399,7 +214,7 @@ describe('getPriorToolObservations', () => { }, } as unknown as OrchestrationContext; - const observations = __testGetPriorToolObservations(context); + const observations = getPriorToolObservations(context); expect(getEventsForRuns).toHaveBeenCalledWith('thread-1', ['run-prior', 'run-current']); expect(getEventsForRun).not.toHaveBeenCalled(); @@ -436,9 +251,7 @@ describe('getPriorToolObservations', () => { }, } as unknown as OrchestrationContext; - expect(__testGetPriorToolObservations(context)).toEqual([ - { toolName: 'credentials', args, result }, - ]); + expect(getPriorToolObservations(context)).toEqual([{ toolName: 'credentials', args, result }]); }); it('returns no observations when event lookup fails', () => { @@ -452,7 +265,7 @@ describe('getPriorToolObservations', () => { }, } as unknown as OrchestrationContext; - expect(__testGetPriorToolObservations(context)).toEqual([]); + expect(getPriorToolObservations(context)).toEqual([]); }); }); @@ -468,7 +281,7 @@ describe('getRecentMessages', () => { }, } as unknown as OrchestrationContext; - const messages = await __testGetRecentMessages(context, 5); + const messages = await getRecentMessages(context, 5); expect(messages).toEqual([{ role: 'user', content: 'Build a Slack to-do agent' }]); }); diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/planner-run-coordinator.test.ts b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/planner-run-coordinator.test.ts new file mode 100644 index 00000000000..2f9661eb82e --- /dev/null +++ b/packages/@n8n/instance-ai/src/tools/orchestration/__tests__/planner-run-coordinator.test.ts @@ -0,0 +1,180 @@ +import type { + OrchestrationContext, + PlannedTaskGraph, + PlannedTaskRecord, + PlannedTaskService, +} from '../../../types'; +import { BlueprintAccumulator } from '../blueprint-accumulator'; +import { clearPlannedTaskGraph, rehydrateAccumulatorFromGraph } from '../planner-run-coordinator'; + +function makeContext(overrides: { + graph: PlannedTaskGraph | null; + runId?: string; +}): { + context: OrchestrationContext; + clear: jest.Mock; + getGraph: jest.Mock; +} { + const clear = jest.fn(async () => { + await Promise.resolve(); + }); + const getGraph = jest.fn(async () => { + await Promise.resolve(); + return overrides.graph; + }); + const plannedTaskService: Partial = { + getGraph, + clear, + }; + const context = { + threadId: 't-1', + runId: overrides.runId ?? 'run-current', + plannedTaskService: plannedTaskService as PlannedTaskService, + } as unknown as OrchestrationContext; + return { context, clear, getGraph }; +} + +describe('clearPlannedTaskGraph', () => { + it('clears the graph when it belongs to this run and is awaiting approval', async () => { + const { context, clear } = makeContext({ + graph: { + planRunId: 'run-current', + status: 'awaiting_approval', + tasks: [], + }, + }); + + await clearPlannedTaskGraph(context); + + expect(clear).toHaveBeenCalledWith('t-1'); + }); + + it('does not clear an active graph from a prior approved plan', async () => { + // A previous `/plan` call already succeeded; its graph is `active` with + // pending checkpoints. A new planner error must not wipe it. + const { context, clear } = makeContext({ + graph: { + planRunId: 'run-previous', + status: 'active', + tasks: [], + }, + }); + + await clearPlannedTaskGraph(context); + + expect(clear).not.toHaveBeenCalled(); + }); + + it('does not clear an awaiting-approval graph that was created by a different planner run', async () => { + // Defensive: a concurrent plan for a different run should not have its + // unapproved graph wiped by this run's error-path cleanup. + const { context, clear } = makeContext({ + graph: { + planRunId: 'run-other', + status: 'awaiting_approval', + tasks: [], + }, + }); + + await clearPlannedTaskGraph(context); + + expect(clear).not.toHaveBeenCalled(); + }); + + it('is a no-op when no graph exists', async () => { + const { context, clear, getGraph } = makeContext({ graph: null }); + + await clearPlannedTaskGraph(context); + + expect(getGraph).toHaveBeenCalled(); + expect(clear).not.toHaveBeenCalled(); + }); + + it('swallows getGraph errors so the caller can return its own error', async () => { + const { context, getGraph } = makeContext({ + graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: [] }, + }); + getGraph.mockRejectedValueOnce(new Error('db down')); + + await expect(clearPlannedTaskGraph(context)).resolves.toBeUndefined(); + }); +}); + +describe('rehydrateAccumulatorFromGraph (resume revision flow)', () => { + const persistedTasks: PlannedTaskRecord[] = [ + { + id: 'wf-1', + title: "Build 'A' workflow", + kind: 'build-workflow', + spec: 'A', + deps: [], + status: 'planned', + }, + { + id: 'wf-2', + title: "Build 'B' workflow", + kind: 'build-workflow', + spec: 'B', + deps: [], + status: 'planned', + }, + ]; + + it('seeds the accumulator from an awaiting-approval graph so a revision keeps originals', async () => { + // Reproduces "ask for edits -> revise existing plan -> submit again": + // on resume the parent rebuilt a fresh accumulator; without rehydration + // the planner's remove/add would operate on an empty plan and the + // re-submit would drop every original item. + const { context } = makeContext({ + graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks }, + }); + const accumulator = new BlueprintAccumulator(); + + await rehydrateAccumulatorFromGraph(context, accumulator); + + // Planner revises: drop one original, add a new one, then resubmits. + expect(accumulator.removeItem('wf-2')).toBe(true); + accumulator.addItem({ + kind: 'workflow', + id: 'wf-3', + name: 'C', + purpose: 'C', + integrations: [], + dependsOn: [], + }); + + expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-3']); + }); + + it('does not reopen an already-approved/active graph', async () => { + const { context } = makeContext({ + graph: { planRunId: 'run-current', status: 'active', tasks: persistedTasks }, + }); + const accumulator = new BlueprintAccumulator(); + + await rehydrateAccumulatorFromGraph(context, accumulator); + + expect(accumulator.isEmpty()).toBe(true); + }); + + it('is a no-op when no graph exists', async () => { + const { context, getGraph } = makeContext({ graph: null }); + const accumulator = new BlueprintAccumulator(); + + await rehydrateAccumulatorFromGraph(context, accumulator); + + expect(getGraph).toHaveBeenCalledWith('t-1'); + expect(accumulator.isEmpty()).toBe(true); + }); + + it('leaves the accumulator empty when getGraph throws', async () => { + const { context, getGraph } = makeContext({ + graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks }, + }); + getGraph.mockRejectedValueOnce(new Error('db down')); + const accumulator = new BlueprintAccumulator(); + + await expect(rehydrateAccumulatorFromGraph(context, accumulator)).resolves.toBeUndefined(); + expect(accumulator.isEmpty()).toBe(true); + }); +}); diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts b/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts index f6aeb0bcd02..57928217494 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/plan-with-agent.tool.ts @@ -12,672 +12,11 @@ * It can also ask the user questions directly via the ask-user tool. */ -import { Agent, Tool } from '@n8n/agents'; -import type { InstanceAiEvent } from '@n8n/api-types'; -import { DateTime } from 'luxon'; +import { Tool } from '@n8n/agents'; import { z } from 'zod'; -import { createAddPlanItemTool, createRemovePlanItemTool } from './add-plan-item.tool'; -import { createSubAgentPersistence } from './agent-persistence'; -import { BlueprintAccumulator } from './blueprint-accumulator'; -import { truncateLabel } from './display-utils'; -import { PLANNER_AGENT_PROMPT } from './plan-agent-prompt'; -import { createSubmitPlanTool } from './submit-plan.tool'; -import { - failTraceRun, - finishTraceRun, - startSubAgentTrace, - traceSubAgentTools, - withTraceRun, -} from './tracing-utils'; -import { attachRuntimeWorkspaceCapabilities } from '../../agent/runtime-workspace'; -import { MAX_STEPS } from '../../constants/max-steps'; -import { consumeStreamCascading } from '../../stream/consume-with-hitl'; -import type { ConsumeStreamCascadingResult } from '../../stream/consume-with-hitl'; -import { createToolRegistry, toolRegistryKeys, toolRegistryValues } from '../../tool-registry'; -import { buildAgentTraceInputs, mergeTraceRunInputs } from '../../tracing/langsmith-tracing'; +import { PlannerRunCoordinator, planToolSuspendSchema } from './planner-run-coordinator'; import type { OrchestrationContext } from '../../types'; -import { resumeAgentStream } from '../../utils/stream-helpers'; -import { CREDENTIALS_TOOL_ID } from '../credentials.tool'; -import { DATA_TABLES_TOOL_ID } from '../data-tables.tool'; -import { ASK_USER_TOOL_ID } from '../shared/ask-user.tool'; -import { createTemplatesTool } from '../templates.tool'; - -/** Number of recent thread messages to include as planner context. */ -const MESSAGE_HISTORY_COUNT = 5; - -/** Read-only discovery tools the planner gets from domainTools. */ -const PLANNER_DOMAIN_TOOL_NAMES = [ - 'nodes', - CREDENTIALS_TOOL_ID, - DATA_TABLES_TOOL_ID, - 'workflows', - ASK_USER_TOOL_ID, -]; - -/** Research tools added when available. */ -const PLANNER_RESEARCH_TOOL_NAMES = ['research']; - -const RELEVANT_PRIOR_TOOL_NAMES = new Set([ - ASK_USER_TOOL_ID, - CREDENTIALS_TOOL_ID, - DATA_TABLES_TOOL_ID, -]); - -// --------------------------------------------------------------------------- -// Message history retrieval -// --------------------------------------------------------------------------- - -interface FormattedMessage { - role: 'user' | 'assistant'; - content: string; -} - -interface PlannerBriefingContext { - collectedAnswers: string[]; - discoveredResources: string[]; -} - -interface ToolObservation { - toolName: string; - args: Record; - result: unknown; -} - -interface CredentialBrief { - id?: string; - name: string; - type: string; -} - -interface DataTableBrief { - id?: string; - name: string; -} - -/** Extract plain text from persisted native memory content. */ -function extractTextFromMemoryContent(content: unknown): string { - if (typeof content === 'string') return content; - if (Array.isArray(content)) return extractTextParts(content); - return ''; -} - -function extractTextParts(parts: unknown[]): string { - return parts - .filter( - (c): c is { type: 'text'; text: string } => - typeof c === 'object' && - c !== null && - 'type' in c && - c.type === 'text' && - 'text' in c && - typeof c.text === 'string', - ) - .map((c) => c.text) - .join('\n'); -} - -function isRecord(value: unknown): value is Record { - return typeof value === 'object' && value !== null && !Array.isArray(value); -} - -function readString(value: unknown): string | undefined { - return typeof value === 'string' && value.trim().length > 0 ? value : undefined; -} - -function readRecord(value: unknown): Record | undefined { - return isRecord(value) ? value : undefined; -} - -function readArray(value: unknown): unknown[] { - return Array.isArray(value) ? value : []; -} - -function readStringArray(value: unknown): string[] { - return readArray(value).filter((item): item is string => typeof item === 'string'); -} - -function addUnique(target: string[], seen: Set, value: string | undefined): void { - if (!value || seen.has(value)) return; - seen.add(value); - target.push(value); -} - -function summarizeList(values: string[], limit = 10): string { - const visible = values.slice(0, limit).join(', '); - const remaining = values.length - limit; - return remaining > 0 ? `${visible}, and ${remaining} more` : visible; -} - -async function getRecentMessages( - context: OrchestrationContext, - count: number, -): Promise { - const messages: FormattedMessage[] = []; - - // Retrieve previously-saved messages from memory. - if (context.memory) { - try { - const history = await context.memory.getMessages(context.threadId, { - limit: count, - }); - - for (const m of history) { - if (!('role' in m)) continue; - const role = m.role; - const content = extractTextFromMemoryContent(m.content); - if ((role === 'user' || role === 'assistant') && content.length > 0) { - messages.push({ role, content }); - } - } - } catch { - // Memory recall failed — continue with just the current message - } - } - - // Always append the current in-flight user message (not yet saved to memory) - if (shouldAppendCurrentUserMessage(messages, context.currentUserMessage)) { - messages.push({ role: 'user', content: context.currentUserMessage }); - } - - return messages; -} - -function shouldAppendCurrentUserMessage( - messages: FormattedMessage[], - currentUserMessage?: string, -): currentUserMessage is string { - const current = currentUserMessage?.trim(); - if (!current) return false; - - const lastUserMessage = [...messages].reverse().find((message) => message.role === 'user'); - return lastUserMessage?.content.trim() !== current; -} - -/** - * Reconstructs prior planner-relevant tool calls from the event stream. - * - * Tool-call and tool-result events are correlated by `toolCallId` so the - * planner can receive structured context that is not preserved in text-only - * memory recall, such as ask-user answers and credential selections. - */ -function getPriorToolObservations(context: OrchestrationContext): ToolObservation[] { - type MutableToolObservation = Omit & { - result: unknown; - hasResult: boolean; - }; - - const toolCalls = new Map(); - const pendingResults = new Map(); - - for (const event of getPriorToolEvents(context)) { - if (event.type === 'tool-call') { - const { toolCallId, toolName, args } = event.payload; - if (!RELEVANT_PRIOR_TOOL_NAMES.has(toolName)) continue; - - const pendingResult = pendingResults.get(toolCallId); - toolCalls.set(toolCallId, { - toolName, - args, - result: pendingResult, - hasResult: pendingResults.has(toolCallId), - }); - continue; - } - - if (event.type === 'tool-result') { - const { toolCallId, result } = event.payload; - const existing = toolCalls.get(toolCallId); - if (existing) { - existing.result = result; - existing.hasResult = true; - } else { - pendingResults.set(toolCallId, result); - } - } - } - - return [...toolCalls.values()] - .filter((observation) => observation.hasResult) - .map(({ toolName, args, result }) => ({ toolName, args, result })); -} - -/** - * Returns the events that may contain prior tool context for this planner run. - * - * When the run belongs to a message group, all runs in that group are searched - * so follow-up runs can see choices collected earlier in the same assistant - * turn. If grouped lookup is unavailable, this falls back to the current run. - */ -function getPriorToolEvents(context: OrchestrationContext): InstanceAiEvent[] { - if (context.messageGroupId) { - const runIds = getMessageGroupRunIds(context); - if (runIds.length > 0) { - try { - return context.eventBus.getEventsForRuns(context.threadId, runIds); - } catch { - // Fall back to the current run below. - } - } - } - - try { - return context.eventBus.getEventsForRun(context.threadId, context.runId); - } catch { - return []; - } -} - -/** - * Finds run IDs that belong to the current message group from run-start events. - * - * The event bus can fetch events for many run IDs, but the orchestration - * context only carries the current run ID and message group ID. This bridges - * those two concepts while keeping the current run as a defensive fallback. - */ -function getMessageGroupRunIds(context: OrchestrationContext): string[] { - const messageGroupId = context.messageGroupId; - if (!messageGroupId) return []; - - const runIds = new Set(); - try { - for (const { event } of context.eventBus.getEventsAfter(context.threadId, 0)) { - if (event.type === 'run-start' && event.payload.messageGroupId === messageGroupId) { - runIds.add(event.runId); - } - } - } catch { - return [context.runId]; - } - runIds.add(context.runId); - - return [...runIds]; -} - -/** - * Converts raw prior tool observations into planner briefing sections. - * - * The resulting strings are intentionally short and human-readable because - * they are embedded directly into the planner prompt under dedicated headings. - */ -function buildPlannerBriefingContext(observations: ToolObservation[]): PlannerBriefingContext { - const collectedAnswers: string[] = []; - const discoveredResources: string[] = []; - const seenAnswers = new Set(); - const seenResources = new Set(); - const credentialsById = buildCredentialLookup(observations); - - for (const observation of observations) { - if (observation.toolName === ASK_USER_TOOL_ID) { - for (const answer of extractAskUserAnswerLines(observation)) { - addUnique(collectedAnswers, seenAnswers, answer); - } - continue; - } - - if (observation.toolName === CREDENTIALS_TOOL_ID) { - const action = readString(observation.args.action); - if (action === 'list') { - addUnique(discoveredResources, seenResources, summarizeCredentials(observation.result)); - } - if (action === 'setup') { - for (const selection of extractCredentialSelectionLines(observation, credentialsById)) { - addUnique(collectedAnswers, seenAnswers, selection); - } - } - continue; - } - - if ( - observation.toolName === DATA_TABLES_TOOL_ID && - readString(observation.args.action) === 'list' - ) { - addUnique(discoveredResources, seenResources, summarizeDataTables(observation.result)); - } - } - - return { collectedAnswers, discoveredResources }; -} - -/** - * Builds an ID lookup from prior credential list results. - * - * Credential setup results contain selected IDs, so this lets the briefing - * render stable user-facing names and credential types when a prior list result - * is available. - */ -function buildCredentialLookup(observations: ToolObservation[]): Map { - const credentialsById = new Map(); - - for (const observation of observations) { - if (observation.toolName !== CREDENTIALS_TOOL_ID) continue; - for (const credential of extractCredentials(observation.result)) { - if (credential.id) credentialsById.set(credential.id, credential); - } - } - - return credentialsById; -} - -/** - * Extracts answered ask-user responses as `question: answer` briefing lines. - * - * Skipped or unanswered prompts are ignored, and question text is recovered - * from tool args when the tool result only includes a question ID. - */ -function extractAskUserAnswerLines(observation: ToolObservation): string[] { - const result = readRecord(observation.result); - if (!result || result.answered === false) return []; - - const questionsById = extractQuestionTextById(observation.args); - const answers = readArray(result.answers); - const lines: string[] = []; - - for (const answerValue of answers) { - const answer = readRecord(answerValue); - if (!answer || answer.skipped === true) continue; - - const questionId = readString(answer.questionId); - const question = - readString(answer.question) ?? (questionId ? questionsById.get(questionId) : undefined); - const selectedOptions = readStringArray(answer.selectedOptions); - const customText = readString(answer.customText); - const values = [...selectedOptions, ...(customText ? [customText] : [])]; - - if (!question || values.length === 0) continue; - lines.push(`${question}: ${values.join(', ')}`); - } - - return lines; -} - -/** - * Maps ask-user question IDs to display text from the original tool args. - */ -function extractQuestionTextById(args: Record): Map { - const questionsById = new Map(); - - for (const questionValue of readArray(args.questions)) { - const question = readRecord(questionValue); - const id = readString(question?.id); - const text = readString(question?.question); - if (id && text) questionsById.set(id, text); - } - - return questionsById; -} - -/** - * Renders credential setup selections as briefing lines. - * - * The setup tool returns a `{ credentialType: credentialId }` map. The optional - * credential lookup turns those IDs back into names so the planner can avoid - * asking the user to choose the same credential again. - */ -function extractCredentialSelectionLines( - observation: ToolObservation, - credentialsById: Map, -): string[] { - const result = readRecord(observation.result); - const credentials = readRecord(result?.credentials); - if (!credentials) return []; - - const lines: string[] = []; - for (const [credentialType, credentialIdValue] of Object.entries(credentials)) { - const credentialId = readString(credentialIdValue); - if (!credentialId) continue; - - const credential = credentialsById.get(credentialId); - const label = credential - ? `${credential.name} (${credential.type})` - : `credential ID ${credentialId}`; - lines.push(`Credential selected for ${credentialType}: ${label}`); - } - - return lines; -} - -/** - * Summarizes a credentials list result for the briefing. - */ -function summarizeCredentials(result: unknown): string | undefined { - const credentials = extractCredentials(result); - if (credentials.length === 0) return undefined; - - return `Credentials available: ${summarizeList( - credentials.map((credential) => `${credential.name} (${credential.type})`), - )}`; -} - -/** - * Reads the minimal credential metadata needed by the planner briefing. - */ -function extractCredentials(result: unknown): CredentialBrief[] { - const record = readRecord(result); - return readArray(record?.credentials) - .map(readCredentialBrief) - .filter((credential): credential is CredentialBrief => credential !== undefined); -} - -function readCredentialBrief(value: unknown): CredentialBrief | undefined { - const record = readRecord(value); - const name = readString(record?.name); - const type = readString(record?.type); - if (!name || !type) return undefined; - const id = readString(record?.id); - - return { - name, - type, - ...(id ? { id } : {}), - }; -} - -/** - * Summarizes a data-tables list result for the briefing. - */ -function summarizeDataTables(result: unknown): string | undefined { - const tables = extractDataTables(result); - if (tables.length === 0) return undefined; - - return `Data tables available: ${summarizeList(tables.map((table) => table.name))}`; -} - -/** - * Reads the minimal data-table metadata needed by the planner briefing. - */ -function extractDataTables(result: unknown): DataTableBrief[] { - const record = readRecord(result); - return readArray(record?.tables) - .map(readDataTableBrief) - .filter((table): table is DataTableBrief => table !== undefined); -} - -function readDataTableBrief(value: unknown): DataTableBrief | undefined { - const record = readRecord(value); - const name = readString(record?.name); - if (!name) return undefined; - const id = readString(record?.id); - - return { - name, - ...(id ? { id } : {}), - }; -} - -/** - * Formats conversation, time, and already-collected context into the planner goal. - */ -function formatMessagesForBriefing( - messages: FormattedMessage[], - guidance?: string, - timeZone?: string, - briefingContext?: PlannerBriefingContext, -): string { - const parts: string[] = []; - - const now = timeZone ? DateTime.now().setZone(timeZone) : DateTime.now(); - const isoNow = now.toISO({ includeOffset: true }) ?? new Date().toISOString(); - parts.push(`${isoNow}`); - if (timeZone) { - parts.push(`${timeZone}`); - } - - if (messages.length > 0) { - parts.push('## Recent conversation'); - for (const m of messages) { - const label = m.role === 'user' ? 'User' : 'Assistant'; - // Truncate very long messages - const content = m.content.length > 2000 ? m.content.slice(0, 2000) + '...' : m.content; - parts.push(`**${label}:** ${content}`); - } - } - - if (briefingContext?.collectedAnswers.length) { - parts.push('## Already-collected answers'); - for (const answer of briefingContext.collectedAnswers) { - parts.push(`- ${answer}`); - } - } - - if (briefingContext?.discoveredResources.length) { - parts.push('## Already-discovered resources'); - for (const resource of briefingContext.discoveredResources) { - parts.push(`- ${resource}`); - } - } - - if (guidance) { - parts.push(`\n## Orchestrator guidance\n${guidance}`); - } - - parts.push('\nDesign the solution blueprint based on the conversation above.'); - - return parts.join('\n\n'); -} - -export const __testFormatMessagesForBriefing = formatMessagesForBriefing; -export const __testGetRecentMessages = getRecentMessages; -export const __testGetPriorToolObservations = getPriorToolObservations; -export const __testBuildPlannerBriefingContext = buildPlannerBriefingContext; - -// --------------------------------------------------------------------------- -// Helper: clear draft checklist from taskStorage -// --------------------------------------------------------------------------- - -/** Publish an empty tasks-update so the frontend clears stale plan items. */ -function publishClearingEvent(context: OrchestrationContext): void { - context.eventBus.publish(context.threadId, { - type: 'tasks-update', - runId: context.runId, - agentId: context.orchestratorAgentId, - payload: { tasks: { tasks: [] }, planItems: [] }, - }); -} - -async function clearDraftChecklist(context: OrchestrationContext): Promise { - try { - await context.taskStorage.save(context.threadId, { tasks: [] }); - } catch { - // Best-effort — don't let cleanup failures block the return path - } -} - -/** - * Remove any persisted planned-task graph for this thread *if and only if* it - * belongs to this planner run's unapproved plan. Called on planner give-up / - * error paths to prevent a later schedulePlannedTasks() tick from dispatching - * a plan the user never approved. - * - * Guarded because the thread may already carry an unrelated active graph (a - * prior approved plan with pending checkpoints / in-flight tasks); an - * unconditional `clear()` here would strand that work. We only touch the graph - * when its `planRunId` matches this run AND its `status` is `awaiting_approval` - * — the single window where submit-plan has persisted but approval hasn't - * happened yet. - */ -export async function __testClearPlannedTaskGraph(context: OrchestrationContext): Promise { - return await clearPlannedTaskGraph(context); -} - -async function clearPlannedTaskGraph(context: OrchestrationContext): Promise { - if (!context.plannedTaskService) return; - try { - const graph = await context.plannedTaskService.getGraph(context.threadId); - if (!graph) return; - if (graph.planRunId !== context.runId) return; - if (graph.status !== 'awaiting_approval') return; - await context.plannedTaskService.clear(context.threadId); - } catch { - // Best-effort — don't let cleanup failures block the return path - } -} - -export async function __testRehydrateAccumulatorFromGraph( - context: OrchestrationContext, - accumulator: BlueprintAccumulator, -): Promise { - return await rehydrateAccumulatorFromGraph(context, accumulator); -} - -/** - * Seed a freshly-built accumulator from the persisted plan before a planner - * resume. The parent plan-tool handler exits on every cascade-suspend, so the - * first-call accumulator is gone by the time an "ask for edits" revision - * resumes the planner — without this, remove-plan-item can't touch the - * original items, add-plan-item only carries the newly-added ones, and the - * re-submit's createPlan (which overwrites unconditionally) replaces the graph - * with a partial plan. - * - * Only rehydrates while the plan is still `awaiting_approval` (the revision - * window) — an already-approved/active graph with in-flight tasks must not be - * reopened here. Best-effort: a getGraph failure leaves the accumulator empty - * rather than blocking the resume. - */ -async function rehydrateAccumulatorFromGraph( - context: OrchestrationContext, - accumulator: BlueprintAccumulator, -): Promise { - if (!context.plannedTaskService) return; - try { - const graph = await context.plannedTaskService.getGraph(context.threadId); - if (graph?.status === 'awaiting_approval' && graph.tasks.length > 0) { - accumulator.loadFromTasks(graph.tasks); - } - } catch { - // Best-effort — fall back to an empty accumulator rather than block resume. - } -} - -// --------------------------------------------------------------------------- -// Tool factory -// --------------------------------------------------------------------------- - -/** - * The plan tool cascades sub-agent HITL suspensions UP through the SDK's - * native suspend/resume mechanism: when the planner sub-agent (or any tool - * inside it) emits a `tool-call-suspended` chunk, the plan tool catches it - * via `consumeStreamCascading` and calls its own `ctx.suspend()` with the - * same payload. This checkpoints the orchestrator's full state alongside the - * planner's, so a process restart between user prompt and click can resume - * the planner without losing any state. - * - * The schemas below are permissive on purpose: the plan tool just forwards - * whatever the inner tool emitted (submit-plan's plan-review payload OR - * ask-user's questions payload) and accepts whatever the frontend sent back - * for that card. Validation already happened on the inner tool. - */ -const planToolSuspendSchema = z - .object({ - requestId: z.string(), - message: z.string(), - severity: z.string(), - // Only submit-plan + ask-user carry an `inputType`; cascaded suspensions - // from other planner tools (credentials, data-tables, ...) don't, and a - // strict `inputType: string` would reject otherwise-valid payloads. - inputType: z.string().optional(), - }) - .passthrough(); const planToolResumeSchema = z.record(z.unknown()); @@ -738,320 +77,29 @@ export function createPlanWithAgentTool(context: OrchestrationContext) { } } - // ── Collect planner tools (shared between first-call and resume) ── - const plannerTools = createToolRegistry(); - - for (const name of PLANNER_DOMAIN_TOOL_NAMES) { - const tool = context.domainTools.get(name); - if (tool) plannerTools.set(name, tool); - } - - for (const name of PLANNER_RESEARCH_TOOL_NAMES) { - const tool = context.domainTools.get(name); - if (tool) plannerTools.set(name, tool); - } - - plannerTools.set('templates', createTemplatesTool()); - - const accumulator = new BlueprintAccumulator(); - plannerTools.set('add-plan-item', createAddPlanItemTool(accumulator, context)); - plannerTools.set('remove-plan-item', createRemovePlanItemTool(accumulator, context)); - plannerTools.set('submit-plan', createSubmitPlanTool(accumulator, context)); - - // Use a runId-derived sub-agent id so resume reuses the same event stream identity. - const subAgentId = `agent-planner-${context.runId}`; - const tracedPlannerTools = traceSubAgentTools(context, plannerTools, 'planner'); - - // ── Build sub-agent (shared) ───────────────────────────────── - const subAgent = new Agent('Workflow Planner Agent') - .model(context.modelId) - .instructions(PLANNER_AGENT_PROMPT, { - providerOptions: { - anthropic: { cacheControl: { type: 'ephemeral' } }, - }, - }) - .tool(toolRegistryValues(tracedPlannerTools)) - .checkpoint(context.checkpointStore ?? 'memory'); - attachRuntimeWorkspaceCapabilities(subAgent, { - runtimeSkills: context.runtimeSkills, - }); - const telemetry = context.tracing?.getTelemetry?.({ - agentRole: 'planner', - functionId: 'instance-ai.subagent.planner', - executionMode: 'background', - metadata: { agent_id: subAgentId }, - }); - if (telemetry) { - subAgent.telemetry(telemetry); - } - - let traceRun: Awaited> | undefined; - + const coordinator = new PlannerRunCoordinator(context); try { - let consumeResult: ConsumeStreamCascadingResult; + const outcome = + resumeData !== undefined && resumeData !== null + ? await coordinator.resume(resumeData) + : await coordinator.startFirstRun(input.guidance); - if (isResume) { - // ── Resume path ───────────────────────────────────────── - const resumeInfo = await context.findSubAgentResumeInfo?.('planner'); - if (!resumeInfo) { - return { - result: - 'The planning step could not be resumed because its state was lost. Please send a new message to continue.', - }; - } - - // Rehydrate the accumulator from the persisted plan so an - // "ask for edits" revision operates on the full plan rather - // than an empty accumulator. See rehydrateAccumulatorFromGraph. - await rehydrateAccumulatorFromGraph(context, accumulator); - - // Open a trace span for the resumed leg so a plan that suspended - // at HITL and resumed still shows its continuation in LangSmith. - // The planner card itself is already in the snapshot from the - // first call, so no agent-spawned event is (re-)published here. - traceRun = await startSubAgentTrace(context, { - agentId: subAgentId, - role: 'planner', - kind: 'planner', - inputs: { resumed: true }, - }); - mergeTraceRunInputs( - traceRun, - buildAgentTraceInputs({ - systemPrompt: PLANNER_AGENT_PROMPT, - tools: tracedPlannerTools, - modelId: context.modelId, - }), - ); - - consumeResult = await withTraceRun(context, traceRun, async () => { - const resumed = await resumeAgentStream(subAgent, resumeData, { - runId: resumeInfo.runId, - toolCallId: resumeInfo.toolCallId, - persistence: resumeInfo.persistence, - maxIterations: MAX_STEPS.PLANNER, - }); - - return await consumeStreamCascading({ - agent: subAgent, - stream: resumed, - runId: context.runId, - agentId: subAgentId, - eventBus: context.eventBus, - logger: context.logger, - threadId: context.threadId, - abortSignal: context.abortSignal, - }); - }); - } else { - // ── First-call path ───────────────────────────────────── - // The planner is the most common inline HITL entry point — when it - // suspends the orchestrator cascades-suspends too, and the SDK does - // not flush the user-message row to memory until a clean loop end - // (which a suspended run never reaches). Persist eagerly so the - // user's bubble is visible if they reload during the suspend window. - if (context.persistInFlightUserMessage) { - await context.persistInFlightUserMessage(); - } - - const messages = await getRecentMessages(context, MESSAGE_HISTORY_COUNT); - const briefingContext = buildPlannerBriefingContext(getPriorToolObservations(context)); - const briefing = formatMessagesForBriefing( - messages, - input.guidance, - context.timeZone, - briefingContext, - ); - - const subtitle = - input.guidance ?? messages.find((m) => m.role === 'user')?.content ?? 'Planning...'; - - context.eventBus.publish(context.threadId, { - type: 'agent-spawned', - runId: context.runId, - agentId: subAgentId, - payload: { - parentId: context.orchestratorAgentId, - role: 'planner', - tools: toolRegistryKeys(plannerTools), - kind: 'planner' as const, - title: 'Planning', - subtitle: truncateLabel(subtitle), - goal: briefing, - }, - }); - - traceRun = await startSubAgentTrace(context, { - agentId: subAgentId, - role: 'planner', - kind: 'planner', - inputs: { - guidance: input.guidance, - messageCount: messages.length, - }, - }); - - mergeTraceRunInputs( - traceRun, - buildAgentTraceInputs({ - systemPrompt: PLANNER_AGENT_PROMPT, - tools: tracedPlannerTools, - modelId: context.modelId, - }), - ); - - consumeResult = await withTraceRun(context, traceRun, async () => { - const persistence = await createSubAgentPersistence(context, { - agentKind: 'planner', - }); - const stream = await subAgent.stream(briefing, { - maxIterations: MAX_STEPS.PLANNER, - abortSignal: context.abortSignal, - persistence, - providerOptions: { - anthropic: { cacheControl: { type: 'ephemeral' } }, - }, - }); - - return await consumeStreamCascading({ - agent: subAgent, - stream, - runId: context.runId, - agentId: subAgentId, - eventBus: context.eventBus, - logger: context.logger, - threadId: context.threadId, - abortSignal: context.abortSignal, - }); - }); + if (outcome.kind === 'lost-state') { + return { + result: + 'The planning step could not be resumed because its state was lost. Please send a new message to continue.', + }; } - // ── Cascade suspension up to the orchestrator ─────────── + const { consumeResult } = outcome; if (consumeResult.status === 'suspended') { - const parsed = planToolSuspendSchema.safeParse(consumeResult.suspension.suspendPayload); - if (!parsed.success) { - context.logger.warn('Planner emitted a suspension payload missing required fields', { - threadId: context.threadId, - runId: context.runId, - toolName: consumeResult.suspension.toolName, - zodIssues: parsed.error.issues, - }); - publishClearingEvent(context); - await clearDraftChecklist(context); - await clearPlannedTaskGraph(context); - return { - result: - 'Planner requested user input but the payload was malformed. Please try again.', - }; - } - return await ctx.suspend(parsed.data); + return await coordinator.cascadeSuspension(ctx, consumeResult); } - // ── Stream finished (completed/cancelled/errored) ────── - const resultText = consumeResult.status === 'completed' ? await consumeResult.text : ''; - - if (traceRun) { - await finishTraceRun(context, traceRun, { - outputs: { - result: resultText, - agentId: subAgentId, - role: 'planner', - hasItems: !accumulator.isEmpty(), - itemCount: accumulator.getTaskItemsForEvent().length, - }, - }); - } - - context.eventBus.publish(context.threadId, { - type: 'agent-completed', - runId: context.runId, - agentId: subAgentId, - payload: { - role: 'planner', - result: resultText, - }, - }); - - // ── Schedule tasks after planner-driven approval ────────── - // Approval is detected via the accumulator's flag, which submit-plan - // flips in its resume handler. createPlan persisted the graph as - // `awaiting_approval` on the first call; flip it to `active` and - // schedule. - if (accumulator.isApproved()) { - if (context.plannedTaskService) { - await context.plannedTaskService.approvePlan(context.threadId); - } - if (context.schedulePlannedTasks) { - await context.schedulePlannedTasks(); - } - // On resume the accumulator is fresh and reports 0 — query the - // persisted graph instead so the orchestrator gets accurate text. - const persistedCount = await getPersistedTaskCount(context); - const taskCount = persistedCount ?? accumulator.getTaskList().length; - return { - result: `Plan approved and ${taskCount} task${taskCount === 1 ? '' : 's'} dispatched.`, - }; - } - - // User explicitly denied the plan. submit-plan already cancelled the - // persisted graph, so the cancelled graph won't be picked up by the - // scheduler. Return a terminal result so the orchestrator stops cleanly. - if (accumulator.isDenied()) { - publishClearingEvent(context); - await clearDraftChecklist(context); - return { result: 'Plan denied by user. No tasks were dispatched.' }; - } - - // Planner finished without approval (no submit-plan or user didn't approve). - publishClearingEvent(context); - await clearDraftChecklist(context); - await clearPlannedTaskGraph(context); - if (!accumulator.isEmpty()) { - return { - result: `Planner added ${accumulator.getTaskList().length} items but did not submit the plan for approval. The plan was not executed.`, - }; - } - return { - result: `Planner finished without producing a plan. Agent output: ${resultText}`, - }; + return await coordinator.handleTerminalResult(consumeResult); } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - if (traceRun) { - await failTraceRun(context, traceRun, error, { - agent_id: subAgentId, - agent_role: 'planner', - }); - } - - context.eventBus.publish(context.threadId, { - type: 'agent-completed', - runId: context.runId, - agentId: subAgentId, - payload: { - role: 'planner', - result: '', - error: errorMessage, - }, - }); - - if (!accumulator.isApproved()) { - publishClearingEvent(context); - await clearDraftChecklist(context); - await clearPlannedTaskGraph(context); - } - - return { result: `Planner error: ${errorMessage}` }; + return await coordinator.handleError(error); } }) .build(); } - -async function getPersistedTaskCount(context: OrchestrationContext): Promise { - if (!context.plannedTaskService) return undefined; - try { - const graph = await context.plannedTaskService.getGraph(context.threadId); - return graph?.tasks?.length; - } catch { - return undefined; - } -} diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/planner-briefing.ts b/packages/@n8n/instance-ai/src/tools/orchestration/planner-briefing.ts new file mode 100644 index 00000000000..3d65ad41347 --- /dev/null +++ b/packages/@n8n/instance-ai/src/tools/orchestration/planner-briefing.ts @@ -0,0 +1,510 @@ +import type { InstanceAiEvent } from '@n8n/api-types'; +import { DateTime } from 'luxon'; + +import type { OrchestrationContext } from '../../types'; +import { CREDENTIALS_TOOL_ID } from '../credentials.tool'; +import { DATA_TABLES_TOOL_ID } from '../data-tables.tool'; +import { ASK_USER_TOOL_ID } from '../shared/ask-user.tool'; + +/** Number of recent thread messages to include as planner context. */ +export const MESSAGE_HISTORY_COUNT = 5; + +const RELEVANT_PRIOR_TOOL_NAMES = new Set([ + ASK_USER_TOOL_ID, + CREDENTIALS_TOOL_ID, + DATA_TABLES_TOOL_ID, +]); + +// --------------------------------------------------------------------------- +// Message history retrieval +// --------------------------------------------------------------------------- + +interface FormattedMessage { + role: 'user' | 'assistant'; + content: string; +} + +interface PlannerBriefingContext { + collectedAnswers: string[]; + discoveredResources: string[]; +} + +interface ToolObservation { + toolName: string; + args: Record; + result: unknown; +} + +interface CredentialBrief { + id?: string; + name: string; + type: string; +} + +interface DataTableBrief { + id?: string; + name: string; +} + +/** Extract plain text from persisted native memory content. */ +function extractTextFromMemoryContent(content: unknown): string { + if (typeof content === 'string') return content; + if (Array.isArray(content)) return extractTextParts(content); + return ''; +} + +function extractTextParts(parts: unknown[]): string { + return parts + .filter( + (c): c is { type: 'text'; text: string } => + typeof c === 'object' && + c !== null && + 'type' in c && + c.type === 'text' && + 'text' in c && + typeof c.text === 'string', + ) + .map((c) => c.text) + .join('\n'); +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +function readString(value: unknown): string | undefined { + return typeof value === 'string' && value.trim().length > 0 ? value : undefined; +} + +function readRecord(value: unknown): Record | undefined { + return isRecord(value) ? value : undefined; +} + +function readArray(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} + +function readStringArray(value: unknown): string[] { + return readArray(value).filter((item): item is string => typeof item === 'string'); +} + +function addUnique(target: string[], seen: Set, value: string | undefined): void { + if (!value || seen.has(value)) return; + seen.add(value); + target.push(value); +} + +function summarizeList(values: string[], limit = 10): string { + const visible = values.slice(0, limit).join(', '); + const remaining = values.length - limit; + return remaining > 0 ? `${visible}, and ${remaining} more` : visible; +} + +export async function getRecentMessages( + context: OrchestrationContext, + count: number, +): Promise { + const messages: FormattedMessage[] = []; + + // Retrieve previously-saved messages from memory. + if (context.memory) { + try { + const history = await context.memory.getMessages(context.threadId, { + limit: count, + }); + + for (const m of history) { + if (!('role' in m)) continue; + const role = m.role; + const content = extractTextFromMemoryContent(m.content); + if ((role === 'user' || role === 'assistant') && content.length > 0) { + messages.push({ role, content }); + } + } + } catch { + // Memory recall failed — continue with just the current message + } + } + + // Always append the current in-flight user message (not yet saved to memory) + if (shouldAppendCurrentUserMessage(messages, context.currentUserMessage)) { + messages.push({ role: 'user', content: context.currentUserMessage }); + } + + return messages; +} + +function shouldAppendCurrentUserMessage( + messages: FormattedMessage[], + currentUserMessage?: string, +): currentUserMessage is string { + const current = currentUserMessage?.trim(); + if (!current) return false; + + const lastUserMessage = [...messages].reverse().find((message) => message.role === 'user'); + return lastUserMessage?.content.trim() !== current; +} + +/** + * Reconstructs prior planner-relevant tool calls from the event stream. + * + * Tool-call and tool-result events are correlated by `toolCallId` so the + * planner can receive structured context that is not preserved in text-only + * memory recall, such as ask-user answers and credential selections. + */ +export function getPriorToolObservations(context: OrchestrationContext): ToolObservation[] { + type MutableToolObservation = Omit & { + result: unknown; + hasResult: boolean; + }; + + const toolCalls = new Map(); + const pendingResults = new Map(); + + for (const event of getPriorToolEvents(context)) { + if (event.type === 'tool-call') { + const { toolCallId, toolName, args } = event.payload; + if (!RELEVANT_PRIOR_TOOL_NAMES.has(toolName)) continue; + + const pendingResult = pendingResults.get(toolCallId); + toolCalls.set(toolCallId, { + toolName, + args, + result: pendingResult, + hasResult: pendingResults.has(toolCallId), + }); + continue; + } + + if (event.type === 'tool-result') { + const { toolCallId, result } = event.payload; + const existing = toolCalls.get(toolCallId); + if (existing) { + existing.result = result; + existing.hasResult = true; + } else { + pendingResults.set(toolCallId, result); + } + } + } + + return [...toolCalls.values()] + .filter((observation) => observation.hasResult) + .map(({ toolName, args, result }) => ({ toolName, args, result })); +} + +/** + * Returns the events that may contain prior tool context for this planner run. + * + * When the run belongs to a message group, all runs in that group are searched + * so follow-up runs can see choices collected earlier in the same assistant + * turn. If grouped lookup is unavailable, this falls back to the current run. + */ +function getPriorToolEvents(context: OrchestrationContext): InstanceAiEvent[] { + if (context.messageGroupId) { + const runIds = getMessageGroupRunIds(context); + if (runIds.length > 0) { + try { + return context.eventBus.getEventsForRuns(context.threadId, runIds); + } catch { + // Fall back to the current run below. + } + } + } + + try { + return context.eventBus.getEventsForRun(context.threadId, context.runId); + } catch { + return []; + } +} + +/** + * Finds run IDs that belong to the current message group from run-start events. + * + * The event bus can fetch events for many run IDs, but the orchestration + * context only carries the current run ID and message group ID. This bridges + * those two concepts while keeping the current run as a defensive fallback. + */ +function getMessageGroupRunIds(context: OrchestrationContext): string[] { + const messageGroupId = context.messageGroupId; + if (!messageGroupId) return []; + + const runIds = new Set(); + try { + for (const { event } of context.eventBus.getEventsAfter(context.threadId, 0)) { + if (event.type === 'run-start' && event.payload.messageGroupId === messageGroupId) { + runIds.add(event.runId); + } + } + } catch { + return [context.runId]; + } + runIds.add(context.runId); + + return [...runIds]; +} + +/** + * Converts raw prior tool observations into planner briefing sections. + * + * The resulting strings are intentionally short and human-readable because + * they are embedded directly into the planner prompt under dedicated headings. + */ +export function buildPlannerBriefingContext( + observations: ToolObservation[], +): PlannerBriefingContext { + const collectedAnswers: string[] = []; + const discoveredResources: string[] = []; + const seenAnswers = new Set(); + const seenResources = new Set(); + const credentialsById = buildCredentialLookup(observations); + + for (const observation of observations) { + if (observation.toolName === ASK_USER_TOOL_ID) { + for (const answer of extractAskUserAnswerLines(observation)) { + addUnique(collectedAnswers, seenAnswers, answer); + } + continue; + } + + if (observation.toolName === CREDENTIALS_TOOL_ID) { + const action = readString(observation.args.action); + if (action === 'list') { + addUnique(discoveredResources, seenResources, summarizeCredentials(observation.result)); + } + if (action === 'setup') { + for (const selection of extractCredentialSelectionLines(observation, credentialsById)) { + addUnique(collectedAnswers, seenAnswers, selection); + } + } + continue; + } + + if ( + observation.toolName === DATA_TABLES_TOOL_ID && + readString(observation.args.action) === 'list' + ) { + addUnique(discoveredResources, seenResources, summarizeDataTables(observation.result)); + } + } + + return { collectedAnswers, discoveredResources }; +} + +/** + * Builds an ID lookup from prior credential list results. + * + * Credential setup results contain selected IDs, so this lets the briefing + * render stable user-facing names and credential types when a prior list result + * is available. + */ +function buildCredentialLookup(observations: ToolObservation[]): Map { + const credentialsById = new Map(); + + for (const observation of observations) { + if (observation.toolName !== CREDENTIALS_TOOL_ID) continue; + for (const credential of extractCredentials(observation.result)) { + if (credential.id) credentialsById.set(credential.id, credential); + } + } + + return credentialsById; +} + +/** + * Extracts answered ask-user responses as `question: answer` briefing lines. + * + * Skipped or unanswered prompts are ignored, and question text is recovered + * from tool args when the tool result only includes a question ID. + */ +function extractAskUserAnswerLines(observation: ToolObservation): string[] { + const result = readRecord(observation.result); + if (!result || result.answered === false) return []; + + const questionsById = extractQuestionTextById(observation.args); + const answers = readArray(result.answers); + const lines: string[] = []; + + for (const answerValue of answers) { + const answer = readRecord(answerValue); + if (!answer || answer.skipped === true) continue; + + const questionId = readString(answer.questionId); + const question = + readString(answer.question) ?? (questionId ? questionsById.get(questionId) : undefined); + const selectedOptions = readStringArray(answer.selectedOptions); + const customText = readString(answer.customText); + const values = [...selectedOptions, ...(customText ? [customText] : [])]; + + if (!question || values.length === 0) continue; + lines.push(`${question}: ${values.join(', ')}`); + } + + return lines; +} + +/** + * Maps ask-user question IDs to display text from the original tool args. + */ +function extractQuestionTextById(args: Record): Map { + const questionsById = new Map(); + + for (const questionValue of readArray(args.questions)) { + const question = readRecord(questionValue); + const id = readString(question?.id); + const text = readString(question?.question); + if (id && text) questionsById.set(id, text); + } + + return questionsById; +} + +/** + * Renders credential setup selections as briefing lines. + * + * The setup tool returns a `{ credentialType: credentialId }` map. The optional + * credential lookup turns those IDs back into names so the planner can avoid + * asking the user to choose the same credential again. + */ +function extractCredentialSelectionLines( + observation: ToolObservation, + credentialsById: Map, +): string[] { + const result = readRecord(observation.result); + const credentials = readRecord(result?.credentials); + if (!credentials) return []; + + const lines: string[] = []; + for (const [credentialType, credentialIdValue] of Object.entries(credentials)) { + const credentialId = readString(credentialIdValue); + if (!credentialId) continue; + + const credential = credentialsById.get(credentialId); + const label = credential + ? `${credential.name} (${credential.type})` + : `credential ID ${credentialId}`; + lines.push(`Credential selected for ${credentialType}: ${label}`); + } + + return lines; +} + +/** + * Summarizes a credentials list result for the briefing. + */ +function summarizeCredentials(result: unknown): string | undefined { + const credentials = extractCredentials(result); + if (credentials.length === 0) return undefined; + + return `Credentials available: ${summarizeList( + credentials.map((credential) => `${credential.name} (${credential.type})`), + )}`; +} + +/** + * Reads the minimal credential metadata needed by the planner briefing. + */ +function extractCredentials(result: unknown): CredentialBrief[] { + const record = readRecord(result); + return readArray(record?.credentials) + .map(readCredentialBrief) + .filter((credential): credential is CredentialBrief => credential !== undefined); +} + +function readCredentialBrief(value: unknown): CredentialBrief | undefined { + const record = readRecord(value); + const name = readString(record?.name); + const type = readString(record?.type); + if (!name || !type) return undefined; + const id = readString(record?.id); + + return { + name, + type, + ...(id ? { id } : {}), + }; +} + +/** + * Summarizes a data-tables list result for the briefing. + */ +function summarizeDataTables(result: unknown): string | undefined { + const tables = extractDataTables(result); + if (tables.length === 0) return undefined; + + return `Data tables available: ${summarizeList(tables.map((table) => table.name))}`; +} + +/** + * Reads the minimal data-table metadata needed by the planner briefing. + */ +function extractDataTables(result: unknown): DataTableBrief[] { + const record = readRecord(result); + return readArray(record?.tables) + .map(readDataTableBrief) + .filter((table): table is DataTableBrief => table !== undefined); +} + +function readDataTableBrief(value: unknown): DataTableBrief | undefined { + const record = readRecord(value); + const name = readString(record?.name); + if (!name) return undefined; + const id = readString(record?.id); + + return { + name, + ...(id ? { id } : {}), + }; +} + +/** + * Formats conversation, time, and already-collected context into the planner goal. + */ +export function formatMessagesForBriefing( + messages: FormattedMessage[], + guidance?: string, + timeZone?: string, + briefingContext?: PlannerBriefingContext, +): string { + const parts: string[] = []; + + const now = timeZone ? DateTime.now().setZone(timeZone) : DateTime.now(); + const isoNow = now.toISO({ includeOffset: true }) ?? new Date().toISOString(); + parts.push(`${isoNow}`); + if (timeZone) { + parts.push(`${timeZone}`); + } + + if (messages.length > 0) { + parts.push('## Recent conversation'); + for (const m of messages) { + const label = m.role === 'user' ? 'User' : 'Assistant'; + // Truncate very long messages + const content = m.content.length > 2000 ? m.content.slice(0, 2000) + '...' : m.content; + parts.push(`**${label}:** ${content}`); + } + } + + if (briefingContext?.collectedAnswers.length) { + parts.push('## Already-collected answers'); + for (const answer of briefingContext.collectedAnswers) { + parts.push(`- ${answer}`); + } + } + + if (briefingContext?.discoveredResources.length) { + parts.push('## Already-discovered resources'); + for (const resource of briefingContext.discoveredResources) { + parts.push(`- ${resource}`); + } + } + + if (guidance) { + parts.push(`\n## Orchestrator guidance\n${guidance}`); + } + + parts.push('\nDesign the solution blueprint based on the conversation above.'); + + return parts.join('\n\n'); +} diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/planner-run-coordinator.ts b/packages/@n8n/instance-ai/src/tools/orchestration/planner-run-coordinator.ts new file mode 100644 index 00000000000..3b0e917ab8c --- /dev/null +++ b/packages/@n8n/instance-ai/src/tools/orchestration/planner-run-coordinator.ts @@ -0,0 +1,521 @@ +import { Agent } from '@n8n/agents'; +import { z } from 'zod'; + +import { createAddPlanItemTool, createRemovePlanItemTool } from './add-plan-item.tool'; +import { createSubAgentPersistence } from './agent-persistence'; +import { BlueprintAccumulator } from './blueprint-accumulator'; +import { truncateLabel } from './display-utils'; +import { PLANNER_AGENT_PROMPT } from './plan-agent-prompt'; +import { + buildPlannerBriefingContext, + formatMessagesForBriefing, + getPriorToolObservations, + getRecentMessages, + MESSAGE_HISTORY_COUNT, +} from './planner-briefing'; +import { createSubmitPlanTool } from './submit-plan.tool'; +import { + failTraceRun, + finishTraceRun, + startSubAgentTrace, + traceSubAgentTools, + withTraceRun, +} from './tracing-utils'; +import { attachRuntimeWorkspaceCapabilities } from '../../agent/runtime-workspace'; +import { MAX_STEPS } from '../../constants/max-steps'; +import { consumeStreamCascading } from '../../stream/consume-with-hitl'; +import type { ConsumeStreamCascadingResult } from '../../stream/consume-with-hitl'; +import { createToolRegistry, toolRegistryKeys, toolRegistryValues } from '../../tool-registry'; +import { buildAgentTraceInputs, mergeTraceRunInputs } from '../../tracing/langsmith-tracing'; +import type { OrchestrationContext } from '../../types'; +import { resumeAgentStream } from '../../utils/stream-helpers'; +import { CREDENTIALS_TOOL_ID } from '../credentials.tool'; +import { DATA_TABLES_TOOL_ID } from '../data-tables.tool'; +import { ASK_USER_TOOL_ID } from '../shared/ask-user.tool'; +import { createTemplatesTool } from '../templates.tool'; + +/** Read-only discovery tools the planner gets from domainTools. */ +const PLANNER_DOMAIN_TOOL_NAMES = [ + 'nodes', + CREDENTIALS_TOOL_ID, + DATA_TABLES_TOOL_ID, + 'workflows', + ASK_USER_TOOL_ID, +]; + +/** Research tools added when available. */ +const PLANNER_RESEARCH_TOOL_NAMES = ['research']; + +// --------------------------------------------------------------------------- +// Helper: clear draft checklist from taskStorage +// --------------------------------------------------------------------------- + +/** Publish an empty tasks-update so the frontend clears stale plan items. */ +function publishClearingEvent(context: OrchestrationContext): void { + context.eventBus.publish(context.threadId, { + type: 'tasks-update', + runId: context.runId, + agentId: context.orchestratorAgentId, + payload: { tasks: { tasks: [] }, planItems: [] }, + }); +} + +async function clearDraftChecklist(context: OrchestrationContext): Promise { + try { + await context.taskStorage.save(context.threadId, { tasks: [] }); + } catch { + // Best-effort — don't let cleanup failures block the return path + } +} + +/** + * Remove any persisted planned-task graph for this thread *if and only if* it + * belongs to this planner run's unapproved plan. Called on planner give-up / + * error paths to prevent a later schedulePlannedTasks() tick from dispatching + * a plan the user never approved. + * + * Guarded because the thread may already carry an unrelated active graph (a + * prior approved plan with pending checkpoints / in-flight tasks); an + * unconditional `clear()` here would strand that work. We only touch the graph + * when its `planRunId` matches this run AND its `status` is `awaiting_approval` + * — the single window where submit-plan has persisted but approval hasn't + * happened yet. + */ +export async function clearPlannedTaskGraph(context: OrchestrationContext): Promise { + if (!context.plannedTaskService) return; + try { + const graph = await context.plannedTaskService.getGraph(context.threadId); + if (!graph) return; + if (graph.planRunId !== context.runId) return; + if (graph.status !== 'awaiting_approval') return; + await context.plannedTaskService.clear(context.threadId); + } catch { + // Best-effort — don't let cleanup failures block the return path + } +} + +/** + * Seed a freshly-built accumulator from the persisted plan before a planner + * resume. The parent plan-tool handler exits on every cascade-suspend, so the + * first-call accumulator is gone by the time an "ask for edits" revision + * resumes the planner — without this, remove-plan-item can't touch the + * original items, add-plan-item only carries the newly-added ones, and the + * re-submit's createPlan (which overwrites unconditionally) replaces the graph + * with a partial plan. + * + * Only rehydrates while the plan is still `awaiting_approval` (the revision + * window) — an already-approved/active graph with in-flight tasks must not be + * reopened here. Best-effort: a getGraph failure leaves the accumulator empty + * rather than blocking the resume. + */ +export async function rehydrateAccumulatorFromGraph( + context: OrchestrationContext, + accumulator: BlueprintAccumulator, +): Promise { + if (!context.plannedTaskService) return; + try { + const graph = await context.plannedTaskService.getGraph(context.threadId); + if (graph?.status === 'awaiting_approval' && graph.tasks.length > 0) { + accumulator.loadFromTasks(graph.tasks); + } + } catch { + // Best-effort — fall back to an empty accumulator rather than block resume. + } +} + +/** + * The plan tool cascades sub-agent HITL suspensions UP through the SDK's + * native suspend/resume mechanism: when the planner sub-agent (or any tool + * inside it) emits a `tool-call-suspended` chunk, the plan tool catches it + * via `consumeStreamCascading` and calls its own `ctx.suspend()` with the + * same payload. This checkpoints the orchestrator's full state alongside the + * planner's, so a process restart between user prompt and click can resume + * the planner without losing any state. + * + * The schemas below are permissive on purpose: the plan tool just forwards + * whatever the inner tool emitted (submit-plan's plan-review payload OR + * ask-user's questions payload) and accepts whatever the frontend sent back + * for that card. Validation already happened on the inner tool. + */ +export const planToolSuspendSchema = z + .object({ + requestId: z.string(), + message: z.string(), + severity: z.string(), + // Only submit-plan + ask-user carry an `inputType`; cascaded suspensions + // from other planner tools (credentials, data-tables, ...) don't, and a + // strict `inputType: string` would reject otherwise-valid payloads. + inputType: z.string().optional(), + }) + .passthrough(); + +/** Assemble the planner sub-agent's tool registry: read-only discovery + + * research tools from the orchestrator's domain set, plus the plan-building + * tools (add/remove/submit) wired to this run's accumulator. */ +function buildPlannerTools(context: OrchestrationContext, accumulator: BlueprintAccumulator) { + const plannerTools = createToolRegistry(); + + for (const name of PLANNER_DOMAIN_TOOL_NAMES) { + const tool = context.domainTools.get(name); + if (tool) plannerTools.set(name, tool); + } + + for (const name of PLANNER_RESEARCH_TOOL_NAMES) { + const tool = context.domainTools.get(name); + if (tool) plannerTools.set(name, tool); + } + + plannerTools.set('templates', createTemplatesTool()); + plannerTools.set('add-plan-item', createAddPlanItemTool(accumulator, context)); + plannerTools.set('remove-plan-item', createRemovePlanItemTool(accumulator, context)); + plannerTools.set('submit-plan', createSubmitPlanTool(accumulator, context)); + + return plannerTools; +} + +/** Construct the planner sub-agent with workspace capabilities + telemetry. */ +function buildPlannerSubAgent( + context: OrchestrationContext, + tracedPlannerTools: ReturnType, + subAgentId: string, +) { + const subAgent = new Agent('Workflow Planner Agent') + .model(context.modelId) + .instructions(PLANNER_AGENT_PROMPT, { + providerOptions: { + anthropic: { cacheControl: { type: 'ephemeral' } }, + }, + }) + .tool(toolRegistryValues(tracedPlannerTools)) + .checkpoint(context.checkpointStore ?? 'memory'); + attachRuntimeWorkspaceCapabilities(subAgent, { + runtimeSkills: context.runtimeSkills, + }); + const telemetry = context.tracing?.getTelemetry?.({ + agentRole: 'planner', + functionId: 'instance-ai.subagent.planner', + executionMode: 'background', + metadata: { agent_id: subAgentId }, + }); + if (telemetry) { + subAgent.telemetry(telemetry); + } + return subAgent; +} + +/** Outcome of starting/resuming a planner run: either the cascading-stream + * result, or a sentinel that the run could not be resumed (checkpoint state + * was lost across a restart). */ +type PlannerRunOutcome = + | { kind: 'consumed'; consumeResult: ConsumeStreamCascadingResult } + | { kind: 'lost-state' }; + +/** + * Owns one planner sub-agent run end-to-end: tool/agent construction, the + * first-call and resume legs, the LangSmith trace span (held as a field so it + * spans the run's try/catch), and the terminal/suspension/error transitions. + * + * The tool handler stays a thin orchestrator: build the coordinator, run + * first-call or resume, then route the result to the matching `handle*` method. + */ +export class PlannerRunCoordinator { + private readonly accumulator = new BlueprintAccumulator(); + + // runId-derived id so a resume reuses the same event-stream identity. + private readonly subAgentId: string; + + private readonly plannerTools: ReturnType; + + private readonly tracedPlannerTools: ReturnType; + + private readonly subAgent: ReturnType; + + // Held as a field so finishTrace/failTrace can finalise the span whether the + // run ends in handleTerminalResult or in the handler's catch. + private traceRun: Awaited>; + + constructor(private readonly context: OrchestrationContext) { + this.subAgentId = `agent-planner-${context.runId}`; + this.plannerTools = buildPlannerTools(context, this.accumulator); + this.tracedPlannerTools = traceSubAgentTools(context, this.plannerTools, 'planner'); + this.subAgent = buildPlannerSubAgent(context, this.tracedPlannerTools, this.subAgentId); + } + + /** First-call leg: persist the in-flight user message, brief the planner, + * publish agent-spawned, and consume the cascading stream. */ + async startFirstRun(guidance?: string): Promise { + const { context, subAgent, subAgentId, plannerTools, tracedPlannerTools } = this; + + // The planner is the most common inline HITL entry point — when it + // suspends the orchestrator cascades-suspends too, and the SDK does not + // flush the user-message row to memory until a clean loop end (which a + // suspended run never reaches). Persist eagerly so the user's bubble is + // visible if they reload during the suspend window. + if (context.persistInFlightUserMessage) { + await context.persistInFlightUserMessage(); + } + + const messages = await getRecentMessages(context, MESSAGE_HISTORY_COUNT); + const briefingContext = buildPlannerBriefingContext(getPriorToolObservations(context)); + const briefing = formatMessagesForBriefing( + messages, + guidance, + context.timeZone, + briefingContext, + ); + + const subtitle = guidance ?? messages.find((m) => m.role === 'user')?.content ?? 'Planning...'; + + context.eventBus.publish(context.threadId, { + type: 'agent-spawned', + runId: context.runId, + agentId: subAgentId, + payload: { + parentId: context.orchestratorAgentId, + role: 'planner', + tools: toolRegistryKeys(plannerTools), + kind: 'planner' as const, + title: 'Planning', + subtitle: truncateLabel(subtitle), + goal: briefing, + }, + }); + + this.traceRun = await startSubAgentTrace(context, { + agentId: subAgentId, + role: 'planner', + kind: 'planner', + inputs: { guidance, messageCount: messages.length }, + }); + mergeTraceRunInputs( + this.traceRun, + buildAgentTraceInputs({ + systemPrompt: PLANNER_AGENT_PROMPT, + tools: tracedPlannerTools, + modelId: context.modelId, + }), + ); + + const consumeResult = await withTraceRun(context, this.traceRun, async () => { + const persistence = await createSubAgentPersistence(context, { agentKind: 'planner' }); + const stream = await subAgent.stream(briefing, { + maxIterations: MAX_STEPS.PLANNER, + abortSignal: context.abortSignal, + persistence, + providerOptions: { + anthropic: { cacheControl: { type: 'ephemeral' } }, + }, + }); + + return await consumeStreamCascading({ + agent: subAgent, + stream, + runId: context.runId, + agentId: subAgentId, + eventBus: context.eventBus, + logger: context.logger, + threadId: context.threadId, + abortSignal: context.abortSignal, + }); + }); + + return { kind: 'consumed', consumeResult }; + } + + /** Resume leg: locate the suspended planner, rehydrate the accumulator from + * the persisted plan, open a fresh trace span, and resume the cascading + * stream. Returns `lost-state` when the checkpoint can't be located. */ + async resume(resumeData: Record): Promise { + const { context, subAgent, subAgentId, tracedPlannerTools } = this; + + const resumeInfo = await context.findSubAgentResumeInfo?.('planner'); + if (!resumeInfo) return { kind: 'lost-state' }; + + // Rehydrate the accumulator from the persisted plan so an "ask for edits" + // revision operates on the full plan rather than an empty accumulator. + await rehydrateAccumulatorFromGraph(context, this.accumulator); + + // Open a trace span for the resumed leg so a plan that suspended at HITL + // and resumed still shows its continuation in LangSmith. The planner card + // is already in the snapshot from the first call, so no agent-spawned + // event is (re-)published here. + this.traceRun = await startSubAgentTrace(context, { + agentId: subAgentId, + role: 'planner', + kind: 'planner', + inputs: { resumed: true }, + }); + mergeTraceRunInputs( + this.traceRun, + buildAgentTraceInputs({ + systemPrompt: PLANNER_AGENT_PROMPT, + tools: tracedPlannerTools, + modelId: context.modelId, + }), + ); + + const consumeResult = await withTraceRun(context, this.traceRun, async () => { + const resumed = await resumeAgentStream(subAgent, resumeData, { + runId: resumeInfo.runId, + toolCallId: resumeInfo.toolCallId, + persistence: resumeInfo.persistence, + maxIterations: MAX_STEPS.PLANNER, + }); + + return await consumeStreamCascading({ + agent: subAgent, + stream: resumed, + runId: context.runId, + agentId: subAgentId, + eventBus: context.eventBus, + logger: context.logger, + threadId: context.threadId, + abortSignal: context.abortSignal, + }); + }); + + return { kind: 'consumed', consumeResult }; + } + + /** Cascade a planner suspension up to the orchestrator. Validates the + * forwarded payload; on a malformed payload it tears down the draft plan + * (so a later schedulePlannedTasks tick can't dispatch it) and returns a + * terminal result instead of suspending. */ + async cascadeSuspension( + ctx: { suspend: (payload: z.infer) => Promise }, + consumeResult: Extract, + ): Promise<{ result: string }> { + const { context } = this; + const parsed = planToolSuspendSchema.safeParse(consumeResult.suspension.suspendPayload); + if (!parsed.success) { + context.logger.warn('Planner emitted a suspension payload missing required fields', { + threadId: context.threadId, + runId: context.runId, + toolName: consumeResult.suspension.toolName, + zodIssues: parsed.error.issues, + }); + await this.tearDownDraftPlan(); + return { + result: 'Planner requested user input but the payload was malformed. Please try again.', + }; + } + return await ctx.suspend(parsed.data); + } + + /** Finalise a completed/cancelled/errored planner run: close the trace, + * publish agent-completed, then resolve to the orchestrator-facing result + * based on approval / denial / no-plan. */ + async handleTerminalResult( + consumeResult: ConsumeStreamCascadingResult, + ): Promise<{ result: string }> { + const { context, accumulator, subAgentId } = this; + const resultText = consumeResult.status === 'completed' ? await consumeResult.text : ''; + + if (this.traceRun) { + await finishTraceRun(context, this.traceRun, { + outputs: { + result: resultText, + agentId: subAgentId, + role: 'planner', + hasItems: !accumulator.isEmpty(), + itemCount: accumulator.getTaskItemsForEvent().length, + }, + }); + } + + this.publishCompleted(resultText); + + // Approval is detected via the accumulator's flag, which submit-plan flips + // in its resume handler. createPlan persisted the graph as + // `awaiting_approval` on the first call; flip it to `active` and schedule. + if (accumulator.isApproved()) { + if (context.plannedTaskService) { + await context.plannedTaskService.approvePlan(context.threadId); + } + if (context.schedulePlannedTasks) { + await context.schedulePlannedTasks(); + } + // On resume the accumulator is fresh and reports 0 — query the persisted + // graph instead so the orchestrator gets accurate text. + const persistedCount = await getPersistedTaskCount(context); + const taskCount = persistedCount ?? accumulator.getTaskList().length; + return { + result: `Plan approved and ${taskCount} task${taskCount === 1 ? '' : 's'} dispatched.`, + }; + } + + // User explicitly denied the plan. submit-plan already cancelled the + // persisted graph, so the cancelled graph won't be picked up by the + // scheduler. Return a terminal result so the orchestrator stops cleanly. + if (accumulator.isDenied()) { + publishClearingEvent(context); + await clearDraftChecklist(context); + return { result: 'Plan denied by user. No tasks were dispatched.' }; + } + + // Planner finished without approval (no submit-plan or user didn't approve). + await this.tearDownDraftPlan(); + if (!accumulator.isEmpty()) { + return { + result: `Planner added ${accumulator.getTaskList().length} items but did not submit the plan for approval. The plan was not executed.`, + }; + } + return { + result: `Planner finished without producing a plan. Agent output: ${resultText}`, + }; + } + + /** Handle an exception thrown anywhere in the run: fail the trace, publish + * agent-completed with the error, and tear down the draft plan unless it + * was already approved (dispatched tasks must not be wiped). */ + async handleError(error: unknown): Promise<{ result: string }> { + const { context, accumulator, subAgentId } = this; + const errorMessage = error instanceof Error ? error.message : String(error); + + if (this.traceRun) { + await failTraceRun(context, this.traceRun, error, { + agent_id: subAgentId, + agent_role: 'planner', + }); + } + + context.eventBus.publish(context.threadId, { + type: 'agent-completed', + runId: context.runId, + agentId: subAgentId, + payload: { role: 'planner', result: '', error: errorMessage }, + }); + + if (!accumulator.isApproved()) { + await this.tearDownDraftPlan(); + } + + return { result: `Planner error: ${errorMessage}` }; + } + + private publishCompleted(resultText: string): void { + this.context.eventBus.publish(this.context.threadId, { + type: 'agent-completed', + runId: this.context.runId, + agentId: this.subAgentId, + payload: { role: 'planner', result: resultText }, + }); + } + + /** Clear the in-progress checklist UI + the unapproved persisted graph. */ + private async tearDownDraftPlan(): Promise { + publishClearingEvent(this.context); + await clearDraftChecklist(this.context); + await clearPlannedTaskGraph(this.context); + } +} + +async function getPersistedTaskCount(context: OrchestrationContext): Promise { + if (!context.plannedTaskService) return undefined; + try { + const graph = await context.plannedTaskService.getGraph(context.threadId); + return graph?.tasks?.length; + } catch { + return undefined; + } +}