From e5e0cb97cdc192cf6e07a4b5c5d431bbf98eea1d Mon Sep 17 00:00:00 2001 From: Oleg Ivaniv Date: Tue, 5 May 2026 17:07:34 +0200 Subject: [PATCH] refactor(instance-ai): move product tracing to otel --- .../src/__tests__/agent-runtime.test.ts | 3 + .../@n8n/agents/src/runtime/agent-runtime.ts | 3 +- .../@n8n/agents/src/runtime/tool-adapter.ts | 4 +- packages/@n8n/agents/src/types/sdk/tool.ts | 4 + ...0000-AddOtelIdsToInstanceAiRunSnapshots.ts | 14 + .../db/src/migrations/postgresdb/index.ts | 2 + .../@n8n/db/src/migrations/sqlite/index.ts | 2 + packages/@n8n/instance-ai/TRACING_SPECS.md | 92 +++-- .../src/storage/agent-tree-snapshot.ts | 2 + .../src/tools/orchestration/tracing-utils.ts | 17 +- .../__tests__/langsmith-tracing.test.ts | 264 +++++++++++-- .../src/tracing/langsmith-tracing.ts | 370 +++++++++++++++++- .../instance-ai-run-snapshot.entity.ts | 6 + .../instance-ai/instance-ai.service.ts | 10 +- .../__tests__/db-snapshot-storage.test.ts | 23 +- .../storage/db-snapshot-storage.ts | 18 +- 16 files changed, 731 insertions(+), 103 deletions(-) create mode 100644 packages/@n8n/db/src/migrations/common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots.ts diff --git a/packages/@n8n/agents/src/__tests__/agent-runtime.test.ts b/packages/@n8n/agents/src/__tests__/agent-runtime.test.ts index 26fd9efb650..396f408ceaa 100644 --- a/packages/@n8n/agents/src/__tests__/agent-runtime.test.ts +++ b/packages/@n8n/agents/src/__tests__/agent-runtime.test.ts @@ -2184,6 +2184,7 @@ describe('AgentRuntime — telemetry propagation', () => { it('passes resolved telemetry to tool handlers via parentTelemetry', async () => { let capturedTelemetry: BuiltTelemetry | undefined; + let capturedToolCallId: string | undefined; const spyTool: BuiltTool = new ToolBuilder('spy') .description('captures telemetry from context') @@ -2191,6 +2192,7 @@ describe('AgentRuntime — telemetry propagation', () => { .output(z.object({ ok: z.boolean() })) .handler(async (_input, ctx) => { capturedTelemetry = ctx.parentTelemetry; + capturedToolCallId = ctx.toolCallId; return await Promise.resolve({ ok: true }); }) .build(); @@ -2211,6 +2213,7 @@ describe('AgentRuntime — telemetry propagation', () => { await runtime.generate('test'); expect(capturedTelemetry).toBe(baseTelemetry); + expect(capturedToolCallId).toBe('tc1'); }); it('emits AI SDK-compatible telemetry spans for local tool execution', async () => { diff --git a/packages/@n8n/agents/src/runtime/agent-runtime.ts b/packages/@n8n/agents/src/runtime/agent-runtime.ts index 8dc0afaa213..a56f6464714 100644 --- a/packages/@n8n/agents/src/runtime/agent-runtime.ts +++ b/packages/@n8n/agents/src/runtime/agent-runtime.ts @@ -1717,7 +1717,8 @@ export class AgentRuntime { toolName, toolInput, resolvedTelemetry, - async () => await executeTool(toolInput, builtTool, resumeData, resolvedTelemetry), + async () => + await executeTool(toolInput, builtTool, resumeData, resolvedTelemetry, toolCallId), ); } catch (error) { return makeToolError(error as Error); diff --git a/packages/@n8n/agents/src/runtime/tool-adapter.ts b/packages/@n8n/agents/src/runtime/tool-adapter.ts index 02635a4bf6a..7a690fc5884 100644 --- a/packages/@n8n/agents/src/runtime/tool-adapter.ts +++ b/packages/@n8n/agents/src/runtime/tool-adapter.ts @@ -142,6 +142,7 @@ export async function executeTool( builtTool: BuiltTool, resumeData?: unknown, parentTelemetry?: BuiltTelemetry, + toolCallId?: string, ): Promise { if (!builtTool.handler) { throw new Error(`No handler found for tool "${builtTool.name}"`); @@ -154,11 +155,12 @@ export async function executeTool( }, resumeData, parentTelemetry, + toolCallId, }; return await builtTool.handler(args, ctx); } - const ctx: ToolContext = { parentTelemetry }; + const ctx: ToolContext = { parentTelemetry, toolCallId }; return await builtTool.handler(args, ctx); } diff --git a/packages/@n8n/agents/src/types/sdk/tool.ts b/packages/@n8n/agents/src/types/sdk/tool.ts index 20969844e5e..9bc6ad3c4e0 100644 --- a/packages/@n8n/agents/src/types/sdk/tool.ts +++ b/packages/@n8n/agents/src/types/sdk/tool.ts @@ -6,6 +6,8 @@ import type { BuiltTelemetry } from '../telemetry'; import type { JSONObject } from '../utils/json'; export interface ToolContext { + /** AI SDK tool call ID for the current local tool execution. */ + toolCallId?: string; /** Telemetry config from the parent agent, for sub-agent propagation. */ parentTelemetry?: BuiltTelemetry; } @@ -19,6 +21,8 @@ export interface InterruptibleToolContext { suspend: (payload: S) => Promise; /** Data from the consumer after resume. Undefined on first invocation. */ resumeData: R | undefined; + /** AI SDK tool call ID for the current local tool execution. */ + toolCallId?: string; /** Telemetry config from the parent agent, for sub-agent propagation. */ parentTelemetry?: BuiltTelemetry; } diff --git a/packages/@n8n/db/src/migrations/common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots.ts b/packages/@n8n/db/src/migrations/common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots.ts new file mode 100644 index 00000000000..db22a9df685 --- /dev/null +++ b/packages/@n8n/db/src/migrations/common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots.ts @@ -0,0 +1,14 @@ +import type { MigrationContext, ReversibleMigration } from '../migration-types'; + +export class AddOtelIdsToInstanceAiRunSnapshots1778065000000 implements ReversibleMigration { + async up({ schemaBuilder: { addColumns, column } }: MigrationContext) { + await addColumns('instance_ai_run_snapshots', [ + column('traceId').varchar(32).comment('OpenTelemetry trace ID for the product root span.'), + column('spanId').varchar(16).comment('OpenTelemetry span ID for the product root span.'), + ]); + } + + async down({ schemaBuilder: { dropColumns } }: MigrationContext) { + await dropColumns('instance_ai_run_snapshots', ['traceId', 'spanId']); + } +} diff --git a/packages/@n8n/db/src/migrations/postgresdb/index.ts b/packages/@n8n/db/src/migrations/postgresdb/index.ts index 552731211b4..519541d731b 100644 --- a/packages/@n8n/db/src/migrations/postgresdb/index.ts +++ b/packages/@n8n/db/src/migrations/postgresdb/index.ts @@ -170,6 +170,7 @@ import { CreateAiBuilderTemporaryWorkflowTable1777281990043 } from '../common/17 import { AddExecutionDeduplicationKey1778000000000 } from '../common/1778000000000-AddExecutionDeduplicationKey'; import { CreateInstanceAiCheckpointTable1778050000000 } from '../common/1778050000000-CreateInstanceAiCheckpointTable'; import { ResetInstanceAiNativePersistence1778060000000 } from '../common/1778060000000-ResetInstanceAiNativePersistence'; +import { AddOtelIdsToInstanceAiRunSnapshots1778065000000 } from '../common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots'; import type { Migration } from '../migration-types'; export const postgresMigrations: Migration[] = [ @@ -345,4 +346,5 @@ export const postgresMigrations: Migration[] = [ CreateAiBuilderTemporaryWorkflowTable1777281990043, ExpandVariablesValueColumnToText1777420800000, ResetInstanceAiNativePersistence1778060000000, + AddOtelIdsToInstanceAiRunSnapshots1778065000000, ]; diff --git a/packages/@n8n/db/src/migrations/sqlite/index.ts b/packages/@n8n/db/src/migrations/sqlite/index.ts index 0d7210d3489..796b381daaa 100644 --- a/packages/@n8n/db/src/migrations/sqlite/index.ts +++ b/packages/@n8n/db/src/migrations/sqlite/index.ts @@ -163,6 +163,7 @@ import { CreateAiBuilderTemporaryWorkflowTable1777281990043 } from '../common/17 import { AddExecutionDeduplicationKey1778000000000 } from '../common/1778000000000-AddExecutionDeduplicationKey'; import { CreateInstanceAiCheckpointTable1778050000000 } from '../common/1778050000000-CreateInstanceAiCheckpointTable'; import { ResetInstanceAiNativePersistence1778060000000 } from '../common/1778060000000-ResetInstanceAiNativePersistence'; +import { AddOtelIdsToInstanceAiRunSnapshots1778065000000 } from '../common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots'; import type { Migration } from '../migration-types'; const sqliteMigrations: Migration[] = [ @@ -331,6 +332,7 @@ const sqliteMigrations: Migration[] = [ AddTracingContextToExecution1777045000000, CreateAiBuilderTemporaryWorkflowTable1777281990043, ResetInstanceAiNativePersistence1778060000000, + AddOtelIdsToInstanceAiRunSnapshots1778065000000, ]; export { sqliteMigrations }; diff --git a/packages/@n8n/instance-ai/TRACING_SPECS.md b/packages/@n8n/instance-ai/TRACING_SPECS.md index 2d12eb8e804..5e9300641dc 100644 --- a/packages/@n8n/instance-ai/TRACING_SPECS.md +++ b/packages/@n8n/instance-ai/TRACING_SPECS.md @@ -77,17 +77,35 @@ Implemented so far: - The LangSmith OTel processor filters noisy AI SDK wrapper spans so provider request spans such as `ai.streamText.doStream` can appear directly under the agent root. +- Instance AI product roots and child spans now use the same OTel + tracer/provider as native agent telemetry in normal execution. +- Normal foreground and detached trace creation no longer creates RunTree spans. +- Agent tree snapshots persist OTel trace/span IDs alongside derived LangSmith + IDs for feedback anchoring. Still wrong: -- `langsmith-tracing.ts` still creates RunTree product spans for live execution. -- Product message turns and native LLM spans are split across separate - LangSmith traces. -- Token usage lives on native OTel spans but does not roll up to RunTree - product roots. -- Feedback anchoring still assumes persisted RunTree IDs. -- Some product spans exist twice: once as RunTree spans and once as native - tool/agent OTel spans. +- Live LangSmith validation has proved feedback against an OTel-only product + root; full provider-span validation with a real model turn is still pending. +- Some fallback RunTree compatibility code remains for legacy/replay-only + paths and should be deleted after rollout validation. +- Detached sub-agent linking captures spawning trace/span metadata and model + tool-call IDs when a detached task is spawned from a local tool handler. + +## Hybrid Reference Notes + +The last working hybrid traces showed RunTree product nodes such as +`message_turn`, `orchestrator`, `context_compaction`, `prompt_build`, and +`subagent:planner` beside native OTel nodes such as `ai.streamText.doStream`. +This proved product semantics and native AI SDK telemetry could both be +exported, but LangSmith displayed them as split turn/root groups and did not +roll token usage up to the product roots. + +The failure mode to avoid is forcing native OTel spans under RunTree IDs. In +that shape, LangSmith can lose or separate provider spans, and the trace no +longer shows the complete system/user/tool/provider turn under a single OTel +context. Regression coverage now asserts normal Instance AI trace creation does +not create RunTree spans. ## Target Architecture @@ -471,72 +489,72 @@ must not require LangSmith to be available. 1. Document and freeze the current hybrid behavior - - [ ] Keep examples of a working hybrid trace with native LLM spans. - - [ ] Keep examples of the failure mode when OTel spans are forced under + - [x] Keep examples of a working hybrid trace with native LLM spans. + - [x] Keep examples of the failure mode when OTel spans are forced under RunTree parent IDs. - - [ ] Add a short note in tests or fixtures explaining why RunTree/OTel + - [x] Add a short note in tests or fixtures explaining why RunTree/OTel parent mixing is forbidden. 2. Add an OTel product tracing adapter - - [ ] Create an Instance AI adapter that starts active OTel spans using the + - [x] Create an Instance AI adapter that starts active OTel spans using the same tracer/provider as native agent telemetry. - - [ ] Support `withSpan`, `startSpan`, `finishSpan`, `failSpan`, and + - [x] Support `withSpan`, `startSpan`, `finishSpan`, `failSpan`, and metadata merging. - - [ ] Ensure active context propagates into `@n8n/agents` runtime calls. - - [ ] Ensure spans flush before response close, suspension persistence, and + - [x] Ensure active context propagates into `@n8n/agents` runtime calls. + - [x] Ensure spans flush before response close, suspension persistence, and detached task completion. 3. Replace RunTree message turn roots - - [ ] Create `instance-ai.message_turn` as an OTel root span. - - [ ] Persist OTel trace/span IDs in the agent tree snapshot. - - [ ] Add metadata required by LangSmith thread view. - - [ ] Remove RunTree creation from the normal foreground path. + - [x] Create `instance-ai.message_turn` as an OTel root span. + - [x] Persist OTel trace/span IDs in the agent tree snapshot. + - [x] Add metadata required by LangSmith thread view. + - [x] Remove RunTree creation from the normal foreground path. 4. Replace RunTree product child spans - - [ ] Convert `orchestrator`, `context_compaction`, and `prompt_build` to + - [x] Convert `orchestrator`, `context_compaction`, and `prompt_build` to OTel spans. - - [ ] Convert inline `subagent:*` spans to OTel spans under active context. - - [ ] Convert HITL suspend/resume spans to OTel spans. - - [ ] Convert selected side-effect-heavy tools to OTel product spans. + - [x] Convert inline `subagent:*` spans to OTel spans under active context. + - [x] Convert HITL suspend/resume spans to OTel spans. + - [x] Convert selected side-effect-heavy tools to OTel product spans. 5. Preserve detached/background sub-agent linking - - [ ] Create detached sub-agent roots as separate OTel traces when they run + - [x] Create detached sub-agent roots as separate OTel traces when they run outside the foreground context. - - [ ] Add spawning metadata: trace ID, span ID, tool call ID, task ID, and + - [x] Add spawning metadata: trace ID, span ID, tool call ID, task ID, and agent role. - [ ] Confirm thread queries show detached roots alongside foreground turns. 6. Rework feedback anchoring - - [ ] Choose explicit LangSmith IDs, derived OTel IDs, or metadata lookup. - - [ ] Prove `Client.createFeedback` works against an OTel-only product root. - - [ ] Persist the chosen IDs in the snapshot. - - [ ] Remove RunTree as a feedback dependency. + - [x] Choose explicit LangSmith IDs, derived OTel IDs, or metadata lookup. + - [x] Prove `Client.createFeedback` works against an OTel-only product root. + - [x] Persist the chosen IDs in the snapshot. + - [x] Remove RunTree as a feedback dependency. 7. Remove RunTree live tracing - - [ ] Remove normal-path `RunTree` root creation. - - [ ] Remove normal-path manual RunTree tool wrappers. + - [x] Remove normal-path `RunTree` root creation. + - [x] Remove normal-path manual RunTree tool wrappers. - [ ] Keep only temporary compatibility code behind an explicit flag, if needed for rollout. - [ ] Delete compatibility code after validation. 8. Decouple replay from tracing - - [ ] Ensure replay records stable Instance AI events, not span IDs. - - [ ] Ensure replay tests pass with LangSmith disabled. + - [x] Ensure replay records stable Instance AI events, not span IDs. + - [x] Ensure replay tests pass with LangSmith disabled. - [ ] Optionally emit replay-tagged OTel spans for debugging only. 9. Add regression coverage - - [ ] Unit test metadata construction. - - [ ] Unit test OTel product span parentage. - - [ ] Unit test feedback ID persistence. - - [ ] Unit test redaction preserving token usage. + - [x] Unit test metadata construction. + - [x] Unit test OTel product span parentage. + - [x] Unit test feedback ID persistence. + - [x] Unit test redaction preserving token usage. - [ ] Local exporter test proving one foreground message turn contains product spans, native provider spans, and local tool spans. - [ ] Live LangSmith validation behind explicit credentials. diff --git a/packages/@n8n/instance-ai/src/storage/agent-tree-snapshot.ts b/packages/@n8n/instance-ai/src/storage/agent-tree-snapshot.ts index 6e473059c23..1ec013737df 100644 --- a/packages/@n8n/instance-ai/src/storage/agent-tree-snapshot.ts +++ b/packages/@n8n/instance-ai/src/storage/agent-tree-snapshot.ts @@ -5,6 +5,8 @@ export interface AgentTreeSnapshot { runId: string; messageGroupId?: string; runIds?: string[]; + traceId?: string; + spanId?: string; langsmithRunId?: string; langsmithTraceId?: string; } diff --git a/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts b/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts index 0b7ba2b6ab1..05a2014096f 100644 --- a/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts +++ b/packages/@n8n/instance-ai/src/tools/orchestration/tracing-utils.ts @@ -1,5 +1,7 @@ import { createDetachedSubAgentTraceContext, + getCurrentOtelSpanContext, + getCurrentTraceToolCallId, mergeCurrentTraceMetadata, } from '../../tracing/langsmith-tracing'; import type { @@ -29,7 +31,7 @@ export async function startSubAgentTrace( if (!context.tracing) return undefined; return await context.tracing.startChildRun(context.tracing.actorRun, { - name: `subagent:${options.role}`, + name: `instance-ai.subagent.${options.role}.stream`, tags: ['sub-agent'], metadata: { agent_role: options.role, @@ -62,6 +64,11 @@ export async function createDetachedSubAgentTracing( typeof context.tracing.actorRun.metadata?.agent_id === 'string' ? context.tracing.actorRun.metadata.agent_id : context.orchestratorAgentId; + const spawnedByAgentRole = + typeof context.tracing.actorRun.metadata?.agent_role === 'string' + ? context.tracing.actorRun.metadata.agent_role + : undefined; + const activeSpanContext = getCurrentOtelSpanContext(); const tracing = await createDetachedSubAgentTraceContext({ projectName: context.tracing.projectName, threadId: context.threadId, @@ -79,9 +86,15 @@ export async function createDetachedSubAgentTracing( taskId: options.taskId, plannedTaskId: options.plannedTaskId, workItemId: options.workItemId, - spawnedByTraceId: context.tracing.rootRun.traceId, + spawnedByTraceId: + activeSpanContext?.traceId ?? + context.tracing.rootRun.otelTraceId ?? + context.tracing.rootRun.traceId, + spawnedBySpanId: activeSpanContext?.spanId ?? context.tracing.actorRun.otelSpanId, spawnedByRunId: context.tracing.actorRun.id, spawnedByAgentId, + spawnedByAgentRole, + spawnedByToolCallId: getCurrentTraceToolCallId(), proxyConfig: context.tracingProxyConfig, }); diff --git a/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts b/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts index 1b4f00501c3..9f3a7ea0c5d 100644 --- a/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts +++ b/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts @@ -1,6 +1,156 @@ +import type { Context } from '@opentelemetry/api'; import { jsonParse } from 'n8n-workflow'; import { executeTool } from '../../__tests__/tool-test-utils'; + +jest.mock('@n8n/agents', () => { + const actual = jest.requireActual>('@n8n/agents'); + const { context, trace } = jest.requireActual<{ + context: { + active(): Context; + with(ctx: Context, fn: () => T): T; + }; + trace: { + getSpan(ctx: Context): unknown; + setSpan(ctx: Context, span: unknown): Context; + }; + }>('@opentelemetry/api'); + + let spanCounter = 0; + const spans: Array<{ + id: string; + traceId: string; + parentSpanId?: string; + name: string; + attributes: Record; + status?: { code: number; message?: string }; + ended: boolean; + }> = []; + + function nextHex(length: number): string { + spanCounter += 1; + return spanCounter.toString(16).padStart(length, '0').slice(-length); + } + + class MockSpan { + private readonly traceId: string; + private readonly spanId: string; + private readonly record: (typeof spans)[number]; + + constructor(name: string, attributes: Record, parentSpan?: MockSpan) { + this.traceId = parentSpan?.spanContext().traceId ?? nextHex(32); + this.spanId = nextHex(16); + this.record = { + id: this.spanId, + traceId: this.traceId, + ...(parentSpan ? { parentSpanId: parentSpan.spanContext().spanId } : {}), + name, + attributes: { ...attributes }, + ended: false, + }; + spans.push(this.record); + } + + spanContext(): { traceId: string; spanId: string } { + return { traceId: this.traceId, spanId: this.spanId }; + } + + setAttributes(attributes: Record): void { + Object.assign(this.record.attributes, attributes); + } + + recordException(): void {} + + setStatus(status: { code: number; message?: string }): void { + this.record.status = status; + } + + end(): void { + this.record.ended = true; + } + } + + const tracer = { + startSpan: ( + name: string, + options?: { attributes?: Record }, + parentContext?: Context, + ) => { + const parentSpan = trace.getSpan(parentContext ?? context.active()) as MockSpan | undefined; + return new MockSpan(name, options?.attributes ?? {}, parentSpan); + }, + startActiveSpan: async ( + name: string, + options: { attributes?: Record }, + fn: (span: MockSpan) => Promise, + ): Promise => { + const span = tracer.startSpan(name, options); + const spanContext = trace.setSpan(context.active(), span as never); + return await context.with(spanContext, async () => await fn(span)); + }, + }; + + const provider = { + forceFlush: jest.fn(async () => await Promise.resolve()), + shutdown: jest.fn(async () => await Promise.resolve()), + }; + + class MockLangSmithTelemetry { + private functionIdValue?: string; + private metadataValue?: Record; + private recordInputsValue = true; + private recordOutputsValue = true; + + functionId(value: string): this { + this.functionIdValue = value; + return this; + } + + metadata(value: Record): this { + this.metadataValue = value; + return this; + } + + recordInputs(value: boolean): this { + this.recordInputsValue = value; + return this; + } + + recordOutputs(value: boolean): this { + this.recordOutputsValue = value; + return this; + } + + async build(): Promise> { + return await Promise.resolve({ + enabled: true, + functionId: this.functionIdValue, + metadata: this.metadataValue, + recordInputs: this.recordInputsValue, + recordOutputs: this.recordOutputsValue, + integrations: [], + tracer, + provider, + }); + } + } + + return { + ...actual, + LangSmithTelemetry: MockLangSmithTelemetry, + __mock: { + reset: () => { + spanCounter = 0; + spans.length = 0; + provider.forceFlush.mockClear(); + provider.shutdown.mockClear(); + }, + getSpans: () => spans, + getProvider: () => provider, + }, + }; +}); + jest.mock('langsmith', () => { let runCounter = 0; const createdRunTrees: Array<{ @@ -245,6 +395,25 @@ type LangSmithMockModule = { }; }; +type AgentsMockModule = { + __mock: { + reset: () => void; + getSpans: () => Array<{ + id: string; + traceId: string; + parentSpanId?: string; + name: string; + attributes: Record; + status?: { code: number; message?: string }; + ended: boolean; + }>; + getProvider: () => { + forceFlush: jest.Mock, []>; + shutdown: jest.Mock, []>; + }; + }; +}; + function isExecutableTool( value: unknown, ): value is { handler: (input: unknown, context: unknown) => Promise } { @@ -274,6 +443,9 @@ const { createAskUserTool } = const { __mock: langsmithMock } = // eslint-disable-next-line @typescript-eslint/no-require-imports require('langsmith') as LangSmithMockModule; +const { __mock: agentsMock } = + // eslint-disable-next-line @typescript-eslint/no-require-imports + require('@n8n/agents') as AgentsMockModule; describe('createInstanceAiTraceContext', () => { const originalLangSmithApiKey = process.env.LANGSMITH_API_KEY; @@ -282,6 +454,7 @@ describe('createInstanceAiTraceContext', () => { beforeEach(() => { langsmithMock.reset(); + agentsMock.reset(); process.env.LANGSMITH_API_KEY = 'test-key'; delete process.env.LANGSMITH_TRACING; delete process.env.LANGCHAIN_TRACING_V2; @@ -459,7 +632,7 @@ describe('createInstanceAiTraceContext', () => { }); }); - it('rehydrates child runs with their parent linkage before patching', async () => { + it('finishes OTel child spans with their parent linkage', async () => { const tracing = await createInstanceAiTraceContext({ threadId: 'thread-1', messageId: 'message-1', @@ -475,9 +648,14 @@ describe('createInstanceAiTraceContext', () => { }), ).resolves.toBeUndefined(); - const patchTarget = langsmithMock.getCreatedRunTrees().at(-1); - expect(patchTarget?.id).toBe(tracing?.orchestratorRun.id); - expect(patchTarget?.parent_run_id).toBe(tracing?.messageRun.id); + const spans = agentsMock.getSpans(); + const orchestratorSpan = spans.find((span) => span.id === tracing?.orchestratorRun.otelSpanId); + expect(orchestratorSpan?.parentSpanId).toBe(tracing?.messageRun.otelSpanId); + expect(orchestratorSpan?.ended).toBe(true); + expect(orchestratorSpan?.attributes.gen_ai_completion).toBeUndefined(); + expect(orchestratorSpan?.attributes['gen_ai.completion']).toBe( + JSON.stringify({ result: 'done' }), + ); }); it('reuses the same message root when continuing a trace for the same message group', async () => { @@ -520,8 +698,11 @@ describe('createInstanceAiTraceContext', () => { kind: 'builder', taskId: 'build-1', spawnedByTraceId: 'trace-parent-1', + spawnedBySpanId: 'span-parent-1', spawnedByRunId: 'run-parent-1', spawnedByAgentId: 'agent-001', + spawnedByAgentRole: 'orchestrator', + spawnedByToolCallId: 'toolu-1', input: { task: 'Build a workflow' }, }); @@ -529,7 +710,7 @@ describe('createInstanceAiTraceContext', () => { expect(tracing?.traceKind).toBe('detached_subagent'); expect(tracing?.rootRun.id).toBe(tracing?.actorRun.id); expect(tracing?.rootRun.parentRunId).toBeUndefined(); - expect(tracing?.rootRun.name).toBe('subagent:workflow-builder'); + expect(tracing?.rootRun.name).toBe('instance-ai.subagent.workflow-builder'); expect(tracing?.rootRun.metadata).toEqual( expect.objectContaining({ thread_id: 'thread-1', @@ -538,8 +719,11 @@ describe('createInstanceAiTraceContext', () => { task_kind: 'builder', agent_id: 'agent-builder-1', spawned_by_trace_id: 'trace-parent-1', + spawned_by_span_id: 'span-parent-1', spawned_by_run_id: 'run-parent-1', spawned_by_agent_id: 'agent-001', + spawned_by_agent_role: 'orchestrator', + spawned_by_tool_call_id: 'toolu-1', }), ); }); @@ -712,25 +896,28 @@ describe('createInstanceAiTraceContext', () => { } await tracing!.withRunTree(tracing!.orchestratorRun, async () => { - await executeTool( - wrappedAskUser, + await wrappedAskUser.handler( { questions: [{ id: 'q1', question: 'What do you want?', type: 'text' }], }, { - agent: { - suspend: async () => { - await Promise.resolve(); - return undefined; - }, + toolCallId: 'toolu-ask', + resumeData: undefined, + suspend: async () => { + await Promise.resolve(); + return undefined as never; }, }, ); }); - const createdRunNames = langsmithMock.getCreatedRunTrees().map((run) => run.name); - expect(createdRunNames).toContain('tool:ask-user'); - expect(createdRunNames).toContain('hitl:suspend'); + const spans = agentsMock.getSpans(); + const spanNames = spans.map((span) => span.name); + expect(spanNames).toContain('instance-ai.tool.ask-user'); + expect(spanNames).toContain('instance-ai.hitl.suspend'); + expect( + spans.find((span) => span.name === 'instance-ai.tool.ask-user')?.attributes.tool_call_id, + ).toBe('toolu-ask'); }); it('does not wrap ordinary local tools for product-level LangSmith spans', async () => { @@ -776,7 +963,7 @@ describe('createInstanceAiTraceContext', () => { expect(tracing).toBeDefined(); const subAgentRun = await tracing!.startChildRun(tracing!.orchestratorRun, { - name: 'subagent:workflow-builder', + name: 'instance-ai.subagent.workflow-builder.stream', tags: ['sub-agent'], metadata: { agent_role: 'workflow-builder' }, inputs: { task: 'Build a workflow' }, @@ -795,10 +982,10 @@ describe('createInstanceAiTraceContext', () => { ); }); - const llmRun = langsmithMock - .getCreatedRunTrees() - .find((run) => run.name === 'llm:anthropic/claude-sonnet-4-6'); - expect(llmRun?.parent_run_id).toBe(subAgentRun.id); + const llmSpan = agentsMock + .getSpans() + .find((span) => span.name === 'llm:anthropic/claude-sonnet-4-6'); + expect(llmSpan?.parentSpanId).toBe(subAgentRun.otelSpanId); }); it('traces resumed suspendable tools without extra HITL child span spam', async () => { @@ -861,10 +1048,10 @@ describe('createInstanceAiTraceContext', () => { ], }); - const createdRunNames = langsmithMock.getCreatedRunTrees().map((run) => run.name); - expect(createdRunNames).toContain('tool:ask-user:resume'); - expect(createdRunNames).not.toContain('hitl:resume'); - expect(createdRunNames).not.toContain('hitl:approval'); + const spanNames = agentsMock.getSpans().map((span) => span.name); + expect(spanNames).toContain('instance-ai.tool.ask-user'); + expect(spanNames).toContain('instance-ai.hitl.resume'); + expect(spanNames).not.toContain('instance-ai.hitl.suspend'); }); it('creates ad-hoc child spans under the current run tree', async () => { @@ -890,8 +1077,8 @@ describe('createInstanceAiTraceContext', () => { expect(result).toBe(42); }); - const createdRunNames = langsmithMock.getCreatedRunTrees().map((run) => run.name); - expect(createdRunNames).toContain('prepare_context'); + const spanNames = agentsMock.getSpans().map((span) => span.name); + expect(spanNames).toContain('prepare_context'); }); it('creates trace context when proxyConfig is provided even without env vars', async () => { @@ -920,7 +1107,7 @@ describe('createInstanceAiTraceContext', () => { expect(tracing?.orchestratorRun).toBeDefined(); }); - it('passes client to RunTree when proxyConfig is provided', async () => { + it('creates OTel product spans when proxyConfig is provided', async () => { const tracing = await createInstanceAiTraceContext({ threadId: 'thread-client', messageId: 'message-client', @@ -936,14 +1123,15 @@ describe('createInstanceAiTraceContext', () => { expect(tracing).toBeDefined(); - const rootRunTree = langsmithMock - .getCreatedRunTrees() - .find((run) => run.name === 'message_turn' && run.client); - expect(rootRunTree).toBeDefined(); - expect(rootRunTree?.client).toBeDefined(); + const rootSpan = agentsMock.getSpans().find((span) => span.name === 'instance-ai.message_turn'); + expect(rootSpan).toBeDefined(); + expect(langsmithMock.getCreatedRunTrees()).toHaveLength(0); }); - it('does not pass client to RunTree without proxyConfig', async () => { + it('does not create RunTree spans without proxyConfig', async () => { + // Regression: normal tracing must not mix RunTree product spans with OTel + // native spans, because LangSmith treats those ingestion paths as separate + // trace hierarchies. await createInstanceAiTraceContext({ threadId: 'thread-no-proxy', messageId: 'message-no-proxy', @@ -952,12 +1140,9 @@ describe('createInstanceAiTraceContext', () => { input: { message: 'no proxy test' }, }); - const rootRunTree = langsmithMock - .getCreatedRunTrees() - .find((run) => run.name === 'message_turn'); - expect(rootRunTree).toBeDefined(); - // Without proxyConfig, the direct client is used (never undefined) - expect(rootRunTree?.client).toBeDefined(); + const rootSpan = agentsMock.getSpans().find((span) => span.name === 'instance-ai.message_turn'); + expect(rootSpan).toBeDefined(); + expect(langsmithMock.getCreatedRunTrees()).toHaveLength(0); }); it('returns undefined when tracing is explicitly disabled even with proxy', async () => { @@ -987,6 +1172,7 @@ describe('submitLangsmithUserFeedback', () => { beforeEach(() => { langsmithMock.reset(); + agentsMock.reset(); process.env.LANGSMITH_API_KEY = 'test-key'; delete process.env.LANGSMITH_TRACING; delete process.env.LANGCHAIN_TRACING_V2; diff --git a/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts b/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts index 341897e9214..d9114086a12 100644 --- a/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts +++ b/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts @@ -7,7 +7,12 @@ import { type Telemetry, type ToolContext, } from '@n8n/agents'; -import { context as otelContext, trace as otelTrace } from '@opentelemetry/api'; +import { + ROOT_CONTEXT, + SpanStatusCode, + context as otelContext, + trace as otelTrace, +} from '@opentelemetry/api'; import type { Context as OtelContext, Span as OtelApiSpan } from '@opentelemetry/api'; import { Client, RunTree } from 'langsmith'; import { getCurrentRunTree, withRunTree as withLangSmithRunTree } from 'langsmith/traceable'; @@ -50,6 +55,10 @@ const LOCAL_TOOL_TRACE_NAMES = new Set([ 'complete-checkpoint', ]); const traceParentOverrideStorage = new AsyncLocalStorage<{ current: RunTree | null }>(); +const productTraceStorage = new AsyncLocalStorage<{ + runtime: ProductOtelTraceRuntime; + currentRun: InstanceAiTraceRun; +}>(); // Per-request proxy auth headers, isolated via AsyncLocalStorage. // The proxy Client is cached per deployment URL; each concurrent request @@ -190,13 +199,19 @@ function startProductSpan( metadata?: Record; inputs?: unknown; parentRun?: InstanceAiTraceRun; + parentContext?: OtelContext; + root?: boolean; }, ): InstanceAiTraceRun { if (!isOtelTracer(runtime.telemetry.tracer)) { throw new Error('Instance AI tracing requires an OpenTelemetry tracer'); } - const parentContext = options.parentRun ? runtime.contexts.get(options.parentRun.id) : undefined; + const parentContext = options.root + ? ROOT_CONTEXT + : (options.parentContext ?? + (options.parentRun ? runtime.contexts.get(options.parentRun.id) : undefined) ?? + otelContext.active()); const span = runtime.telemetry.tracer.startSpan( options.name, { @@ -268,10 +283,10 @@ async function finishProductSpan( if (options?.error) { span.recordException(new Error(options.error)); - span.setStatus({ code: 2, message: options.error }); + span.setStatus({ code: SpanStatusCode.ERROR, message: options.error }); run.error = options.error; } else { - span.setStatus({ code: 1 }); + span.setStatus({ code: SpanStatusCode.OK }); } run.endTime = Date.now(); @@ -293,7 +308,65 @@ async function withProductSpanContext( return await fn(); } - return await otelContext.with(spanContext, fn); + return await productTraceStorage.run( + { runtime, currentRun: run }, + async () => await otelContext.with(spanContext, fn), + ); +} + +function getCurrentProductTrace(): + | { runtime: ProductOtelTraceRuntime; currentRun: InstanceAiTraceRun } + | undefined { + return productTraceStorage.getStore(); +} + +function getActiveOtelContextWithSpan(): OtelContext | undefined { + const activeContext = otelContext.active(); + return otelTrace.getSpan(activeContext) ? activeContext : undefined; +} + +function spanMetadataAttributes( + metadata: Record | undefined, +): Record { + const attributes: Record = {}; + for (const [key, value] of Object.entries(metadata ?? {})) { + const attributeValue = toTelemetryAttributeValue(value); + if (attributeValue === undefined) continue; + attributes[key] = attributeValue; + if (!key.startsWith('langsmith.metadata.')) { + attributes[`langsmith.metadata.${key}`] = attributeValue; + } + } + return attributes; +} + +function updateProductRunMetadata( + runtime: ProductOtelTraceRuntime, + run: InstanceAiTraceRun, + metadata: Record, +): void { + const mergedMetadata = mergeMetadata(run.metadata, metadata); + if (!mergedMetadata) return; + + run.metadata = mergedMetadata; + const attributes = spanMetadataAttributes(metadata); + if (Object.keys(attributes).length > 0) { + runtime.spans.get(run.id)?.setAttributes(attributes); + } +} + +function updateProductRunInputs( + runtime: ProductOtelTraceRuntime, + run: InstanceAiTraceRun, + inputs: Record, +): void { + const mergedInputs = sanitizeTracePayload(mergeRunTreeInputs(run.inputs, inputs)); + run.inputs = mergedInputs; + + const prompt = stringifyTracePayload(mergedInputs); + if (prompt !== undefined) { + runtime.spans.get(run.id)?.setAttributes({ [GEN_AI_PROMPT]: prompt }); + } } /** Get a LangSmith Client that uses gzip encoding (no brotli). */ @@ -353,8 +426,11 @@ interface CreateDetachedSubAgentTraceContextOptions extends CreateInstanceAiTrac plannedTaskId?: string; workItemId?: string; spawnedByTraceId?: string; + spawnedBySpanId?: string; spawnedByRunId?: string; spawnedByAgentId?: string; + spawnedByAgentRole?: string; + spawnedByToolCallId?: string; } interface CurrentTraceSpanOptions { @@ -945,7 +1021,38 @@ export function setTraceParentOverride(parentRun: RunTree | null | undefined): v } } +export function getCurrentOtelSpanContext(): { traceId: string; spanId: string } | undefined { + const activeSpanContext = otelTrace.getSpan(otelContext.active())?.spanContext(); + if (activeSpanContext) { + return { + traceId: activeSpanContext.traceId, + spanId: activeSpanContext.spanId, + }; + } + + const currentRun = getCurrentProductTrace()?.currentRun; + if (currentRun?.otelTraceId && currentRun.otelSpanId) { + return { + traceId: currentRun.otelTraceId, + spanId: currentRun.otelSpanId, + }; + } + + return undefined; +} + +export function getCurrentTraceToolCallId(): string | undefined { + const metadata = getCurrentProductTrace()?.currentRun.metadata; + return typeof metadata?.tool_call_id === 'string' ? metadata.tool_call_id : undefined; +} + export function mergeCurrentTraceMetadata(metadata: Record): void { + const currentProductTrace = getCurrentProductTrace(); + if (currentProductTrace) { + updateProductRunMetadata(currentProductTrace.runtime, currentProductTrace.currentRun, metadata); + return; + } + const currentRun = getTraceParentRun(); if (!currentRun) { return; @@ -968,6 +1075,12 @@ export function mergeTraceRunInputs( const mergedInputs = sanitizeTracePayload(mergeRunTreeInputs(run.inputs, inputs)); run.inputs = mergedInputs; + const currentProductTrace = getCurrentProductTrace(); + if (currentProductTrace) { + updateProductRunInputs(currentProductTrace.runtime, run, inputs); + return; + } + const currentRun = getTraceParentRun(); if (currentRun?.id === run.id) { currentRun.inputs = mergedInputs; @@ -1030,6 +1143,36 @@ export async function withCurrentTraceSpan( options: CurrentTraceSpanOptions, fn: () => Promise, ): Promise { + const currentProductTrace = getCurrentProductTrace(); + if (currentProductTrace) { + const activeParentContext = getActiveOtelContextWithSpan(); + const spanRun = startProductSpan(currentProductTrace.runtime, { + projectName: currentProductTrace.currentRun.projectName, + name: options.name, + runType: options.runType ?? 'chain', + tags: options.tags, + metadata: options.metadata, + inputs: options.inputs, + parentRun: currentProductTrace.currentRun, + ...(activeParentContext ? { parentContext: activeParentContext } : {}), + }); + + try { + const result = await withProductSpanContext(currentProductTrace.runtime, spanRun, fn); + await finishProductSpan(currentProductTrace.runtime, spanRun, { + ...(options.processOutputs ? { outputs: options.processOutputs(result) } : {}), + metadata: { final_status: 'completed' }, + }); + return result; + } catch (error) { + await finishProductSpan(currentProductTrace.runtime, spanRun, { + error: normalizeErrorMessage(error), + metadata: { final_status: 'error' }, + }); + throw error; + } + } + const parentRun = getTraceParentRun(); if (!parentRun) { return await fn(); @@ -1104,12 +1247,179 @@ function isInterruptibleToolContext( return isRecord(context) && typeof context.suspend === 'function'; } +function getToolCallId(context: NativeToolContext): string | undefined { + return isRecord(context) && typeof context.toolCallId === 'string' + ? context.toolCallId + : undefined; +} + +function getProductToolSpanName(toolName: string): string { + if (toolName.startsWith('workspace_') || toolName === 'workspace' || toolName === 'write-file') { + return 'instance-ai.tool.workspace_edit'; + } + if (toolName === 'submit-workflow') { + return 'instance-ai.tool.workflow_submit'; + } + if (toolName === 'verify-built-workflow' || toolName === 'report-verification-verdict') { + return 'instance-ai.tool.workflow_validation'; + } + if (toolName === 'build-workflow' || toolName === 'build-workflow-with-agent') { + return 'instance-ai.tool.workflow_build'; + } + if (toolName === 'complete-checkpoint' || toolName === 'task-control') { + return 'instance-ai.tool.background_task'; + } + return `instance-ai.tool.${toolName.replace(/[^a-zA-Z0-9._-]+/g, '-')}`; +} + +async function startAndFinishProductChildSpan( + currentTrace: { runtime: ProductOtelTraceRuntime; currentRun: InstanceAiTraceRun }, + options: { + name: string; + runType?: string; + tags?: string[]; + metadata?: Record; + inputs?: unknown; + outputs?: unknown; + error?: string; + }, +): Promise { + const activeParentContext = getActiveOtelContextWithSpan(); + const childRun = startProductSpan(currentTrace.runtime, { + projectName: currentTrace.currentRun.projectName, + name: options.name, + runType: options.runType ?? 'chain', + tags: options.tags, + metadata: options.metadata, + inputs: options.inputs, + parentRun: currentTrace.currentRun, + ...(activeParentContext ? { parentContext: activeParentContext } : {}), + }); + await finishProductSpan(currentTrace.runtime, childRun, { + ...(options.outputs !== undefined ? { outputs: options.outputs } : {}), + ...(options.error ? { error: options.error } : {}), + metadata: { + final_status: options.error ? 'error' : 'completed', + }, + }); +} + +async function traceProductToolExecute( + tool: TraceableNativeTool, + options: InstanceAiToolTraceOptions | undefined, + input: unknown, + context: NativeToolContext, + currentTrace: { runtime: ProductOtelTraceRuntime; currentRun: InstanceAiTraceRun }, +): Promise { + const resumeData = isInterruptibleToolContext(context) ? context.resumeData : undefined; + const isResume = resumeData !== undefined && resumeData !== null; + const activeParentContext = getActiveOtelContextWithSpan(); + const toolCallId = getToolCallId(context); + const toolRun = startProductSpan(currentTrace.runtime, { + projectName: currentTrace.currentRun.projectName, + name: getProductToolSpanName(tool.name), + runType: 'tool', + tags: normalizeTags(['tool'], options?.tags), + metadata: mergeMetadata(options?.metadata, { + tool_name: tool.name, + ...(toolCallId ? { tool_call_id: toolCallId } : {}), + ...(options?.agentRole ? { agent_role: options.agentRole } : {}), + phase: isResume ? 'resume' : 'initial', + ...(isResume + ? mergeMetadata(buildSuspendMetadata(tool.name, resumeData), { + approved: isRecord(resumeData) ? resumeData.approved : undefined, + }) + : {}), + }), + inputs: { input }, + parentRun: currentTrace.currentRun, + ...(activeParentContext ? { parentContext: activeParentContext } : {}), + }); + + let toolRunFinished = false; + const finishToolRun = async (finishOptions?: InstanceAiTraceRunFinishOptions) => { + if (toolRunFinished) return; + toolRunFinished = true; + await finishProductSpan(currentTrace.runtime, toolRun, finishOptions); + }; + + const originalSuspend = isInterruptibleToolContext(context) ? context.suspend : undefined; + const wrappedContext: NativeToolContext = + typeof originalSuspend === 'function' + ? { + ...context, + suspend: async (suspendPayload: unknown) => { + await startAndFinishProductChildSpan( + { runtime: currentTrace.runtime, currentRun: toolRun }, + { + name: 'instance-ai.hitl.suspend', + runType: 'chain', + tags: ['hitl'], + metadata: buildSuspendMetadata(tool.name, suspendPayload), + inputs: suspendPayload, + outputs: suspendPayload, + }, + ); + await finishToolRun({ + outputs: { + status: 'suspended', + suspendPayload, + }, + metadata: mergeMetadata(buildSuspendMetadata(tool.name, suspendPayload), { + final_status: 'suspended', + }), + }); + return await originalSuspend(suspendPayload); + }, + } + : context; + + try { + const result = await withProductSpanContext(currentTrace.runtime, toolRun, async () => { + if (isResume) { + await startAndFinishProductChildSpan( + { runtime: currentTrace.runtime, currentRun: toolRun }, + { + name: 'instance-ai.hitl.resume', + runType: 'chain', + tags: ['hitl', 'resume'], + metadata: mergeMetadata(buildSuspendMetadata(tool.name, resumeData), { + approved: isRecord(resumeData) ? resumeData.approved : undefined, + }), + inputs: resumeData, + outputs: { + status: 'resumed', + }, + }, + ); + } + return await tool.handler(input, wrappedContext); + }); + await finishToolRun({ + outputs: result, + metadata: { final_status: 'completed' }, + }); + return result; + } catch (error) { + await finishToolRun({ + error: normalizeErrorMessage(error), + metadata: { final_status: 'error' }, + }); + throw error; + } +} + async function traceSuspendableToolExecute( tool: TraceableNativeTool, options: InstanceAiToolTraceOptions | undefined, input: unknown, context: NativeToolContext, ): Promise { + const currentProductTrace = getCurrentProductTrace(); + if (currentProductTrace) { + return await traceProductToolExecute(tool, options, input, context, currentProductTrace); + } + const parentRun = getTraceParentRun(); if (!parentRun) { return await tool.handler(input, context); @@ -1193,6 +1503,11 @@ async function traceToolExecute( input: unknown, context: NativeToolContext, ): Promise { + const currentProductTrace = getCurrentProductTrace(); + if (currentProductTrace) { + return await traceProductToolExecute(tool, options, input, context, currentProductTrace); + } + const parentRun = getTraceParentRun(); if (!parentRun) { return await tool.handler(input, context); @@ -1252,8 +1567,9 @@ function createTraceContext( parentRun: InstanceAiTraceRun, init: InstanceAiTraceRunInit, ): Promise => - await withProxy(async () => - otelRuntime + await withProxy(async () => { + const activeParentContext = getActiveOtelContextWithSpan(); + return otelRuntime ? startProductSpan(otelRuntime, { projectName, name: init.name, @@ -1262,9 +1578,10 @@ function createTraceContext( metadata: mergeMetadata(parentRun.metadata, init.metadata), inputs: init.inputs, parentRun, + ...(activeParentContext ? { parentContext: activeParentContext } : {}), }) - : await createChildRun(parentRun, init), - ); + : await createChildRun(parentRun, init); + }); const withRunTree = async (run: InstanceAiTraceRun, fn: () => Promise): Promise => await withProxy(async () => @@ -1792,15 +2109,24 @@ export async function createInstanceAiTraceContext( name: 'instance-ai.message_turn', runType: 'chain', tags: ['message-turn'], - metadata: mergeMetadata(baseMetadata, { agent_role: 'message_turn' }), + metadata: mergeMetadata(baseMetadata, { + agent_role: 'message_turn', + execution_mode: 'foreground', + trace_kind: 'message_turn', + }), inputs: options.input, + root: true, }); const orchestratorRun = startProductSpan(otelRuntime, { projectName, - name: 'instance-ai.orchestrator', + name: 'instance-ai.orchestrator.stream', runType: 'chain', tags: ['orchestrator'], - metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }), + metadata: mergeMetadata(baseMetadata, { + agent_role: 'orchestrator', + execution_mode: 'foreground', + trace_kind: 'message_turn', + }), inputs: options.input, parentRun: messageRun, }); @@ -1848,6 +2174,7 @@ export async function continueInstanceAiTraceContext( metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator', execution_mode: 'resume', + trace_kind: 'message_turn', }), inputs: options.input, parentRun: existingContext.messageRun, @@ -1910,16 +2237,26 @@ export async function createDetachedSubAgentTraceContext( metadata: mergeMetadata(baseMetadata, { agent_role: options.role, agent_id: options.agentId, + execution_mode: 'detached_subagent', + trace_kind: 'detached_subagent', task_kind: options.kind, ...(options.taskId ? { task_id: options.taskId } : {}), ...(options.plannedTaskId ? { planned_task_id: options.plannedTaskId } : {}), ...(options.workItemId ? { work_item_id: options.workItemId } : {}), ...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}), + ...(options.spawnedBySpanId ? { spawned_by_span_id: options.spawnedBySpanId } : {}), ...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}), ...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}), + ...(options.spawnedByAgentRole + ? { spawned_by_agent_role: options.spawnedByAgentRole } + : {}), + ...(options.spawnedByToolCallId + ? { spawned_by_tool_call_id: options.spawnedByToolCallId } + : {}), subagent_role: options.role, }), inputs: options.input, + root: true, }); return createTraceContext( @@ -1937,13 +2274,22 @@ export async function createDetachedSubAgentTraceContext( mergeMetadata(baseMetadata, { agent_role: options.role, agent_id: options.agentId, + execution_mode: 'detached_subagent', + trace_kind: 'detached_subagent', task_kind: options.kind, ...(options.taskId ? { task_id: options.taskId } : {}), ...(options.plannedTaskId ? { planned_task_id: options.plannedTaskId } : {}), ...(options.workItemId ? { work_item_id: options.workItemId } : {}), ...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}), + ...(options.spawnedBySpanId ? { spawned_by_span_id: options.spawnedBySpanId } : {}), ...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}), ...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}), + ...(options.spawnedByAgentRole + ? { spawned_by_agent_role: options.spawnedByAgentRole } + : {}), + ...(options.spawnedByToolCallId + ? { spawned_by_tool_call_id: options.spawnedByToolCallId } + : {}), subagent_role: options.role, }) ?? baseMetadata, baseTelemetry: otelRuntime.telemetry, diff --git a/packages/cli/src/modules/instance-ai/entities/instance-ai-run-snapshot.entity.ts b/packages/cli/src/modules/instance-ai/entities/instance-ai-run-snapshot.entity.ts index ecc68799388..758b11732c4 100644 --- a/packages/cli/src/modules/instance-ai/entities/instance-ai-run-snapshot.entity.ts +++ b/packages/cli/src/modules/instance-ai/entities/instance-ai-run-snapshot.entity.ts @@ -25,4 +25,10 @@ export class InstanceAiRunSnapshot extends WithTimestamps { @Column({ type: 'varchar', length: 36, nullable: true }) langsmithTraceId: string | null; + + @Column({ type: 'varchar', length: 32, nullable: true }) + traceId: string | null; + + @Column({ type: 'varchar', length: 16, nullable: true }) + spanId: string | null; } diff --git a/packages/cli/src/modules/instance-ai/instance-ai.service.ts b/packages/cli/src/modules/instance-ai/instance-ai.service.ts index 68dbbbc1313..1025425a1a6 100644 --- a/packages/cli/src/modules/instance-ai/instance-ai.service.ts +++ b/packages/cli/src/modules/instance-ai/instance-ai.service.ts @@ -2162,8 +2162,8 @@ export class InstanceAiService { payload: { message: 'Recalling conversation...' }, }); const contextCompactionRun = tracing - ? await tracing.startChildRun(tracing.actorRun, { - name: 'context_compaction', + ? await tracing.startChildRun(tracing.messageRun, { + name: 'instance-ai.context_compaction', tags: ['context'], metadata: { agent_role: 'context_compaction' }, inputs: { @@ -2212,8 +2212,8 @@ export class InstanceAiService { }); const promptBuildRun = tracing - ? await tracing.startChildRun(tracing.actorRun, { - name: 'prompt_build', + ? await tracing.startChildRun(tracing.messageRun, { + name: 'instance-ai.prompt_build', tags: ['prompt'], metadata: { agent_role: 'prompt_build' }, inputs: { @@ -3508,6 +3508,8 @@ export class InstanceAiService { const saveOptions = { messageGroupId, runIds: groupRunIds, + traceId: tracing?.rootRun.otelTraceId, + spanId: tracing?.rootRun.otelSpanId, langsmithRunId: tracing?.rootRun.id, langsmithTraceId: tracing?.rootRun.traceId, }; diff --git a/packages/cli/src/modules/instance-ai/storage/__tests__/db-snapshot-storage.test.ts b/packages/cli/src/modules/instance-ai/storage/__tests__/db-snapshot-storage.test.ts index 0542494b5fc..85832c6d552 100644 --- a/packages/cli/src/modules/instance-ai/storage/__tests__/db-snapshot-storage.test.ts +++ b/packages/cli/src/modules/instance-ai/storage/__tests__/db-snapshot-storage.test.ts @@ -11,6 +11,8 @@ function makeRow(overrides: Partial = {}): InstanceAiRunS messageGroupId: null, runIds: null, tree: JSON.stringify({ agentId: 'agent-root' }), + traceId: null, + spanId: null, langsmithRunId: null, langsmithTraceId: null, createdAt: new Date(), @@ -81,10 +83,12 @@ describe('DbSnapshotStorage', () => { }); describe('save', () => { - it('persists langsmith IDs via upsert', async () => { + it('persists trace IDs via upsert', async () => { await storage.save('thread-1', { agentId: 'agent-root' } as never, 'run-1', { messageGroupId: 'mg-1', runIds: ['run-1'], + traceId: '0123456789abcdef0123456789abcdef', + spanId: '0123456789abcdef', langsmithRunId: 'ls-run-1', langsmithTraceId: 'ls-trace-1', }); @@ -95,6 +99,8 @@ describe('DbSnapshotStorage', () => { runId: 'run-1', messageGroupId: 'mg-1', runIds: ['run-1'], + traceId: '0123456789abcdef0123456789abcdef', + spanId: '0123456789abcdef', langsmithRunId: 'ls-run-1', langsmithTraceId: 'ls-trace-1', }), @@ -102,20 +108,27 @@ describe('DbSnapshotStorage', () => { ); }); - it('writes nulls when langsmith IDs are absent', async () => { + it('writes nulls when trace IDs are absent', async () => { await storage.save('thread-1', { agentId: 'agent-root' } as never, 'run-1'); expect(repo.upsert).toHaveBeenCalledWith( - expect.objectContaining({ langsmithRunId: null, langsmithTraceId: null }), + expect.objectContaining({ + traceId: null, + spanId: null, + langsmithRunId: null, + langsmithTraceId: null, + }), expect.anything(), ); }); }); describe('updateLast', () => { - it('preserves existing langsmith IDs when the caller does not supply new ones', async () => { + it('preserves existing trace IDs when the caller does not supply new ones', async () => { const existing = makeRow({ messageGroupId: 'mg-1', + traceId: 'existing-trace', + spanId: 'existing-span', langsmithRunId: 'ls-run-existing', langsmithTraceId: 'ls-trace-existing', }); @@ -128,6 +141,8 @@ describe('DbSnapshotStorage', () => { expect(repo.update).toHaveBeenCalledWith( { threadId: 'thread-1', runId: 'run-1' }, expect.objectContaining({ + traceId: 'existing-trace', + spanId: 'existing-span', langsmithRunId: 'ls-run-existing', langsmithTraceId: 'ls-trace-existing', }), diff --git a/packages/cli/src/modules/instance-ai/storage/db-snapshot-storage.ts b/packages/cli/src/modules/instance-ai/storage/db-snapshot-storage.ts index 659f5f72470..936d28a9a6c 100644 --- a/packages/cli/src/modules/instance-ai/storage/db-snapshot-storage.ts +++ b/packages/cli/src/modules/instance-ai/storage/db-snapshot-storage.ts @@ -8,6 +8,8 @@ import { InstanceAiRunSnapshotRepository } from '../repositories/instance-ai-run export interface SaveSnapshotOptions { messageGroupId?: string; runIds?: string[]; + traceId?: string; + spanId?: string; langsmithRunId?: string; langsmithTraceId?: string; } @@ -44,6 +46,8 @@ export class DbSnapshotStorage { runId: row.runId, messageGroupId: row.messageGroupId ?? undefined, runIds: row.runIds ?? undefined, + traceId: row.traceId ?? undefined, + spanId: row.spanId ?? undefined, langsmithRunId: row.langsmithRunId ?? undefined, langsmithTraceId: row.langsmithTraceId ?? undefined, }; @@ -55,7 +59,7 @@ export class DbSnapshotStorage { runId: string, options: SaveSnapshotOptions = {}, ): Promise { - const { messageGroupId, runIds, langsmithRunId, langsmithTraceId } = options; + const { messageGroupId, runIds, traceId, spanId, langsmithRunId, langsmithTraceId } = options; await this.repo.upsert( { threadId, @@ -63,6 +67,8 @@ export class DbSnapshotStorage { messageGroupId: messageGroupId ?? null, runIds: runIds ?? null, tree: JSON.stringify(agentTree), + traceId: traceId ?? null, + spanId: spanId ?? null, langsmithRunId: langsmithRunId ?? null, langsmithTraceId: langsmithTraceId ?? null, }, @@ -76,7 +82,7 @@ export class DbSnapshotStorage { runId: string, options: SaveSnapshotOptions = {}, ): Promise { - const { messageGroupId, runIds, langsmithRunId, langsmithTraceId } = options; + const { messageGroupId, runIds, traceId, spanId, langsmithRunId, langsmithTraceId } = options; // Prefer lookup by messageGroupId when available if (messageGroupId) { @@ -92,7 +98,9 @@ export class DbSnapshotStorage { tree: JSON.stringify(agentTree), messageGroupId, runIds: runIds ?? existing.runIds, - // Preserve existing LangSmith IDs if caller didn't provide new ones. + // Preserve existing trace IDs if caller didn't provide new ones. + traceId: traceId ?? existing.traceId, + spanId: spanId ?? existing.spanId, langsmithRunId: langsmithRunId ?? existing.langsmithRunId, langsmithTraceId: langsmithTraceId ?? existing.langsmithTraceId, }, @@ -110,6 +118,8 @@ export class DbSnapshotStorage { tree: JSON.stringify(agentTree), messageGroupId: messageGroupId ?? byRunId.messageGroupId, runIds: runIds ?? byRunId.runIds, + traceId: traceId ?? byRunId.traceId, + spanId: spanId ?? byRunId.spanId, langsmithRunId: langsmithRunId ?? byRunId.langsmithRunId, langsmithTraceId: langsmithTraceId ?? byRunId.langsmithTraceId, }, @@ -131,6 +141,8 @@ export class DbSnapshotStorage { runId: r.runId, messageGroupId: r.messageGroupId ?? undefined, runIds: r.runIds ?? undefined, + traceId: r.traceId ?? undefined, + spanId: r.spanId ?? undefined, langsmithRunId: r.langsmithRunId ?? undefined, langsmithTraceId: r.langsmithTraceId ?? undefined, }));