refactor(instance-ai): route product tracing through otel

This commit is contained in:
Oleg Ivaniv 2026-05-05 16:42:02 +02:00
parent e8e96df636
commit 868be3deee
No known key found for this signature in database
5 changed files with 359 additions and 69 deletions

View File

@ -34,6 +34,7 @@
"@daytonaio/sdk": "0.149.0",
"@joplin/turndown-plugin-gfm": "^1.0.12",
"langsmith": "catalog:",
"@opentelemetry/api": "^1.9.0",
"@mozilla/readability": "^0.6.0",
"@n8n/api-types": "workspace:*",
"@n8n/utils": "workspace:*",

View File

@ -327,12 +327,14 @@ describe('createInstanceAiTraceContext', () => {
expect(tracing?.getTelemetry).toBeDefined();
const telemetry = await tracing!.getTelemetry!({
const telemetryOrBuilder = tracing!.getTelemetry!({
agentRole: 'orchestrator',
functionId: 'instance-ai.orchestrator',
executionMode: 'foreground',
metadata: { custom_flag: true },
}).build();
});
const telemetry =
'build' in telemetryOrBuilder ? await telemetryOrBuilder.build() : telemetryOrBuilder;
expect(telemetry.functionId).toBe('instance-ai.orchestrator');
expect(telemetry.recordInputs).toBe(true);

View File

@ -1,11 +1,14 @@
import {
LangSmithTelemetry,
type AttributeValue,
type BuiltTelemetry,
type BuiltTool,
type InterruptibleToolContext,
type Telemetry,
type ToolContext,
} from '@n8n/agents';
import { context as otelContext, trace as otelTrace } from '@opentelemetry/api';
import type { Context as OtelContext, Span as OtelApiSpan } from '@opentelemetry/api';
import { Client, RunTree } from 'langsmith';
import { getCurrentRunTree, withRunTree as withLangSmithRunTree } from 'langsmith/traceable';
import { AsyncLocalStorage } from 'node:async_hooks';
@ -59,6 +62,7 @@ const proxyHeaderStore = new AsyncLocalStorage<Record<string, string>>();
// hydrateRunTree() (which reconstructs RunTree from serialized state)
// can use the correct proxy client for its HTTP calls.
const traceClients = new Map<string, Client>();
const otelTraceRuntimes = new Map<string, ProductOtelTraceRuntime>();
/**
* Fetch wrapper for LangSmith clients:
@ -82,6 +86,216 @@ const gzipFetch: typeof globalThis.fetch = async (input, init) => {
let cachedProxyClient: { client: Client; apiUrl: string } | null = null;
let cachedDirectClient: Client | null = null;
const OTEL_TRACE_VERSION = 'otel-v2';
const LANGSMITH_TRACEABLE = 'langsmith.traceable';
const LANGSMITH_TRACE_NAME = 'langsmith.trace.name';
const LANGSMITH_SPAN_KIND = 'langsmith.span.kind';
const LANGSMITH_SPAN_TAGS = 'langsmith.span.tags';
const GEN_AI_PROMPT = 'gen_ai.prompt';
const GEN_AI_COMPLETION = 'gen_ai.completion';
interface ProductOtelTraceRuntime {
telemetry: BuiltTelemetry;
spans: Map<string, OtelApiSpan>;
contexts: Map<string, OtelContext>;
}
interface OTelTracer {
startSpan(
name: string,
options?: { attributes?: Record<string, AttributeValue> },
context?: OtelContext,
): OtelApiSpan;
}
function isOtelTracer(value: unknown): value is OTelTracer {
return (
value !== null &&
typeof value === 'object' &&
typeof Reflect.get(value, 'startSpan') === 'function'
);
}
function langsmithTraceIdFromOtelTraceId(traceId: string): string {
return `${traceId.substring(0, 8)}-${traceId.substring(8, 12)}-${traceId.substring(
12,
16,
)}-${traceId.substring(16, 20)}-${traceId.substring(20, 32)}`;
}
function langsmithRunIdFromOtelSpanId(spanId: string): string {
const paddedHex = spanId.padStart(16, '0');
return `00000000-0000-0000-${paddedHex.substring(0, 4)}-${paddedHex.substring(4, 16)}`;
}
function stableDottedOrder(parentRun: InstanceAiTraceRun | undefined, runId: string): string {
return parentRun?.dottedOrder ? `${parentRun.dottedOrder}.${runId}` : runId;
}
function buildProductSpanAttributes(options: {
name: string;
runType?: string;
tags?: string[];
metadata?: Record<string, unknown>;
inputs?: unknown;
}): Record<string, AttributeValue> {
const attributes: Record<string, AttributeValue> = {
[LANGSMITH_TRACEABLE]: 'true',
[LANGSMITH_TRACE_NAME]: options.name,
[LANGSMITH_SPAN_KIND]: options.runType ?? 'chain',
'instance_ai.trace_version': OTEL_TRACE_VERSION,
};
const tags = normalizeTags(DEFAULT_TAGS, options.tags);
if (tags?.length) {
attributes[LANGSMITH_SPAN_TAGS] = tags;
}
const metadata = mergeMetadata(options.metadata, {
trace_version: OTEL_TRACE_VERSION,
'instance_ai.trace_version': OTEL_TRACE_VERSION,
});
for (const [key, value] of Object.entries(metadata ?? {})) {
const attributeValue = toTelemetryAttributeValue(value);
if (attributeValue === undefined) continue;
attributes[key] = attributeValue;
if (!key.startsWith('langsmith.metadata.')) {
attributes[`langsmith.metadata.${key}`] = attributeValue;
}
}
const inputs = options.inputs === undefined ? undefined : stringifyTracePayload(options.inputs);
if (inputs !== undefined) {
attributes[GEN_AI_PROMPT] = inputs;
}
return attributes;
}
function stringifyTracePayload(value: unknown): string | undefined {
try {
return JSON.stringify(sanitizeTracePayload(value));
} catch {
return undefined;
}
}
function startProductSpan(
runtime: ProductOtelTraceRuntime,
options: {
projectName: string;
name: string;
runType?: string;
tags?: string[];
metadata?: Record<string, unknown>;
inputs?: unknown;
parentRun?: InstanceAiTraceRun;
},
): InstanceAiTraceRun {
if (!isOtelTracer(runtime.telemetry.tracer)) {
throw new Error('Instance AI tracing requires an OpenTelemetry tracer');
}
const parentContext = options.parentRun ? runtime.contexts.get(options.parentRun.id) : undefined;
const span = runtime.telemetry.tracer.startSpan(
options.name,
{
attributes: buildProductSpanAttributes(options),
},
parentContext,
);
const spanContext = span.spanContext();
const traceId = langsmithTraceIdFromOtelTraceId(spanContext.traceId);
const runId = langsmithRunIdFromOtelSpanId(spanContext.spanId);
const spanContextWithSpan = otelTrace.setSpan(parentContext ?? otelContext.active(), span);
const parentRun = options.parentRun;
const run: InstanceAiTraceRun = {
id: runId,
name: options.name,
runType: options.runType ?? 'chain',
projectName: options.projectName,
startTime: Date.now(),
traceId,
otelTraceId: spanContext.traceId,
otelSpanId: spanContext.spanId,
dottedOrder: stableDottedOrder(parentRun, runId),
executionOrder: parentRun ? parentRun.childExecutionOrder + 1 : 0,
childExecutionOrder: 0,
...(parentRun ? { parentRunId: parentRun.id } : {}),
...(options.tags ? { tags: normalizeTags(DEFAULT_TAGS, parentRun?.tags, options.tags) } : {}),
...(options.metadata ? { metadata: mergeMetadata(parentRun?.metadata, options.metadata) } : {}),
...(options.inputs !== undefined ? { inputs: sanitizeTracePayload(options.inputs) } : {}),
};
if (parentRun) {
parentRun.childExecutionOrder += 1;
}
runtime.spans.set(run.id, span);
runtime.contexts.set(run.id, spanContextWithSpan);
return run;
}
async function finishProductSpan(
runtime: ProductOtelTraceRuntime,
run: InstanceAiTraceRun,
options?: InstanceAiTraceRunFinishOptions,
): Promise<void> {
const span = runtime.spans.get(run.id);
if (!span) return;
const metadata = mergeMetadata(options?.metadata);
const attributes: Record<string, AttributeValue> = {};
for (const [key, value] of Object.entries(metadata ?? {})) {
const attributeValue = toTelemetryAttributeValue(value);
if (attributeValue === undefined) continue;
attributes[key] = attributeValue;
attributes[`langsmith.metadata.${key}`] = attributeValue;
}
if (options?.outputs !== undefined) {
const completion = stringifyTracePayload(options.outputs);
if (completion !== undefined) {
attributes[GEN_AI_COMPLETION] = completion;
}
run.outputs = sanitizeTracePayload(options.outputs);
}
if (Object.keys(attributes).length > 0) {
span.setAttributes(attributes);
}
if (options?.error) {
span.recordException(new Error(options.error));
span.setStatus({ code: 2, message: options.error });
run.error = options.error;
} else {
span.setStatus({ code: 1 });
}
run.endTime = Date.now();
run.metadata = mergeMetadata(run.metadata, metadata);
span.end();
runtime.spans.delete(run.id);
runtime.contexts.delete(run.id);
await runtime.telemetry.provider?.forceFlush();
}
async function withProductSpanContext<T>(
runtime: ProductOtelTraceRuntime,
run: InstanceAiTraceRun,
fn: () => Promise<T>,
): Promise<T> {
const spanContext = runtime.contexts.get(run.id);
if (!spanContext) {
return await fn();
}
return await otelContext.with(spanContext, fn);
}
/** Get a LangSmith Client that uses gzip encoding (no brotli). */
function getOrCreateDirectClient(): Client {
if (cachedDirectClient) return cachedDirectClient;
@ -654,6 +868,7 @@ function mergeRunTreeInputs(
*/
export function releaseTraceClient(traceId: string): void {
traceClients.delete(traceId);
otelTraceRuntimes.delete(traceId);
}
export interface SubmitLangsmithUserFeedbackOptions {
@ -1020,8 +1235,13 @@ function createTraceContext(
rootRun: InstanceAiTraceRun,
actorRun: InstanceAiTraceRun,
getProxyHeaders?: () => Promise<Record<string, string>>,
telemetryFactory?: (options: InstanceAiTelemetryOptions) => Telemetry,
telemetryFactory?: (options: InstanceAiTelemetryOptions) => Telemetry | BuiltTelemetry,
otelRuntime?: ProductOtelTraceRuntime,
): InstanceAiTraceContext {
if (otelRuntime) {
otelTraceRuntimes.set(rootRun.traceId, otelRuntime);
}
const withProxy = async <T>(fn: () => Promise<T>): Promise<T> => {
if (!getProxyHeaders) return await fn();
const headers = await getProxyHeaders();
@ -1032,19 +1252,40 @@ function createTraceContext(
parentRun: InstanceAiTraceRun,
init: InstanceAiTraceRunInit,
): Promise<InstanceAiTraceRun> =>
await withProxy(async () => await createChildRun(parentRun, init));
await withProxy(async () =>
otelRuntime
? startProductSpan(otelRuntime, {
projectName,
name: init.name,
runType: init.runType,
tags: init.tags,
metadata: mergeMetadata(parentRun.metadata, init.metadata),
inputs: init.inputs,
parentRun,
})
: await createChildRun(parentRun, init),
);
const withRunTree = async <T>(run: InstanceAiTraceRun, fn: () => Promise<T>): Promise<T> =>
await withProxy(async () => await withSerializedRunTree(run, fn));
await withProxy(async () =>
otelRuntime
? await withProductSpanContext(otelRuntime, run, fn)
: await withSerializedRunTree(run, fn),
);
const finishRun = async (
run: InstanceAiTraceRun,
finishOptions?: InstanceAiTraceRunFinishOptions,
): Promise<void> => {
await withProxy(async () => await finishTraceRun(run, finishOptions));
await withProxy(async () =>
otelRuntime
? await finishProductSpan(otelRuntime, run, finishOptions)
: await finishTraceRun(run, finishOptions),
);
// Clean up traceClients when root run finishes
if (!run.parentRunId) {
traceClients.delete(run.traceId);
otelTraceRuntimes.delete(run.traceId);
}
};
@ -1053,15 +1294,20 @@ function createTraceContext(
error: unknown,
metadata?: Record<string, unknown>,
): Promise<void> => {
await withProxy(
async () =>
await finishTraceRun(run, {
error: normalizeErrorMessage(error),
metadata,
}),
await withProxy(async () =>
otelRuntime
? await finishProductSpan(otelRuntime, run, {
error: normalizeErrorMessage(error),
metadata,
})
: await finishTraceRun(run, {
error: normalizeErrorMessage(error),
metadata,
}),
);
if (!run.parentRunId) {
traceClients.delete(run.traceId);
otelTraceRuntimes.delete(run.traceId);
}
};
@ -1076,7 +1322,7 @@ function createTraceContext(
withRunTree,
finishRun,
failRun,
toHeaders: (run) => hydrateRunTree(run).toHeaders(),
toHeaders: (run) => (otelRuntime ? {} : hydrateRunTree(run).toHeaders()),
...(telemetryFactory ? { getTelemetry: telemetryFactory } : {}),
wrapTools: (tools, traceOptions) => {
if (ctx.replayMode === 'replay' && ctx.traceIndex && ctx.idRemapper) {
@ -1386,33 +1632,6 @@ export function createTraceReplayOnlyContext(): InstanceAiTraceContext {
return ctx;
}
async function createRun(options: {
projectName: string;
name: string;
runType?: string;
tags?: string[];
metadata?: Record<string, unknown>;
inputs?: unknown;
client?: Client;
}): Promise<InstanceAiTraceRun> {
const runTree = new RunTree({
name: options.name,
run_type: options.runType ?? 'chain',
project_name: options.projectName,
tags: normalizeTags(DEFAULT_TAGS, options.tags),
metadata: mergeMetadata(options.metadata),
inputs: sanitizeTracePayload(options.inputs),
client: options.client ?? getOrCreateDirectClient(),
});
await runTree.postRun();
if (options.client) {
traceClients.set(runTree.trace_id, options.client);
}
return createRunStateFromTree(runTree);
}
async function createChildRun(
parentState: InstanceAiTraceRun,
options: InstanceAiTraceRunInit,
@ -1469,6 +1688,7 @@ function buildBaseMetadata(options: CreateInstanceAiTraceContextOptions): Record
message_id: options.messageId,
run_id: options.runId,
user_id: options.userId,
'instance_ai.trace_version': OTEL_TRACE_VERSION,
...(options.modelId !== undefined
? { model_id: serializeModelIdForTrace(options.modelId) }
: {}),
@ -1483,7 +1703,8 @@ function createTelemetryFactory(options: {
actorRun: InstanceAiTraceRun;
baseMetadata: Record<string, unknown>;
proxyConfig?: ServiceProxyConfig;
}): (telemetryOptions: InstanceAiTelemetryOptions) => Telemetry {
baseTelemetry?: BuiltTelemetry;
}): (telemetryOptions: InstanceAiTelemetryOptions) => BuiltTelemetry | Telemetry {
return (telemetryOptions) => {
const agentRole = telemetryOptions.agentRole;
const executionMode =
@ -1499,17 +1720,17 @@ function createTelemetryFactory(options: {
});
const functionId = telemetryOptions.functionId ?? formatTelemetryFunctionId(agentRole);
return new LangSmithTelemetry({
project: options.projectName,
transformExportedSpan: redactLangSmithTelemetrySpan,
...(options.proxyConfig
? {
apiKey: '-',
endpoint: options.proxyConfig.apiUrl,
headers: options.proxyConfig.getAuthHeaders,
}
: {}),
})
if (options.baseTelemetry) {
return {
...options.baseTelemetry,
functionId,
metadata,
recordInputs: true,
recordOutputs: true,
};
}
return createLangSmithTelemetryBuilder(options.projectName, options.proxyConfig)
.functionId(functionId)
.metadata(metadata)
.recordInputs(true)
@ -1517,6 +1738,41 @@ function createTelemetryFactory(options: {
};
}
function createLangSmithTelemetryBuilder(
projectName: string,
proxyConfig?: ServiceProxyConfig,
): LangSmithTelemetry {
return new LangSmithTelemetry({
project: projectName,
transformExportedSpan: redactLangSmithTelemetrySpan,
...(proxyConfig
? {
apiKey: '-',
endpoint: proxyConfig.apiUrl,
headers: proxyConfig.getAuthHeaders,
}
: {}),
});
}
async function createProductOtelRuntime(
projectName: string,
proxyConfig?: ServiceProxyConfig,
): Promise<ProductOtelTraceRuntime> {
const telemetry = await createLangSmithTelemetryBuilder(projectName, proxyConfig)
.functionId('instance-ai.product')
.metadata({})
.recordInputs(true)
.recordOutputs(true)
.build();
return {
telemetry,
spans: new Map(),
contexts: new Map(),
};
}
export async function createInstanceAiTraceContext(
options: CreateInstanceAiTraceContextOptions,
): Promise<InstanceAiTraceContext | undefined> {
@ -1526,24 +1782,27 @@ export async function createInstanceAiTraceContext(
ensureLangSmithTracingEnv();
const client = options.proxyConfig ? getOrCreateProxyClient(options.proxyConfig) : undefined;
const projectName = options.projectName ?? DEFAULT_PROJECT_NAME;
const baseMetadata = buildBaseMetadata(options);
const createTraceRuns = async () => {
const messageRun = await createRun({
const otelRuntime = await createProductOtelRuntime(projectName, options.proxyConfig);
const messageRun = startProductSpan(otelRuntime, {
projectName,
name: 'message_turn',
name: 'instance-ai.message_turn',
runType: 'chain',
tags: ['message-turn'],
metadata: mergeMetadata(baseMetadata, { agent_role: 'message_turn' }),
inputs: options.input,
client,
});
const orchestratorRun = await createChildRun(messageRun, {
name: 'orchestrator',
const orchestratorRun = startProductSpan(otelRuntime, {
projectName,
name: 'instance-ai.orchestrator',
runType: 'chain',
tags: ['orchestrator'],
metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }),
inputs: options.input,
parentRun: messageRun,
});
return createTraceContext(
@ -1558,8 +1817,10 @@ export async function createInstanceAiTraceContext(
rootRun: messageRun,
actorRun: orchestratorRun,
baseMetadata,
baseTelemetry: otelRuntime.telemetry,
...(options.proxyConfig ? { proxyConfig: options.proxyConfig } : {}),
}),
otelRuntime,
);
};
@ -1575,14 +1836,28 @@ export async function continueInstanceAiTraceContext(
options: CreateInstanceAiTraceContextOptions,
): Promise<InstanceAiTraceContext> {
const baseMetadata = buildBaseMetadata(options);
const otelRuntime = otelTraceRuntimes.get(existingContext.rootRun.traceId);
const createContinuation = async () => {
const orchestratorRun = await createChildRun(existingContext.messageRun, {
name: 'orchestrator',
tags: ['orchestrator'],
metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }),
inputs: options.input,
});
const orchestratorRun = otelRuntime
? startProductSpan(otelRuntime, {
projectName: existingContext.projectName,
name: 'instance-ai.orchestrator.resume',
runType: 'chain',
tags: ['orchestrator', 'resume'],
metadata: mergeMetadata(baseMetadata, {
agent_role: 'orchestrator',
execution_mode: 'resume',
}),
inputs: options.input,
parentRun: existingContext.messageRun,
})
: await createChildRun(existingContext.messageRun, {
name: 'orchestrator',
tags: ['orchestrator'],
metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }),
inputs: options.input,
});
return createTraceContext(
existingContext.projectName,
@ -1596,8 +1871,10 @@ export async function continueInstanceAiTraceContext(
rootRun: existingContext.rootRun,
actorRun: orchestratorRun,
baseMetadata,
...(otelRuntime ? { baseTelemetry: otelRuntime.telemetry } : {}),
...(options.proxyConfig ? { proxyConfig: options.proxyConfig } : {}),
}),
otelRuntime,
);
};
@ -1617,14 +1894,15 @@ export async function createDetachedSubAgentTraceContext(
ensureLangSmithTracingEnv();
const client = options.proxyConfig ? getOrCreateProxyClient(options.proxyConfig) : undefined;
const projectName = options.projectName ?? DEFAULT_PROJECT_NAME;
const baseMetadata = buildBaseMetadata(options);
const createDetachedRuns = async () => {
const rootRun = await createRun({
const otelRuntime = await createProductOtelRuntime(projectName, options.proxyConfig);
const rootRun = startProductSpan(otelRuntime, {
projectName,
name: `subagent:${options.role}`,
name: `instance-ai.subagent.${options.role}`,
runType: 'chain',
tags: normalizeTags(
['sub-agent', 'background'],
options.plannedTaskId ? ['planned'] : undefined,
@ -1639,9 +1917,9 @@ export async function createDetachedSubAgentTraceContext(
...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}),
...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}),
...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}),
subagent_role: options.role,
}),
inputs: options.input,
client,
});
return createTraceContext(
@ -1666,9 +1944,12 @@ export async function createDetachedSubAgentTraceContext(
...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}),
...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}),
...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}),
subagent_role: options.role,
}) ?? baseMetadata,
baseTelemetry: otelRuntime.telemetry,
...(options.proxyConfig ? { proxyConfig: options.proxyConfig } : {}),
}),
otelRuntime,
);
};

View File

@ -1,6 +1,7 @@
import type { LanguageModelV2 } from '@ai-sdk/provider-v5';
import type {
AttributeValue,
BuiltTelemetry,
BuiltMemory,
BuiltTool,
CheckpointStore,
@ -803,6 +804,8 @@ export interface InstanceAiTraceRun {
startTime: number;
endTime?: number;
traceId: string;
otelTraceId?: string;
otelSpanId?: string;
dottedOrder: string;
executionOrder: number;
childExecutionOrder: number;
@ -868,7 +871,7 @@ export interface InstanceAiTraceContext {
tools: InstanceAiToolRegistry,
options?: InstanceAiToolTraceOptions,
) => InstanceAiToolRegistry;
getTelemetry?: (options: InstanceAiTelemetryOptions) => Telemetry;
getTelemetry?: (options: InstanceAiTelemetryOptions) => Telemetry | BuiltTelemetry;
/** Trace replay mode: 'record' captures tool I/O, 'replay' remaps IDs, 'off' disables. */
replayMode: TraceReplayMode;
/** Shared ID remapper instance — available in 'replay' mode. */

View File

@ -1615,6 +1615,9 @@ importers:
'@n8n/workflow-sdk':
specifier: workspace:*
version: link:../workflow-sdk
'@opentelemetry/api':
specifier: ^1.9.0
version: 1.9.0
csv-parse:
specifier: 5.5.0
version: 5.5.0