mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-02 01:37:07 +02:00
feat(core): Persist pending confirmations on database (#31052)
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
11538be71b
commit
e413a7740d
|
|
@ -727,6 +727,7 @@ export interface InstanceAiConfirmation {
|
|||
introMessage?: string;
|
||||
tasks?: TaskList;
|
||||
resourceDecision?: GatewayConfirmationRequiredPayload;
|
||||
expired?: boolean;
|
||||
}
|
||||
|
||||
export interface InstanceAiToolCallState {
|
||||
|
|
|
|||
|
|
@ -100,6 +100,8 @@ export type {
|
|||
AgentMessage,
|
||||
BuiltMemory,
|
||||
CheckpointStore,
|
||||
ContentToolCall,
|
||||
MessageContent,
|
||||
SerializableAgentState,
|
||||
Thread,
|
||||
} from '@n8n/agents';
|
||||
|
|
@ -190,6 +192,7 @@ export const createSubAgent: typeof SubAgentFactoryMod.createSubAgent = lazyFunc
|
|||
);
|
||||
export { createAllTools, createOrchestrationTools } from './tools';
|
||||
export {
|
||||
createSubAgentResourceId,
|
||||
createSubAgentResourceIdPrefix,
|
||||
SUB_AGENT_RESOURCE_PREFIX,
|
||||
} from './tools/orchestration/agent-persistence';
|
||||
|
|
|
|||
|
|
@ -139,6 +139,7 @@ describe('executeResumableStream', () => {
|
|||
toolCallId: 'tool-call-1',
|
||||
requestId: 'request-1',
|
||||
toolName: 'ask-user',
|
||||
suspendPayload: { requestId: 'request-1', message: 'Need approval' },
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
|
@ -479,6 +480,7 @@ describe('executeResumableStream', () => {
|
|||
requestId: 'request-1',
|
||||
toolCallId: 'tool-call-1',
|
||||
toolName: 'pause-for-user',
|
||||
suspendPayload: { requestId: 'request-1', message: 'First confirmation' },
|
||||
});
|
||||
expect(waitForConfirmation).toHaveBeenCalledTimes(1);
|
||||
expect(waitForConfirmation).toHaveBeenCalledWith('request-1');
|
||||
|
|
|
|||
|
|
@ -136,6 +136,7 @@ describe('streamAgentRun', () => {
|
|||
requestId: 'request-1',
|
||||
toolCallId: 'tool-call-1',
|
||||
toolName: 'pause-for-user',
|
||||
suspendPayload: {},
|
||||
},
|
||||
confirmationEvent: {
|
||||
type: 'confirmation-request',
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
import type { InstanceAiEvent } from '@n8n/api-types';
|
||||
|
||||
import type { InstanceAiEventBus } from '../event-bus/event-bus.interface';
|
||||
import type { Logger } from '../logger';
|
||||
import {
|
||||
|
|
@ -6,6 +8,9 @@ import {
|
|||
type TraceStatus,
|
||||
} from '../runtime/resumable-stream-executor';
|
||||
import type { WorkSummary } from '../stream/work-summary-accumulator';
|
||||
import type { SuspensionInfo } from '../utils/stream-helpers';
|
||||
|
||||
type ConfirmationRequestEvent = Extract<InstanceAiEvent, { type: 'confirmation-request' }>;
|
||||
|
||||
export interface ConsumeWithHitlOptions {
|
||||
agent: unknown;
|
||||
|
|
@ -107,3 +112,81 @@ export async function consumeStreamWithHitl(
|
|||
workSummary: result.workSummary,
|
||||
};
|
||||
}
|
||||
|
||||
export interface ConsumeStreamCascadingOptions {
|
||||
agent: unknown;
|
||||
stream: unknown;
|
||||
runId: string;
|
||||
agentId: string;
|
||||
eventBus: InstanceAiEventBus;
|
||||
logger: Logger;
|
||||
threadId: string;
|
||||
abortSignal: AbortSignal;
|
||||
}
|
||||
|
||||
export type ConsumeStreamCascadingResult =
|
||||
| {
|
||||
status: 'completed' | 'cancelled' | 'errored';
|
||||
agentRunId: string;
|
||||
text: Promise<string>;
|
||||
workSummary: WorkSummary;
|
||||
}
|
||||
| {
|
||||
status: 'suspended';
|
||||
agentRunId: string;
|
||||
suspension: SuspensionInfo;
|
||||
confirmationEvent?: ConfirmationRequestEvent;
|
||||
text?: Promise<string>;
|
||||
workSummary: WorkSummary;
|
||||
};
|
||||
|
||||
/**
|
||||
* Consume a sub-agent stream and return cleanly when it either finishes or
|
||||
* hits a HITL suspension. Unlike `consumeStreamWithHitl` (which transparently
|
||||
* bridges sub-agent suspensions to a parent `waitForConfirmation` Promise),
|
||||
* this returns the suspension info to the caller so the caller can decide
|
||||
* how to handle it — e.g. cascade the suspension up to its own SDK suspend
|
||||
* via `ctx.suspend(payload)`, so the parent's agent run is also checkpointed
|
||||
* and survives a process restart.
|
||||
*
|
||||
* Uses `executeResumableStream`'s manual mode, which suppresses the
|
||||
* sub-agent's `confirmation-request` event publish (returning it on the
|
||||
* result instead) — the caller emits the card at whatever runId is
|
||||
* meaningful at its level.
|
||||
*/
|
||||
export async function consumeStreamCascading(
|
||||
options: ConsumeStreamCascadingOptions,
|
||||
): Promise<ConsumeStreamCascadingResult> {
|
||||
const stream = normalizeStreamSource(options.stream);
|
||||
const result = await executeResumableStream({
|
||||
agent: options.agent,
|
||||
stream,
|
||||
context: {
|
||||
threadId: options.threadId,
|
||||
runId: options.runId,
|
||||
agentId: options.agentId,
|
||||
eventBus: options.eventBus,
|
||||
signal: options.abortSignal,
|
||||
logger: options.logger,
|
||||
},
|
||||
control: { mode: 'manual' },
|
||||
});
|
||||
|
||||
if (result.status === 'suspended' && result.suspension) {
|
||||
return {
|
||||
status: 'suspended',
|
||||
agentRunId: result.agentRunId,
|
||||
suspension: result.suspension,
|
||||
...(result.confirmationEvent ? { confirmationEvent: result.confirmationEvent } : {}),
|
||||
...(result.text ? { text: result.text } : {}),
|
||||
workSummary: result.workSummary,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
status: result.status === 'suspended' ? 'errored' : result.status,
|
||||
agentRunId: result.agentRunId,
|
||||
text: result.text ?? stream.text ?? Promise.resolve(''),
|
||||
workSummary: result.workSummary,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -141,4 +141,76 @@ describe('BlueprintAccumulator', () => {
|
|||
expect(checkpoint?.deps).toEqual(['wf-2']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('loadFromTasks (revision flow)', () => {
|
||||
const originalTasks = [
|
||||
{
|
||||
id: 'wf-1',
|
||||
title: "Build 'A' workflow",
|
||||
kind: 'build-workflow',
|
||||
spec: 'Build A',
|
||||
deps: [],
|
||||
},
|
||||
{
|
||||
id: 'wf-2',
|
||||
title: "Build 'B' workflow",
|
||||
kind: 'build-workflow',
|
||||
spec: 'Build B',
|
||||
deps: [],
|
||||
},
|
||||
{
|
||||
id: 'verify-1',
|
||||
title: 'Verify A',
|
||||
kind: 'checkpoint',
|
||||
spec: 'Verify A',
|
||||
deps: ['wf-1'],
|
||||
},
|
||||
];
|
||||
|
||||
it('seeds the accumulator with the persisted plan', () => {
|
||||
accumulator.loadFromTasks(originalTasks);
|
||||
|
||||
expect(accumulator.isEmpty()).toBe(false);
|
||||
expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-2', 'verify-1']);
|
||||
});
|
||||
|
||||
it('preserves original items across an ask-for-edits revision (remove + add + resubmit)', () => {
|
||||
// Simulates the resume path: the parent handler rebuilt a fresh
|
||||
// accumulator, then rehydrated it from the persisted graph before
|
||||
// the planner revised the plan.
|
||||
accumulator.loadFromTasks(originalTasks);
|
||||
|
||||
// Planner revises: drop one original, add a new one.
|
||||
expect(accumulator.removeItem('wf-2')).toBe(true);
|
||||
accumulator.addItem({
|
||||
kind: 'workflow',
|
||||
id: 'wf-3',
|
||||
name: 'C',
|
||||
purpose: 'Build C',
|
||||
integrations: [],
|
||||
dependsOn: [],
|
||||
});
|
||||
|
||||
// The resubmitted plan keeps the surviving originals plus the new item,
|
||||
// rather than collapsing to only the newly-added one.
|
||||
expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'verify-1', 'wf-3']);
|
||||
});
|
||||
|
||||
it('upserts by id so a revised original replaces in place', () => {
|
||||
accumulator.loadFromTasks(originalTasks);
|
||||
|
||||
accumulator.addItem({
|
||||
kind: 'workflow',
|
||||
id: 'wf-1',
|
||||
name: 'A (revised)',
|
||||
purpose: 'Build A differently',
|
||||
integrations: [],
|
||||
dependsOn: [],
|
||||
});
|
||||
|
||||
const list = accumulator.getTaskList();
|
||||
expect(list.map((t) => t.id)).toEqual(['wf-1', 'wf-2', 'verify-1']);
|
||||
expect(list.find((t) => t.id === 'wf-1')?.title).toBe("Build 'A (revised)' workflow");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,4 +1,10 @@
|
|||
import type { OrchestrationContext, PlannedTaskGraph, PlannedTaskService } from '../../../types';
|
||||
import type {
|
||||
OrchestrationContext,
|
||||
PlannedTaskGraph,
|
||||
PlannedTaskRecord,
|
||||
PlannedTaskService,
|
||||
} from '../../../types';
|
||||
import { BlueprintAccumulator } from '../blueprint-accumulator';
|
||||
|
||||
const {
|
||||
__testBuildPlannerBriefingContext,
|
||||
|
|
@ -6,6 +12,7 @@ const {
|
|||
__testFormatMessagesForBriefing,
|
||||
__testGetRecentMessages,
|
||||
__testGetPriorToolObservations,
|
||||
__testRehydrateAccumulatorFromGraph,
|
||||
} =
|
||||
// eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/consistent-type-imports
|
||||
require('../plan-with-agent.tool') as typeof import('../plan-with-agent.tool');
|
||||
|
|
@ -103,6 +110,87 @@ describe('clearPlannedTaskGraph', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('rehydrateAccumulatorFromGraph (resume revision flow)', () => {
|
||||
const persistedTasks: PlannedTaskRecord[] = [
|
||||
{
|
||||
id: 'wf-1',
|
||||
title: "Build 'A' workflow",
|
||||
kind: 'build-workflow',
|
||||
spec: 'A',
|
||||
deps: [],
|
||||
status: 'planned',
|
||||
},
|
||||
{
|
||||
id: 'wf-2',
|
||||
title: "Build 'B' workflow",
|
||||
kind: 'build-workflow',
|
||||
spec: 'B',
|
||||
deps: [],
|
||||
status: 'planned',
|
||||
},
|
||||
];
|
||||
|
||||
it('seeds the accumulator from an awaiting-approval graph so a revision keeps originals', async () => {
|
||||
// Reproduces "ask for edits -> revise existing plan -> submit again":
|
||||
// on resume the parent rebuilt a fresh accumulator; without rehydration
|
||||
// the planner's remove/add would operate on an empty plan and the
|
||||
// re-submit would drop every original item.
|
||||
const { context } = makeContext({
|
||||
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks },
|
||||
});
|
||||
const accumulator = new BlueprintAccumulator();
|
||||
|
||||
await __testRehydrateAccumulatorFromGraph(context, accumulator);
|
||||
|
||||
// Planner revises: drop one original, add a new one, then resubmits.
|
||||
expect(accumulator.removeItem('wf-2')).toBe(true);
|
||||
accumulator.addItem({
|
||||
kind: 'workflow',
|
||||
id: 'wf-3',
|
||||
name: 'C',
|
||||
purpose: 'C',
|
||||
integrations: [],
|
||||
dependsOn: [],
|
||||
});
|
||||
|
||||
expect(accumulator.getTaskList().map((t) => t.id)).toEqual(['wf-1', 'wf-3']);
|
||||
});
|
||||
|
||||
it('does not reopen an already-approved/active graph', async () => {
|
||||
const { context } = makeContext({
|
||||
graph: { planRunId: 'run-current', status: 'active', tasks: persistedTasks },
|
||||
});
|
||||
const accumulator = new BlueprintAccumulator();
|
||||
|
||||
await __testRehydrateAccumulatorFromGraph(context, accumulator);
|
||||
|
||||
expect(accumulator.isEmpty()).toBe(true);
|
||||
});
|
||||
|
||||
it('is a no-op when no graph exists', async () => {
|
||||
const { context, getGraph } = makeContext({ graph: null });
|
||||
const accumulator = new BlueprintAccumulator();
|
||||
|
||||
await __testRehydrateAccumulatorFromGraph(context, accumulator);
|
||||
|
||||
expect(getGraph).toHaveBeenCalledWith('t-1');
|
||||
expect(accumulator.isEmpty()).toBe(true);
|
||||
});
|
||||
|
||||
it('leaves the accumulator empty when getGraph throws', async () => {
|
||||
const { context, getGraph } = makeContext({
|
||||
graph: { planRunId: 'run-current', status: 'awaiting_approval', tasks: persistedTasks },
|
||||
});
|
||||
getGraph.mockRejectedValueOnce(new Error('db down'));
|
||||
const accumulator = new BlueprintAccumulator();
|
||||
|
||||
await expect(
|
||||
__testRehydrateAccumulatorFromGraph(context, accumulator),
|
||||
).resolves.toBeUndefined();
|
||||
expect(accumulator.isEmpty()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('formatMessagesForBriefing', () => {
|
||||
// The planner system prompt (plan-agent-prompt.ts) treats <current-datetime>
|
||||
// and <user-timezone> as a paired contract — schedule/cron decisions read
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
import { Agent, Tool } from '@n8n/agents';
|
||||
import type { InstanceAiEvent } from '@n8n/api-types';
|
||||
import { DateTime } from 'luxon';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { createAddPlanItemTool, createRemovePlanItemTool } from './add-plan-item.tool';
|
||||
|
|
@ -33,10 +32,12 @@ import {
|
|||
} from './tracing-utils';
|
||||
import { attachRuntimeWorkspaceCapabilities } from '../../agent/runtime-workspace';
|
||||
import { MAX_STEPS } from '../../constants/max-steps';
|
||||
import { consumeStreamWithHitl, requireCompletedHitlText } from '../../stream/consume-with-hitl';
|
||||
import { consumeStreamCascading } from '../../stream/consume-with-hitl';
|
||||
import type { ConsumeStreamCascadingResult } from '../../stream/consume-with-hitl';
|
||||
import { createToolRegistry, toolRegistryKeys, toolRegistryValues } from '../../tool-registry';
|
||||
import { buildAgentTraceInputs, mergeTraceRunInputs } from '../../tracing/langsmith-tracing';
|
||||
import type { OrchestrationContext } from '../../types';
|
||||
import { resumeAgentStream } from '../../utils/stream-helpers';
|
||||
import { CREDENTIALS_TOOL_ID } from '../credentials.tool';
|
||||
import { DATA_TABLES_TOOL_ID } from '../data-tables.tool';
|
||||
import { ASK_USER_TOOL_ID } from '../shared/ask-user.tool';
|
||||
|
|
@ -612,10 +613,74 @@ async function clearPlannedTaskGraph(context: OrchestrationContext): Promise<voi
|
|||
}
|
||||
}
|
||||
|
||||
export async function __testRehydrateAccumulatorFromGraph(
|
||||
context: OrchestrationContext,
|
||||
accumulator: BlueprintAccumulator,
|
||||
): Promise<void> {
|
||||
return await rehydrateAccumulatorFromGraph(context, accumulator);
|
||||
}
|
||||
|
||||
/**
|
||||
* Seed a freshly-built accumulator from the persisted plan before a planner
|
||||
* resume. The parent plan-tool handler exits on every cascade-suspend, so the
|
||||
* first-call accumulator is gone by the time an "ask for edits" revision
|
||||
* resumes the planner — without this, remove-plan-item can't touch the
|
||||
* original items, add-plan-item only carries the newly-added ones, and the
|
||||
* re-submit's createPlan (which overwrites unconditionally) replaces the graph
|
||||
* with a partial plan.
|
||||
*
|
||||
* Only rehydrates while the plan is still `awaiting_approval` (the revision
|
||||
* window) — an already-approved/active graph with in-flight tasks must not be
|
||||
* reopened here. Best-effort: a getGraph failure leaves the accumulator empty
|
||||
* rather than blocking the resume.
|
||||
*/
|
||||
async function rehydrateAccumulatorFromGraph(
|
||||
context: OrchestrationContext,
|
||||
accumulator: BlueprintAccumulator,
|
||||
): Promise<void> {
|
||||
if (!context.plannedTaskService) return;
|
||||
try {
|
||||
const graph = await context.plannedTaskService.getGraph(context.threadId);
|
||||
if (graph?.status === 'awaiting_approval' && graph.tasks.length > 0) {
|
||||
accumulator.loadFromTasks(graph.tasks);
|
||||
}
|
||||
} catch {
|
||||
// Best-effort — fall back to an empty accumulator rather than block resume.
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tool factory
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* The plan tool cascades sub-agent HITL suspensions UP through the SDK's
|
||||
* native suspend/resume mechanism: when the planner sub-agent (or any tool
|
||||
* inside it) emits a `tool-call-suspended` chunk, the plan tool catches it
|
||||
* via `consumeStreamCascading` and calls its own `ctx.suspend()` with the
|
||||
* same payload. This checkpoints the orchestrator's full state alongside the
|
||||
* planner's, so a process restart between user prompt and click can resume
|
||||
* the planner without losing any state.
|
||||
*
|
||||
* The schemas below are permissive on purpose: the plan tool just forwards
|
||||
* whatever the inner tool emitted (submit-plan's plan-review payload OR
|
||||
* ask-user's questions payload) and accepts whatever the frontend sent back
|
||||
* for that card. Validation already happened on the inner tool.
|
||||
*/
|
||||
const planToolSuspendSchema = z
|
||||
.object({
|
||||
requestId: z.string(),
|
||||
message: z.string(),
|
||||
severity: z.string(),
|
||||
// Only submit-plan + ask-user carry an `inputType`; cascaded suspensions
|
||||
// from other planner tools (credentials, data-tables, ...) don't, and a
|
||||
// strict `inputType: string` would reject otherwise-valid payloads.
|
||||
inputType: z.string().optional(),
|
||||
})
|
||||
.passthrough();
|
||||
|
||||
const planToolResumeSchema = z.record(z.unknown());
|
||||
|
||||
export function createPlanWithAgentTool(context: OrchestrationContext) {
|
||||
return new Tool('plan')
|
||||
.description(
|
||||
|
|
@ -643,13 +708,20 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
result: z.string(),
|
||||
}),
|
||||
)
|
||||
.handler(async (input: { guidance?: string }) => {
|
||||
.suspend(planToolSuspendSchema)
|
||||
.resume(planToolResumeSchema)
|
||||
.handler(async (input: { guidance?: string }, ctx) => {
|
||||
const resumeData = ctx.resumeData;
|
||||
const isResume = resumeData !== undefined && resumeData !== null;
|
||||
|
||||
// ── Same-turn denial guard ─────────────────────────────────────
|
||||
// If the user denied a plan earlier in this same message group, the
|
||||
// orchestrator must not silently spawn another planner. Without this
|
||||
// guard the LLM can ignore the "stop on denial" prompt and start a
|
||||
// fresh planner with a new accumulator, defeating the denial.
|
||||
if (context.plannedTaskService && context.messageGroupId) {
|
||||
// Only applies to first-call invocations — resume continues an
|
||||
// already-suspended planner and cannot be a fresh re-spawn.
|
||||
if (!isResume && context.plannedTaskService && context.messageGroupId) {
|
||||
const existing = await context.plannedTaskService.getGraph(context.threadId);
|
||||
if (
|
||||
existing?.status === 'cancelled' &&
|
||||
|
|
@ -666,147 +738,230 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
}
|
||||
}
|
||||
|
||||
// ── Collect planner tools ──────────────────────────────────────
|
||||
// ── Collect planner tools (shared between first-call and resume) ──
|
||||
const plannerTools = createToolRegistry();
|
||||
|
||||
for (const name of PLANNER_DOMAIN_TOOL_NAMES) {
|
||||
const tool = context.domainTools.get(name);
|
||||
if (tool) {
|
||||
plannerTools.set(name, tool);
|
||||
}
|
||||
if (tool) plannerTools.set(name, tool);
|
||||
}
|
||||
|
||||
for (const name of PLANNER_RESEARCH_TOOL_NAMES) {
|
||||
const tool = context.domainTools.get(name);
|
||||
if (tool) {
|
||||
plannerTools.set(name, tool);
|
||||
}
|
||||
if (tool) plannerTools.set(name, tool);
|
||||
}
|
||||
|
||||
// Best-practices guidance — planner-exclusive
|
||||
plannerTools.set('templates', createTemplatesTool());
|
||||
|
||||
// Incremental plan accumulation + approval tools
|
||||
const accumulator = new BlueprintAccumulator();
|
||||
plannerTools.set('add-plan-item', createAddPlanItemTool(accumulator, context));
|
||||
plannerTools.set('remove-plan-item', createRemovePlanItemTool(accumulator, context));
|
||||
plannerTools.set('submit-plan', createSubmitPlanTool(accumulator, context));
|
||||
|
||||
// ── Retrieve conversation history ─────────────────────────────
|
||||
const messages = await getRecentMessages(context, MESSAGE_HISTORY_COUNT);
|
||||
const briefingContext = buildPlannerBriefingContext(getPriorToolObservations(context));
|
||||
const briefing = formatMessagesForBriefing(
|
||||
messages,
|
||||
input.guidance,
|
||||
context.timeZone,
|
||||
briefingContext,
|
||||
);
|
||||
|
||||
// ── IDs & events ──────────────────────────────────────────────
|
||||
const subAgentId = `agent-planner-${nanoid(6)}`;
|
||||
const subtitle =
|
||||
input.guidance ?? messages.find((m) => m.role === 'user')?.content ?? 'Planning...';
|
||||
|
||||
context.eventBus.publish(context.threadId, {
|
||||
type: 'agent-spawned',
|
||||
runId: context.runId,
|
||||
agentId: subAgentId,
|
||||
payload: {
|
||||
parentId: context.orchestratorAgentId,
|
||||
role: 'planner',
|
||||
tools: toolRegistryKeys(plannerTools),
|
||||
kind: 'planner' as const,
|
||||
title: 'Planning',
|
||||
subtitle: truncateLabel(subtitle),
|
||||
goal: briefing,
|
||||
},
|
||||
});
|
||||
|
||||
// ── Tracing ───────────────────────────────────────────────────
|
||||
const traceRun = await startSubAgentTrace(context, {
|
||||
agentId: subAgentId,
|
||||
role: 'planner',
|
||||
kind: 'planner',
|
||||
inputs: {
|
||||
guidance: input.guidance,
|
||||
messageCount: messages.length,
|
||||
},
|
||||
});
|
||||
// Use a runId-derived sub-agent id so resume reuses the same event stream identity.
|
||||
const subAgentId = `agent-planner-${context.runId}`;
|
||||
const tracedPlannerTools = traceSubAgentTools(context, plannerTools, 'planner');
|
||||
|
||||
// ── Build sub-agent (shared) ─────────────────────────────────
|
||||
const subAgent = new Agent('Workflow Planner Agent')
|
||||
.model(context.modelId)
|
||||
.instructions(PLANNER_AGENT_PROMPT, {
|
||||
providerOptions: {
|
||||
anthropic: { cacheControl: { type: 'ephemeral' } },
|
||||
},
|
||||
})
|
||||
.tool(toolRegistryValues(tracedPlannerTools))
|
||||
.checkpoint(context.checkpointStore ?? 'memory');
|
||||
attachRuntimeWorkspaceCapabilities(subAgent, {
|
||||
runtimeSkills: context.runtimeSkills,
|
||||
});
|
||||
const telemetry = context.tracing?.getTelemetry?.({
|
||||
agentRole: 'planner',
|
||||
functionId: 'instance-ai.subagent.planner',
|
||||
executionMode: 'background',
|
||||
metadata: { agent_id: subAgentId },
|
||||
});
|
||||
if (telemetry) {
|
||||
subAgent.telemetry(telemetry);
|
||||
}
|
||||
|
||||
let traceRun: Awaited<ReturnType<typeof startSubAgentTrace>> | undefined;
|
||||
|
||||
try {
|
||||
// ── Create & stream sub-agent (inline, blocking) ──────────
|
||||
const subAgent = new Agent('Workflow Planner Agent')
|
||||
.model(context.modelId)
|
||||
.instructions(PLANNER_AGENT_PROMPT, {
|
||||
providerOptions: {
|
||||
anthropic: { cacheControl: { type: 'ephemeral' } },
|
||||
},
|
||||
})
|
||||
.tool(toolRegistryValues(tracedPlannerTools))
|
||||
.checkpoint(context.checkpointStore ?? 'memory');
|
||||
attachRuntimeWorkspaceCapabilities(subAgent, {
|
||||
runtimeSkills: context.runtimeSkills,
|
||||
});
|
||||
const telemetry = context.tracing?.getTelemetry?.({
|
||||
agentRole: 'planner',
|
||||
functionId: 'instance-ai.subagent.planner',
|
||||
executionMode: 'background',
|
||||
metadata: { agent_id: subAgentId },
|
||||
});
|
||||
if (telemetry) {
|
||||
subAgent.telemetry(telemetry);
|
||||
}
|
||||
mergeTraceRunInputs(
|
||||
traceRun,
|
||||
buildAgentTraceInputs({
|
||||
systemPrompt: PLANNER_AGENT_PROMPT,
|
||||
tools: tracedPlannerTools,
|
||||
modelId: context.modelId,
|
||||
}),
|
||||
);
|
||||
let consumeResult: ConsumeStreamCascadingResult;
|
||||
|
||||
const resultText = await withTraceRun(context, traceRun, async () => {
|
||||
const persistence = await createSubAgentPersistence(context, {
|
||||
agentKind: 'planner',
|
||||
});
|
||||
const stream = await subAgent.stream(briefing, {
|
||||
maxIterations: MAX_STEPS.PLANNER,
|
||||
abortSignal: context.abortSignal,
|
||||
persistence,
|
||||
providerOptions: {
|
||||
anthropic: { cacheControl: { type: 'ephemeral' } },
|
||||
},
|
||||
});
|
||||
if (isResume) {
|
||||
// ── Resume path ─────────────────────────────────────────
|
||||
const resumeInfo = await context.findSubAgentResumeInfo?.('planner');
|
||||
if (!resumeInfo) {
|
||||
return {
|
||||
result:
|
||||
'The planning step could not be resumed because its state was lost. Please send a new message to continue.',
|
||||
};
|
||||
}
|
||||
|
||||
const result = await consumeStreamWithHitl({
|
||||
agent: subAgent,
|
||||
stream,
|
||||
runId: context.runId,
|
||||
agentId: subAgentId,
|
||||
eventBus: context.eventBus,
|
||||
logger: context.logger,
|
||||
threadId: context.threadId,
|
||||
abortSignal: context.abortSignal,
|
||||
waitForConfirmation: context.waitForConfirmation,
|
||||
maxIterations: MAX_STEPS.PLANNER,
|
||||
persistence,
|
||||
});
|
||||
// Rehydrate the accumulator from the persisted plan so an
|
||||
// "ask for edits" revision operates on the full plan rather
|
||||
// than an empty accumulator. See rehydrateAccumulatorFromGraph.
|
||||
await rehydrateAccumulatorFromGraph(context, accumulator);
|
||||
|
||||
return await requireCompletedHitlText(result, 'Planner sub-agent');
|
||||
});
|
||||
|
||||
await finishTraceRun(context, traceRun, {
|
||||
outputs: {
|
||||
result: resultText,
|
||||
// Open a trace span for the resumed leg so a plan that suspended
|
||||
// at HITL and resumed still shows its continuation in LangSmith.
|
||||
// The planner card itself is already in the snapshot from the
|
||||
// first call, so no agent-spawned event is (re-)published here.
|
||||
traceRun = await startSubAgentTrace(context, {
|
||||
agentId: subAgentId,
|
||||
role: 'planner',
|
||||
hasItems: !accumulator.isEmpty(),
|
||||
itemCount: accumulator.getTaskItemsForEvent().length,
|
||||
},
|
||||
});
|
||||
kind: 'planner',
|
||||
inputs: { resumed: true },
|
||||
});
|
||||
mergeTraceRunInputs(
|
||||
traceRun,
|
||||
buildAgentTraceInputs({
|
||||
systemPrompt: PLANNER_AGENT_PROMPT,
|
||||
tools: tracedPlannerTools,
|
||||
modelId: context.modelId,
|
||||
}),
|
||||
);
|
||||
|
||||
consumeResult = await withTraceRun(context, traceRun, async () => {
|
||||
const resumed = await resumeAgentStream(subAgent, resumeData, {
|
||||
runId: resumeInfo.runId,
|
||||
toolCallId: resumeInfo.toolCallId,
|
||||
persistence: resumeInfo.persistence,
|
||||
maxIterations: MAX_STEPS.PLANNER,
|
||||
});
|
||||
|
||||
return await consumeStreamCascading({
|
||||
agent: subAgent,
|
||||
stream: resumed,
|
||||
runId: context.runId,
|
||||
agentId: subAgentId,
|
||||
eventBus: context.eventBus,
|
||||
logger: context.logger,
|
||||
threadId: context.threadId,
|
||||
abortSignal: context.abortSignal,
|
||||
});
|
||||
});
|
||||
} else {
|
||||
// ── First-call path ─────────────────────────────────────
|
||||
// The planner is the most common inline HITL entry point — when it
|
||||
// suspends the orchestrator cascades-suspends too, and the SDK does
|
||||
// not flush the user-message row to memory until a clean loop end
|
||||
// (which a suspended run never reaches). Persist eagerly so the
|
||||
// user's bubble is visible if they reload during the suspend window.
|
||||
if (context.persistInFlightUserMessage) {
|
||||
await context.persistInFlightUserMessage();
|
||||
}
|
||||
|
||||
const messages = await getRecentMessages(context, MESSAGE_HISTORY_COUNT);
|
||||
const briefingContext = buildPlannerBriefingContext(getPriorToolObservations(context));
|
||||
const briefing = formatMessagesForBriefing(
|
||||
messages,
|
||||
input.guidance,
|
||||
context.timeZone,
|
||||
briefingContext,
|
||||
);
|
||||
|
||||
const subtitle =
|
||||
input.guidance ?? messages.find((m) => m.role === 'user')?.content ?? 'Planning...';
|
||||
|
||||
context.eventBus.publish(context.threadId, {
|
||||
type: 'agent-spawned',
|
||||
runId: context.runId,
|
||||
agentId: subAgentId,
|
||||
payload: {
|
||||
parentId: context.orchestratorAgentId,
|
||||
role: 'planner',
|
||||
tools: toolRegistryKeys(plannerTools),
|
||||
kind: 'planner' as const,
|
||||
title: 'Planning',
|
||||
subtitle: truncateLabel(subtitle),
|
||||
goal: briefing,
|
||||
},
|
||||
});
|
||||
|
||||
traceRun = await startSubAgentTrace(context, {
|
||||
agentId: subAgentId,
|
||||
role: 'planner',
|
||||
kind: 'planner',
|
||||
inputs: {
|
||||
guidance: input.guidance,
|
||||
messageCount: messages.length,
|
||||
},
|
||||
});
|
||||
|
||||
mergeTraceRunInputs(
|
||||
traceRun,
|
||||
buildAgentTraceInputs({
|
||||
systemPrompt: PLANNER_AGENT_PROMPT,
|
||||
tools: tracedPlannerTools,
|
||||
modelId: context.modelId,
|
||||
}),
|
||||
);
|
||||
|
||||
consumeResult = await withTraceRun(context, traceRun, async () => {
|
||||
const persistence = await createSubAgentPersistence(context, {
|
||||
agentKind: 'planner',
|
||||
});
|
||||
const stream = await subAgent.stream(briefing, {
|
||||
maxIterations: MAX_STEPS.PLANNER,
|
||||
abortSignal: context.abortSignal,
|
||||
persistence,
|
||||
providerOptions: {
|
||||
anthropic: { cacheControl: { type: 'ephemeral' } },
|
||||
},
|
||||
});
|
||||
|
||||
return await consumeStreamCascading({
|
||||
agent: subAgent,
|
||||
stream,
|
||||
runId: context.runId,
|
||||
agentId: subAgentId,
|
||||
eventBus: context.eventBus,
|
||||
logger: context.logger,
|
||||
threadId: context.threadId,
|
||||
abortSignal: context.abortSignal,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// ── Cascade suspension up to the orchestrator ───────────
|
||||
if (consumeResult.status === 'suspended') {
|
||||
const parsed = planToolSuspendSchema.safeParse(consumeResult.suspension.suspendPayload);
|
||||
if (!parsed.success) {
|
||||
context.logger.warn('Planner emitted a suspension payload missing required fields', {
|
||||
threadId: context.threadId,
|
||||
runId: context.runId,
|
||||
toolName: consumeResult.suspension.toolName,
|
||||
zodIssues: parsed.error.issues,
|
||||
});
|
||||
publishClearingEvent(context);
|
||||
await clearDraftChecklist(context);
|
||||
await clearPlannedTaskGraph(context);
|
||||
return {
|
||||
result:
|
||||
'Planner requested user input but the payload was malformed. Please try again.',
|
||||
};
|
||||
}
|
||||
return await ctx.suspend(parsed.data);
|
||||
}
|
||||
|
||||
// ── Stream finished (completed/cancelled/errored) ──────
|
||||
const resultText = consumeResult.status === 'completed' ? await consumeResult.text : '';
|
||||
|
||||
if (traceRun) {
|
||||
await finishTraceRun(context, traceRun, {
|
||||
outputs: {
|
||||
result: resultText,
|
||||
agentId: subAgentId,
|
||||
role: 'planner',
|
||||
hasItems: !accumulator.isEmpty(),
|
||||
itemCount: accumulator.getTaskItemsForEvent().length,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// ── Publish agent-completed ───────────────────────────────
|
||||
context.eventBus.publish(context.threadId, {
|
||||
type: 'agent-completed',
|
||||
runId: context.runId,
|
||||
|
|
@ -818,9 +973,10 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
});
|
||||
|
||||
// ── Schedule tasks after planner-driven approval ──────────
|
||||
// Only dispatch if submit-plan was called AND the user approved.
|
||||
// createPlan persists the graph as `awaiting_approval`; flip it
|
||||
// to `active` before scheduling so tick() can dispatch.
|
||||
// Approval is detected via the accumulator's flag, which submit-plan
|
||||
// flips in its resume handler. createPlan persisted the graph as
|
||||
// `awaiting_approval` on the first call; flip it to `active` and
|
||||
// schedule.
|
||||
if (accumulator.isApproved()) {
|
||||
if (context.plannedTaskService) {
|
||||
await context.plannedTaskService.approvePlan(context.threadId);
|
||||
|
|
@ -828,7 +984,10 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
if (context.schedulePlannedTasks) {
|
||||
await context.schedulePlannedTasks();
|
||||
}
|
||||
const taskCount = accumulator.getTaskList().length;
|
||||
// On resume the accumulator is fresh and reports 0 — query the
|
||||
// persisted graph instead so the orchestrator gets accurate text.
|
||||
const persistedCount = await getPersistedTaskCount(context);
|
||||
const taskCount = persistedCount ?? accumulator.getTaskList().length;
|
||||
return {
|
||||
result: `Plan approved and ${taskCount} task${taskCount === 1 ? '' : 's'} dispatched.`,
|
||||
};
|
||||
|
|
@ -843,13 +1002,9 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
return { result: 'Plan denied by user. No tasks were dispatched.' };
|
||||
}
|
||||
|
||||
// Planner finished without approval (no submit-plan or user didn't approve)
|
||||
// Planner finished without approval (no submit-plan or user didn't approve).
|
||||
publishClearingEvent(context);
|
||||
await clearDraftChecklist(context);
|
||||
// Clear the persisted planned-task graph too. submit-plan persists
|
||||
// it BEFORE user approval (so HITL can display the checklist), so
|
||||
// leaving it intact on planner give-up would let a later
|
||||
// schedulePlannedTasks() tick pick up and dispatch a rejected plan.
|
||||
await clearPlannedTaskGraph(context);
|
||||
if (!accumulator.isEmpty()) {
|
||||
return {
|
||||
|
|
@ -861,10 +1016,12 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
};
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
await failTraceRun(context, traceRun, error, {
|
||||
agent_id: subAgentId,
|
||||
agent_role: 'planner',
|
||||
});
|
||||
if (traceRun) {
|
||||
await failTraceRun(context, traceRun, error, {
|
||||
agent_id: subAgentId,
|
||||
agent_role: 'planner',
|
||||
});
|
||||
}
|
||||
|
||||
context.eventBus.publish(context.threadId, {
|
||||
type: 'agent-completed',
|
||||
|
|
@ -877,12 +1034,6 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
},
|
||||
});
|
||||
|
||||
// Clear draft checklist and persisted graph on error — same reason
|
||||
// as the non-approval path: an error-aborted plan must not later be
|
||||
// auto-dispatched by the post-run reschedule. Skip both when the user
|
||||
// already approved this plan: the failure is downstream of approval
|
||||
// (e.g. approvePlan/schedulePlannedTasks threw), and clearing would
|
||||
// drop a plan the user explicitly accepted.
|
||||
if (!accumulator.isApproved()) {
|
||||
publishClearingEvent(context);
|
||||
await clearDraftChecklist(context);
|
||||
|
|
@ -894,3 +1045,13 @@ export function createPlanWithAgentTool(context: OrchestrationContext) {
|
|||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
async function getPersistedTaskCount(context: OrchestrationContext): Promise<number | undefined> {
|
||||
if (!context.plannedTaskService) return undefined;
|
||||
try {
|
||||
const graph = await context.plannedTaskService.getGraph(context.threadId);
|
||||
return graph?.tasks?.length;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1216,6 +1216,34 @@ export interface OrchestrationContext {
|
|||
taskId: string,
|
||||
correction: string,
|
||||
) => 'queued' | 'task-completed' | 'task-not-found';
|
||||
/**
|
||||
* Resume info for a suspended sub-agent of this thread, looked up from the
|
||||
* persisted checkpoint store by the deterministic sub-agent resourceId
|
||||
* (`instance-ai-subagent:{threadId}:{agentKind}`). Used by the cascading
|
||||
* suspend path: when the orchestrator's `plan` tool resumes, it calls
|
||||
* this to find the planner sub-agent's `runId` + suspended `toolCallId`
|
||||
* + the persistence the planner was running under, so the resume path
|
||||
* can rebuild the sub-agent with the same persistence and call
|
||||
* `plannerAgent.resume('stream', resumeData, { runId, toolCallId })`
|
||||
* without stashing anything across its own suspend/resume cycle.
|
||||
*/
|
||||
findSubAgentResumeInfo?: (agentKind: string) => Promise<
|
||||
| {
|
||||
runId: string;
|
||||
toolCallId: string;
|
||||
persistence: { threadId: string; resourceId: string };
|
||||
}
|
||||
| undefined
|
||||
>;
|
||||
/**
|
||||
* Persist the current user message to thread memory immediately, so it
|
||||
* survives a restart that happens while the orchestrator is suspended on
|
||||
* an inline HITL tool call. The SDK only flushes the turn delta on a clean
|
||||
* loop completion, which a suspended run never reaches — without this the
|
||||
* user's bubble is invisible on reload until the turn eventually completes.
|
||||
* Idempotent: safe to call multiple times within a run.
|
||||
*/
|
||||
persistInFlightUserMessage?: () => Promise<void>;
|
||||
/** Mark the current orchestrator run as making progress. */
|
||||
touchRun?: () => boolean;
|
||||
/** Mark a running background task as making progress. */
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ describe('parseSuspension', () => {
|
|||
toolCallId: 'tc-1',
|
||||
requestId: 'req-1',
|
||||
toolName: 'setup-credentials',
|
||||
suspendPayload: { requestId: 'req-1' },
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -67,6 +68,7 @@ describe('parseSuspension', () => {
|
|||
toolCallId: 'tc-1',
|
||||
requestId: 'req-1',
|
||||
toolName: 'setup-credentials',
|
||||
suspendPayload: { requestId: 'req-1' },
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -83,6 +85,7 @@ describe('parseSuspension', () => {
|
|||
toolCallId: 'tc-1',
|
||||
requestId: 'tc-1',
|
||||
toolName: undefined,
|
||||
suspendPayload: {},
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -116,6 +119,7 @@ describe('parseSuspension', () => {
|
|||
toolCallId: 'tc-1',
|
||||
requestId: 'tc-1',
|
||||
toolName: undefined,
|
||||
suspendPayload: {},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -13,6 +13,8 @@ export interface SuspensionInfo {
|
|||
toolCallId: string;
|
||||
requestId: string;
|
||||
toolName?: string;
|
||||
/** The raw suspend payload as passed to `ctx.suspend()` by the inner tool. */
|
||||
suspendPayload: Record<string, unknown>;
|
||||
}
|
||||
|
||||
/** Extract suspension info from a stream chunk. */
|
||||
|
|
@ -29,7 +31,7 @@ export function parseSuspension(chunk: unknown): SuspensionInfo | null {
|
|||
const toolName = typeof sp.toolName === 'string' ? sp.toolName : undefined;
|
||||
|
||||
if (!reqId || !tcId) return null;
|
||||
return { toolCallId: tcId, requestId: reqId, toolName };
|
||||
return { toolCallId: tcId, requestId: reqId, toolName, suspendPayload: suspPayload };
|
||||
}
|
||||
|
||||
export interface Resumable {
|
||||
|
|
|
|||
|
|
@ -44,9 +44,14 @@ function createService(options: { threadTtlDays?: number } = {}): InstanceAiMemo
|
|||
mockAgentMemory as never,
|
||||
mockDbSnapshotStorage as never,
|
||||
mockCheckpointRepository as never,
|
||||
mockPendingConfirmationRepository as never,
|
||||
);
|
||||
}
|
||||
|
||||
const mockPendingConfirmationRepository = {
|
||||
findLiveRequestIds: jest.fn(async () => new Set<string>()),
|
||||
};
|
||||
|
||||
function makeTree(overrides?: Partial<InstanceAiAgentNode>): InstanceAiAgentNode {
|
||||
return {
|
||||
agentId: 'agent-001',
|
||||
|
|
@ -132,11 +137,12 @@ describe('InstanceAiMemoryService.getRichMessages', () => {
|
|||
content: [
|
||||
{ type: 'text', text: 'Here are your workflows' },
|
||||
{
|
||||
type: 'tool-result',
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'list-workflows',
|
||||
input: {},
|
||||
result: { workflows: [] },
|
||||
state: 'resolved',
|
||||
output: { workflows: [] },
|
||||
},
|
||||
],
|
||||
createdAt: new Date('2026-01-01T00:00:01.000Z'),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
import type { InstanceAiAgentNode } from '@n8n/api-types';
|
||||
import type { InstanceAiAgentNode, InstanceAiMessage } from '@n8n/api-types';
|
||||
|
||||
import { parseStoredMessages } from '../message-parser';
|
||||
import {
|
||||
collectConfirmationRequestIds,
|
||||
markExpiredConfirmations,
|
||||
parseStoredMessages,
|
||||
} from '../message-parser';
|
||||
import type { StoredAgentMessage } from '../message-parser';
|
||||
|
||||
const BASE_DATE_MS = Date.UTC(2026, 0, 1);
|
||||
|
|
@ -106,11 +110,12 @@ describe('parseStoredMessages', () => {
|
|||
content: [
|
||||
{ type: 'text', text: 'Here are your workflows' },
|
||||
{
|
||||
type: 'tool-result',
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'list-workflows',
|
||||
input: { limit: 10 },
|
||||
result: { workflows: ['wf1'] },
|
||||
state: 'resolved',
|
||||
output: { workflows: ['wf1'] },
|
||||
},
|
||||
],
|
||||
createdAt: makeDate(1),
|
||||
|
|
@ -162,6 +167,109 @@ describe('parseStoredMessages', () => {
|
|||
expect(tc?.renderHint).toBe('tasks');
|
||||
});
|
||||
|
||||
it('should surface rejected tool calls via `error`, not `result`', () => {
|
||||
const messages: StoredAgentMessage[] = [
|
||||
{
|
||||
id: 'msg-u',
|
||||
role: 'user',
|
||||
content: 'Do something',
|
||||
createdAt: makeDate(),
|
||||
},
|
||||
{
|
||||
id: 'msg-a',
|
||||
role: 'assistant',
|
||||
content: [
|
||||
{
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-rej',
|
||||
toolName: 'workflows',
|
||||
input: { name: 'x' },
|
||||
state: 'rejected',
|
||||
error: 'Workflow not found',
|
||||
},
|
||||
],
|
||||
createdAt: makeDate(1),
|
||||
},
|
||||
];
|
||||
|
||||
const result = parseStoredMessages(messages);
|
||||
|
||||
const tc = result[1].agentTree?.toolCalls[0];
|
||||
expect(tc?.isLoading).toBe(false);
|
||||
expect(tc?.result).toBeUndefined();
|
||||
expect(tc?.error).toBe('Workflow not found');
|
||||
});
|
||||
|
||||
it('should skip malformed tool-call parts instead of rendering half-populated cards', () => {
|
||||
const messages: StoredAgentMessage[] = [
|
||||
{
|
||||
id: 'msg-u',
|
||||
role: 'user',
|
||||
content: 'Go',
|
||||
createdAt: makeDate(),
|
||||
},
|
||||
{
|
||||
id: 'msg-a',
|
||||
role: 'assistant',
|
||||
content: [
|
||||
// Valid tool call — should survive.
|
||||
{
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-ok',
|
||||
toolName: 'list-workflows',
|
||||
input: {},
|
||||
state: 'resolved',
|
||||
output: { ok: true },
|
||||
},
|
||||
// Missing toolName — fails the schema, must be dropped.
|
||||
{ type: 'tool-call', toolCallId: 'tc-no-name', input: {}, state: 'resolved' },
|
||||
// Missing toolCallId — dropped.
|
||||
{ type: 'tool-call', toolName: 'orphan', input: {}, state: 'resolved' },
|
||||
// `error` wrong type for a rejected call — dropped.
|
||||
{
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-bad-error',
|
||||
toolName: 'workflows',
|
||||
state: 'rejected',
|
||||
error: { not: 'a string' },
|
||||
},
|
||||
],
|
||||
createdAt: makeDate(1),
|
||||
},
|
||||
];
|
||||
|
||||
const result = parseStoredMessages(messages);
|
||||
|
||||
const toolCalls = result[1].agentTree?.toolCalls ?? [];
|
||||
expect(toolCalls.map((tc) => tc.toolCallId)).toEqual(['tc-ok']);
|
||||
});
|
||||
|
||||
it('should drop content parts with an unrecognized type', () => {
|
||||
const messages: StoredAgentMessage[] = [
|
||||
{
|
||||
id: 'msg-u',
|
||||
role: 'user',
|
||||
content: 'Go',
|
||||
createdAt: makeDate(),
|
||||
},
|
||||
{
|
||||
id: 'msg-a',
|
||||
role: 'assistant',
|
||||
content: [
|
||||
{ type: 'text', text: 'Hello' },
|
||||
// Unknown type — not in the content-part union, must be ignored.
|
||||
{ type: 'bogus-part', text: 'should not surface', payload: 42 },
|
||||
],
|
||||
createdAt: makeDate(1),
|
||||
},
|
||||
];
|
||||
|
||||
const result = parseStoredMessages(messages);
|
||||
|
||||
expect(result[1].content).toBe('Hello');
|
||||
expect(result[1].agentTree?.timeline).toEqual([{ type: 'text', content: 'Hello' }]);
|
||||
});
|
||||
|
||||
it('should parse reasoning from native parts', () => {
|
||||
const messages: StoredAgentMessage[] = [
|
||||
{
|
||||
|
|
@ -388,6 +496,87 @@ describe('parseStoredMessages', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('should keep the snapshot tree when dedupe collapses in-flight checkpoint messages', () => {
|
||||
// Simulates the in-flight HITL case: the SDK hasn't committed
|
||||
// the turn to memory yet, so `loadInFlightCheckpointMessages`
|
||||
// surfaces several intermediate assistant messages from the
|
||||
// checkpoint blob. The snapshot was paired with a middle
|
||||
// message via timestamp matching, while a later message
|
||||
// (with no tree of its own) carries the latest text. Dedupe
|
||||
// must transfer the agentTree forward so the confirmation
|
||||
// card in the snapshot tree survives.
|
||||
const snapshotTree: InstanceAiAgentNode = {
|
||||
agentId: 'agent-001',
|
||||
role: 'orchestrator',
|
||||
status: 'active',
|
||||
textContent: 'Streaming...',
|
||||
reasoning: '',
|
||||
toolCalls: [
|
||||
{
|
||||
toolCallId: 'tc-cred',
|
||||
toolName: 'credentials',
|
||||
args: {},
|
||||
isLoading: true,
|
||||
confirmation: {
|
||||
requestId: 'req-live',
|
||||
inputType: 'approval',
|
||||
message: 'Select a credential',
|
||||
severity: 'info',
|
||||
},
|
||||
renderHint: 'default',
|
||||
},
|
||||
],
|
||||
children: [],
|
||||
timeline: [],
|
||||
};
|
||||
|
||||
const messages: StoredAgentMessage[] = [
|
||||
{
|
||||
id: 'msg-u',
|
||||
role: 'user',
|
||||
content: 'Build it',
|
||||
createdAt: makeDate(0),
|
||||
},
|
||||
{
|
||||
id: 'msg-a-early',
|
||||
role: 'assistant',
|
||||
content: [{ type: 'text', text: '' }],
|
||||
createdAt: makeDate(10),
|
||||
},
|
||||
{
|
||||
id: 'msg-a-paired',
|
||||
role: 'assistant',
|
||||
content: [{ type: 'text', text: 'Looking up credentials' }],
|
||||
createdAt: makeDate(20),
|
||||
},
|
||||
{
|
||||
id: 'msg-a-latest',
|
||||
role: 'assistant',
|
||||
content: [{ type: 'text', text: 'Need credential confirmation' }],
|
||||
createdAt: makeDate(40),
|
||||
},
|
||||
];
|
||||
|
||||
const result = parseStoredMessages(messages, [
|
||||
{
|
||||
tree: snapshotTree,
|
||||
runId: 'run_paired',
|
||||
messageGroupId: 'mg_inflight',
|
||||
createdAt: makeDate(25),
|
||||
updatedAt: makeDate(25),
|
||||
},
|
||||
]);
|
||||
|
||||
// One user + one assistant (dedup collapses the three assistant rows).
|
||||
expect(result).toHaveLength(2);
|
||||
const assistant = result[1];
|
||||
// Latest message id survives so live SSE deltas keep correlating.
|
||||
expect(assistant.id).toBe('msg-a-latest');
|
||||
// Tree from the snapshot is transferred onto the kept message.
|
||||
expect(assistant.agentTree).toBe(snapshotTree);
|
||||
expect(assistant.agentTree?.toolCalls[0].confirmation?.requestId).toBe('req-live');
|
||||
});
|
||||
|
||||
it('should apply renderHint correctly for known tool names', () => {
|
||||
const messages: StoredAgentMessage[] = [
|
||||
{
|
||||
|
|
@ -401,22 +590,28 @@ describe('parseStoredMessages', () => {
|
|||
role: 'assistant',
|
||||
content: [
|
||||
{
|
||||
type: 'tool-result',
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'delegate',
|
||||
result: 'ok',
|
||||
input: {},
|
||||
state: 'resolved',
|
||||
output: 'ok',
|
||||
},
|
||||
{
|
||||
type: 'tool-result',
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-2',
|
||||
toolName: 'build-workflow-with-agent',
|
||||
result: 'ok',
|
||||
input: {},
|
||||
state: 'resolved',
|
||||
output: 'ok',
|
||||
},
|
||||
{
|
||||
type: 'tool-result',
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-3',
|
||||
toolName: 'plan',
|
||||
result: 'ok',
|
||||
input: {},
|
||||
state: 'resolved',
|
||||
output: 'ok',
|
||||
},
|
||||
],
|
||||
createdAt: makeDate(1),
|
||||
|
|
@ -786,11 +981,12 @@ describe('parseStoredMessages', () => {
|
|||
role: 'assistant',
|
||||
content: [
|
||||
{
|
||||
type: 'tool-result',
|
||||
type: 'tool-call',
|
||||
toolCallId: 'tc-parts',
|
||||
toolName: 'plan',
|
||||
input: { goal: 'x' },
|
||||
result: 'done',
|
||||
state: 'resolved',
|
||||
output: 'done',
|
||||
},
|
||||
],
|
||||
createdAt: makeDate(1),
|
||||
|
|
@ -804,3 +1000,164 @@ describe('parseStoredMessages', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('confirmation expiration helpers', () => {
|
||||
function makeMessageWithConfirmations(requestIds: string[]): InstanceAiMessage {
|
||||
return {
|
||||
id: 'msg-a',
|
||||
role: 'assistant',
|
||||
createdAt: makeDate().toISOString(),
|
||||
content: '',
|
||||
reasoning: '',
|
||||
isStreaming: false,
|
||||
runId: 'run-1',
|
||||
agentTree: {
|
||||
agentId: 'agent-001',
|
||||
role: 'orchestrator',
|
||||
status: 'completed',
|
||||
textContent: '',
|
||||
reasoning: '',
|
||||
toolCalls: requestIds.map((requestId, idx) => ({
|
||||
toolCallId: `tc-${idx}`,
|
||||
toolName: 'plan',
|
||||
args: {},
|
||||
isLoading: true,
|
||||
confirmation: {
|
||||
requestId,
|
||||
severity: 'info' as const,
|
||||
message: '',
|
||||
inputType: 'plan-review' as const,
|
||||
},
|
||||
})),
|
||||
children: [
|
||||
{
|
||||
agentId: 'agent-planner',
|
||||
role: 'planner',
|
||||
status: 'completed',
|
||||
textContent: '',
|
||||
reasoning: '',
|
||||
toolCalls: [
|
||||
{
|
||||
toolCallId: 'tc-sub',
|
||||
toolName: 'submit-plan',
|
||||
args: {},
|
||||
isLoading: true,
|
||||
confirmation: {
|
||||
requestId: 'req-sub',
|
||||
severity: 'info' as const,
|
||||
message: '',
|
||||
inputType: 'plan-review' as const,
|
||||
},
|
||||
},
|
||||
],
|
||||
children: [],
|
||||
timeline: [],
|
||||
},
|
||||
],
|
||||
timeline: [],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
it('collects request IDs from orchestrator and sub-agent tool calls', () => {
|
||||
const messages = [makeMessageWithConfirmations(['req-1', 'req-2'])];
|
||||
expect(collectConfirmationRequestIds(messages).sort()).toEqual(['req-1', 'req-2', 'req-sub']);
|
||||
});
|
||||
|
||||
it('flips expired flag only on confirmations whose requestId is not in the live set', () => {
|
||||
const messages = [makeMessageWithConfirmations(['req-1'])];
|
||||
markExpiredConfirmations(messages, new Set(['req-1']));
|
||||
|
||||
const node = messages[0].agentTree!;
|
||||
expect(node.toolCalls[0].confirmation?.expired).toBeUndefined();
|
||||
expect(node.children[0].toolCalls[0].confirmation?.expired).toBe(true);
|
||||
});
|
||||
|
||||
it('does nothing for messages without an agent tree', () => {
|
||||
const messages: InstanceAiMessage[] = [
|
||||
{
|
||||
id: 'msg-u',
|
||||
role: 'user',
|
||||
createdAt: makeDate().toISOString(),
|
||||
content: 'hi',
|
||||
reasoning: '',
|
||||
isStreaming: false,
|
||||
},
|
||||
];
|
||||
expect(collectConfirmationRequestIds(messages)).toEqual([]);
|
||||
markExpiredConfirmations(messages, new Set());
|
||||
});
|
||||
|
||||
/** Build a single assistant message carrying one plan-review confirmation
|
||||
* card, with overridable actionability fields. */
|
||||
function makeCardMessage(
|
||||
overrides: Partial<{ isLoading: boolean; confirmationStatus: 'approved' | 'denied' }>,
|
||||
): InstanceAiMessage {
|
||||
return {
|
||||
id: 'msg-a',
|
||||
role: 'assistant',
|
||||
createdAt: makeDate().toISOString(),
|
||||
content: '',
|
||||
reasoning: '',
|
||||
isStreaming: false,
|
||||
runId: 'run-1',
|
||||
agentTree: {
|
||||
agentId: 'agent-001',
|
||||
role: 'orchestrator',
|
||||
status: 'completed',
|
||||
textContent: '',
|
||||
reasoning: '',
|
||||
toolCalls: [
|
||||
{
|
||||
toolCallId: 'tc-0',
|
||||
toolName: 'plan',
|
||||
args: {},
|
||||
isLoading: overrides.isLoading ?? true,
|
||||
...(overrides.confirmationStatus
|
||||
? { confirmationStatus: overrides.confirmationStatus }
|
||||
: {}),
|
||||
confirmation: {
|
||||
requestId: 'req-resolved',
|
||||
severity: 'info' as const,
|
||||
message: '',
|
||||
inputType: 'plan-review' as const,
|
||||
},
|
||||
},
|
||||
],
|
||||
children: [],
|
||||
timeline: [],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Regression: a resolved plan card reloaded after the user approved/denied it
|
||||
// has no pending-confirmation row (claim() deleted it), but that absence must
|
||||
// NOT relabel the historical card as "Plan (expired)".
|
||||
it('does not mark a settled (no longer loading) card expired even with no live row', () => {
|
||||
const messages = [makeCardMessage({ isLoading: false })];
|
||||
markExpiredConfirmations(messages, new Set());
|
||||
expect(messages[0].agentTree!.toolCalls[0].confirmation?.expired).toBeUndefined();
|
||||
});
|
||||
|
||||
it.each(['approved', 'denied'] as const)(
|
||||
'does not mark a %s card expired even with no live row',
|
||||
(confirmationStatus) => {
|
||||
const messages = [makeCardMessage({ confirmationStatus })];
|
||||
markExpiredConfirmations(messages, new Set());
|
||||
expect(messages[0].agentTree!.toolCalls[0].confirmation?.expired).toBeUndefined();
|
||||
},
|
||||
);
|
||||
|
||||
it('does not collect request IDs for settled cards', () => {
|
||||
expect(collectConfirmationRequestIds([makeCardMessage({ isLoading: false })])).toEqual([]);
|
||||
expect(
|
||||
collectConfirmationRequestIds([makeCardMessage({ confirmationStatus: 'approved' })]),
|
||||
).toEqual([]);
|
||||
});
|
||||
|
||||
it('still marks a genuinely actionable card expired when its row is gone', () => {
|
||||
const messages = [makeCardMessage({ isLoading: true })];
|
||||
markExpiredConfirmations(messages, new Set());
|
||||
expect(messages[0].agentTree!.toolCalls[0].confirmation?.expired).toBe(true);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -20,8 +20,13 @@ import { DbSnapshotStorage } from './storage/db-snapshot-storage';
|
|||
|
||||
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
||||
|
||||
import { parseStoredMessages } from './message-parser';
|
||||
import {
|
||||
collectConfirmationRequestIds,
|
||||
markExpiredConfirmations,
|
||||
parseStoredMessages,
|
||||
} from './message-parser';
|
||||
import { InstanceAiCheckpointRepository } from './repositories/instance-ai-checkpoint.repository';
|
||||
import { InstanceAiPendingConfirmationRepository } from './repositories/instance-ai-pending-confirmation.repository';
|
||||
import { TypeORMAgentMemory } from './storage/typeorm-agent-memory';
|
||||
|
||||
function isAgentMessageLike(value: unknown): value is AgentDbMessage {
|
||||
|
|
@ -58,6 +63,7 @@ export class InstanceAiMemoryService {
|
|||
private readonly agentMemory: TypeORMAgentMemory,
|
||||
private readonly dbSnapshotStorage: DbSnapshotStorage,
|
||||
private readonly checkpointRepository: InstanceAiCheckpointRepository,
|
||||
private readonly pendingConfirmationRepository: InstanceAiPendingConfirmationRepository,
|
||||
) {
|
||||
this.instanceAiConfig = globalConfig.instanceAi;
|
||||
}
|
||||
|
|
@ -159,10 +165,31 @@ export class InstanceAiMemoryService {
|
|||
const storedMessages = mergeMessagesById(result.messages, checkpointMessages);
|
||||
|
||||
const messages = parseStoredMessages(storedMessages, snapshots);
|
||||
await this.flagExpiredConfirmations(messages);
|
||||
|
||||
return { threadId, messages };
|
||||
}
|
||||
|
||||
/** Cross-check every confirmation card against `instance_ai_pending_confirmations`
|
||||
* and flip `confirmation.expired = true` on the ones with no live row. */
|
||||
private async flagExpiredConfirmations(
|
||||
messages: Awaited<ReturnType<typeof parseStoredMessages>>,
|
||||
): Promise<void> {
|
||||
const requestIds = collectConfirmationRequestIds(messages);
|
||||
if (requestIds.length === 0) return;
|
||||
try {
|
||||
const live = await this.pendingConfirmationRepository.findLiveRequestIds(
|
||||
requestIds,
|
||||
new Date(),
|
||||
);
|
||||
markExpiredConfirmations(messages, live);
|
||||
} catch (error) {
|
||||
this.logger.warn('Failed to flag expired confirmation cards', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async loadInFlightCheckpointMessages(threadId: string): Promise<AgentDbMessage[]> {
|
||||
let checkpoints;
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import {
|
|||
InstanceAiLivenessPolicy,
|
||||
McpClientManager,
|
||||
createDomainAccessTracker,
|
||||
createSubAgentResourceId,
|
||||
BackgroundTaskManager,
|
||||
buildAgentTreeFromEvents,
|
||||
classifyAttachments,
|
||||
|
|
@ -3070,6 +3071,13 @@ export class InstanceAiService {
|
|||
iterationLog,
|
||||
sendCorrectionToTask: (taskId, correction) =>
|
||||
this.sendCorrectionToTask(threadId, taskId, correction),
|
||||
findSubAgentResumeInfo: async (agentKind) =>
|
||||
await this.checkpointStore.findSuspendedSubAgentResumeInfo(
|
||||
createSubAgentResourceId(threadId, agentKind),
|
||||
),
|
||||
persistInFlightUserMessage: async () => {
|
||||
await this.persistUserMessageOnFirstSuspend(threadId, runId);
|
||||
},
|
||||
workflowTaskService: workflowTasks,
|
||||
workspace: runtimeWorkspace,
|
||||
nodeDefinitionDirs: nodeDefDirs.length > 0 ? nodeDefDirs : undefined,
|
||||
|
|
|
|||
|
|
@ -5,12 +5,36 @@ import type {
|
|||
InstanceAiToolCallState,
|
||||
InstanceAiTimelineEntry,
|
||||
} from '@n8n/api-types';
|
||||
import type { AgentDbMessage, AgentTreeSnapshot } from '@n8n/instance-ai';
|
||||
import type { AgentDbMessage, AgentTreeSnapshot, MessageContent } from '@n8n/instance-ai';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { cleanStoredUserMessage } from './internal-messages';
|
||||
|
||||
type RunSnapshots = AgentTreeSnapshot[];
|
||||
|
||||
const toolCallContentPartSchema = z.object({
|
||||
type: z.literal('tool-call'),
|
||||
toolCallId: z.string(),
|
||||
toolName: z.string(),
|
||||
input: z.unknown().optional(),
|
||||
state: z.enum(['pending', 'resolved', 'rejected']).optional(),
|
||||
output: z.unknown().optional(),
|
||||
error: z.string().optional(),
|
||||
});
|
||||
|
||||
const textContentPartSchema = z.object({ type: z.literal('text'), text: z.string() });
|
||||
const reasoningContentPartSchema = z.object({ type: z.literal('reasoning'), text: z.string() });
|
||||
const opaqueContentPartSchema = z
|
||||
.object({ type: z.enum(['invalid-tool-call', 'file', 'citation', 'provider']) })
|
||||
.passthrough();
|
||||
|
||||
const contentPartSchema = z.union([
|
||||
textContentPartSchema,
|
||||
reasoningContentPartSchema,
|
||||
toolCallContentPartSchema,
|
||||
opaqueContentPartSchema,
|
||||
]);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Persisted message shapes
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -21,16 +45,10 @@ interface StoredToolInvocation {
|
|||
toolName: string;
|
||||
args: Record<string, unknown>;
|
||||
result?: unknown;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
interface StoredContentPart {
|
||||
type: string;
|
||||
text?: string;
|
||||
toolCallId?: string;
|
||||
toolName?: string;
|
||||
input?: Record<string, unknown>;
|
||||
result?: unknown;
|
||||
}
|
||||
type StoredContentPart = MessageContent;
|
||||
|
||||
export interface StoredAgentMessage {
|
||||
id: string;
|
||||
|
|
@ -65,31 +83,19 @@ function extractReasoningFromContent(content: unknown): string {
|
|||
|
||||
function extractTextFromParts(parts: unknown[]): string {
|
||||
return parts
|
||||
.filter(
|
||||
(p): p is { type: 'text'; text: string } =>
|
||||
typeof p === 'object' &&
|
||||
p !== null &&
|
||||
'type' in p &&
|
||||
p.type === 'text' &&
|
||||
'text' in p &&
|
||||
typeof p.text === 'string',
|
||||
)
|
||||
.map((p) => p.text)
|
||||
.flatMap((p) => {
|
||||
const parsed = textContentPartSchema.safeParse(p);
|
||||
return parsed.success ? [parsed.data.text] : [];
|
||||
})
|
||||
.join('');
|
||||
}
|
||||
|
||||
function extractReasoningFromParts(parts: unknown[]): string {
|
||||
return parts
|
||||
.filter(
|
||||
(p): p is { type: 'reasoning'; text: string } =>
|
||||
typeof p === 'object' &&
|
||||
p !== null &&
|
||||
'type' in p &&
|
||||
p.type === 'reasoning' &&
|
||||
'text' in p &&
|
||||
typeof p.text === 'string',
|
||||
)
|
||||
.map((p) => p.text)
|
||||
.flatMap((p) => {
|
||||
const parsed = reasoningContentPartSchema.safeParse(p);
|
||||
return parsed.success ? [parsed.data.text] : [];
|
||||
})
|
||||
.join('');
|
||||
}
|
||||
|
||||
|
|
@ -99,28 +105,47 @@ function extractParts(content: unknown): StoredContentPart[] | undefined {
|
|||
}
|
||||
|
||||
function isStoredContentPart(value: unknown): value is StoredContentPart {
|
||||
return typeof value === 'object' && value !== null && 'type' in value;
|
||||
return contentPartSchema.safeParse(value).success;
|
||||
}
|
||||
|
||||
function toRecord(value: unknown): Record<string, unknown> {
|
||||
return typeof value === 'object' && value !== null && !Array.isArray(value)
|
||||
? (value as Record<string, unknown>)
|
||||
: {};
|
||||
}
|
||||
|
||||
function nativeToolPartToInvocation(part: StoredContentPart): StoredToolInvocation | undefined {
|
||||
if (part.type === 'tool-call' && part.toolCallId && part.toolName) {
|
||||
return {
|
||||
state: 'call',
|
||||
toolCallId: part.toolCallId,
|
||||
toolName: part.toolName,
|
||||
args: part.input ?? {},
|
||||
};
|
||||
}
|
||||
if (part.type === 'tool-result' && part.toolCallId && part.toolName) {
|
||||
if (part.type !== 'tool-call') return undefined;
|
||||
|
||||
const parsed = toolCallContentPartSchema.safeParse(part);
|
||||
if (!parsed.success) return undefined;
|
||||
const toolCall = parsed.data;
|
||||
|
||||
const args = toRecord(toolCall.input);
|
||||
if (toolCall.state === 'resolved') {
|
||||
return {
|
||||
state: 'result',
|
||||
toolCallId: part.toolCallId,
|
||||
toolName: part.toolName,
|
||||
args: part.input ?? {},
|
||||
result: part.result,
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
args,
|
||||
result: toolCall.output,
|
||||
};
|
||||
}
|
||||
return undefined;
|
||||
if (toolCall.state === 'rejected') {
|
||||
return {
|
||||
state: 'result',
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
args,
|
||||
error: toolCall.error,
|
||||
};
|
||||
}
|
||||
return {
|
||||
state: 'call',
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.toolName,
|
||||
args,
|
||||
};
|
||||
}
|
||||
|
||||
function extractToolInvocations(content: unknown): StoredToolInvocation[] {
|
||||
|
|
@ -140,6 +165,7 @@ function buildToolCallState(invocation: StoredToolInvocation): InstanceAiToolCal
|
|||
toolName: invocation.toolName,
|
||||
args: invocation.args,
|
||||
result: isCompleted ? invocation.result : undefined,
|
||||
error: isCompleted ? invocation.error : undefined,
|
||||
isLoading: !isCompleted,
|
||||
renderHint: getRenderHint(invocation.toolName),
|
||||
};
|
||||
|
|
@ -160,7 +186,7 @@ function buildTimeline(
|
|||
for (const part of parts) {
|
||||
if (part.type === 'text' && part.text) {
|
||||
timeline.push({ type: 'text', content: part.text });
|
||||
} else if ((part.type === 'tool-call' || part.type === 'tool-result') && part.toolCallId) {
|
||||
} else if (part.type === 'tool-call' && part.toolCallId) {
|
||||
timeline.push({ type: 'tool-call', toolCallId: part.toolCallId });
|
||||
}
|
||||
}
|
||||
|
|
@ -262,9 +288,19 @@ export function parseStoredMessages(
|
|||
// orphan snapshots before, between, or after assistant rows.
|
||||
let nextSnapshotIdx = 0;
|
||||
const consumedSnapshots = new Set<AgentTreeSnapshot>();
|
||||
// Messages whose `agentTree` originated from a snapshot (as opposed to
|
||||
// being synthesized by `buildFlatAgentTree`). Used by the dedupe pass to
|
||||
// prefer transferring snapshot trees forward in the in-flight HITL case.
|
||||
const messagesWithSnapshotTree = new Set<InstanceAiMessage>();
|
||||
|
||||
let lastUserMessageId: string | undefined;
|
||||
|
||||
function pushSnapshotMessage(snapshot: AgentTreeSnapshot): void {
|
||||
const built = buildSnapshotMessage(snapshot);
|
||||
messagesWithSnapshotTree.add(built);
|
||||
messages.push(built);
|
||||
}
|
||||
|
||||
function appendChronologicalOrphansBefore(message: ConversationStoredMessage): void {
|
||||
const messageTimestamp = messageCreatedAtMs(message);
|
||||
while (nextSnapshotIdx < snapshotList.length) {
|
||||
|
|
@ -273,7 +309,7 @@ export function parseStoredMessages(
|
|||
if (snapshotTimestamp === undefined || snapshotTimestamp >= messageTimestamp) return;
|
||||
|
||||
consumedSnapshots.add(snapshot);
|
||||
messages.push(buildSnapshotMessage(snapshot));
|
||||
pushSnapshotMessage(snapshot);
|
||||
nextSnapshotIdx++;
|
||||
}
|
||||
}
|
||||
|
|
@ -343,7 +379,7 @@ export function parseStoredMessages(
|
|||
? buildFlatAgentTree(text, reasoning, toolCalls, parts)
|
||||
: undefined);
|
||||
|
||||
messages.push({
|
||||
const assistantMessage: InstanceAiMessage = {
|
||||
id: msg.id,
|
||||
runId,
|
||||
messageGroupId: snapshot?.messageGroupId,
|
||||
|
|
@ -354,7 +390,9 @@ export function parseStoredMessages(
|
|||
reasoning,
|
||||
isStreaming: false,
|
||||
agentTree,
|
||||
});
|
||||
};
|
||||
if (snapshot) messagesWithSnapshotTree.add(assistantMessage);
|
||||
messages.push(assistantMessage);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -364,7 +402,7 @@ export function parseStoredMessages(
|
|||
|
||||
for (const snapshot of snapshots ?? []) {
|
||||
if (consumedSnapshots.has(snapshot)) continue;
|
||||
messages.push(buildSnapshotMessage(snapshot));
|
||||
pushSnapshotMessage(snapshot);
|
||||
}
|
||||
|
||||
// Propagate messageGroupId across assistant rows in the same conversational
|
||||
|
|
@ -384,15 +422,35 @@ export function parseStoredMessages(
|
|||
// Deduplicate assistant messages by messageGroupId.
|
||||
// Follow-up runs in the same group produce separate DB rows; keep only
|
||||
// the latest (which carries the full runIds array and complete tree).
|
||||
const seen = new Set<string>();
|
||||
//
|
||||
// In-flight HITL turns are different: the snapshot is paired with a
|
||||
// *middle* checkpoint message via timestamp matching, and the latest
|
||||
// message in the turn has only an auto-generated flat tree from
|
||||
// `buildFlatAgentTree`. Keeping just the latest would drop the
|
||||
// snapshot's tree (including its live confirmation cards), so transfer
|
||||
// the snapshot's `agentTree` + `runIds` onto the kept message when the
|
||||
// kept one's tree didn't come from a snapshot.
|
||||
const keptIndexByGid = new Map<string, number>();
|
||||
const toRemove = new Set<number>();
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const gid = messages[i].messageGroupId;
|
||||
if (!gid) continue;
|
||||
if (seen.has(gid)) {
|
||||
messages.splice(i, 1);
|
||||
} else {
|
||||
seen.add(gid);
|
||||
const keptIdx = keptIndexByGid.get(gid);
|
||||
if (keptIdx === undefined) {
|
||||
keptIndexByGid.set(gid, i);
|
||||
continue;
|
||||
}
|
||||
const kept = messages[keptIdx];
|
||||
const candidate = messages[i];
|
||||
if (!messagesWithSnapshotTree.has(kept) && messagesWithSnapshotTree.has(candidate)) {
|
||||
kept.agentTree = candidate.agentTree;
|
||||
kept.runIds = candidate.runIds;
|
||||
messagesWithSnapshotTree.add(kept);
|
||||
}
|
||||
toRemove.add(i);
|
||||
}
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
if (toRemove.has(i)) messages.splice(i, 1);
|
||||
}
|
||||
|
||||
return messages;
|
||||
|
|
@ -435,3 +493,67 @@ function propagateMessageGroupIdWithinRange(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Pull every confirmation requestId out of the parsed messages' agent trees. */
|
||||
/**
|
||||
* A confirmation card is "actionable" only while the user can still respond to
|
||||
* it: the tool call is in-flight and no terminal status has been recorded.
|
||||
* Once approved/denied (or otherwise settled) the card is historical — its
|
||||
* pending-confirmation row is gone after claim/delete, but that absence means
|
||||
* "resolved", not "expired".
|
||||
*/
|
||||
function isActionableConfirmation(tc: InstanceAiToolCallState): boolean {
|
||||
return (
|
||||
tc.confirmation !== undefined &&
|
||||
tc.isLoading &&
|
||||
tc.confirmationStatus !== 'approved' &&
|
||||
tc.confirmationStatus !== 'denied'
|
||||
);
|
||||
}
|
||||
|
||||
export function collectConfirmationRequestIds(messages: InstanceAiMessage[]): string[] {
|
||||
const requestIds: string[] = [];
|
||||
for (const message of messages) {
|
||||
if (!message.agentTree) continue;
|
||||
walkAgentNodes(message.agentTree, (node) => {
|
||||
for (const tc of node.toolCalls) {
|
||||
const { confirmation } = tc;
|
||||
if (!confirmation || !isActionableConfirmation(tc)) continue;
|
||||
requestIds.push(confirmation.requestId);
|
||||
}
|
||||
});
|
||||
}
|
||||
return requestIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flip `confirmation.expired = true` on still-actionable cards whose
|
||||
* pending-confirmation row is no longer live. Settled cards (approved/denied,
|
||||
* or no longer loading) are left untouched — their row is also gone, but that
|
||||
* means "resolved", not "expired", so relabeling them would rewrite history.
|
||||
*/
|
||||
export function markExpiredConfirmations(
|
||||
messages: InstanceAiMessage[],
|
||||
liveRequestIds: Set<string>,
|
||||
): void {
|
||||
for (const message of messages) {
|
||||
if (!message.agentTree) continue;
|
||||
walkAgentNodes(message.agentTree, (node) => {
|
||||
for (const tc of node.toolCalls) {
|
||||
const { confirmation } = tc;
|
||||
if (!confirmation || !isActionableConfirmation(tc)) continue;
|
||||
if (!liveRequestIds.has(confirmation.requestId)) {
|
||||
confirmation.expired = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function walkAgentNodes(
|
||||
node: InstanceAiAgentNode,
|
||||
visit: (node: InstanceAiAgentNode) => void,
|
||||
): void {
|
||||
visit(node);
|
||||
for (const child of node.children) walkAgentNodes(child, visit);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,9 +60,11 @@ describe('InstanceAiPendingConfirmationRepository.claim', () => {
|
|||
|
||||
expect(result).toBe(row);
|
||||
expect(txRepo.findOne).toHaveBeenCalledWith({
|
||||
where: { requestId: 'req-1', userId: 'user-1' },
|
||||
where: expect.objectContaining({ requestId: 'req-1', userId: 'user-1' }),
|
||||
});
|
||||
expect(txRepo.delete).toHaveBeenCalledWith({ requestId: 'req-1', userId: 'user-1' });
|
||||
expect(txRepo.delete).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ requestId: 'req-1', userId: 'user-1' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('returns undefined when no row matches the requestId+userId', async () => {
|
||||
|
|
@ -96,7 +98,24 @@ describe('InstanceAiPendingConfirmationRepository.claim', () => {
|
|||
|
||||
expect(result).toBeUndefined();
|
||||
expect(txRepo.findOne).toHaveBeenCalledWith({
|
||||
where: { requestId: 'req-1', userId: 'attacker-user' },
|
||||
where: expect.objectContaining({ requestId: 'req-1', userId: 'attacker-user' }),
|
||||
});
|
||||
});
|
||||
|
||||
it('treats expired rows as already gone — same predicate as findLiveRequestIds', async () => {
|
||||
// Driver behavior: an expired row would not match the live-where
|
||||
// predicate, so findOne returns null even though the row physically
|
||||
// exists. The expired-prune sweep is responsible for the physical row
|
||||
// — the claim path treats it as unclaimable in the meantime.
|
||||
const txRepo = mock<Repository<InstanceAiPendingConfirmation>>();
|
||||
txRepo.findOne.mockResolvedValueOnce(null);
|
||||
const { repo } = buildRepoWithTxRepo(txRepo);
|
||||
|
||||
const result = await repo.claim('req-expired', 'user-1');
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
expect(txRepo.delete).not.toHaveBeenCalled();
|
||||
const where = (txRepo.findOne.mock.calls[0][0] as { where: Record<string, unknown> }).where;
|
||||
expect(where).toHaveProperty('expiresAt');
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -23,4 +23,21 @@ export class InstanceAiCheckpointRepository extends Repository<InstanceAiCheckpo
|
|||
order: { createdAt: 'DESC' },
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the most recent active (non-expired) checkpoint for a given
|
||||
* resourceId. Used by the orchestrator's `plan` tool resume path to
|
||||
* locate the suspended planner sub-agent: sub-agent resourceIds are
|
||||
* deterministically derived from the parent thread (e.g.
|
||||
* `instance-ai-subagent:{threadId}:planner`), so the orchestrator can
|
||||
* compute the resourceId and look up the planner's runId here without
|
||||
* having to stash anything across its own suspend/resume cycle.
|
||||
*/
|
||||
async findActiveByResourceId(resourceId: string): Promise<InstanceAiCheckpoint | undefined> {
|
||||
const row = await this.findOne({
|
||||
where: { resourceId, expiredAt: IsNull() },
|
||||
order: { createdAt: 'DESC' },
|
||||
});
|
||||
return row ?? undefined;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { Service } from '@n8n/di';
|
||||
import { DataSource, LessThan, Repository } from '@n8n/typeorm';
|
||||
import { DataSource, In, IsNull, LessThan, MoreThanOrEqual, Or, Repository } from '@n8n/typeorm';
|
||||
|
||||
import { InstanceAiPendingConfirmation } from '../entities/instance-ai-pending-confirmation.entity';
|
||||
|
||||
|
|
@ -15,7 +15,8 @@ export class InstanceAiPendingConfirmationRepository extends Repository<Instance
|
|||
* confirm/cancel/TTL sweep) all return `undefined` except one.
|
||||
*
|
||||
* Scoped by `userId` so a different user cannot claim a confirmation that
|
||||
* was registered for someone else.
|
||||
* was registered for someone else, and by `expiresAt` so an expired row
|
||||
* is treated as gone.
|
||||
*/
|
||||
async claim(
|
||||
requestId: string,
|
||||
|
|
@ -23,15 +24,21 @@ export class InstanceAiPendingConfirmationRepository extends Repository<Instance
|
|||
): Promise<InstanceAiPendingConfirmation | undefined> {
|
||||
return await this.manager.transaction(async (manager) => {
|
||||
const repo = manager.getRepository(InstanceAiPendingConfirmation);
|
||||
const now = new Date();
|
||||
const liveWhere = {
|
||||
requestId,
|
||||
userId,
|
||||
expiresAt: Or(IsNull(), MoreThanOrEqual(now)),
|
||||
};
|
||||
const row = await repo.findOne({
|
||||
where: { requestId, userId },
|
||||
where: liveWhere,
|
||||
...(manager.connection.options.type === 'postgres'
|
||||
? { lock: { mode: 'pessimistic_write' as const } }
|
||||
: {}),
|
||||
});
|
||||
if (!row) return undefined;
|
||||
|
||||
const result = await repo.delete({ requestId, userId });
|
||||
const result = await repo.delete(liveWhere);
|
||||
if (result.affected === 0) return undefined;
|
||||
return row;
|
||||
});
|
||||
|
|
@ -61,4 +68,18 @@ export class InstanceAiPendingConfirmationRepository extends Repository<Instance
|
|||
async findByThreadId(threadId: string): Promise<InstanceAiPendingConfirmation[]> {
|
||||
return await this.find({ where: { threadId } });
|
||||
}
|
||||
|
||||
/** Of the given request IDs, return those still actionable (row exists and
|
||||
* not past `expiresAt`). The complement is treated as expired by the UI. */
|
||||
async findLiveRequestIds(requestIds: string[], now: Date): Promise<Set<string>> {
|
||||
if (requestIds.length === 0) return new Set();
|
||||
const rows = await this.find({
|
||||
where: {
|
||||
requestId: In(requestIds),
|
||||
expiresAt: Or(IsNull(), MoreThanOrEqual(now)),
|
||||
},
|
||||
select: ['requestId'],
|
||||
});
|
||||
return new Set(rows.map((row) => row.requestId));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,4 +159,55 @@ describe('TypeORMAgentCheckpointStore', () => {
|
|||
await expect(store.deleteOlderThan(new Date(0))).resolves.toBe(7);
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
describe('findSuspendedSubAgentResumeInfo', () => {
|
||||
it('picks the suspended tool call when parallel non-suspended ones are present', async () => {
|
||||
const state = makeState({
|
||||
pendingToolCalls: {
|
||||
'tc-finished': {
|
||||
toolCallId: 'tc-finished',
|
||||
toolName: 'list-tools',
|
||||
input: {},
|
||||
suspended: false,
|
||||
},
|
||||
'tc-suspended': {
|
||||
toolCallId: 'tc-suspended',
|
||||
toolName: 'ask-user',
|
||||
input: {},
|
||||
suspended: true,
|
||||
suspendPayload: {},
|
||||
resumeSchema: {},
|
||||
runId: 'inner-run',
|
||||
},
|
||||
},
|
||||
});
|
||||
checkpointRepo.findActiveByResourceId.mockResolvedValueOnce(
|
||||
makeCheckpoint({ key: 'run_outer', state }),
|
||||
);
|
||||
|
||||
const info = await store.findSuspendedSubAgentResumeInfo('resource-x');
|
||||
|
||||
expect(info).toEqual({
|
||||
runId: 'run_outer',
|
||||
toolCallId: 'tc-suspended',
|
||||
persistence: { threadId: 'thread-1', resourceId: 'user-1' },
|
||||
});
|
||||
});
|
||||
|
||||
it('returns undefined when no entry in pendingToolCalls is suspended', async () => {
|
||||
const state = makeState({
|
||||
pendingToolCalls: {
|
||||
'tc-1': {
|
||||
toolCallId: 'tc-1',
|
||||
toolName: 'list-tools',
|
||||
input: {},
|
||||
suspended: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
checkpointRepo.findActiveByResourceId.mockResolvedValueOnce(makeCheckpoint({ state }));
|
||||
|
||||
await expect(store.findSuspendedSubAgentResumeInfo('resource-x')).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -79,6 +79,41 @@ export class TypeORMAgentCheckpointStore implements CheckpointStore {
|
|||
return await this.markExpiredOlderThan(olderThan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up the most recent suspended sub-agent run for a given resourceId
|
||||
* and pull the info needed to resume it. Used by the orchestrator's
|
||||
* cascade-suspend path: when the `plan` tool resumes, it needs the
|
||||
* planner sub-agent's `runId` + `toolCallId` (to call `subAgent.resume`)
|
||||
* + its `persistence` (so the resumed call reuses the same sub-agent
|
||||
* thread and resourceId the original used). The sub-agent's resourceId
|
||||
* is deterministically derived from the parent thread + agent kind, so
|
||||
* the caller can compute the lookup key without stashing anything.
|
||||
*/
|
||||
async findSuspendedSubAgentResumeInfo(resourceId: string): Promise<
|
||||
| {
|
||||
runId: string;
|
||||
toolCallId: string;
|
||||
persistence: { threadId: string; resourceId: string };
|
||||
}
|
||||
| undefined
|
||||
> {
|
||||
const row = await this.checkpointRepo.findActiveByResourceId(resourceId);
|
||||
if (!row?.state) return undefined;
|
||||
// `pendingToolCalls` can hold parallel tool calls from one turn, only
|
||||
// some of which suspended. Pick the suspended entry explicitly so we
|
||||
// don't try to resume a tool that ran to completion in the same batch.
|
||||
const suspendedEntry = Object.entries(row.state.pendingToolCalls ?? {}).find(
|
||||
([, call]) => call.suspended,
|
||||
);
|
||||
const persistence = row.state.persistence;
|
||||
if (!suspendedEntry || !persistence?.threadId || !persistence.resourceId) return undefined;
|
||||
return {
|
||||
runId: row.key,
|
||||
toolCallId: suspendedEntry[0],
|
||||
persistence: { threadId: persistence.threadId, resourceId: persistence.resourceId },
|
||||
};
|
||||
}
|
||||
|
||||
/** Drop expired tombstones outright once they're past the GC horizon. */
|
||||
async hardDeleteExpiredOlderThan(olderThan: Date): Promise<number> {
|
||||
const result = await this.checkpointRepo.delete({
|
||||
|
|
|
|||
|
|
@ -5768,6 +5768,8 @@
|
|||
"instanceAi.backgroundTask.failed": "Background task failed",
|
||||
"instanceAi.planReview.title": "Review plan",
|
||||
"instanceAi.planReview.titleResolved": "Plan",
|
||||
"instanceAi.planReview.titleExpired": "Plan (expired)",
|
||||
"instanceAi.planReview.expiredHint": "This plan can no longer be approved. Send a new message to continue.",
|
||||
"instanceAi.planReview.building": "Building plan...",
|
||||
"instanceAi.planReview.awaitingApproval": "Awaiting approval",
|
||||
"instanceAi.planReview.description": "Review the plan, then approve it to start building or request changes to revise it.",
|
||||
|
|
|
|||
|
|
@ -223,10 +223,22 @@ function handlePlanDeny(tc: InstanceAiToolCallState) {
|
|||
void thread.confirmAction(requestId, { kind: 'planDeny' });
|
||||
}
|
||||
|
||||
/** Find the latest plan-review confirmation from a planner child's submit-plan tool call.
|
||||
* Prefers pending (isLoading) over resolved — handles revision loops where
|
||||
* multiple submit-plan calls exist. */
|
||||
/** Find the plan-review confirmation for this turn. Two shapes coexist:
|
||||
*
|
||||
* 1. Cascade flow (this feature): the planner sub-agent's submit-plan
|
||||
* confirmation is captured-not-published, so it cascades up onto the
|
||||
* orchestrator's own `plan` tool call.
|
||||
* 2. Direct flow: the planner child's submit-plan tool call carries it.
|
||||
*
|
||||
* Check the orchestrator's own tool calls first (the cascade case), then fall
|
||||
* back to the planner child. Prefers pending (isLoading) over resolved to
|
||||
* handle revision loops where multiple submit-plan calls exist. */
|
||||
const plannerConfirmation = computed<InstanceAiToolCallState | undefined>(() => {
|
||||
const onOrchestrator = props.agentNode.toolCalls.find(
|
||||
(tc) => tc.confirmation?.inputType === 'plan-review',
|
||||
);
|
||||
if (onOrchestrator) return onOrchestrator;
|
||||
|
||||
let latest: InstanceAiToolCallState | undefined;
|
||||
for (const child of props.agentNode.children) {
|
||||
if (child.role !== 'planner') continue;
|
||||
|
|
@ -240,6 +252,17 @@ const plannerConfirmation = computed<InstanceAiToolCallState | undefined>(() =>
|
|||
return latest;
|
||||
});
|
||||
|
||||
/** True when a planner sub-agent was spawned for this orchestrator turn. The
|
||||
* cascade flow leaves the plan-review confirmation on the orchestrator's own
|
||||
* `plan` tool call AND the planner child renders its own card, so without
|
||||
* this guard the tool-call slot and the post-AgentSection slot both draw a
|
||||
* plan card (one interactive, one loading). Suppress the tool-call slot when
|
||||
* a planner child exists — the post-AgentSection slot is the canonical render
|
||||
* and shows the planner's step list above the card. */
|
||||
const hasPlannerChild = computed<boolean>(() =>
|
||||
props.agentNode.children.some((c) => c.role === 'planner'),
|
||||
);
|
||||
|
||||
/** Map simplified TaskList items to PlannedTaskArg shape for loading preview */
|
||||
function mapTaskItemsToPlannedTasks(tasks?: TaskList): PlannedTaskArg[] | undefined {
|
||||
if (!tasks?.tasks?.length) return undefined;
|
||||
|
|
@ -291,14 +314,20 @@ function mapTaskItemsToPlannedTasks(tasks?: TaskList): PlannedTaskArg[] | undefi
|
|||
<template v-else-if="toolCallsById[entry.toolCallId].renderHint === 'eval-setup'" />
|
||||
<!-- Plan review must match before the planner renderHint suppression:
|
||||
when the plan tool attaches the confirmation to its own tool call
|
||||
(no planner child agent), that suppression would otherwise hide it. -->
|
||||
(no planner child agent), that suppression would otherwise hide it.
|
||||
When a planner child IS present, defer to the post-AgentSection
|
||||
slot so the card isn't drawn twice. -->
|
||||
<PlanReviewPanel
|
||||
v-else-if="toolCallsById[entry.toolCallId].confirmation?.inputType === 'plan-review'"
|
||||
v-else-if="
|
||||
toolCallsById[entry.toolCallId].confirmation?.inputType === 'plan-review' &&
|
||||
!hasPlannerChild
|
||||
"
|
||||
:key="toolCallsById[entry.toolCallId].confirmation?.requestId"
|
||||
:planned-tasks="getPlanTasks(toolCallsById[entry.toolCallId])"
|
||||
:status="getPlanReviewStatus(toolCallsById[entry.toolCallId])"
|
||||
:updating="isPlanReviewUpdating(toolCallsById[entry.toolCallId])"
|
||||
:read-only="isPlanCardReadOnly(toolCallsById[entry.toolCallId])"
|
||||
:expired="toolCallsById[entry.toolCallId].confirmation?.expired"
|
||||
@approve="handlePlanApprove(toolCallsById[entry.toolCallId])"
|
||||
@ask-for-edits="handlePlanAskForEdits(toolCallsById[entry.toolCallId])"
|
||||
@deny="handlePlanDeny(toolCallsById[entry.toolCallId])"
|
||||
|
|
@ -357,6 +386,7 @@ function mapTaskItemsToPlannedTasks(tasks?: TaskList): PlannedTaskArg[] | undefi
|
|||
:status="plannerConfirmation ? getPlanReviewStatus(plannerConfirmation) : 'pending'"
|
||||
:updating="!!plannerConfirmation && isPlanReviewUpdating(plannerConfirmation)"
|
||||
:read-only="!!plannerConfirmation && isPlanCardReadOnly(plannerConfirmation)"
|
||||
:expired="plannerConfirmation?.confirmation?.expired"
|
||||
@approve="plannerConfirmation && handlePlanApprove(plannerConfirmation)"
|
||||
@ask-for-edits="plannerConfirmation && handlePlanAskForEdits(plannerConfirmation)"
|
||||
@deny="plannerConfirmation && handlePlanDeny(plannerConfirmation)"
|
||||
|
|
|
|||
|
|
@ -33,6 +33,9 @@ const props = defineProps<{
|
|||
loading?: boolean;
|
||||
status?: PlanReviewStatus;
|
||||
updating?: boolean;
|
||||
/** The underlying pending confirmation is gone (TTL prune, restart, cancel)
|
||||
* — render a terminal "expired" state with no actionable footer. */
|
||||
expired?: boolean;
|
||||
}>();
|
||||
|
||||
const i18n = useI18n();
|
||||
|
|
@ -50,13 +53,13 @@ const reviewStatus = computed<PlanReviewStatus>(
|
|||
() => props.status ?? resolvedAction.value ?? 'pending',
|
||||
);
|
||||
|
||||
const isExpanded = ref(!props.readOnly);
|
||||
const isExpanded = ref(!props.readOnly && !props.expired);
|
||||
|
||||
const titleKey = computed<BaseTextKey>(() =>
|
||||
isResolved.value || props.readOnly
|
||||
? 'instanceAi.planReview.titleResolved'
|
||||
: 'instanceAi.planReview.title',
|
||||
);
|
||||
const titleKey = computed<BaseTextKey>(() => {
|
||||
if (props.expired) return 'instanceAi.planReview.titleExpired';
|
||||
if (isResolved.value || props.readOnly) return 'instanceAi.planReview.titleResolved';
|
||||
return 'instanceAi.planReview.title';
|
||||
});
|
||||
|
||||
const showActions = computed(
|
||||
() =>
|
||||
|
|
@ -64,10 +67,13 @@ const showActions = computed(
|
|||
!isResolved.value &&
|
||||
!props.readOnly &&
|
||||
!props.loading &&
|
||||
!props.expired &&
|
||||
props.plannedTasks.length > 0,
|
||||
);
|
||||
|
||||
const showChangesRequested = computed(() => reviewStatus.value === 'changes-requested');
|
||||
const showChangesRequested = computed(
|
||||
() => reviewStatus.value === 'changes-requested' && !props.expired,
|
||||
);
|
||||
|
||||
const isShimmering = computed(() => Boolean(props.loading || props.updating));
|
||||
|
||||
|
|
@ -243,6 +249,13 @@ function handleDeny() {
|
|||
{{ i18n.baseText('instanceAi.planReview.changesRequested') }}
|
||||
</N8nButton>
|
||||
</ConfirmationFooter>
|
||||
|
||||
<!-- Expired hint replaces the approval footer once the underlying state is gone. -->
|
||||
<div v-else-if="props.expired" :class="$style.expiredHint">
|
||||
<N8nText size="small" color="text-light">
|
||||
{{ i18n.baseText('instanceAi.planReview.expiredHint') }}
|
||||
</N8nText>
|
||||
</div>
|
||||
</AnimatedCollapsibleContent>
|
||||
</CollapsibleRoot>
|
||||
</template>
|
||||
|
|
@ -259,6 +272,11 @@ function handleDeny() {
|
|||
max-width: 90%;
|
||||
}
|
||||
|
||||
.expiredHint {
|
||||
padding: var(--spacing--xs) var(--spacing--sm);
|
||||
border-top: var(--border);
|
||||
}
|
||||
|
||||
.header {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
|
|
|
|||
|
|
@ -79,6 +79,11 @@ function collectPendingConfirmations(
|
|||
tc.confirmationStatus !== 'approved' &&
|
||||
tc.confirmationStatus !== 'denied' &&
|
||||
!resolved.has(tc.confirmation.requestId) &&
|
||||
// Expired cards render as a terminal "this action has expired" state
|
||||
// in their inline slot; surfacing them in the floating/inline panel
|
||||
// would block the chat input on a confirmation the user can no
|
||||
// longer act on.
|
||||
!tc.confirmation.expired &&
|
||||
// Plan review renders inline in the timeline, not in the confirmation panel
|
||||
tc.confirmation.inputType !== 'plan-review'
|
||||
) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user