diff --git a/packages/@n8n/instance-ai/src/runtime/__tests__/background-task-manager.test.ts b/packages/@n8n/instance-ai/src/runtime/__tests__/background-task-manager.test.ts index 7e9470cc6e5..17dad139dde 100644 --- a/packages/@n8n/instance-ai/src/runtime/__tests__/background-task-manager.test.ts +++ b/packages/@n8n/instance-ai/src/runtime/__tests__/background-task-manager.test.ts @@ -37,6 +37,7 @@ describe('BackgroundTaskManager', () => { it('rejects spawn when concurrent limit is reached', () => { const onLimitReached = jest.fn(); + const createTraceContext = jest.fn(); manager.spawn( makeSpawnOptions({ taskId: 't1', run: async () => await new Promise(() => {}) }), @@ -48,10 +49,30 @@ describe('BackgroundTaskManager', () => { makeSpawnOptions({ taskId: 't3', run: async () => await new Promise(() => {}) }), ); - const result = manager.spawn(makeSpawnOptions({ taskId: 't4', onLimitReached })); + const result = manager.spawn( + makeSpawnOptions({ taskId: 't4', onLimitReached, createTraceContext }), + ); expect(result.status).toBe('limit-reached'); expect(onLimitReached).toHaveBeenCalledWith(expect.stringContaining('limit of 3')); + expect(createTraceContext).not.toHaveBeenCalled(); + }); + + it('creates lazy trace context only after a task is accepted', async () => { + const traceContext = { projectName: 'instance-ai' } as never; + const createTraceContext = jest.fn().mockResolvedValue(traceContext); + const run = jest.fn().mockResolvedValue('done'); + + manager.spawn(makeSpawnOptions({ createTraceContext, run })); + await flushPromises(); + + expect(createTraceContext).toHaveBeenCalledTimes(1); + expect(run).toHaveBeenCalledWith( + expect.any(AbortSignal), + expect.any(Function), + expect.any(Function), + { traceContext }, + ); }); it('calls onCompleted and onSettled when run resolves with string', async () => { @@ -198,6 +219,28 @@ describe('BackgroundTaskManager', () => { expect(manager.getRunningTasks('thread-1')).toHaveLength(1); }); + it('does not create lazy trace context for duplicate spawns', () => { + manager.spawn( + makeSpawnOptions({ + taskId: 'first', + run: async () => await new Promise(() => {}), + dedupeKey: { role: 'workflow-builder', plannedTaskId: 'planned-trace' }, + }), + ); + const createTraceContext = jest.fn(); + + const second = manager.spawn( + makeSpawnOptions({ + taskId: 'second', + createTraceContext, + dedupeKey: { role: 'workflow-builder', plannedTaskId: 'planned-trace' }, + }), + ); + + expect(second.status).toBe('duplicate'); + expect(createTraceContext).not.toHaveBeenCalled(); + }); + it('allows a new spawn once the first planned-task settles', async () => { const { promise, resolve } = createDeferred(); manager.spawn( diff --git a/packages/@n8n/instance-ai/src/runtime/background-task-manager.ts b/packages/@n8n/instance-ai/src/runtime/background-task-manager.ts index c8f22f6bd77..0b1d4cf824f 100644 --- a/packages/@n8n/instance-ai/src/runtime/background-task-manager.ts +++ b/packages/@n8n/instance-ai/src/runtime/background-task-manager.ts @@ -21,6 +21,7 @@ export interface ManagedBackgroundTask { plannedTaskId?: string; workItemId?: string; traceContext?: InstanceAiTraceContext; + createTraceContext?: () => Promise; /** Identity used for single-flight dedupe lookups; copied from the spawn options. */ dedupeKey?: BackgroundTaskDedupeKey; /** @@ -52,6 +53,7 @@ export interface SpawnManagedBackgroundTaskOptions { plannedTaskId?: string; workItemId?: string; traceContext?: InstanceAiTraceContext; + createTraceContext?: () => Promise; /** * Identity for single-flight dedupe. When supplied, a spawn with the same `plannedTaskId` * (primary) or `role + workflowId` (fallback) as a currently-running task returns @@ -72,6 +74,7 @@ export interface SpawnManagedBackgroundTaskOptions { signal: AbortSignal, drainCorrections: () => string[], waitForCorrection: () => Promise, + taskContext: { traceContext?: InstanceAiTraceContext }, ) => Promise; onLimitReached?: (errorMessage: string) => void; onCompleted?: (task: ManagedBackgroundTask) => void | Promise; @@ -300,10 +303,14 @@ export class BackgroundTaskManager { }); try { + if (!task.traceContext && options.createTraceContext) { + task.traceContext = await options.createTraceContext(); + } const raw = await options.run( task.abortController.signal, drainCorrections, waitForCorrection, + { traceContext: task.traceContext }, ); task.status = 'completed'; task.result = typeof raw === 'string' ? raw : raw.text; diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/build-workflow-agent.tool.ts b/packages/@n8n/instance-ai/src/tools/orchestration/build-workflow-agent.tool.ts index 0d77ce7c76c..e94ed47d0d1 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/build-workflow-agent.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/build-workflow-agent.tool.ts @@ -20,7 +20,7 @@ import { import { compactBuilderMemoryThread } from './builder-memory-compaction'; import { truncateLabel } from './display-utils'; import { - createDetachedSubAgentTracing, + createDetachedSubAgentTraceFactory, traceSubAgentTools, withTraceContextActor, } from './tracing-utils'; @@ -684,30 +684,32 @@ export async function startBuildWorkflowAgentTask( : undefined, runningTasks: runningTaskSummaries, }); - let traceContext: Awaited>; - try { - traceContext = await createDetachedSubAgentTracing(context, { - agentId: subAgentId, - role: 'workflow-builder', - kind: 'builder', - taskId, - plannedTaskId: input.plannedTaskId, - workItemId, - inputs: { - task: input.task, - workflowId: input.workflowId, - conversationContext: input.conversationContext, - }, - }); - } catch (error) { - if (reusedBuilderSession) { - void context.builderSandboxSessionRegistry?.release(reusedBuilderSession.sessionId, { - keep: true, - reason: 'trace_setup_failed', - }); + const detachedTraceFactory = createDetachedSubAgentTraceFactory(context, { + agentId: subAgentId, + role: 'workflow-builder', + kind: 'builder', + taskId, + plannedTaskId: input.plannedTaskId, + workItemId, + inputs: { + task: input.task, + workflowId: input.workflowId, + conversationContext: input.conversationContext, + }, + }); + const createTraceContext = async () => { + try { + return await detachedTraceFactory(); + } catch (error) { + if (reusedBuilderSession) { + void context.builderSandboxSessionRegistry?.release(reusedBuilderSession.sessionId, { + keep: true, + reason: 'trace_setup_failed', + }); + } + throw error; } - throw error; - } + }; let spawnOutcome: ReturnType; try { @@ -716,7 +718,7 @@ export async function startBuildWorkflowAgentTask( threadId: context.threadId, agentId: subAgentId, role: 'workflow-builder', - traceContext, + createTraceContext, plannedTaskId: input.plannedTaskId, workItemId, dedupeKey: { @@ -731,7 +733,12 @@ export async function startBuildWorkflowAgentTask( // bare background-task-completed shell. parentCheckpointId: context.isCheckpointFollowUp === true ? context.checkpointTaskId : undefined, - run: async (signal, drainCorrections, waitForCorrection): Promise => + run: async ( + signal, + drainCorrections, + waitForCorrection, + { traceContext }, + ): Promise => await withTraceContextActor(traceContext, async () => { let builderWs: BuilderWorkspace | undefined; let activeBuilderSession: BuilderSandboxSession | undefined = reusedBuilderSession; diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/data-table-agent.tool.ts b/packages/@n8n/instance-ai/src/tools/orchestration/data-table-agent.tool.ts index cf061787591..b39433670e3 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/data-table-agent.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/data-table-agent.tool.ts @@ -13,7 +13,7 @@ import { z } from 'zod'; import { DATA_TABLE_AGENT_PROMPT } from './data-table-agent.prompt'; import { truncateLabel } from './display-utils'; import { - createDetachedSubAgentTracing, + createDetachedSubAgentTraceFactory, traceSubAgentTools, withTraceContextActor, } from './tracing-utils'; @@ -63,7 +63,7 @@ export async function startDataTableAgentTask( const subAgentId = input.agentId ?? `agent-datatable-${nanoid(6)}`; const taskId = input.taskId ?? `datatable-${nanoid(8)}`; - const traceContext = await createDetachedSubAgentTracing(context, { + const createTraceContext = createDetachedSubAgentTraceFactory(context, { agentId: subAgentId, role: 'data-table-manager', kind: 'data-table', @@ -81,12 +81,12 @@ export async function startDataTableAgentTask( threadId: context.threadId, agentId: subAgentId, role: 'data-table-manager', - traceContext, + createTraceContext, plannedTaskId: input.plannedTaskId, dedupeKey: { role: 'data-table-manager', plannedTaskId: input.plannedTaskId }, parentCheckpointId: context.isCheckpointFollowUp === true ? context.checkpointTaskId : undefined, - run: async (signal, _drainCorrections, _waitForCorrection) => { + run: async (signal, _drainCorrections, _waitForCorrection, { traceContext }) => { return await withTraceContextActor(traceContext, async () => { const subAgent = new Agent('Data Table Agent') .model(context.modelId) diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/delegate.tool.ts b/packages/@n8n/instance-ai/src/tools/orchestration/delegate.tool.ts index 957fd036ef4..fafefe15b69 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/delegate.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/delegate.tool.ts @@ -4,7 +4,7 @@ import { nanoid } from 'nanoid'; import { delegateInputSchema, delegateOutputSchema, type DelegateInput } from './delegate.schemas'; import { truncateLabel } from './display-utils'; import { - createDetachedSubAgentTracing, + createDetachedSubAgentTraceFactory, failTraceRun, finishTraceRun, startSubAgentTrace, @@ -132,7 +132,7 @@ export async function startDetachedDelegateTask( input.artifacts, input.conversationContext, ); - const traceContext = await createDetachedSubAgentTracing(context, { + const createTraceContext = createDetachedSubAgentTraceFactory(context, { agentId: subAgentId, role, kind: 'delegate', @@ -152,12 +152,12 @@ export async function startDetachedDelegateTask( threadId: context.threadId, agentId: subAgentId, role, - traceContext, + createTraceContext, plannedTaskId: input.plannedTaskId, dedupeKey: { role, plannedTaskId: input.plannedTaskId }, parentCheckpointId: context.isCheckpointFollowUp === true ? context.checkpointTaskId : undefined, - run: async (signal, drainCorrections, waitForCorrection) => { + run: async (signal, drainCorrections, waitForCorrection, { traceContext }) => { return await withTraceContextActor(traceContext, async () => { const subAgent = createSubAgent({ agentId: subAgentId, diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/research-with-agent.tool.ts b/packages/@n8n/instance-ai/src/tools/orchestration/research-with-agent.tool.ts index ebac4899158..ef12296e977 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/research-with-agent.tool.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/research-with-agent.tool.ts @@ -12,7 +12,7 @@ import { z } from 'zod'; import { truncateLabel } from './display-utils'; import { RESEARCH_AGENT_PROMPT } from './research-agent-prompt'; import { - createDetachedSubAgentTracing, + createDetachedSubAgentTraceFactory, traceSubAgentTools, withTraceContextActor, } from './tracing-utils'; @@ -63,7 +63,7 @@ export async function startResearchAgentTask( additionalContext: input.constraints ? `Constraints: ${input.constraints}` : undefined, runningTasks: context.getRunningTaskSummaries?.(), }); - const traceContext = await createDetachedSubAgentTracing(context, { + const createTraceContext = createDetachedSubAgentTraceFactory(context, { agentId: subAgentId, role: 'web-researcher', kind: 'research', @@ -82,12 +82,12 @@ export async function startResearchAgentTask( threadId: context.threadId, agentId: subAgentId, role: 'web-researcher', - traceContext, + createTraceContext, plannedTaskId: input.plannedTaskId, dedupeKey: { role: 'web-researcher', plannedTaskId: input.plannedTaskId }, parentCheckpointId: context.isCheckpointFollowUp === true ? context.checkpointTaskId : undefined, - run: async (signal, drainCorrections, waitForCorrection) => { + run: async (signal, drainCorrections, waitForCorrection, { traceContext }) => { return await withTraceContextActor(traceContext, async () => { const subAgent = new Agent('Web Research Agent') .model(context.modelId) diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts b/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts index a4b0dd52f00..cc78d976406 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts @@ -50,7 +50,14 @@ export async function createDetachedSubAgentTracing( context: OrchestrationContext, options: StartSubAgentTraceOptions, ): Promise { - if (!context.tracing) return undefined; + return await createDetachedSubAgentTraceFactory(context, options)(); +} + +export function createDetachedSubAgentTraceFactory( + context: OrchestrationContext, + options: StartSubAgentTraceOptions, +): () => Promise { + if (!context.tracing) return async () => undefined; const messageId = typeof context.tracing.actorRun.metadata?.message_id === 'string' @@ -69,46 +76,51 @@ export async function createDetachedSubAgentTracing( ? context.tracing.actorRun.metadata.agent_role : undefined; const activeSpanContext = getCurrentOtelSpanContext(); - const tracing = await createDetachedSubAgentTraceContext({ - projectName: context.tracing.projectName, - threadId: context.threadId, - conversationId, - messageGroupId: context.messageGroupId, - messageId, - runId: context.runId, - userId: context.userId, - modelId: context.modelId, - input: options.inputs, - metadata: options.metadata, - agentId: options.agentId, - role: options.role, - kind: options.kind, - taskId: options.taskId, - plannedTaskId: options.plannedTaskId, - workItemId: options.workItemId, - spawnedByTraceId: - activeSpanContext?.traceId ?? - context.tracing.rootRun.otelTraceId ?? - context.tracing.rootRun.traceId, - spawnedBySpanId: activeSpanContext?.spanId ?? context.tracing.actorRun.otelSpanId, - spawnedByRunId: context.tracing.actorRun.id, - spawnedByAgentId, - spawnedByAgentRole, - spawnedByToolCallId: getCurrentTraceToolCallId(), - proxyConfig: context.tracingProxyConfig, - }); + const spawnedByToolCallId = getCurrentTraceToolCallId(); - if (tracing) { - mergeCurrentTraceMetadata({ - detached_trace: true, - spawned_role: options.role, - ...(options.taskId ? { spawned_task_id: options.taskId } : {}), - spawned_trace_id: tracing.rootRun.traceId, - spawned_root_run_id: tracing.rootRun.id, + return async () => { + if (!context.tracing) return undefined; + const tracing = await createDetachedSubAgentTraceContext({ + projectName: context.tracing.projectName, + threadId: context.threadId, + conversationId, + messageGroupId: context.messageGroupId, + messageId, + runId: context.runId, + userId: context.userId, + modelId: context.modelId, + input: options.inputs, + metadata: options.metadata, + agentId: options.agentId, + role: options.role, + kind: options.kind, + taskId: options.taskId, + plannedTaskId: options.plannedTaskId, + workItemId: options.workItemId, + spawnedByTraceId: + activeSpanContext?.traceId ?? + context.tracing.rootRun.otelTraceId ?? + context.tracing.rootRun.traceId, + spawnedBySpanId: activeSpanContext?.spanId ?? context.tracing.actorRun.otelSpanId, + spawnedByRunId: context.tracing.actorRun.id, + spawnedByAgentId, + spawnedByAgentRole, + spawnedByToolCallId, + proxyConfig: context.tracingProxyConfig, }); - } - return tracing; + if (tracing) { + mergeCurrentTraceMetadata({ + detached_trace: true, + spawned_role: options.role, + ...(options.taskId ? { spawned_task_id: options.taskId } : {}), + spawned_trace_id: tracing.rootRun.traceId, + spawned_root_run_id: tracing.rootRun.id, + }); + } + + return tracing; + }; } export function traceSubAgentTools( diff --git a/packages/@n8n/instance-ai/src/types.ts b/packages/@n8n/instance-ai/src/types.ts index f0f702434ac..b0c4ca94dd1 100644 --- a/packages/@n8n/instance-ai/src/types.ts +++ b/packages/@n8n/instance-ai/src/types.ts @@ -896,7 +896,10 @@ export interface SpawnBackgroundTaskOptions { threadId: string; agentId: string; role: string; + /** Existing trace context for legacy callers. Prefer createTraceContext for new background tasks. */ traceContext?: InstanceAiTraceContext; + /** Lazily creates the background trace only after the task is accepted and starts executing. */ + createTraceContext?: () => Promise; /** When set, links the background task back to a planned task in the scheduler. */ plannedTaskId?: string; /** Unique work item ID for workflow loop tracking. When set, the service @@ -927,6 +930,7 @@ export interface SpawnBackgroundTaskOptions { signal: AbortSignal, drainCorrections: () => string[], waitForCorrection: () => Promise, + taskContext: { traceContext?: InstanceAiTraceContext }, ) => Promise; } diff --git a/packages/cli/src/modules/instance-ai/instance-ai.service.ts b/packages/cli/src/modules/instance-ai/instance-ai.service.ts index da5e831063d..50b696e2d02 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai.service.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai.service.ts @@ -2983,6 +2983,7 @@ export class InstanceAiService { plannedTaskId: opts.plannedTaskId, workItemId: opts.workItemId, traceContext: opts.traceContext, + createTraceContext: opts.createTraceContext, dedupeKey: opts.dedupeKey, parentCheckpointId: opts.parentCheckpointId, run: opts.run,