refactor(core): Split planner run into briefing + coordinator modules (#31458)

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Jaakko Husso 2026-06-02 15:28:07 +03:00 committed by GitHub
parent 58a3fb2227
commit a0b616073b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1243 additions and 1171 deletions

View File

@ -1,195 +1,10 @@
import type {
OrchestrationContext,
PlannedTaskGraph,
PlannedTaskRecord,
PlannedTaskService,
} from '../../../types';
import { BlueprintAccumulator } from '../blueprint-accumulator';
const {
__testBuildPlannerBriefingContext,
__testClearPlannedTaskGraph,
__testFormatMessagesForBriefing,
__testGetRecentMessages,
__testGetPriorToolObservations,
__testRehydrateAccumulatorFromGraph,
} =
// eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/consistent-type-imports
require('../plan-with-agent.tool') as typeof import('../plan-with-agent.tool');
function makeContext(overrides: {
graph: PlannedTaskGraph | null;
runId?: string;
}): {
context: OrchestrationContext;
clear: jest.Mock;
getGraph: jest.Mock;
} {
const clear = jest.fn(async () => {
await Promise.resolve();
});
const getGraph = jest.fn(async () => {
await Promise.resolve();
return overrides.graph;
});
const plannedTaskService: Partial<PlannedTaskService> = {
getGraph,
clear,
};
const context = {
threadId: 't-1',
runId: overrides.runId ?? 'run-current',
plannedTaskService: plannedTaskService as PlannedTaskService,
} as unknown as OrchestrationContext;
return { context, clear, getGraph };
}
describe('clearPlannedTaskGraph', () => {
it('clears the graph when it belongs to this run and is awaiting approval', async () => {
const { context, clear } = makeContext({
graph: {
planRunId: 'run-current',
status: 'awaiting_approval',
tasks: [],
},
});
await __testClearPlannedTaskGraph(context);
expect(clear).toHaveBeenCalledWith('t-1');
});
it('does not clear an active graph from a prior approved plan', async () => {
// A previous `/plan` call already succeeded; its graph is `active` with
// pending checkpoints. A new planner error must not wipe it.
const { context, clear } = makeContext({
graph: {
planRunId: 'run-previous',
status: 'active',
tasks: [],
},
});
await __testClearPlannedTaskGraph(context);
expect(clear).not.toHaveBeenCalled();
});
it('does not clear an awaiting-approval graph that was created by a different planner run', async () => {
// Defensive: a concurrent plan for a different run should not have its
// unapproved graph wiped by this run's error-path cleanup.
const { context, clear } = makeContext({
graph: {
planRunId: 'run-other',
status: 'awaiting_approval',
tasks: [],
},
});
await __testClearPlannedTaskGraph(context);
expect(clear).not.toHaveBeenCalled();
});
it('is a no-op when no graph exists', async () => {
const { context, clear, getGraph } = makeContext({ graph: null });
await __testClearPlannedTaskGraph(context);
expect(getGraph).toHaveBeenCalled();
expect(clear).not.toHaveBeenCalled();
});
it('swallows getGraph errors so the caller can return its own error', async () => {
const { context, getGraph } = makeContext({
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: [] },
});
getGraph.mockRejectedValueOnce(new Error('db down'));
await expect(__testClearPlannedTaskGraph(context)).resolves.toBeUndefined();
});
});
describe('rehydrateAccumulatorFromGraph (resume revision flow)', () => {
const persistedTasks: PlannedTaskRecord[] = [
{
id: 'wf-1',
title: "Build 'A' workflow",
kind: 'build-workflow',
spec: 'A',
deps: [],
status: 'planned',
},
{
id: 'wf-2',
title: "Build 'B' workflow",
kind: 'build-workflow',
spec: 'B',
deps: [],
status: 'planned',
},
];
it('seeds the accumulator from an awaiting-approval graph so a revision keeps originals', async () => {
// Reproduces "ask for edits -> revise existing plan -> submit again":
// on resume the parent rebuilt a fresh accumulator; without rehydration
// the planner's remove/add would operate on an empty plan and the
// re-submit would drop every original item.
const { context } = makeContext({
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks },
});
const accumulator = new BlueprintAccumulator();
await __testRehydrateAccumulatorFromGraph(context, accumulator);
// Planner revises: drop one original, add a new one, then resubmits.
expect(accumulator.removeItem('wf-2')).toBe(true);
accumulator.addItem({
kind: 'workflow',
id: 'wf-3',
name: 'C',
purpose: 'C',
integrations: [],
dependsOn: [],
});
expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-3']);
});
it('does not reopen an already-approved/active graph', async () => {
const { context } = makeContext({
graph: { planRunId: 'run-current', status: 'active', tasks: persistedTasks },
});
const accumulator = new BlueprintAccumulator();
await __testRehydrateAccumulatorFromGraph(context, accumulator);
expect(accumulator.isEmpty()).toBe(true);
});
it('is a no-op when no graph exists', async () => {
const { context, getGraph } = makeContext({ graph: null });
const accumulator = new BlueprintAccumulator();
await __testRehydrateAccumulatorFromGraph(context, accumulator);
expect(getGraph).toHaveBeenCalledWith('t-1');
expect(accumulator.isEmpty()).toBe(true);
});
it('leaves the accumulator empty when getGraph throws', async () => {
const { context, getGraph } = makeContext({
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks },
});
getGraph.mockRejectedValueOnce(new Error('db down'));
const accumulator = new BlueprintAccumulator();
await expect(
__testRehydrateAccumulatorFromGraph(context, accumulator),
).resolves.toBeUndefined();
expect(accumulator.isEmpty()).toBe(true);
});
});
import type { OrchestrationContext } from '../../../types';
import {
buildPlannerBriefingContext,
formatMessagesForBriefing,
getPriorToolObservations,
getRecentMessages,
} from '../planner-briefing';
describe('formatMessagesForBriefing', () => {
// The planner system prompt (plan-agent-prompt.ts) treats <current-datetime>
@ -197,7 +12,7 @@ describe('formatMessagesForBriefing', () => {
// both. Emitting only one drops half the contract.
it('emits <current-datetime> alongside <user-timezone> when a zone is provided', () => {
const briefing = __testFormatMessagesForBriefing(
const briefing = formatMessagesForBriefing(
[{ role: 'user', content: 'schedule me a daily digest' }],
undefined,
'America/New_York',
@ -208,14 +23,14 @@ describe('formatMessagesForBriefing', () => {
});
it('still emits <current-datetime> when no zone is provided', () => {
const briefing = __testFormatMessagesForBriefing([], undefined, undefined);
const briefing = formatMessagesForBriefing([], undefined, undefined);
expect(briefing).toMatch(/<current-datetime>[^<]+<\/current-datetime>/);
expect(briefing).not.toContain('<user-timezone>');
});
it('renders already-collected answers and discovered resources as dedicated sections', () => {
const briefing = __testFormatMessagesForBriefing(
const briefing = formatMessagesForBriefing(
[{ role: 'user', content: 'Build a Slack to-do agent' }],
undefined,
'America/New_York',
@ -238,7 +53,7 @@ describe('formatMessagesForBriefing', () => {
describe('buildPlannerBriefingContext', () => {
it('extracts ask-user answers and credential selections from prior tool results', () => {
const context = __testBuildPlannerBriefingContext([
const context = buildPlannerBriefingContext([
{
toolName: 'credentials',
args: { action: 'list' },
@ -290,7 +105,7 @@ describe('buildPlannerBriefingContext', () => {
});
it('ignores unanswered and skipped ask-user answers', () => {
const context = __testBuildPlannerBriefingContext([
const context = buildPlannerBriefingContext([
{
toolName: 'ask-user',
args: {
@ -399,7 +214,7 @@ describe('getPriorToolObservations', () => {
},
} as unknown as OrchestrationContext;
const observations = __testGetPriorToolObservations(context);
const observations = getPriorToolObservations(context);
expect(getEventsForRuns).toHaveBeenCalledWith('thread-1', ['run-prior', 'run-current']);
expect(getEventsForRun).not.toHaveBeenCalled();
@ -436,9 +251,7 @@ describe('getPriorToolObservations', () => {
},
} as unknown as OrchestrationContext;
expect(__testGetPriorToolObservations(context)).toEqual([
{ toolName: 'credentials', args, result },
]);
expect(getPriorToolObservations(context)).toEqual([{ toolName: 'credentials', args, result }]);
});
it('returns no observations when event lookup fails', () => {
@ -452,7 +265,7 @@ describe('getPriorToolObservations', () => {
},
} as unknown as OrchestrationContext;
expect(__testGetPriorToolObservations(context)).toEqual([]);
expect(getPriorToolObservations(context)).toEqual([]);
});
});
@ -468,7 +281,7 @@ describe('getRecentMessages', () => {
},
} as unknown as OrchestrationContext;
const messages = await __testGetRecentMessages(context, 5);
const messages = await getRecentMessages(context, 5);
expect(messages).toEqual([{ role: 'user', content: 'Build a Slack to-do agent' }]);
});

View File

@ -0,0 +1,180 @@
import type {
OrchestrationContext,
PlannedTaskGraph,
PlannedTaskRecord,
PlannedTaskService,
} from '../../../types';
import { BlueprintAccumulator } from '../blueprint-accumulator';
import { clearPlannedTaskGraph, rehydrateAccumulatorFromGraph } from '../planner-run-coordinator';
function makeContext(overrides: {
graph: PlannedTaskGraph | null;
runId?: string;
}): {
context: OrchestrationContext;
clear: jest.Mock;
getGraph: jest.Mock;
} {
const clear = jest.fn(async () => {
await Promise.resolve();
});
const getGraph = jest.fn(async () => {
await Promise.resolve();
return overrides.graph;
});
const plannedTaskService: Partial<PlannedTaskService> = {
getGraph,
clear,
};
const context = {
threadId: 't-1',
runId: overrides.runId ?? 'run-current',
plannedTaskService: plannedTaskService as PlannedTaskService,
} as unknown as OrchestrationContext;
return { context, clear, getGraph };
}
describe('clearPlannedTaskGraph', () => {
it('clears the graph when it belongs to this run and is awaiting approval', async () => {
const { context, clear } = makeContext({
graph: {
planRunId: 'run-current',
status: 'awaiting_approval',
tasks: [],
},
});
await clearPlannedTaskGraph(context);
expect(clear).toHaveBeenCalledWith('t-1');
});
it('does not clear an active graph from a prior approved plan', async () => {
// A previous `/plan` call already succeeded; its graph is `active` with
// pending checkpoints. A new planner error must not wipe it.
const { context, clear } = makeContext({
graph: {
planRunId: 'run-previous',
status: 'active',
tasks: [],
},
});
await clearPlannedTaskGraph(context);
expect(clear).not.toHaveBeenCalled();
});
it('does not clear an awaiting-approval graph that was created by a different planner run', async () => {
// Defensive: a concurrent plan for a different run should not have its
// unapproved graph wiped by this run's error-path cleanup.
const { context, clear } = makeContext({
graph: {
planRunId: 'run-other',
status: 'awaiting_approval',
tasks: [],
},
});
await clearPlannedTaskGraph(context);
expect(clear).not.toHaveBeenCalled();
});
it('is a no-op when no graph exists', async () => {
const { context, clear, getGraph } = makeContext({ graph: null });
await clearPlannedTaskGraph(context);
expect(getGraph).toHaveBeenCalled();
expect(clear).not.toHaveBeenCalled();
});
it('swallows getGraph errors so the caller can return its own error', async () => {
const { context, getGraph } = makeContext({
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: [] },
});
getGraph.mockRejectedValueOnce(new Error('db down'));
await expect(clearPlannedTaskGraph(context)).resolves.toBeUndefined();
});
});
describe('rehydrateAccumulatorFromGraph (resume revision flow)', () => {
const persistedTasks: PlannedTaskRecord[] = [
{
id: 'wf-1',
title: "Build 'A' workflow",
kind: 'build-workflow',
spec: 'A',
deps: [],
status: 'planned',
},
{
id: 'wf-2',
title: "Build 'B' workflow",
kind: 'build-workflow',
spec: 'B',
deps: [],
status: 'planned',
},
];
it('seeds the accumulator from an awaiting-approval graph so a revision keeps originals', async () => {
// Reproduces "ask for edits -> revise existing plan -> submit again":
// on resume the parent rebuilt a fresh accumulator; without rehydration
// the planner's remove/add would operate on an empty plan and the
// re-submit would drop every original item.
const { context } = makeContext({
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks },
});
const accumulator = new BlueprintAccumulator();
await rehydrateAccumulatorFromGraph(context, accumulator);
// Planner revises: drop one original, add a new one, then resubmits.
expect(accumulator.removeItem('wf-2')).toBe(true);
accumulator.addItem({
kind: 'workflow',
id: 'wf-3',
name: 'C',
purpose: 'C',
integrations: [],
dependsOn: [],
});
expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-3']);
});
it('does not reopen an already-approved/active graph', async () => {
const { context } = makeContext({
graph: { planRunId: 'run-current', status: 'active', tasks: persistedTasks },
});
const accumulator = new BlueprintAccumulator();
await rehydrateAccumulatorFromGraph(context, accumulator);
expect(accumulator.isEmpty()).toBe(true);
});
it('is a no-op when no graph exists', async () => {
const { context, getGraph } = makeContext({ graph: null });
const accumulator = new BlueprintAccumulator();
await rehydrateAccumulatorFromGraph(context, accumulator);
expect(getGraph).toHaveBeenCalledWith('t-1');
expect(accumulator.isEmpty()).toBe(true);
});
it('leaves the accumulator empty when getGraph throws', async () => {
const { context, getGraph } = makeContext({
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks },
});
getGraph.mockRejectedValueOnce(new Error('db down'));
const accumulator = new BlueprintAccumulator();
await expect(rehydrateAccumulatorFromGraph(context, accumulator)).resolves.toBeUndefined();
expect(accumulator.isEmpty()).toBe(true);
});
});

View File

@ -0,0 +1,510 @@
import type { InstanceAiEvent } from '@n8n/api-types';
import { DateTime } from 'luxon';
import type { OrchestrationContext } from '../../types';
import { CREDENTIALS_TOOL_ID } from '../credentials.tool';
import { DATA_TABLES_TOOL_ID } from '../data-tables.tool';
import { ASK_USER_TOOL_ID } from '../shared/ask-user.tool';
/** Number of recent thread messages to include as planner context. */
export const MESSAGE_HISTORY_COUNT = 5;
const RELEVANT_PRIOR_TOOL_NAMES = new Set<string>([
ASK_USER_TOOL_ID,
CREDENTIALS_TOOL_ID,
DATA_TABLES_TOOL_ID,
]);
// ---------------------------------------------------------------------------
// Message history retrieval
// ---------------------------------------------------------------------------
interface FormattedMessage {
role: 'user' | 'assistant';
content: string;
}
interface PlannerBriefingContext {
collectedAnswers: string[];
discoveredResources: string[];
}
interface ToolObservation {
toolName: string;
args: Record<string, unknown>;
result: unknown;
}
interface CredentialBrief {
id?: string;
name: string;
type: string;
}
interface DataTableBrief {
id?: string;
name: string;
}
/** Extract plain text from persisted native memory content. */
function extractTextFromMemoryContent(content: unknown): string {
if (typeof content === 'string') return content;
if (Array.isArray(content)) return extractTextParts(content);
return '';
}
function extractTextParts(parts: unknown[]): string {
return parts
.filter(
(c): c is { type: 'text'; text: string } =>
typeof c === 'object' &&
c !== null &&
'type' in c &&
c.type === 'text' &&
'text' in c &&
typeof c.text === 'string',
)
.map((c) => c.text)
.join('\n');
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function readString(value: unknown): string | undefined {
return typeof value === 'string' && value.trim().length > 0 ? value : undefined;
}
function readRecord(value: unknown): Record<string, unknown> | undefined {
return isRecord(value) ? value : undefined;
}
function readArray(value: unknown): unknown[] {
return Array.isArray(value) ? value : [];
}
function readStringArray(value: unknown): string[] {
return readArray(value).filter((item): item is string => typeof item === 'string');
}
function addUnique(target: string[], seen: Set<string>, value: string | undefined): void {
if (!value || seen.has(value)) return;
seen.add(value);
target.push(value);
}
function summarizeList(values: string[], limit = 10): string {
const visible = values.slice(0, limit).join(', ');
const remaining = values.length - limit;
return remaining > 0 ? `${visible}, and ${remaining} more` : visible;
}
export async function getRecentMessages(
context: OrchestrationContext,
count: number,
): Promise<FormattedMessage[]> {
const messages: FormattedMessage[] = [];
// Retrieve previously-saved messages from memory.
if (context.memory) {
try {
const history = await context.memory.getMessages(context.threadId, {
limit: count,
});
for (const m of history) {
if (!('role' in m)) continue;
const role = m.role;
const content = extractTextFromMemoryContent(m.content);
if ((role === 'user' || role === 'assistant') && content.length > 0) {
messages.push({ role, content });
}
}
} catch {
// Memory recall failed — continue with just the current message
}
}
// Always append the current in-flight user message (not yet saved to memory)
if (shouldAppendCurrentUserMessage(messages, context.currentUserMessage)) {
messages.push({ role: 'user', content: context.currentUserMessage });
}
return messages;
}
function shouldAppendCurrentUserMessage(
messages: FormattedMessage[],
currentUserMessage?: string,
): currentUserMessage is string {
const current = currentUserMessage?.trim();
if (!current) return false;
const lastUserMessage = [...messages].reverse().find((message) => message.role === 'user');
return lastUserMessage?.content.trim() !== current;
}
/**
* Reconstructs prior planner-relevant tool calls from the event stream.
*
* Tool-call and tool-result events are correlated by `toolCallId` so the
* planner can receive structured context that is not preserved in text-only
* memory recall, such as ask-user answers and credential selections.
*/
export function getPriorToolObservations(context: OrchestrationContext): ToolObservation[] {
type MutableToolObservation = Omit<ToolObservation, 'result'> & {
result: unknown;
hasResult: boolean;
};
const toolCalls = new Map<string, MutableToolObservation>();
const pendingResults = new Map<string, unknown>();
for (const event of getPriorToolEvents(context)) {
if (event.type === 'tool-call') {
const { toolCallId, toolName, args } = event.payload;
if (!RELEVANT_PRIOR_TOOL_NAMES.has(toolName)) continue;
const pendingResult = pendingResults.get(toolCallId);
toolCalls.set(toolCallId, {
toolName,
args,
result: pendingResult,
hasResult: pendingResults.has(toolCallId),
});
continue;
}
if (event.type === 'tool-result') {
const { toolCallId, result } = event.payload;
const existing = toolCalls.get(toolCallId);
if (existing) {
existing.result = result;
existing.hasResult = true;
} else {
pendingResults.set(toolCallId, result);
}
}
}
return [...toolCalls.values()]
.filter((observation) => observation.hasResult)
.map(({ toolName, args, result }) => ({ toolName, args, result }));
}
/**
* Returns the events that may contain prior tool context for this planner run.
*
* When the run belongs to a message group, all runs in that group are searched
* so follow-up runs can see choices collected earlier in the same assistant
* turn. If grouped lookup is unavailable, this falls back to the current run.
*/
function getPriorToolEvents(context: OrchestrationContext): InstanceAiEvent[] {
if (context.messageGroupId) {
const runIds = getMessageGroupRunIds(context);
if (runIds.length > 0) {
try {
return context.eventBus.getEventsForRuns(context.threadId, runIds);
} catch {
// Fall back to the current run below.
}
}
}
try {
return context.eventBus.getEventsForRun(context.threadId, context.runId);
} catch {
return [];
}
}
/**
* Finds run IDs that belong to the current message group from run-start events.
*
* The event bus can fetch events for many run IDs, but the orchestration
* context only carries the current run ID and message group ID. This bridges
* those two concepts while keeping the current run as a defensive fallback.
*/
function getMessageGroupRunIds(context: OrchestrationContext): string[] {
const messageGroupId = context.messageGroupId;
if (!messageGroupId) return [];
const runIds = new Set<string>();
try {
for (const { event } of context.eventBus.getEventsAfter(context.threadId, 0)) {
if (event.type === 'run-start' && event.payload.messageGroupId === messageGroupId) {
runIds.add(event.runId);
}
}
} catch {
return [context.runId];
}
runIds.add(context.runId);
return [...runIds];
}
/**
* Converts raw prior tool observations into planner briefing sections.
*
* The resulting strings are intentionally short and human-readable because
* they are embedded directly into the planner prompt under dedicated headings.
*/
export function buildPlannerBriefingContext(
observations: ToolObservation[],
): PlannerBriefingContext {
const collectedAnswers: string[] = [];
const discoveredResources: string[] = [];
const seenAnswers = new Set<string>();
const seenResources = new Set<string>();
const credentialsById = buildCredentialLookup(observations);
for (const observation of observations) {
if (observation.toolName === ASK_USER_TOOL_ID) {
for (const answer of extractAskUserAnswerLines(observation)) {
addUnique(collectedAnswers, seenAnswers, answer);
}
continue;
}
if (observation.toolName === CREDENTIALS_TOOL_ID) {
const action = readString(observation.args.action);
if (action === 'list') {
addUnique(discoveredResources, seenResources, summarizeCredentials(observation.result));
}
if (action === 'setup') {
for (const selection of extractCredentialSelectionLines(observation, credentialsById)) {
addUnique(collectedAnswers, seenAnswers, selection);
}
}
continue;
}
if (
observation.toolName === DATA_TABLES_TOOL_ID &&
readString(observation.args.action) === 'list'
) {
addUnique(discoveredResources, seenResources, summarizeDataTables(observation.result));
}
}
return { collectedAnswers, discoveredResources };
}
/**
* Builds an ID lookup from prior credential list results.
*
* Credential setup results contain selected IDs, so this lets the briefing
* render stable user-facing names and credential types when a prior list result
* is available.
*/
function buildCredentialLookup(observations: ToolObservation[]): Map<string, CredentialBrief> {
const credentialsById = new Map<string, CredentialBrief>();
for (const observation of observations) {
if (observation.toolName !== CREDENTIALS_TOOL_ID) continue;
for (const credential of extractCredentials(observation.result)) {
if (credential.id) credentialsById.set(credential.id, credential);
}
}
return credentialsById;
}
/**
* Extracts answered ask-user responses as `question: answer` briefing lines.
*
* Skipped or unanswered prompts are ignored, and question text is recovered
* from tool args when the tool result only includes a question ID.
*/
function extractAskUserAnswerLines(observation: ToolObservation): string[] {
const result = readRecord(observation.result);
if (!result || result.answered === false) return [];
const questionsById = extractQuestionTextById(observation.args);
const answers = readArray(result.answers);
const lines: string[] = [];
for (const answerValue of answers) {
const answer = readRecord(answerValue);
if (!answer || answer.skipped === true) continue;
const questionId = readString(answer.questionId);
const question =
readString(answer.question) ?? (questionId ? questionsById.get(questionId) : undefined);
const selectedOptions = readStringArray(answer.selectedOptions);
const customText = readString(answer.customText);
const values = [...selectedOptions, ...(customText ? [customText] : [])];
if (!question || values.length === 0) continue;
lines.push(`${question}: ${values.join(', ')}`);
}
return lines;
}
/**
* Maps ask-user question IDs to display text from the original tool args.
*/
function extractQuestionTextById(args: Record<string, unknown>): Map<string, string> {
const questionsById = new Map<string, string>();
for (const questionValue of readArray(args.questions)) {
const question = readRecord(questionValue);
const id = readString(question?.id);
const text = readString(question?.question);
if (id && text) questionsById.set(id, text);
}
return questionsById;
}
/**
* Renders credential setup selections as briefing lines.
*
* The setup tool returns a `{ credentialType: credentialId }` map. The optional
* credential lookup turns those IDs back into names so the planner can avoid
* asking the user to choose the same credential again.
*/
function extractCredentialSelectionLines(
observation: ToolObservation,
credentialsById: Map<string, CredentialBrief>,
): string[] {
const result = readRecord(observation.result);
const credentials = readRecord(result?.credentials);
if (!credentials) return [];
const lines: string[] = [];
for (const [credentialType, credentialIdValue] of Object.entries(credentials)) {
const credentialId = readString(credentialIdValue);
if (!credentialId) continue;
const credential = credentialsById.get(credentialId);
const label = credential
? `${credential.name} (${credential.type})`
: `credential ID ${credentialId}`;
lines.push(`Credential selected for ${credentialType}: ${label}`);
}
return lines;
}
/**
* Summarizes a credentials list result for the briefing.
*/
function summarizeCredentials(result: unknown): string | undefined {
const credentials = extractCredentials(result);
if (credentials.length === 0) return undefined;
return `Credentials available: ${summarizeList(
credentials.map((credential) => `${credential.name} (${credential.type})`),
)}`;
}
/**
* Reads the minimal credential metadata needed by the planner briefing.
*/
function extractCredentials(result: unknown): CredentialBrief[] {
const record = readRecord(result);
return readArray(record?.credentials)
.map(readCredentialBrief)
.filter((credential): credential is CredentialBrief => credential !== undefined);
}
function readCredentialBrief(value: unknown): CredentialBrief | undefined {
const record = readRecord(value);
const name = readString(record?.name);
const type = readString(record?.type);
if (!name || !type) return undefined;
const id = readString(record?.id);
return {
name,
type,
...(id ? { id } : {}),
};
}
/**
* Summarizes a data-tables list result for the briefing.
*/
function summarizeDataTables(result: unknown): string | undefined {
const tables = extractDataTables(result);
if (tables.length === 0) return undefined;
return `Data tables available: ${summarizeList(tables.map((table) => table.name))}`;
}
/**
* Reads the minimal data-table metadata needed by the planner briefing.
*/
function extractDataTables(result: unknown): DataTableBrief[] {
const record = readRecord(result);
return readArray(record?.tables)
.map(readDataTableBrief)
.filter((table): table is DataTableBrief => table !== undefined);
}
function readDataTableBrief(value: unknown): DataTableBrief | undefined {
const record = readRecord(value);
const name = readString(record?.name);
if (!name) return undefined;
const id = readString(record?.id);
return {
name,
...(id ? { id } : {}),
};
}
/**
* Formats conversation, time, and already-collected context into the planner goal.
*/
export function formatMessagesForBriefing(
messages: FormattedMessage[],
guidance?: string,
timeZone?: string,
briefingContext?: PlannerBriefingContext,
): string {
const parts: string[] = [];
const now = timeZone ? DateTime.now().setZone(timeZone) : DateTime.now();
const isoNow = now.toISO({ includeOffset: true }) ?? new Date().toISOString();
parts.push(`<current-datetime>${isoNow}</current-datetime>`);
if (timeZone) {
parts.push(`<user-timezone>${timeZone}</user-timezone>`);
}
if (messages.length > 0) {
parts.push('## Recent conversation');
for (const m of messages) {
const label = m.role === 'user' ? 'User' : 'Assistant';
// Truncate very long messages
const content = m.content.length > 2000 ? m.content.slice(0, 2000) + '...' : m.content;
parts.push(`**${label}:** ${content}`);
}
}
if (briefingContext?.collectedAnswers.length) {
parts.push('## Already-collected answers');
for (const answer of briefingContext.collectedAnswers) {
parts.push(`- ${answer}`);
}
}
if (briefingContext?.discoveredResources.length) {
parts.push('## Already-discovered resources');
for (const resource of briefingContext.discoveredResources) {
parts.push(`- ${resource}`);
}
}
if (guidance) {
parts.push(`\n## Orchestrator guidance\n${guidance}`);
}
parts.push('\nDesign the solution blueprint based on the conversation above.');
return parts.join('\n\n');
}

View File

@ -0,0 +1,521 @@
import { Agent } from '@n8n/agents';
import { z } from 'zod';
import { createAddPlanItemTool, createRemovePlanItemTool } from './add-plan-item.tool';
import { createSubAgentPersistence } from './agent-persistence';
import { BlueprintAccumulator } from './blueprint-accumulator';
import { truncateLabel } from './display-utils';
import { PLANNER_AGENT_PROMPT } from './plan-agent-prompt';
import {
buildPlannerBriefingContext,
formatMessagesForBriefing,
getPriorToolObservations,
getRecentMessages,
MESSAGE_HISTORY_COUNT,
} from './planner-briefing';
import { createSubmitPlanTool } from './submit-plan.tool';
import {
failTraceRun,
finishTraceRun,
startSubAgentTrace,
traceSubAgentTools,
withTraceRun,
} from './tracing-utils';
import { attachRuntimeWorkspaceCapabilities } from '../../agent/runtime-workspace';
import { MAX_STEPS } from '../../constants/max-steps';
import { consumeStreamCascading } from '../../stream/consume-with-hitl';
import type { ConsumeStreamCascadingResult } from '../../stream/consume-with-hitl';
import { createToolRegistry, toolRegistryKeys, toolRegistryValues } from '../../tool-registry';
import { buildAgentTraceInputs, mergeTraceRunInputs } from '../../tracing/langsmith-tracing';
import type { OrchestrationContext } from '../../types';
import { resumeAgentStream } from '../../utils/stream-helpers';
import { CREDENTIALS_TOOL_ID } from '../credentials.tool';
import { DATA_TABLES_TOOL_ID } from '../data-tables.tool';
import { ASK_USER_TOOL_ID } from '../shared/ask-user.tool';
import { createTemplatesTool } from '../templates.tool';
/** Read-only discovery tools the planner gets from domainTools. */
const PLANNER_DOMAIN_TOOL_NAMES = [
'nodes',
CREDENTIALS_TOOL_ID,
DATA_TABLES_TOOL_ID,
'workflows',
ASK_USER_TOOL_ID,
];
/** Research tools added when available. */
const PLANNER_RESEARCH_TOOL_NAMES = ['research'];
// ---------------------------------------------------------------------------
// Helper: clear draft checklist from taskStorage
// ---------------------------------------------------------------------------
/** Publish an empty tasks-update so the frontend clears stale plan items. */
function publishClearingEvent(context: OrchestrationContext): void {
context.eventBus.publish(context.threadId, {
type: 'tasks-update',
runId: context.runId,
agentId: context.orchestratorAgentId,
payload: { tasks: { tasks: [] }, planItems: [] },
});
}
async function clearDraftChecklist(context: OrchestrationContext): Promise<void> {
try {
await context.taskStorage.save(context.threadId, { tasks: [] });
} catch {
// Best-effort — don't let cleanup failures block the return path
}
}
/**
* Remove any persisted planned-task graph for this thread *if and only if* it
* belongs to this planner run's unapproved plan. Called on planner give-up /
* error paths to prevent a later schedulePlannedTasks() tick from dispatching
* a plan the user never approved.
*
* Guarded because the thread may already carry an unrelated active graph (a
* prior approved plan with pending checkpoints / in-flight tasks); an
* unconditional `clear()` here would strand that work. We only touch the graph
* when its `planRunId` matches this run AND its `status` is `awaiting_approval`
* the single window where submit-plan has persisted but approval hasn't
* happened yet.
*/
export async function clearPlannedTaskGraph(context: OrchestrationContext): Promise<void> {
if (!context.plannedTaskService) return;
try {
const graph = await context.plannedTaskService.getGraph(context.threadId);
if (!graph) return;
if (graph.planRunId !== context.runId) return;
if (graph.status !== 'awaiting_approval') return;
await context.plannedTaskService.clear(context.threadId);
} catch {
// Best-effort — don't let cleanup failures block the return path
}
}
/**
* Seed a freshly-built accumulator from the persisted plan before a planner
* resume. The parent plan-tool handler exits on every cascade-suspend, so the
* first-call accumulator is gone by the time an "ask for edits" revision
* resumes the planner without this, remove-plan-item can't touch the
* original items, add-plan-item only carries the newly-added ones, and the
* re-submit's createPlan (which overwrites unconditionally) replaces the graph
* with a partial plan.
*
* Only rehydrates while the plan is still `awaiting_approval` (the revision
* window) an already-approved/active graph with in-flight tasks must not be
* reopened here. Best-effort: a getGraph failure leaves the accumulator empty
* rather than blocking the resume.
*/
export async function rehydrateAccumulatorFromGraph(
context: OrchestrationContext,
accumulator: BlueprintAccumulator,
): Promise<void> {
if (!context.plannedTaskService) return;
try {
const graph = await context.plannedTaskService.getGraph(context.threadId);
if (graph?.status === 'awaiting_approval' && graph.tasks.length > 0) {
accumulator.loadFromTasks(graph.tasks);
}
} catch {
// Best-effort — fall back to an empty accumulator rather than block resume.
}
}
/**
* The plan tool cascades sub-agent HITL suspensions UP through the SDK's
* native suspend/resume mechanism: when the planner sub-agent (or any tool
* inside it) emits a `tool-call-suspended` chunk, the plan tool catches it
* via `consumeStreamCascading` and calls its own `ctx.suspend()` with the
* same payload. This checkpoints the orchestrator's full state alongside the
* planner's, so a process restart between user prompt and click can resume
* the planner without losing any state.
*
* The schemas below are permissive on purpose: the plan tool just forwards
* whatever the inner tool emitted (submit-plan's plan-review payload OR
* ask-user's questions payload) and accepts whatever the frontend sent back
* for that card. Validation already happened on the inner tool.
*/
export const planToolSuspendSchema = z
.object({
requestId: z.string(),
message: z.string(),
severity: z.string(),
// Only submit-plan + ask-user carry an `inputType`; cascaded suspensions
// from other planner tools (credentials, data-tables, ...) don't, and a
// strict `inputType: string` would reject otherwise-valid payloads.
inputType: z.string().optional(),
})
.passthrough();
/** Assemble the planner sub-agent's tool registry: read-only discovery +
* research tools from the orchestrator's domain set, plus the plan-building
* tools (add/remove/submit) wired to this run's accumulator. */
function buildPlannerTools(context: OrchestrationContext, accumulator: BlueprintAccumulator) {
const plannerTools = createToolRegistry();
for (const name of PLANNER_DOMAIN_TOOL_NAMES) {
const tool = context.domainTools.get(name);
if (tool) plannerTools.set(name, tool);
}
for (const name of PLANNER_RESEARCH_TOOL_NAMES) {
const tool = context.domainTools.get(name);
if (tool) plannerTools.set(name, tool);
}
plannerTools.set('templates', createTemplatesTool());
plannerTools.set('add-plan-item', createAddPlanItemTool(accumulator, context));
plannerTools.set('remove-plan-item', createRemovePlanItemTool(accumulator, context));
plannerTools.set('submit-plan', createSubmitPlanTool(accumulator, context));
return plannerTools;
}
/** Construct the planner sub-agent with workspace capabilities + telemetry. */
function buildPlannerSubAgent(
context: OrchestrationContext,
tracedPlannerTools: ReturnType<typeof traceSubAgentTools>,
subAgentId: string,
) {
const subAgent = new Agent('Workflow Planner Agent')
.model(context.modelId)
.instructions(PLANNER_AGENT_PROMPT, {
providerOptions: {
anthropic: { cacheControl: { type: 'ephemeral' } },
},
})
.tool(toolRegistryValues(tracedPlannerTools))
.checkpoint(context.checkpointStore ?? 'memory');
attachRuntimeWorkspaceCapabilities(subAgent, {
runtimeSkills: context.runtimeSkills,
});
const telemetry = context.tracing?.getTelemetry?.({
agentRole: 'planner',
functionId: 'instance-ai.subagent.planner',
executionMode: 'background',
metadata: { agent_id: subAgentId },
});
if (telemetry) {
subAgent.telemetry(telemetry);
}
return subAgent;
}
/** Outcome of starting/resuming a planner run: either the cascading-stream
* result, or a sentinel that the run could not be resumed (checkpoint state
* was lost across a restart). */
type PlannerRunOutcome =
| { kind: 'consumed'; consumeResult: ConsumeStreamCascadingResult }
| { kind: 'lost-state' };
/**
* Owns one planner sub-agent run end-to-end: tool/agent construction, the
* first-call and resume legs, the LangSmith trace span (held as a field so it
* spans the run's try/catch), and the terminal/suspension/error transitions.
*
* The tool handler stays a thin orchestrator: build the coordinator, run
* first-call or resume, then route the result to the matching `handle*` method.
*/
export class PlannerRunCoordinator {
private readonly accumulator = new BlueprintAccumulator();
// runId-derived id so a resume reuses the same event-stream identity.
private readonly subAgentId: string;
private readonly plannerTools: ReturnType<typeof buildPlannerTools>;
private readonly tracedPlannerTools: ReturnType<typeof traceSubAgentTools>;
private readonly subAgent: ReturnType<typeof buildPlannerSubAgent>;
// Held as a field so finishTrace/failTrace can finalise the span whether the
// run ends in handleTerminalResult or in the handler's catch.
private traceRun: Awaited<ReturnType<typeof startSubAgentTrace>>;
constructor(private readonly context: OrchestrationContext) {
this.subAgentId = `agent-planner-${context.runId}`;
this.plannerTools = buildPlannerTools(context, this.accumulator);
this.tracedPlannerTools = traceSubAgentTools(context, this.plannerTools, 'planner');
this.subAgent = buildPlannerSubAgent(context, this.tracedPlannerTools, this.subAgentId);
}
/** First-call leg: persist the in-flight user message, brief the planner,
* publish agent-spawned, and consume the cascading stream. */
async startFirstRun(guidance?: string): Promise<PlannerRunOutcome> {
const { context, subAgent, subAgentId, plannerTools, tracedPlannerTools } = this;
// The planner is the most common inline HITL entry point — when it
// suspends the orchestrator cascades-suspends too, and the SDK does not
// flush the user-message row to memory until a clean loop end (which a
// suspended run never reaches). Persist eagerly so the user's bubble is
// visible if they reload during the suspend window.
if (context.persistInFlightUserMessage) {
await context.persistInFlightUserMessage();
}
const messages = await getRecentMessages(context, MESSAGE_HISTORY_COUNT);
const briefingContext = buildPlannerBriefingContext(getPriorToolObservations(context));
const briefing = formatMessagesForBriefing(
messages,
guidance,
context.timeZone,
briefingContext,
);
const subtitle = guidance ?? messages.find((m) => m.role === 'user')?.content ?? 'Planning...';
context.eventBus.publish(context.threadId, {
type: 'agent-spawned',
runId: context.runId,
agentId: subAgentId,
payload: {
parentId: context.orchestratorAgentId,
role: 'planner',
tools: toolRegistryKeys(plannerTools),
kind: 'planner' as const,
title: 'Planning',
subtitle: truncateLabel(subtitle),
goal: briefing,
},
});
this.traceRun = await startSubAgentTrace(context, {
agentId: subAgentId,
role: 'planner',
kind: 'planner',
inputs: { guidance, messageCount: messages.length },
});
mergeTraceRunInputs(
this.traceRun,
buildAgentTraceInputs({
systemPrompt: PLANNER_AGENT_PROMPT,
tools: tracedPlannerTools,
modelId: context.modelId,
}),
);
const consumeResult = await withTraceRun(context, this.traceRun, async () => {
const persistence = await createSubAgentPersistence(context, { agentKind: 'planner' });
const stream = await subAgent.stream(briefing, {
maxIterations: MAX_STEPS.PLANNER,
abortSignal: context.abortSignal,
persistence,
providerOptions: {
anthropic: { cacheControl: { type: 'ephemeral' } },
},
});
return await consumeStreamCascading({
agent: subAgent,
stream,
runId: context.runId,
agentId: subAgentId,
eventBus: context.eventBus,
logger: context.logger,
threadId: context.threadId,
abortSignal: context.abortSignal,
});
});
return { kind: 'consumed', consumeResult };
}
/** Resume leg: locate the suspended planner, rehydrate the accumulator from
* the persisted plan, open a fresh trace span, and resume the cascading
* stream. Returns `lost-state` when the checkpoint can't be located. */
async resume(resumeData: Record<string, unknown>): Promise<PlannerRunOutcome> {
const { context, subAgent, subAgentId, tracedPlannerTools } = this;
const resumeInfo = await context.findSubAgentResumeInfo?.('planner');
if (!resumeInfo) return { kind: 'lost-state' };
// Rehydrate the accumulator from the persisted plan so an "ask for edits"
// revision operates on the full plan rather than an empty accumulator.
await rehydrateAccumulatorFromGraph(context, this.accumulator);
// Open a trace span for the resumed leg so a plan that suspended at HITL
// and resumed still shows its continuation in LangSmith. The planner card
// is already in the snapshot from the first call, so no agent-spawned
// event is (re-)published here.
this.traceRun = await startSubAgentTrace(context, {
agentId: subAgentId,
role: 'planner',
kind: 'planner',
inputs: { resumed: true },
});
mergeTraceRunInputs(
this.traceRun,
buildAgentTraceInputs({
systemPrompt: PLANNER_AGENT_PROMPT,
tools: tracedPlannerTools,
modelId: context.modelId,
}),
);
const consumeResult = await withTraceRun(context, this.traceRun, async () => {
const resumed = await resumeAgentStream(subAgent, resumeData, {
runId: resumeInfo.runId,
toolCallId: resumeInfo.toolCallId,
persistence: resumeInfo.persistence,
maxIterations: MAX_STEPS.PLANNER,
});
return await consumeStreamCascading({
agent: subAgent,
stream: resumed,
runId: context.runId,
agentId: subAgentId,
eventBus: context.eventBus,
logger: context.logger,
threadId: context.threadId,
abortSignal: context.abortSignal,
});
});
return { kind: 'consumed', consumeResult };
}
/** Cascade a planner suspension up to the orchestrator. Validates the
* forwarded payload; on a malformed payload it tears down the draft plan
* (so a later schedulePlannedTasks tick can't dispatch it) and returns a
* terminal result instead of suspending. */
async cascadeSuspension(
ctx: { suspend: (payload: z.infer<typeof planToolSuspendSchema>) => Promise<never> },
consumeResult: Extract<ConsumeStreamCascadingResult, { status: 'suspended' }>,
): Promise<{ result: string }> {
const { context } = this;
const parsed = planToolSuspendSchema.safeParse(consumeResult.suspension.suspendPayload);
if (!parsed.success) {
context.logger.warn('Planner emitted a suspension payload missing required fields', {
threadId: context.threadId,
runId: context.runId,
toolName: consumeResult.suspension.toolName,
zodIssues: parsed.error.issues,
});
await this.tearDownDraftPlan();
return {
result: 'Planner requested user input but the payload was malformed. Please try again.',
};
}
return await ctx.suspend(parsed.data);
}
/** Finalise a completed/cancelled/errored planner run: close the trace,
* publish agent-completed, then resolve to the orchestrator-facing result
* based on approval / denial / no-plan. */
async handleTerminalResult(
consumeResult: ConsumeStreamCascadingResult,
): Promise<{ result: string }> {
const { context, accumulator, subAgentId } = this;
const resultText = consumeResult.status === 'completed' ? await consumeResult.text : '';
if (this.traceRun) {
await finishTraceRun(context, this.traceRun, {
outputs: {
result: resultText,
agentId: subAgentId,
role: 'planner',
hasItems: !accumulator.isEmpty(),
itemCount: accumulator.getTaskItemsForEvent().length,
},
});
}
this.publishCompleted(resultText);
// Approval is detected via the accumulator's flag, which submit-plan flips
// in its resume handler. createPlan persisted the graph as
// `awaiting_approval` on the first call; flip it to `active` and schedule.
if (accumulator.isApproved()) {
if (context.plannedTaskService) {
await context.plannedTaskService.approvePlan(context.threadId);
}
if (context.schedulePlannedTasks) {
await context.schedulePlannedTasks();
}
// On resume the accumulator is fresh and reports 0 — query the persisted
// graph instead so the orchestrator gets accurate text.
const persistedCount = await getPersistedTaskCount(context);
const taskCount = persistedCount ?? accumulator.getTaskList().length;
return {
result: `Plan approved and ${taskCount} task${taskCount === 1 ? '' : 's'} dispatched.`,
};
}
// User explicitly denied the plan. submit-plan already cancelled the
// persisted graph, so the cancelled graph won't be picked up by the
// scheduler. Return a terminal result so the orchestrator stops cleanly.
if (accumulator.isDenied()) {
publishClearingEvent(context);
await clearDraftChecklist(context);
return { result: 'Plan denied by user. No tasks were dispatched.' };
}
// Planner finished without approval (no submit-plan or user didn't approve).
await this.tearDownDraftPlan();
if (!accumulator.isEmpty()) {
return {
result: `Planner added ${accumulator.getTaskList().length} items but did not submit the plan for approval. The plan was not executed.`,
};
}
return {
result: `Planner finished without producing a plan. Agent output: ${resultText}`,
};
}
/** Handle an exception thrown anywhere in the run: fail the trace, publish
* agent-completed with the error, and tear down the draft plan unless it
* was already approved (dispatched tasks must not be wiped). */
async handleError(error: unknown): Promise<{ result: string }> {
const { context, accumulator, subAgentId } = this;
const errorMessage = error instanceof Error ? error.message : String(error);
if (this.traceRun) {
await failTraceRun(context, this.traceRun, error, {
agent_id: subAgentId,
agent_role: 'planner',
});
}
context.eventBus.publish(context.threadId, {
type: 'agent-completed',
runId: context.runId,
agentId: subAgentId,
payload: { role: 'planner', result: '', error: errorMessage },
});
if (!accumulator.isApproved()) {
await this.tearDownDraftPlan();
}
return { result: `Planner error: ${errorMessage}` };
}
private publishCompleted(resultText: string): void {
this.context.eventBus.publish(this.context.threadId, {
type: 'agent-completed',
runId: this.context.runId,
agentId: this.subAgentId,
payload: { role: 'planner', result: resultText },
});
}
/** Clear the in-progress checklist UI + the unapproved persisted graph. */
private async tearDownDraftPlan(): Promise<void> {
publishClearingEvent(this.context);
await clearDraftChecklist(this.context);
await clearPlannedTaskGraph(this.context);
}
}
async function getPersistedTaskCount(context: OrchestrationContext): Promise<number | undefined> {
if (!context.plannedTaskService) return undefined;
try {
const graph = await context.plannedTaskService.getGraph(context.threadId);
return graph?.tasks?.length;
} catch {
return undefined;
}
}