diff --git a/packages/@n8n/instance-ai/package.json b/packages/@n8n/instance-ai/package.json index 1f524c624c2..6f08960e7c5 100644 --- a/packages/@n8n/instance-ai/package.json +++ b/packages/@n8n/instance-ai/package.json @@ -34,6 +34,7 @@ "@daytonaio/sdk": "0.149.0", "@joplin/turndown-plugin-gfm": "^1.0.12", "langsmith": "catalog:", + "@opentelemetry/api": "^1.9.0", "@mozilla/readability": "^0.6.0", "@n8n/api-types": "workspace:*", "@n8n/utils": "workspace:*", diff --git a/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts b/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts index 497695998d3..1b4f00501c3 100644 --- a/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts +++ b/packages/@n8n/instance-ai/src/tracing/__tests__/langsmith-tracing.test.ts @@ -327,12 +327,14 @@ describe('createInstanceAiTraceContext', () => { expect(tracing?.getTelemetry).toBeDefined(); - const telemetry = await tracing!.getTelemetry!({ + const telemetryOrBuilder = tracing!.getTelemetry!({ agentRole: 'orchestrator', functionId: 'instance-ai.orchestrator', executionMode: 'foreground', metadata: { custom_flag: true }, - }).build(); + }); + const telemetry = + 'build' in telemetryOrBuilder ? await telemetryOrBuilder.build() : telemetryOrBuilder; expect(telemetry.functionId).toBe('instance-ai.orchestrator'); expect(telemetry.recordInputs).toBe(true); diff --git a/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts b/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts index 800c49a8ffa..341897e9214 100644 --- a/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts +++ b/packages/@n8n/instance-ai/src/tracing/langsmith-tracing.ts @@ -1,11 +1,14 @@ import { LangSmithTelemetry, type AttributeValue, + type BuiltTelemetry, type BuiltTool, type InterruptibleToolContext, type Telemetry, type ToolContext, } from '@n8n/agents'; +import { context as otelContext, trace as otelTrace } from '@opentelemetry/api'; +import type { Context as OtelContext, Span as OtelApiSpan } from '@opentelemetry/api'; import { Client, RunTree } from 'langsmith'; import { getCurrentRunTree, withRunTree as withLangSmithRunTree } from 'langsmith/traceable'; import { AsyncLocalStorage } from 'node:async_hooks'; @@ -59,6 +62,7 @@ const proxyHeaderStore = new AsyncLocalStorage>(); // hydrateRunTree() (which reconstructs RunTree from serialized state) // can use the correct proxy client for its HTTP calls. const traceClients = new Map(); +const otelTraceRuntimes = new Map(); /** * Fetch wrapper for LangSmith clients: @@ -82,6 +86,216 @@ const gzipFetch: typeof globalThis.fetch = async (input, init) => { let cachedProxyClient: { client: Client; apiUrl: string } | null = null; let cachedDirectClient: Client | null = null; +const OTEL_TRACE_VERSION = 'otel-v2'; +const LANGSMITH_TRACEABLE = 'langsmith.traceable'; +const LANGSMITH_TRACE_NAME = 'langsmith.trace.name'; +const LANGSMITH_SPAN_KIND = 'langsmith.span.kind'; +const LANGSMITH_SPAN_TAGS = 'langsmith.span.tags'; +const GEN_AI_PROMPT = 'gen_ai.prompt'; +const GEN_AI_COMPLETION = 'gen_ai.completion'; + +interface ProductOtelTraceRuntime { + telemetry: BuiltTelemetry; + spans: Map; + contexts: Map; +} + +interface OTelTracer { + startSpan( + name: string, + options?: { attributes?: Record }, + context?: OtelContext, + ): OtelApiSpan; +} + +function isOtelTracer(value: unknown): value is OTelTracer { + return ( + value !== null && + typeof value === 'object' && + typeof Reflect.get(value, 'startSpan') === 'function' + ); +} + +function langsmithTraceIdFromOtelTraceId(traceId: string): string { + return `${traceId.substring(0, 8)}-${traceId.substring(8, 12)}-${traceId.substring( + 12, + 16, + )}-${traceId.substring(16, 20)}-${traceId.substring(20, 32)}`; +} + +function langsmithRunIdFromOtelSpanId(spanId: string): string { + const paddedHex = spanId.padStart(16, '0'); + return `00000000-0000-0000-${paddedHex.substring(0, 4)}-${paddedHex.substring(4, 16)}`; +} + +function stableDottedOrder(parentRun: InstanceAiTraceRun | undefined, runId: string): string { + return parentRun?.dottedOrder ? `${parentRun.dottedOrder}.${runId}` : runId; +} + +function buildProductSpanAttributes(options: { + name: string; + runType?: string; + tags?: string[]; + metadata?: Record; + inputs?: unknown; +}): Record { + const attributes: Record = { + [LANGSMITH_TRACEABLE]: 'true', + [LANGSMITH_TRACE_NAME]: options.name, + [LANGSMITH_SPAN_KIND]: options.runType ?? 'chain', + 'instance_ai.trace_version': OTEL_TRACE_VERSION, + }; + + const tags = normalizeTags(DEFAULT_TAGS, options.tags); + if (tags?.length) { + attributes[LANGSMITH_SPAN_TAGS] = tags; + } + + const metadata = mergeMetadata(options.metadata, { + trace_version: OTEL_TRACE_VERSION, + 'instance_ai.trace_version': OTEL_TRACE_VERSION, + }); + for (const [key, value] of Object.entries(metadata ?? {})) { + const attributeValue = toTelemetryAttributeValue(value); + if (attributeValue === undefined) continue; + attributes[key] = attributeValue; + if (!key.startsWith('langsmith.metadata.')) { + attributes[`langsmith.metadata.${key}`] = attributeValue; + } + } + + const inputs = options.inputs === undefined ? undefined : stringifyTracePayload(options.inputs); + if (inputs !== undefined) { + attributes[GEN_AI_PROMPT] = inputs; + } + + return attributes; +} + +function stringifyTracePayload(value: unknown): string | undefined { + try { + return JSON.stringify(sanitizeTracePayload(value)); + } catch { + return undefined; + } +} + +function startProductSpan( + runtime: ProductOtelTraceRuntime, + options: { + projectName: string; + name: string; + runType?: string; + tags?: string[]; + metadata?: Record; + inputs?: unknown; + parentRun?: InstanceAiTraceRun; + }, +): InstanceAiTraceRun { + if (!isOtelTracer(runtime.telemetry.tracer)) { + throw new Error('Instance AI tracing requires an OpenTelemetry tracer'); + } + + const parentContext = options.parentRun ? runtime.contexts.get(options.parentRun.id) : undefined; + const span = runtime.telemetry.tracer.startSpan( + options.name, + { + attributes: buildProductSpanAttributes(options), + }, + parentContext, + ); + const spanContext = span.spanContext(); + const traceId = langsmithTraceIdFromOtelTraceId(spanContext.traceId); + const runId = langsmithRunIdFromOtelSpanId(spanContext.spanId); + const spanContextWithSpan = otelTrace.setSpan(parentContext ?? otelContext.active(), span); + + const parentRun = options.parentRun; + const run: InstanceAiTraceRun = { + id: runId, + name: options.name, + runType: options.runType ?? 'chain', + projectName: options.projectName, + startTime: Date.now(), + traceId, + otelTraceId: spanContext.traceId, + otelSpanId: spanContext.spanId, + dottedOrder: stableDottedOrder(parentRun, runId), + executionOrder: parentRun ? parentRun.childExecutionOrder + 1 : 0, + childExecutionOrder: 0, + ...(parentRun ? { parentRunId: parentRun.id } : {}), + ...(options.tags ? { tags: normalizeTags(DEFAULT_TAGS, parentRun?.tags, options.tags) } : {}), + ...(options.metadata ? { metadata: mergeMetadata(parentRun?.metadata, options.metadata) } : {}), + ...(options.inputs !== undefined ? { inputs: sanitizeTracePayload(options.inputs) } : {}), + }; + + if (parentRun) { + parentRun.childExecutionOrder += 1; + } + + runtime.spans.set(run.id, span); + runtime.contexts.set(run.id, spanContextWithSpan); + return run; +} + +async function finishProductSpan( + runtime: ProductOtelTraceRuntime, + run: InstanceAiTraceRun, + options?: InstanceAiTraceRunFinishOptions, +): Promise { + const span = runtime.spans.get(run.id); + if (!span) return; + + const metadata = mergeMetadata(options?.metadata); + const attributes: Record = {}; + for (const [key, value] of Object.entries(metadata ?? {})) { + const attributeValue = toTelemetryAttributeValue(value); + if (attributeValue === undefined) continue; + attributes[key] = attributeValue; + attributes[`langsmith.metadata.${key}`] = attributeValue; + } + + if (options?.outputs !== undefined) { + const completion = stringifyTracePayload(options.outputs); + if (completion !== undefined) { + attributes[GEN_AI_COMPLETION] = completion; + } + run.outputs = sanitizeTracePayload(options.outputs); + } + + if (Object.keys(attributes).length > 0) { + span.setAttributes(attributes); + } + + if (options?.error) { + span.recordException(new Error(options.error)); + span.setStatus({ code: 2, message: options.error }); + run.error = options.error; + } else { + span.setStatus({ code: 1 }); + } + + run.endTime = Date.now(); + run.metadata = mergeMetadata(run.metadata, metadata); + span.end(); + runtime.spans.delete(run.id); + runtime.contexts.delete(run.id); + + await runtime.telemetry.provider?.forceFlush(); +} + +async function withProductSpanContext( + runtime: ProductOtelTraceRuntime, + run: InstanceAiTraceRun, + fn: () => Promise, +): Promise { + const spanContext = runtime.contexts.get(run.id); + if (!spanContext) { + return await fn(); + } + + return await otelContext.with(spanContext, fn); +} + /** Get a LangSmith Client that uses gzip encoding (no brotli). */ function getOrCreateDirectClient(): Client { if (cachedDirectClient) return cachedDirectClient; @@ -654,6 +868,7 @@ function mergeRunTreeInputs( */ export function releaseTraceClient(traceId: string): void { traceClients.delete(traceId); + otelTraceRuntimes.delete(traceId); } export interface SubmitLangsmithUserFeedbackOptions { @@ -1020,8 +1235,13 @@ function createTraceContext( rootRun: InstanceAiTraceRun, actorRun: InstanceAiTraceRun, getProxyHeaders?: () => Promise>, - telemetryFactory?: (options: InstanceAiTelemetryOptions) => Telemetry, + telemetryFactory?: (options: InstanceAiTelemetryOptions) => Telemetry | BuiltTelemetry, + otelRuntime?: ProductOtelTraceRuntime, ): InstanceAiTraceContext { + if (otelRuntime) { + otelTraceRuntimes.set(rootRun.traceId, otelRuntime); + } + const withProxy = async (fn: () => Promise): Promise => { if (!getProxyHeaders) return await fn(); const headers = await getProxyHeaders(); @@ -1032,19 +1252,40 @@ function createTraceContext( parentRun: InstanceAiTraceRun, init: InstanceAiTraceRunInit, ): Promise => - await withProxy(async () => await createChildRun(parentRun, init)); + await withProxy(async () => + otelRuntime + ? startProductSpan(otelRuntime, { + projectName, + name: init.name, + runType: init.runType, + tags: init.tags, + metadata: mergeMetadata(parentRun.metadata, init.metadata), + inputs: init.inputs, + parentRun, + }) + : await createChildRun(parentRun, init), + ); const withRunTree = async (run: InstanceAiTraceRun, fn: () => Promise): Promise => - await withProxy(async () => await withSerializedRunTree(run, fn)); + await withProxy(async () => + otelRuntime + ? await withProductSpanContext(otelRuntime, run, fn) + : await withSerializedRunTree(run, fn), + ); const finishRun = async ( run: InstanceAiTraceRun, finishOptions?: InstanceAiTraceRunFinishOptions, ): Promise => { - await withProxy(async () => await finishTraceRun(run, finishOptions)); + await withProxy(async () => + otelRuntime + ? await finishProductSpan(otelRuntime, run, finishOptions) + : await finishTraceRun(run, finishOptions), + ); // Clean up traceClients when root run finishes if (!run.parentRunId) { traceClients.delete(run.traceId); + otelTraceRuntimes.delete(run.traceId); } }; @@ -1053,15 +1294,20 @@ function createTraceContext( error: unknown, metadata?: Record, ): Promise => { - await withProxy( - async () => - await finishTraceRun(run, { - error: normalizeErrorMessage(error), - metadata, - }), + await withProxy(async () => + otelRuntime + ? await finishProductSpan(otelRuntime, run, { + error: normalizeErrorMessage(error), + metadata, + }) + : await finishTraceRun(run, { + error: normalizeErrorMessage(error), + metadata, + }), ); if (!run.parentRunId) { traceClients.delete(run.traceId); + otelTraceRuntimes.delete(run.traceId); } }; @@ -1076,7 +1322,7 @@ function createTraceContext( withRunTree, finishRun, failRun, - toHeaders: (run) => hydrateRunTree(run).toHeaders(), + toHeaders: (run) => (otelRuntime ? {} : hydrateRunTree(run).toHeaders()), ...(telemetryFactory ? { getTelemetry: telemetryFactory } : {}), wrapTools: (tools, traceOptions) => { if (ctx.replayMode === 'replay' && ctx.traceIndex && ctx.idRemapper) { @@ -1386,33 +1632,6 @@ export function createTraceReplayOnlyContext(): InstanceAiTraceContext { return ctx; } -async function createRun(options: { - projectName: string; - name: string; - runType?: string; - tags?: string[]; - metadata?: Record; - inputs?: unknown; - client?: Client; -}): Promise { - const runTree = new RunTree({ - name: options.name, - run_type: options.runType ?? 'chain', - project_name: options.projectName, - tags: normalizeTags(DEFAULT_TAGS, options.tags), - metadata: mergeMetadata(options.metadata), - inputs: sanitizeTracePayload(options.inputs), - client: options.client ?? getOrCreateDirectClient(), - }); - await runTree.postRun(); - - if (options.client) { - traceClients.set(runTree.trace_id, options.client); - } - - return createRunStateFromTree(runTree); -} - async function createChildRun( parentState: InstanceAiTraceRun, options: InstanceAiTraceRunInit, @@ -1469,6 +1688,7 @@ function buildBaseMetadata(options: CreateInstanceAiTraceContextOptions): Record message_id: options.messageId, run_id: options.runId, user_id: options.userId, + 'instance_ai.trace_version': OTEL_TRACE_VERSION, ...(options.modelId !== undefined ? { model_id: serializeModelIdForTrace(options.modelId) } : {}), @@ -1483,7 +1703,8 @@ function createTelemetryFactory(options: { actorRun: InstanceAiTraceRun; baseMetadata: Record; proxyConfig?: ServiceProxyConfig; -}): (telemetryOptions: InstanceAiTelemetryOptions) => Telemetry { + baseTelemetry?: BuiltTelemetry; +}): (telemetryOptions: InstanceAiTelemetryOptions) => BuiltTelemetry | Telemetry { return (telemetryOptions) => { const agentRole = telemetryOptions.agentRole; const executionMode = @@ -1499,17 +1720,17 @@ function createTelemetryFactory(options: { }); const functionId = telemetryOptions.functionId ?? formatTelemetryFunctionId(agentRole); - return new LangSmithTelemetry({ - project: options.projectName, - transformExportedSpan: redactLangSmithTelemetrySpan, - ...(options.proxyConfig - ? { - apiKey: '-', - endpoint: options.proxyConfig.apiUrl, - headers: options.proxyConfig.getAuthHeaders, - } - : {}), - }) + if (options.baseTelemetry) { + return { + ...options.baseTelemetry, + functionId, + metadata, + recordInputs: true, + recordOutputs: true, + }; + } + + return createLangSmithTelemetryBuilder(options.projectName, options.proxyConfig) .functionId(functionId) .metadata(metadata) .recordInputs(true) @@ -1517,6 +1738,41 @@ function createTelemetryFactory(options: { }; } +function createLangSmithTelemetryBuilder( + projectName: string, + proxyConfig?: ServiceProxyConfig, +): LangSmithTelemetry { + return new LangSmithTelemetry({ + project: projectName, + transformExportedSpan: redactLangSmithTelemetrySpan, + ...(proxyConfig + ? { + apiKey: '-', + endpoint: proxyConfig.apiUrl, + headers: proxyConfig.getAuthHeaders, + } + : {}), + }); +} + +async function createProductOtelRuntime( + projectName: string, + proxyConfig?: ServiceProxyConfig, +): Promise { + const telemetry = await createLangSmithTelemetryBuilder(projectName, proxyConfig) + .functionId('instance-ai.product') + .metadata({}) + .recordInputs(true) + .recordOutputs(true) + .build(); + + return { + telemetry, + spans: new Map(), + contexts: new Map(), + }; +} + export async function createInstanceAiTraceContext( options: CreateInstanceAiTraceContextOptions, ): Promise { @@ -1526,24 +1782,27 @@ export async function createInstanceAiTraceContext( ensureLangSmithTracingEnv(); - const client = options.proxyConfig ? getOrCreateProxyClient(options.proxyConfig) : undefined; const projectName = options.projectName ?? DEFAULT_PROJECT_NAME; const baseMetadata = buildBaseMetadata(options); const createTraceRuns = async () => { - const messageRun = await createRun({ + const otelRuntime = await createProductOtelRuntime(projectName, options.proxyConfig); + const messageRun = startProductSpan(otelRuntime, { projectName, - name: 'message_turn', + name: 'instance-ai.message_turn', + runType: 'chain', tags: ['message-turn'], metadata: mergeMetadata(baseMetadata, { agent_role: 'message_turn' }), inputs: options.input, - client, }); - const orchestratorRun = await createChildRun(messageRun, { - name: 'orchestrator', + const orchestratorRun = startProductSpan(otelRuntime, { + projectName, + name: 'instance-ai.orchestrator', + runType: 'chain', tags: ['orchestrator'], metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }), inputs: options.input, + parentRun: messageRun, }); return createTraceContext( @@ -1558,8 +1817,10 @@ export async function createInstanceAiTraceContext( rootRun: messageRun, actorRun: orchestratorRun, baseMetadata, + baseTelemetry: otelRuntime.telemetry, ...(options.proxyConfig ? { proxyConfig: options.proxyConfig } : {}), }), + otelRuntime, ); }; @@ -1575,14 +1836,28 @@ export async function continueInstanceAiTraceContext( options: CreateInstanceAiTraceContextOptions, ): Promise { const baseMetadata = buildBaseMetadata(options); + const otelRuntime = otelTraceRuntimes.get(existingContext.rootRun.traceId); const createContinuation = async () => { - const orchestratorRun = await createChildRun(existingContext.messageRun, { - name: 'orchestrator', - tags: ['orchestrator'], - metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }), - inputs: options.input, - }); + const orchestratorRun = otelRuntime + ? startProductSpan(otelRuntime, { + projectName: existingContext.projectName, + name: 'instance-ai.orchestrator.resume', + runType: 'chain', + tags: ['orchestrator', 'resume'], + metadata: mergeMetadata(baseMetadata, { + agent_role: 'orchestrator', + execution_mode: 'resume', + }), + inputs: options.input, + parentRun: existingContext.messageRun, + }) + : await createChildRun(existingContext.messageRun, { + name: 'orchestrator', + tags: ['orchestrator'], + metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }), + inputs: options.input, + }); return createTraceContext( existingContext.projectName, @@ -1596,8 +1871,10 @@ export async function continueInstanceAiTraceContext( rootRun: existingContext.rootRun, actorRun: orchestratorRun, baseMetadata, + ...(otelRuntime ? { baseTelemetry: otelRuntime.telemetry } : {}), ...(options.proxyConfig ? { proxyConfig: options.proxyConfig } : {}), }), + otelRuntime, ); }; @@ -1617,14 +1894,15 @@ export async function createDetachedSubAgentTraceContext( ensureLangSmithTracingEnv(); - const client = options.proxyConfig ? getOrCreateProxyClient(options.proxyConfig) : undefined; const projectName = options.projectName ?? DEFAULT_PROJECT_NAME; const baseMetadata = buildBaseMetadata(options); const createDetachedRuns = async () => { - const rootRun = await createRun({ + const otelRuntime = await createProductOtelRuntime(projectName, options.proxyConfig); + const rootRun = startProductSpan(otelRuntime, { projectName, - name: `subagent:${options.role}`, + name: `instance-ai.subagent.${options.role}`, + runType: 'chain', tags: normalizeTags( ['sub-agent', 'background'], options.plannedTaskId ? ['planned'] : undefined, @@ -1639,9 +1917,9 @@ export async function createDetachedSubAgentTraceContext( ...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}), ...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}), ...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}), + subagent_role: options.role, }), inputs: options.input, - client, }); return createTraceContext( @@ -1666,9 +1944,12 @@ export async function createDetachedSubAgentTraceContext( ...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}), ...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}), ...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}), + subagent_role: options.role, }) ?? baseMetadata, + baseTelemetry: otelRuntime.telemetry, ...(options.proxyConfig ? { proxyConfig: options.proxyConfig } : {}), }), + otelRuntime, ); }; diff --git a/packages/@n8n/instance-ai/src/types.ts b/packages/@n8n/instance-ai/src/types.ts index 7c643f2ee0b..6b083bf6f4d 100644 --- a/packages/@n8n/instance-ai/src/types.ts +++ b/packages/@n8n/instance-ai/src/types.ts @@ -1,6 +1,7 @@ import type { LanguageModelV2 } from '@ai-sdk/provider-v5'; import type { AttributeValue, + BuiltTelemetry, BuiltMemory, BuiltTool, CheckpointStore, @@ -803,6 +804,8 @@ export interface InstanceAiTraceRun { startTime: number; endTime?: number; traceId: string; + otelTraceId?: string; + otelSpanId?: string; dottedOrder: string; executionOrder: number; childExecutionOrder: number; @@ -868,7 +871,7 @@ export interface InstanceAiTraceContext { tools: InstanceAiToolRegistry, options?: InstanceAiToolTraceOptions, ) => InstanceAiToolRegistry; - getTelemetry?: (options: InstanceAiTelemetryOptions) => Telemetry; + getTelemetry?: (options: InstanceAiTelemetryOptions) => Telemetry | BuiltTelemetry; /** Trace replay mode: 'record' captures tool I/O, 'replay' remaps IDs, 'off' disables. */ replayMode: TraceReplayMode; /** Shared ID remapper instance — available in 'replay' mode. */ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 796023a966c..8c14594f44b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1615,6 +1615,9 @@ importers: '@n8n/workflow-sdk': specifier: workspace:* version: link:../workflow-sdk + '@opentelemetry/api': + specifier: ^1.9.0 + version: 1.9.0 csv-parse: specifier: 5.5.0 version: 5.5.0