mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-12 16:10:30 +02:00
Merge ff9d1f3614 into 0ce820de73
This commit is contained in:
commit
1607f50a91
|
|
@ -1,6 +1,18 @@
|
|||
import { getSystemPrompt } from '../system-prompt';
|
||||
|
||||
describe('getSystemPrompt', () => {
|
||||
describe('first visible turn guidance', () => {
|
||||
it('instructs the agent to send a concise sentence before the first tool call', () => {
|
||||
const prompt = getSystemPrompt({});
|
||||
|
||||
expect(prompt).toContain('before your first tool call');
|
||||
expect(prompt).toContain('write one short sentence');
|
||||
expect(prompt).toContain("Keep it tied to the user's goal, not the tool name");
|
||||
expect(prompt).toContain('Never let an empty assistant message');
|
||||
expect(prompt).toContain('[Calling tools: ...]');
|
||||
});
|
||||
});
|
||||
|
||||
describe('license hints', () => {
|
||||
it('includes License Limitations section when hints are provided', () => {
|
||||
const prompt = getSystemPrompt({
|
||||
|
|
|
|||
|
|
@ -166,6 +166,8 @@ Examples: search "credential" for the credentials tool, search "file" for filesy
|
|||
|
||||
- Be concise. Ask for clarification when intent is ambiguous.
|
||||
- No emojis unless the user explicitly requests them.
|
||||
- At the beginning of a normal user-visible turn, before your first tool call, write one short sentence explaining what you are about to do or what decision you need. Keep it tied to the user's goal, not the tool name. For system-generated background or checkpoint follow-up turns, follow the follow-up instructions.
|
||||
- Never let an empty assistant message or a \`[Calling tools: ...]\` placeholder be the first visible response.
|
||||
- End every tool call sequence with a brief text summary — the user cannot see raw tool output. Do not end your turn silently after tool calls. Exception: after spawning a background agent (\`build-workflow-with-agent\`, \`plan\`, \`create-tasks\`, \`delegate\`, \`research-with-agent\`, \`manage-data-tables-with-agent\`) the task card replaces your reply — do not write text.
|
||||
|
||||
## Safety
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ export type {
|
|||
BackgroundTaskStatusSnapshot,
|
||||
ConfirmationData,
|
||||
PendingConfirmation,
|
||||
RunStateTimeoutDetails,
|
||||
StartedRunState,
|
||||
SuspendedRunState,
|
||||
} from './runtime/run-state-registry';
|
||||
|
|
|
|||
|
|
@ -1088,6 +1088,12 @@ describe('RunStateRegistry', () => {
|
|||
const result = registry.sweepTimedOut(policy, 30_000);
|
||||
|
||||
expect(result.activeThreadIds).toEqual(['thread-old']);
|
||||
expect(result.activeTimeouts['thread-old']).toMatchObject({
|
||||
reason: 'idle_timeout',
|
||||
surface: 'active-run',
|
||||
timeoutMs: 30_000,
|
||||
idleMs: 30_000,
|
||||
});
|
||||
});
|
||||
|
||||
it('identifies suspended runs older than maxAgeMs', () => {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,11 @@ import type { InstanceAiThreadStatusResponse } from '@n8n/api-types';
|
|||
import { nanoid } from 'nanoid';
|
||||
|
||||
import type { InstanceAiTraceContext } from '../types';
|
||||
import type { InstanceAiLivenessPolicy } from './liveness-policy';
|
||||
import type {
|
||||
InstanceAiLivenessPolicy,
|
||||
InstanceAiLivenessSurface,
|
||||
InstanceAiLivenessTimeoutReason,
|
||||
} from './liveness-policy';
|
||||
|
||||
export interface ActiveRunState {
|
||||
runId: string;
|
||||
|
|
@ -72,6 +76,14 @@ export interface BackgroundTaskStatusSnapshot {
|
|||
threadId: string;
|
||||
}
|
||||
|
||||
export interface RunStateTimeoutDetails {
|
||||
reason: InstanceAiLivenessTimeoutReason;
|
||||
surface: InstanceAiLivenessSurface;
|
||||
timeoutMs: number;
|
||||
elapsedMs: number;
|
||||
idleMs: number;
|
||||
}
|
||||
|
||||
export interface StartRunOptions<TUser> {
|
||||
threadId: string;
|
||||
user: TUser;
|
||||
|
|
@ -372,8 +384,12 @@ export class RunStateRegistry<TUser = unknown> {
|
|||
activeThreadIds: string[];
|
||||
suspendedThreadIds: string[];
|
||||
confirmationRequestIds: string[];
|
||||
activeTimeouts: Record<string, RunStateTimeoutDetails>;
|
||||
suspendedTimeouts: Record<string, RunStateTimeoutDetails>;
|
||||
confirmationTimeouts: Record<string, RunStateTimeoutDetails>;
|
||||
} {
|
||||
const activeThreadIds: string[] = [];
|
||||
const activeTimeouts: Record<string, RunStateTimeoutDetails> = {};
|
||||
for (const [threadId, run] of this.activeRuns) {
|
||||
if (this.hasPendingConfirmationForThread(threadId)) continue;
|
||||
|
||||
|
|
@ -387,10 +403,12 @@ export class RunStateRegistry<TUser = unknown> {
|
|||
});
|
||||
if (decision.action === 'timeout') {
|
||||
activeThreadIds.push(threadId);
|
||||
activeTimeouts[threadId] = decision;
|
||||
}
|
||||
}
|
||||
|
||||
const suspendedThreadIds: string[] = [];
|
||||
const suspendedTimeouts: Record<string, RunStateTimeoutDetails> = {};
|
||||
for (const [threadId, run] of this.suspendedRuns) {
|
||||
const startedAt = run.startedAt ?? run.createdAt;
|
||||
const lastActivityAt = run.lastActivityAt ?? run.createdAt;
|
||||
|
|
@ -402,9 +420,11 @@ export class RunStateRegistry<TUser = unknown> {
|
|||
});
|
||||
if (decision.action === 'timeout') {
|
||||
suspendedThreadIds.push(threadId);
|
||||
suspendedTimeouts[threadId] = decision;
|
||||
}
|
||||
}
|
||||
const confirmationRequestIds: string[] = [];
|
||||
const confirmationTimeouts: Record<string, RunStateTimeoutDetails> = {};
|
||||
for (const [reqId, pending] of this.pendingConfirmations) {
|
||||
const startedAt = pending.startedAt ?? pending.createdAt;
|
||||
const lastActivityAt = pending.lastActivityAt ?? pending.createdAt;
|
||||
|
|
@ -416,9 +436,17 @@ export class RunStateRegistry<TUser = unknown> {
|
|||
});
|
||||
if (decision.action === 'timeout') {
|
||||
confirmationRequestIds.push(reqId);
|
||||
confirmationTimeouts[reqId] = decision;
|
||||
}
|
||||
}
|
||||
return { activeThreadIds, suspendedThreadIds, confirmationRequestIds };
|
||||
return {
|
||||
activeThreadIds,
|
||||
suspendedThreadIds,
|
||||
confirmationRequestIds,
|
||||
activeTimeouts,
|
||||
suspendedTimeouts,
|
||||
confirmationTimeouts,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -10,7 +10,12 @@ import {
|
|||
type InstanceAiLivenessTimeoutReason,
|
||||
} from '@n8n/instance-ai';
|
||||
|
||||
import { INSTANCE_AI_RUN_TIMEOUT_REASON, InstanceAiLivenessService } from '../liveness';
|
||||
import {
|
||||
INSTANCE_AI_RUN_TIMEOUT_REASON,
|
||||
InstanceAiLivenessService,
|
||||
type InstanceAiLivenessSweepResult,
|
||||
} from '../liveness';
|
||||
import type { InstanceAiRunTimeoutDetails } from '../run-timeout-details';
|
||||
|
||||
type TestSuspendedRun = {
|
||||
runId: string;
|
||||
|
|
@ -22,11 +27,13 @@ function createLivenessService() {
|
|||
const policyConfig = createInstanceAiLivenessPolicyConfig();
|
||||
const policy = new InstanceAiLivenessPolicy(policyConfig);
|
||||
const runState = {
|
||||
sweepTimedOut: jest.fn((_policy: InstanceAiLivenessPolicy, _now?: number) => ({
|
||||
activeThreadIds: [] as string[],
|
||||
suspendedThreadIds: [] as string[],
|
||||
confirmationRequestIds: [] as string[],
|
||||
})),
|
||||
sweepTimedOut: jest.fn(
|
||||
(_policy: InstanceAiLivenessPolicy, _now?: number): InstanceAiLivenessSweepResult => ({
|
||||
activeThreadIds: [] as string[],
|
||||
suspendedThreadIds: [] as string[],
|
||||
confirmationRequestIds: [] as string[],
|
||||
}),
|
||||
),
|
||||
cancelActiveRun: jest.fn(
|
||||
(_threadId: string): { runId: string; abortController: AbortController } | undefined =>
|
||||
undefined,
|
||||
|
|
@ -93,11 +100,27 @@ describe('InstanceAiLivenessService', () => {
|
|||
threadId: 'thread-suspended',
|
||||
abortController: suspendedAbortController,
|
||||
};
|
||||
const activeTimeout: InstanceAiRunTimeoutDetails = {
|
||||
reason: 'idle_timeout',
|
||||
surface: 'active-run',
|
||||
timeoutMs: 600_000,
|
||||
elapsedMs: 650_000,
|
||||
idleMs: 606_000,
|
||||
};
|
||||
const suspendedTimeout: InstanceAiRunTimeoutDetails = {
|
||||
reason: 'max_lifetime',
|
||||
surface: 'suspended-run',
|
||||
timeoutMs: 1_800_000,
|
||||
elapsedMs: 1_801_000,
|
||||
idleMs: 900_000,
|
||||
};
|
||||
|
||||
runState.sweepTimedOut.mockReturnValue({
|
||||
activeThreadIds: ['thread-active'],
|
||||
suspendedThreadIds: ['thread-suspended'],
|
||||
confirmationRequestIds: ['request-1'],
|
||||
activeTimeouts: { 'thread-active': activeTimeout },
|
||||
suspendedTimeouts: { 'thread-suspended': suspendedTimeout },
|
||||
});
|
||||
runState.cancelActiveRun.mockReturnValue({
|
||||
runId: 'run-active',
|
||||
|
|
@ -136,7 +159,14 @@ describe('InstanceAiLivenessService', () => {
|
|||
'thread-confirmation',
|
||||
expect.objectContaining({ responseId: 'run-timeout:run-confirmation' }),
|
||||
);
|
||||
expect(service.consumeRunTimedOut('run-active')).toBe(true);
|
||||
expect(service.consumeRunTimeout('run-active')).toEqual({
|
||||
timedOut: true,
|
||||
details: activeTimeout,
|
||||
});
|
||||
expect(service.consumeRunTimeout('run-suspended')).toEqual({
|
||||
timedOut: true,
|
||||
details: suspendedTimeout,
|
||||
});
|
||||
expect(service.hasTimedOutActiveRunThread('thread-active')).toBe(true);
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -533,6 +533,7 @@ type TerminalGuardOrderServiceInternals = {
|
|||
getEventsForRuns: jest.Mock;
|
||||
publish: jest.Mock;
|
||||
};
|
||||
liveness: { consumeRunTimeout: jest.Mock };
|
||||
telemetry: { track: jest.Mock };
|
||||
logger: { warn: jest.Mock; error: jest.Mock };
|
||||
traceContextsByRunId: Map<string, { threadId: string; messageGroupId?: string }>;
|
||||
|
|
@ -602,6 +603,7 @@ function createTerminalGuardOrderService(): TerminalGuardOrderServiceInternals {
|
|||
events.push(event);
|
||||
}),
|
||||
};
|
||||
service.liveness = { consumeRunTimeout: jest.fn(() => ({ timedOut: false })) };
|
||||
service.telemetry = { track: jest.fn() };
|
||||
service.logger = { warn: jest.fn(), error: jest.fn() };
|
||||
service.traceContextsByRunId = new Map([
|
||||
|
|
|
|||
|
|
@ -0,0 +1,86 @@
|
|||
import type { InstanceAiEvent } from '@n8n/api-types';
|
||||
|
||||
import { buildInstanceAiRunTraceMetadata } from '../run-trace-metadata';
|
||||
|
||||
const baseEvent = {
|
||||
runId: 'run-1',
|
||||
agentId: 'agent-1',
|
||||
};
|
||||
|
||||
describe('buildInstanceAiRunTraceMetadata', () => {
|
||||
it('classifies a tool call followed by HITL as a contextless HITL first state', () => {
|
||||
const events: InstanceAiEvent[] = [
|
||||
{
|
||||
type: 'run-start',
|
||||
...baseEvent,
|
||||
payload: { messageId: 'message-1' },
|
||||
},
|
||||
{
|
||||
type: 'tool-call',
|
||||
...baseEvent,
|
||||
payload: { toolCallId: 'tool-1', toolName: 'credentials', args: { action: 'list' } },
|
||||
},
|
||||
{
|
||||
type: 'confirmation-request',
|
||||
...baseEvent,
|
||||
payload: {
|
||||
requestId: 'request-1',
|
||||
toolCallId: 'tool-1',
|
||||
toolName: 'credentials',
|
||||
args: {},
|
||||
severity: 'info',
|
||||
message: 'Pick a credential',
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
expect(buildInstanceAiRunTraceMetadata(events, { status: 'cancelled' })).toMatchObject({
|
||||
first_visible_state: 'contextless_hitl',
|
||||
first_tool_name: 'credentials',
|
||||
cancellation_type: 'explicit',
|
||||
});
|
||||
});
|
||||
|
||||
it('keeps the first tool name when assistant text appears first', () => {
|
||||
const events: InstanceAiEvent[] = [
|
||||
{
|
||||
type: 'text-delta',
|
||||
...baseEvent,
|
||||
payload: { text: 'I will check the workflow first.' },
|
||||
},
|
||||
{
|
||||
type: 'tool-call',
|
||||
...baseEvent,
|
||||
payload: { toolCallId: 'tool-1', toolName: 'workflows', args: { action: 'get' } },
|
||||
},
|
||||
];
|
||||
|
||||
expect(buildInstanceAiRunTraceMetadata(events, { status: 'completed' })).toEqual({
|
||||
first_visible_state: 'assistant_text',
|
||||
first_tool_name: 'workflows',
|
||||
});
|
||||
});
|
||||
|
||||
it('records timeout cancellation type and idle tail without extra fields', () => {
|
||||
const metadata = buildInstanceAiRunTraceMetadata([], {
|
||||
status: 'cancelled',
|
||||
cancellationReason: 'timeout',
|
||||
runTimeout: {
|
||||
timedOut: true,
|
||||
details: {
|
||||
reason: 'idle_timeout',
|
||||
surface: 'active-run',
|
||||
timeoutMs: 600_000,
|
||||
elapsedMs: 650_200,
|
||||
idleMs: 606_400.4,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(metadata).toEqual({
|
||||
first_visible_state: 'empty',
|
||||
cancellation_type: 'idle_timeout',
|
||||
idle_tail_ms: 606_400,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -111,7 +111,12 @@ import { InstanceAiCompactionService } from './compaction.service';
|
|||
import { ProxyTokenManager } from '@/services/proxy-token-manager';
|
||||
import { InstanceAiThreadRepository } from './repositories/instance-ai-thread.repository';
|
||||
import { TraceReplayState } from './trace-replay-state';
|
||||
import { INSTANCE_AI_RUN_TIMEOUT_REASON, InstanceAiLivenessService } from './liveness';
|
||||
import {
|
||||
INSTANCE_AI_RUN_TIMEOUT_REASON,
|
||||
InstanceAiLivenessService,
|
||||
type InstanceAiConsumedRunTimeout,
|
||||
} from './liveness';
|
||||
import { buildInstanceAiRunTraceMetadata } from './run-trace-metadata';
|
||||
|
||||
function getErrorMessage(error: unknown): string {
|
||||
return error instanceof Error ? error.message : String(error);
|
||||
|
|
@ -902,6 +907,32 @@ export class InstanceAiService {
|
|||
await this.finalizeMessageTraceRoot(runId, tracing, options);
|
||||
}
|
||||
|
||||
private buildMessageTraceMetadata(
|
||||
threadId: string,
|
||||
runId: string,
|
||||
options: {
|
||||
status: MessageTraceFinalization['status'];
|
||||
cancellationReason?: string;
|
||||
runTimeout?: InstanceAiConsumedRunTimeout;
|
||||
},
|
||||
): Record<string, unknown> {
|
||||
const traceOptions = {
|
||||
status: options.status,
|
||||
...(options.cancellationReason !== undefined
|
||||
? { cancellationReason: options.cancellationReason }
|
||||
: {}),
|
||||
...(options.runTimeout !== undefined ? { runTimeout: options.runTimeout } : {}),
|
||||
};
|
||||
|
||||
return {
|
||||
completion_source: 'orchestrator',
|
||||
...buildInstanceAiRunTraceMetadata(
|
||||
this.eventBus.getEventsForRun(threadId, runId),
|
||||
traceOptions,
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
private async finalizeRemainingMessageTraceRoots(
|
||||
threadId: string,
|
||||
options: MessageTraceFinalization,
|
||||
|
|
@ -1854,7 +1885,9 @@ export class InstanceAiService {
|
|||
return {
|
||||
status: 'error',
|
||||
reason: 'invalid_confirmation_payload',
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
metadata: this.buildMessageTraceMetadata(args.threadId, args.runId, {
|
||||
status: 'error',
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -3123,7 +3156,7 @@ export class InstanceAiService {
|
|||
status: finalStatus,
|
||||
outputText,
|
||||
modelId,
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
metadata: this.buildMessageTraceMetadata(threadId, runId, { status: finalStatus }),
|
||||
};
|
||||
const archivedWorkflowIds = await this.reapAiTemporaryFromRun(
|
||||
threadId,
|
||||
|
|
@ -3149,7 +3182,8 @@ export class InstanceAiService {
|
|||
}
|
||||
} catch (error) {
|
||||
if (signal.aborted) {
|
||||
const cancellationReason = this.liveness.consumeRunTimedOut(runId)
|
||||
const runTimeout = this.liveness.consumeRunTimeout(runId);
|
||||
const cancellationReason = runTimeout.timedOut
|
||||
? INSTANCE_AI_RUN_TIMEOUT_REASON
|
||||
: getAbortReason(signal);
|
||||
if (cancellationReason === INSTANCE_AI_RUN_TIMEOUT_REASON) {
|
||||
|
|
@ -3166,7 +3200,11 @@ export class InstanceAiService {
|
|||
messageTraceFinalization = {
|
||||
status: 'cancelled',
|
||||
reason: cancellationReason,
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
metadata: this.buildMessageTraceMetadata(threadId, runId, {
|
||||
status: 'cancelled',
|
||||
cancellationReason,
|
||||
runTimeout,
|
||||
}),
|
||||
};
|
||||
const archivedWorkflowIds = await this.reapAiTemporaryFromRun(
|
||||
threadId,
|
||||
|
|
@ -3206,7 +3244,7 @@ export class InstanceAiService {
|
|||
messageTraceFinalization = {
|
||||
status: 'error',
|
||||
reason: errorMessage,
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
metadata: this.buildMessageTraceMetadata(threadId, runId, { status: 'error' }),
|
||||
};
|
||||
|
||||
const archivedWorkflowIds = await this.reapAiTemporaryFromRun(
|
||||
|
|
@ -3237,6 +3275,9 @@ export class InstanceAiService {
|
|||
this.domainAccessTrackersByThread.get(threadId)?.clearRun(runId);
|
||||
if (messageTraceFinalization) {
|
||||
await this.maybeFinalizeRunTraceRoot(runId, messageTraceFinalization);
|
||||
if (messageTraceFinalization.status !== 'cancelled') {
|
||||
this.liveness.consumeRunTimeout(runId);
|
||||
}
|
||||
}
|
||||
// Clean up Mastra workflow snapshots unless the run is suspended (needed for resume).
|
||||
// Mastra only persists snapshots on suspension and never deletes them on completion.
|
||||
|
|
@ -3700,7 +3741,9 @@ export class InstanceAiService {
|
|||
messageTraceFinalization = {
|
||||
status: finalStatus,
|
||||
outputText,
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
metadata: this.buildMessageTraceMetadata(opts.threadId, opts.runId, {
|
||||
status: finalStatus,
|
||||
}),
|
||||
};
|
||||
const archivedWorkflowIds = await this.reapAiTemporaryFromRun(
|
||||
opts.threadId,
|
||||
|
|
@ -3723,17 +3766,28 @@ export class InstanceAiService {
|
|||
} catch (error) {
|
||||
if (opts.signal.aborted) {
|
||||
const messageGroupId = this.traceContextsByRunId.get(opts.runId)?.messageGroupId;
|
||||
const runTimeout = this.liveness.consumeRunTimeout(opts.runId);
|
||||
const cancellationReason = runTimeout.timedOut
|
||||
? INSTANCE_AI_RUN_TIMEOUT_REASON
|
||||
: getAbortReason(opts.signal);
|
||||
if (cancellationReason === INSTANCE_AI_RUN_TIMEOUT_REASON) {
|
||||
this.liveness.publishRunTimeoutNotice(opts.threadId, opts.runId);
|
||||
}
|
||||
this.evaluateTerminalResponse(opts.threadId, opts.runId, 'cancelled', {
|
||||
messageGroupId,
|
||||
});
|
||||
await this.finalizeRunTracing(opts.runId, opts.tracing, {
|
||||
status: 'cancelled',
|
||||
reason: 'user_cancelled',
|
||||
reason: cancellationReason,
|
||||
});
|
||||
messageTraceFinalization = {
|
||||
status: 'cancelled',
|
||||
reason: 'user_cancelled',
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
reason: cancellationReason,
|
||||
metadata: this.buildMessageTraceMetadata(opts.threadId, opts.runId, {
|
||||
status: 'cancelled',
|
||||
cancellationReason,
|
||||
runTimeout,
|
||||
}),
|
||||
};
|
||||
const archivedWorkflowIds = await this.reapAiTemporaryFromRun(
|
||||
opts.threadId,
|
||||
|
|
@ -3744,7 +3798,7 @@ export class InstanceAiService {
|
|||
opts.threadId,
|
||||
opts.runId,
|
||||
'cancelled',
|
||||
'user_cancelled',
|
||||
cancellationReason,
|
||||
archivedWorkflowIds,
|
||||
);
|
||||
await this.saveAgentTreeSnapshot(opts.threadId, opts.runId, opts.snapshotStorage);
|
||||
|
|
@ -3771,7 +3825,9 @@ export class InstanceAiService {
|
|||
messageTraceFinalization = {
|
||||
status: 'error',
|
||||
reason: errorMessage,
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
metadata: this.buildMessageTraceMetadata(opts.threadId, opts.runId, {
|
||||
status: 'error',
|
||||
}),
|
||||
};
|
||||
|
||||
const archivedWorkflowIds = await this.reapAiTemporaryFromRun(
|
||||
|
|
@ -3796,6 +3852,9 @@ export class InstanceAiService {
|
|||
// post-run planned-task dispatch.
|
||||
if (messageTraceFinalization) {
|
||||
await this.maybeFinalizeRunTraceRoot(opts.runId, messageTraceFinalization);
|
||||
if (messageTraceFinalization.status !== 'cancelled') {
|
||||
this.liveness.consumeRunTimeout(opts.runId);
|
||||
}
|
||||
}
|
||||
// Post-run planned-task wiring — mirror the executeRun finally.
|
||||
// Resumed ordinary-chat runs also need to drive the scheduler in case
|
||||
|
|
@ -4137,6 +4196,10 @@ export class InstanceAiService {
|
|||
suspended: SuspendedRunState<User>,
|
||||
reason = 'user_cancelled',
|
||||
): Promise<void> {
|
||||
const runTimeout =
|
||||
reason === INSTANCE_AI_RUN_TIMEOUT_REASON
|
||||
? this.liveness.consumeRunTimeout(suspended.runId)
|
||||
: undefined;
|
||||
if (reason === INSTANCE_AI_RUN_TIMEOUT_REASON) {
|
||||
this.liveness.publishRunTimeoutNotice(suspended.threadId, suspended.runId);
|
||||
}
|
||||
|
|
@ -4172,7 +4235,11 @@ export class InstanceAiService {
|
|||
await this.maybeFinalizeRunTraceRoot(suspended.runId, {
|
||||
status: 'cancelled',
|
||||
reason,
|
||||
metadata: { completion_source: 'orchestrator' },
|
||||
metadata: this.buildMessageTraceMetadata(suspended.threadId, suspended.runId, {
|
||||
status: 'cancelled',
|
||||
cancellationReason: reason,
|
||||
...(runTimeout ? { runTimeout } : {}),
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ export {
|
|||
INSTANCE_AI_RUN_TIMEOUT_REASON,
|
||||
InstanceAiLivenessService,
|
||||
type InstanceAiLivenessBackgroundTasks,
|
||||
type InstanceAiConsumedRunTimeout,
|
||||
type InstanceAiLivenessEventBus,
|
||||
type InstanceAiLivenessLogger,
|
||||
type InstanceAiLivenessPendingConfirmation,
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ import type { InstanceAiEvent } from '@n8n/api-types';
|
|||
import { Time } from '@n8n/constants';
|
||||
import type { InstanceAiLivenessPolicy, InstanceAiLivenessTimeoutReason } from '@n8n/instance-ai';
|
||||
|
||||
import type { InstanceAiRunTimeoutDetails } from '../run-timeout-details';
|
||||
|
||||
const ORCHESTRATOR_AGENT_ID = 'agent-001';
|
||||
|
||||
export const INSTANCE_AI_RUN_TIMEOUT_REASON = 'timeout';
|
||||
|
|
@ -39,6 +41,14 @@ export type InstanceAiLivenessSweepResult = {
|
|||
activeThreadIds: string[];
|
||||
suspendedThreadIds: string[];
|
||||
confirmationRequestIds: string[];
|
||||
activeTimeouts?: Record<string, InstanceAiRunTimeoutDetails>;
|
||||
suspendedTimeouts?: Record<string, InstanceAiRunTimeoutDetails>;
|
||||
confirmationTimeouts?: Record<string, InstanceAiRunTimeoutDetails>;
|
||||
};
|
||||
|
||||
export type InstanceAiConsumedRunTimeout = {
|
||||
timedOut: boolean;
|
||||
details?: InstanceAiRunTimeoutDetails;
|
||||
};
|
||||
|
||||
export type InstanceAiLivenessRunState<
|
||||
|
|
@ -86,7 +96,7 @@ export class InstanceAiLivenessService<
|
|||
> {
|
||||
private timeoutInterval?: NodeJS.Timeout;
|
||||
|
||||
private readonly timedOutRunIds = new Set<string>();
|
||||
private readonly timedOutRunIds = new Map<string, InstanceAiRunTimeoutDetails | undefined>();
|
||||
|
||||
private readonly timedOutActiveRunThreads = new Set<string>();
|
||||
|
||||
|
|
@ -117,8 +127,8 @@ export class InstanceAiLivenessService<
|
|||
this.timedOutActiveRunThreads.delete(threadId);
|
||||
}
|
||||
|
||||
markRunTimedOut(runId: string): void {
|
||||
this.timedOutRunIds.add(runId);
|
||||
markRunTimedOut(runId: string, details?: InstanceAiRunTimeoutDetails): void {
|
||||
this.timedOutRunIds.set(runId, details);
|
||||
}
|
||||
|
||||
consumeRunTimedOut(runId: string): boolean {
|
||||
|
|
@ -127,22 +137,37 @@ export class InstanceAiLivenessService<
|
|||
return timedOut;
|
||||
}
|
||||
|
||||
consumeRunTimeout(runId: string): InstanceAiConsumedRunTimeout {
|
||||
const timedOut = this.timedOutRunIds.has(runId);
|
||||
if (!timedOut) return { timedOut: false };
|
||||
|
||||
const details = this.timedOutRunIds.get(runId);
|
||||
this.timedOutRunIds.delete(runId);
|
||||
return details ? { timedOut: true, details } : { timedOut: true };
|
||||
}
|
||||
|
||||
hasTimedOutActiveRunThread(threadId: string): boolean {
|
||||
return this.timedOutActiveRunThreads.has(threadId);
|
||||
}
|
||||
|
||||
async sweepTimedOutWork(now = Date.now()): Promise<void> {
|
||||
const { activeThreadIds, suspendedThreadIds, confirmationRequestIds } =
|
||||
this.options.runState.sweepTimedOut(this.options.policy, now);
|
||||
const {
|
||||
activeThreadIds,
|
||||
suspendedThreadIds,
|
||||
confirmationRequestIds,
|
||||
activeTimeouts,
|
||||
suspendedTimeouts,
|
||||
confirmationTimeouts,
|
||||
} = this.options.runState.sweepTimedOut(this.options.policy, now);
|
||||
|
||||
for (const threadId of activeThreadIds) {
|
||||
this.options.logger.debug('Cancelling timed-out active run', { threadId });
|
||||
this.cancelTimedOutActiveRun(threadId);
|
||||
this.cancelTimedOutActiveRun(threadId, activeTimeouts?.[threadId]);
|
||||
}
|
||||
|
||||
for (const threadId of suspendedThreadIds) {
|
||||
this.options.logger.debug('Auto-rejecting timed-out suspended run', { threadId });
|
||||
this.cancelTimedOutSuspendedRun(threadId);
|
||||
this.cancelTimedOutSuspendedRun(threadId, suspendedTimeouts?.[threadId]);
|
||||
}
|
||||
|
||||
for (const reqId of confirmationRequestIds) {
|
||||
|
|
@ -152,7 +177,10 @@ export class InstanceAiLivenessService<
|
|||
const pending = this.options.runState.getPendingConfirmation(reqId);
|
||||
if (pending) {
|
||||
const runId = this.options.runState.getActiveRunId(pending.threadId);
|
||||
if (runId) this.publishRunTimeoutNotice(pending.threadId, runId);
|
||||
if (runId) {
|
||||
this.markRunTimedOut(runId, confirmationTimeouts?.[reqId]);
|
||||
this.publishRunTimeoutNotice(pending.threadId, runId);
|
||||
}
|
||||
}
|
||||
this.options.runState.rejectPendingConfirmation(reqId);
|
||||
}
|
||||
|
|
@ -177,21 +205,21 @@ export class InstanceAiLivenessService<
|
|||
}
|
||||
}
|
||||
|
||||
cancelTimedOutActiveRun(threadId: string): void {
|
||||
cancelTimedOutActiveRun(threadId: string, details?: InstanceAiRunTimeoutDetails): void {
|
||||
const active = this.options.runState.cancelActiveRun(threadId);
|
||||
if (!active) return;
|
||||
|
||||
this.markRunTimedOut(active.runId);
|
||||
this.markRunTimedOut(active.runId, details);
|
||||
this.timedOutActiveRunThreads.add(threadId);
|
||||
this.publishRunTimeoutNotice(threadId, active.runId);
|
||||
active.abortController.abort();
|
||||
}
|
||||
|
||||
cancelTimedOutSuspendedRun(threadId: string): void {
|
||||
cancelTimedOutSuspendedRun(threadId: string, details?: InstanceAiRunTimeoutDetails): void {
|
||||
const suspended = this.options.runState.cancelSuspendedRun(threadId);
|
||||
if (!suspended) return;
|
||||
|
||||
this.markRunTimedOut(suspended.runId);
|
||||
this.markRunTimedOut(suspended.runId, details);
|
||||
suspended.abortController.abort();
|
||||
this.options.finalizeCancelledSuspendedRun(suspended, INSTANCE_AI_RUN_TIMEOUT_REASON);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,9 @@
|
|||
import type { InstanceAiLivenessSurface, InstanceAiLivenessTimeoutReason } from '@n8n/instance-ai';
|
||||
|
||||
export type InstanceAiRunTimeoutDetails = {
|
||||
reason: InstanceAiLivenessTimeoutReason;
|
||||
surface: InstanceAiLivenessSurface;
|
||||
timeoutMs: number;
|
||||
elapsedMs: number;
|
||||
idleMs: number;
|
||||
};
|
||||
112
packages/cli/src/modules/instance-ai/run-trace-metadata.ts
Normal file
112
packages/cli/src/modules/instance-ai/run-trace-metadata.ts
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
import type { InstanceAiEvent } from '@n8n/api-types';
|
||||
import type { InstanceAiLivenessTimeoutReason } from '@n8n/instance-ai';
|
||||
|
||||
import type { InstanceAiRunTimeoutDetails } from './run-timeout-details';
|
||||
|
||||
const RUN_TIMEOUT_REASON = 'timeout';
|
||||
|
||||
export type InstanceAiFirstVisibleState =
|
||||
| 'assistant_text'
|
||||
| 'contextless_hitl'
|
||||
| 'tool_call'
|
||||
| 'task_card'
|
||||
| 'empty';
|
||||
|
||||
export type InstanceAiCancellationType = 'explicit' | InstanceAiLivenessTimeoutReason;
|
||||
|
||||
export type InstanceAiRunTimeoutTraceContext = {
|
||||
timedOut: boolean;
|
||||
details?: InstanceAiRunTimeoutDetails;
|
||||
};
|
||||
|
||||
export type InstanceAiRunTraceMetadataOptions = {
|
||||
status: 'completed' | 'cancelled' | 'error';
|
||||
cancellationReason?: string;
|
||||
runTimeout?: InstanceAiRunTimeoutTraceContext;
|
||||
};
|
||||
|
||||
type FirstVisibleSummary = {
|
||||
state: InstanceAiFirstVisibleState;
|
||||
firstToolName?: string;
|
||||
};
|
||||
|
||||
function withFirstToolName(
|
||||
state: InstanceAiFirstVisibleState,
|
||||
firstToolName: string | undefined,
|
||||
): FirstVisibleSummary {
|
||||
return firstToolName ? { state, firstToolName } : { state };
|
||||
}
|
||||
|
||||
function getFirstToolName(events: InstanceAiEvent[]): string | undefined {
|
||||
for (const event of events) {
|
||||
if (event.type === 'tool-call') return event.payload.toolName;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function getFirstVisibleSummary(events: InstanceAiEvent[]): FirstVisibleSummary {
|
||||
const firstToolName = getFirstToolName(events);
|
||||
let sawToolCall = false;
|
||||
|
||||
for (const event of events) {
|
||||
if (event.type === 'tool-call') {
|
||||
sawToolCall = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (event.type === 'confirmation-request') {
|
||||
return {
|
||||
state: 'contextless_hitl',
|
||||
firstToolName: firstToolName ?? event.payload.toolName,
|
||||
};
|
||||
}
|
||||
|
||||
if (event.type === 'text-delta' && event.payload.text.trim().length > 0) {
|
||||
return withFirstToolName('assistant_text', firstToolName);
|
||||
}
|
||||
|
||||
if (event.type === 'agent-spawned' || event.type === 'tasks-update') {
|
||||
return withFirstToolName('task_card', firstToolName);
|
||||
}
|
||||
}
|
||||
|
||||
return withFirstToolName(sawToolCall ? 'tool_call' : 'empty', firstToolName);
|
||||
}
|
||||
|
||||
function getCancellationType(
|
||||
options: InstanceAiRunTraceMetadataOptions,
|
||||
): InstanceAiCancellationType | undefined {
|
||||
if (options.status !== 'cancelled') return undefined;
|
||||
|
||||
if (options.runTimeout?.timedOut || options.cancellationReason === RUN_TIMEOUT_REASON) {
|
||||
return options.runTimeout?.details?.reason ?? 'idle_timeout';
|
||||
}
|
||||
|
||||
return 'explicit';
|
||||
}
|
||||
|
||||
export function buildInstanceAiRunTraceMetadata(
|
||||
events: InstanceAiEvent[],
|
||||
options: InstanceAiRunTraceMetadataOptions,
|
||||
): Record<string, unknown> {
|
||||
const firstVisible = getFirstVisibleSummary(events);
|
||||
const metadata: Record<string, unknown> = {
|
||||
first_visible_state: firstVisible.state,
|
||||
};
|
||||
|
||||
if (firstVisible.firstToolName) {
|
||||
metadata.first_tool_name = firstVisible.firstToolName;
|
||||
}
|
||||
|
||||
const cancellationType = getCancellationType(options);
|
||||
if (cancellationType) {
|
||||
metadata.cancellation_type = cancellationType;
|
||||
}
|
||||
|
||||
if (options.runTimeout?.details?.idleMs !== undefined) {
|
||||
metadata.idle_tail_ms = Math.round(options.runTimeout.details.idleMs);
|
||||
}
|
||||
|
||||
return metadata;
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user