mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-30 16:26:59 +02:00
refactor(instance-ai): move product tracing to otel
This commit is contained in:
parent
868be3deee
commit
e5e0cb97cd
|
|
@ -2184,6 +2184,7 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
|
||||
it('passes resolved telemetry to tool handlers via parentTelemetry', async () => {
|
||||
let capturedTelemetry: BuiltTelemetry | undefined;
|
||||
let capturedToolCallId: string | undefined;
|
||||
|
||||
const spyTool: BuiltTool = new ToolBuilder('spy')
|
||||
.description('captures telemetry from context')
|
||||
|
|
@ -2191,6 +2192,7 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
.output(z.object({ ok: z.boolean() }))
|
||||
.handler(async (_input, ctx) => {
|
||||
capturedTelemetry = ctx.parentTelemetry;
|
||||
capturedToolCallId = ctx.toolCallId;
|
||||
return await Promise.resolve({ ok: true });
|
||||
})
|
||||
.build();
|
||||
|
|
@ -2211,6 +2213,7 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
await runtime.generate('test');
|
||||
|
||||
expect(capturedTelemetry).toBe(baseTelemetry);
|
||||
expect(capturedToolCallId).toBe('tc1');
|
||||
});
|
||||
|
||||
it('emits AI SDK-compatible telemetry spans for local tool execution', async () => {
|
||||
|
|
|
|||
|
|
@ -1717,7 +1717,8 @@ export class AgentRuntime {
|
|||
toolName,
|
||||
toolInput,
|
||||
resolvedTelemetry,
|
||||
async () => await executeTool(toolInput, builtTool, resumeData, resolvedTelemetry),
|
||||
async () =>
|
||||
await executeTool(toolInput, builtTool, resumeData, resolvedTelemetry, toolCallId),
|
||||
);
|
||||
} catch (error) {
|
||||
return makeToolError(error as Error);
|
||||
|
|
|
|||
|
|
@ -142,6 +142,7 @@ export async function executeTool(
|
|||
builtTool: BuiltTool,
|
||||
resumeData?: unknown,
|
||||
parentTelemetry?: BuiltTelemetry,
|
||||
toolCallId?: string,
|
||||
): Promise<unknown> {
|
||||
if (!builtTool.handler) {
|
||||
throw new Error(`No handler found for tool "${builtTool.name}"`);
|
||||
|
|
@ -154,11 +155,12 @@ export async function executeTool(
|
|||
},
|
||||
resumeData,
|
||||
parentTelemetry,
|
||||
toolCallId,
|
||||
};
|
||||
return await builtTool.handler(args, ctx);
|
||||
}
|
||||
|
||||
const ctx: ToolContext = { parentTelemetry };
|
||||
const ctx: ToolContext = { parentTelemetry, toolCallId };
|
||||
return await builtTool.handler(args, ctx);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ import type { BuiltTelemetry } from '../telemetry';
|
|||
import type { JSONObject } from '../utils/json';
|
||||
|
||||
export interface ToolContext {
|
||||
/** AI SDK tool call ID for the current local tool execution. */
|
||||
toolCallId?: string;
|
||||
/** Telemetry config from the parent agent, for sub-agent propagation. */
|
||||
parentTelemetry?: BuiltTelemetry;
|
||||
}
|
||||
|
|
@ -19,6 +21,8 @@ export interface InterruptibleToolContext<S = unknown, R = unknown> {
|
|||
suspend: (payload: S) => Promise<never>;
|
||||
/** Data from the consumer after resume. Undefined on first invocation. */
|
||||
resumeData: R | undefined;
|
||||
/** AI SDK tool call ID for the current local tool execution. */
|
||||
toolCallId?: string;
|
||||
/** Telemetry config from the parent agent, for sub-agent propagation. */
|
||||
parentTelemetry?: BuiltTelemetry;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
import type { MigrationContext, ReversibleMigration } from '../migration-types';
|
||||
|
||||
export class AddOtelIdsToInstanceAiRunSnapshots1778065000000 implements ReversibleMigration {
|
||||
async up({ schemaBuilder: { addColumns, column } }: MigrationContext) {
|
||||
await addColumns('instance_ai_run_snapshots', [
|
||||
column('traceId').varchar(32).comment('OpenTelemetry trace ID for the product root span.'),
|
||||
column('spanId').varchar(16).comment('OpenTelemetry span ID for the product root span.'),
|
||||
]);
|
||||
}
|
||||
|
||||
async down({ schemaBuilder: { dropColumns } }: MigrationContext) {
|
||||
await dropColumns('instance_ai_run_snapshots', ['traceId', 'spanId']);
|
||||
}
|
||||
}
|
||||
|
|
@ -170,6 +170,7 @@ import { CreateAiBuilderTemporaryWorkflowTable1777281990043 } from '../common/17
|
|||
import { AddExecutionDeduplicationKey1778000000000 } from '../common/1778000000000-AddExecutionDeduplicationKey';
|
||||
import { CreateInstanceAiCheckpointTable1778050000000 } from '../common/1778050000000-CreateInstanceAiCheckpointTable';
|
||||
import { ResetInstanceAiNativePersistence1778060000000 } from '../common/1778060000000-ResetInstanceAiNativePersistence';
|
||||
import { AddOtelIdsToInstanceAiRunSnapshots1778065000000 } from '../common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots';
|
||||
import type { Migration } from '../migration-types';
|
||||
|
||||
export const postgresMigrations: Migration[] = [
|
||||
|
|
@ -345,4 +346,5 @@ export const postgresMigrations: Migration[] = [
|
|||
CreateAiBuilderTemporaryWorkflowTable1777281990043,
|
||||
ExpandVariablesValueColumnToText1777420800000,
|
||||
ResetInstanceAiNativePersistence1778060000000,
|
||||
AddOtelIdsToInstanceAiRunSnapshots1778065000000,
|
||||
];
|
||||
|
|
|
|||
|
|
@ -163,6 +163,7 @@ import { CreateAiBuilderTemporaryWorkflowTable1777281990043 } from '../common/17
|
|||
import { AddExecutionDeduplicationKey1778000000000 } from '../common/1778000000000-AddExecutionDeduplicationKey';
|
||||
import { CreateInstanceAiCheckpointTable1778050000000 } from '../common/1778050000000-CreateInstanceAiCheckpointTable';
|
||||
import { ResetInstanceAiNativePersistence1778060000000 } from '../common/1778060000000-ResetInstanceAiNativePersistence';
|
||||
import { AddOtelIdsToInstanceAiRunSnapshots1778065000000 } from '../common/1778065000000-AddOtelIdsToInstanceAiRunSnapshots';
|
||||
import type { Migration } from '../migration-types';
|
||||
|
||||
const sqliteMigrations: Migration[] = [
|
||||
|
|
@ -331,6 +332,7 @@ const sqliteMigrations: Migration[] = [
|
|||
AddTracingContextToExecution1777045000000,
|
||||
CreateAiBuilderTemporaryWorkflowTable1777281990043,
|
||||
ResetInstanceAiNativePersistence1778060000000,
|
||||
AddOtelIdsToInstanceAiRunSnapshots1778065000000,
|
||||
];
|
||||
|
||||
export { sqliteMigrations };
|
||||
|
|
|
|||
|
|
@ -77,17 +77,35 @@ Implemented so far:
|
|||
- The LangSmith OTel processor filters noisy AI SDK wrapper spans so provider
|
||||
request spans such as `ai.streamText.doStream` can appear directly under the
|
||||
agent root.
|
||||
- Instance AI product roots and child spans now use the same OTel
|
||||
tracer/provider as native agent telemetry in normal execution.
|
||||
- Normal foreground and detached trace creation no longer creates RunTree spans.
|
||||
- Agent tree snapshots persist OTel trace/span IDs alongside derived LangSmith
|
||||
IDs for feedback anchoring.
|
||||
|
||||
Still wrong:
|
||||
|
||||
- `langsmith-tracing.ts` still creates RunTree product spans for live execution.
|
||||
- Product message turns and native LLM spans are split across separate
|
||||
LangSmith traces.
|
||||
- Token usage lives on native OTel spans but does not roll up to RunTree
|
||||
product roots.
|
||||
- Feedback anchoring still assumes persisted RunTree IDs.
|
||||
- Some product spans exist twice: once as RunTree spans and once as native
|
||||
tool/agent OTel spans.
|
||||
- Live LangSmith validation has proved feedback against an OTel-only product
|
||||
root; full provider-span validation with a real model turn is still pending.
|
||||
- Some fallback RunTree compatibility code remains for legacy/replay-only
|
||||
paths and should be deleted after rollout validation.
|
||||
- Detached sub-agent linking captures spawning trace/span metadata and model
|
||||
tool-call IDs when a detached task is spawned from a local tool handler.
|
||||
|
||||
## Hybrid Reference Notes
|
||||
|
||||
The last working hybrid traces showed RunTree product nodes such as
|
||||
`message_turn`, `orchestrator`, `context_compaction`, `prompt_build`, and
|
||||
`subagent:planner` beside native OTel nodes such as `ai.streamText.doStream`.
|
||||
This proved product semantics and native AI SDK telemetry could both be
|
||||
exported, but LangSmith displayed them as split turn/root groups and did not
|
||||
roll token usage up to the product roots.
|
||||
|
||||
The failure mode to avoid is forcing native OTel spans under RunTree IDs. In
|
||||
that shape, LangSmith can lose or separate provider spans, and the trace no
|
||||
longer shows the complete system/user/tool/provider turn under a single OTel
|
||||
context. Regression coverage now asserts normal Instance AI trace creation does
|
||||
not create RunTree spans.
|
||||
|
||||
## Target Architecture
|
||||
|
||||
|
|
@ -471,72 +489,72 @@ must not require LangSmith to be available.
|
|||
|
||||
1. Document and freeze the current hybrid behavior
|
||||
|
||||
- [ ] Keep examples of a working hybrid trace with native LLM spans.
|
||||
- [ ] Keep examples of the failure mode when OTel spans are forced under
|
||||
- [x] Keep examples of a working hybrid trace with native LLM spans.
|
||||
- [x] Keep examples of the failure mode when OTel spans are forced under
|
||||
RunTree parent IDs.
|
||||
- [ ] Add a short note in tests or fixtures explaining why RunTree/OTel
|
||||
- [x] Add a short note in tests or fixtures explaining why RunTree/OTel
|
||||
parent mixing is forbidden.
|
||||
|
||||
2. Add an OTel product tracing adapter
|
||||
|
||||
- [ ] Create an Instance AI adapter that starts active OTel spans using the
|
||||
- [x] Create an Instance AI adapter that starts active OTel spans using the
|
||||
same tracer/provider as native agent telemetry.
|
||||
- [ ] Support `withSpan`, `startSpan`, `finishSpan`, `failSpan`, and
|
||||
- [x] Support `withSpan`, `startSpan`, `finishSpan`, `failSpan`, and
|
||||
metadata merging.
|
||||
- [ ] Ensure active context propagates into `@n8n/agents` runtime calls.
|
||||
- [ ] Ensure spans flush before response close, suspension persistence, and
|
||||
- [x] Ensure active context propagates into `@n8n/agents` runtime calls.
|
||||
- [x] Ensure spans flush before response close, suspension persistence, and
|
||||
detached task completion.
|
||||
|
||||
3. Replace RunTree message turn roots
|
||||
|
||||
- [ ] Create `instance-ai.message_turn` as an OTel root span.
|
||||
- [ ] Persist OTel trace/span IDs in the agent tree snapshot.
|
||||
- [ ] Add metadata required by LangSmith thread view.
|
||||
- [ ] Remove RunTree creation from the normal foreground path.
|
||||
- [x] Create `instance-ai.message_turn` as an OTel root span.
|
||||
- [x] Persist OTel trace/span IDs in the agent tree snapshot.
|
||||
- [x] Add metadata required by LangSmith thread view.
|
||||
- [x] Remove RunTree creation from the normal foreground path.
|
||||
|
||||
4. Replace RunTree product child spans
|
||||
|
||||
- [ ] Convert `orchestrator`, `context_compaction`, and `prompt_build` to
|
||||
- [x] Convert `orchestrator`, `context_compaction`, and `prompt_build` to
|
||||
OTel spans.
|
||||
- [ ] Convert inline `subagent:*` spans to OTel spans under active context.
|
||||
- [ ] Convert HITL suspend/resume spans to OTel spans.
|
||||
- [ ] Convert selected side-effect-heavy tools to OTel product spans.
|
||||
- [x] Convert inline `subagent:*` spans to OTel spans under active context.
|
||||
- [x] Convert HITL suspend/resume spans to OTel spans.
|
||||
- [x] Convert selected side-effect-heavy tools to OTel product spans.
|
||||
|
||||
5. Preserve detached/background sub-agent linking
|
||||
|
||||
- [ ] Create detached sub-agent roots as separate OTel traces when they run
|
||||
- [x] Create detached sub-agent roots as separate OTel traces when they run
|
||||
outside the foreground context.
|
||||
- [ ] Add spawning metadata: trace ID, span ID, tool call ID, task ID, and
|
||||
- [x] Add spawning metadata: trace ID, span ID, tool call ID, task ID, and
|
||||
agent role.
|
||||
- [ ] Confirm thread queries show detached roots alongside foreground turns.
|
||||
|
||||
6. Rework feedback anchoring
|
||||
|
||||
- [ ] Choose explicit LangSmith IDs, derived OTel IDs, or metadata lookup.
|
||||
- [ ] Prove `Client.createFeedback` works against an OTel-only product root.
|
||||
- [ ] Persist the chosen IDs in the snapshot.
|
||||
- [ ] Remove RunTree as a feedback dependency.
|
||||
- [x] Choose explicit LangSmith IDs, derived OTel IDs, or metadata lookup.
|
||||
- [x] Prove `Client.createFeedback` works against an OTel-only product root.
|
||||
- [x] Persist the chosen IDs in the snapshot.
|
||||
- [x] Remove RunTree as a feedback dependency.
|
||||
|
||||
7. Remove RunTree live tracing
|
||||
|
||||
- [ ] Remove normal-path `RunTree` root creation.
|
||||
- [ ] Remove normal-path manual RunTree tool wrappers.
|
||||
- [x] Remove normal-path `RunTree` root creation.
|
||||
- [x] Remove normal-path manual RunTree tool wrappers.
|
||||
- [ ] Keep only temporary compatibility code behind an explicit flag, if
|
||||
needed for rollout.
|
||||
- [ ] Delete compatibility code after validation.
|
||||
|
||||
8. Decouple replay from tracing
|
||||
|
||||
- [ ] Ensure replay records stable Instance AI events, not span IDs.
|
||||
- [ ] Ensure replay tests pass with LangSmith disabled.
|
||||
- [x] Ensure replay records stable Instance AI events, not span IDs.
|
||||
- [x] Ensure replay tests pass with LangSmith disabled.
|
||||
- [ ] Optionally emit replay-tagged OTel spans for debugging only.
|
||||
|
||||
9. Add regression coverage
|
||||
|
||||
- [ ] Unit test metadata construction.
|
||||
- [ ] Unit test OTel product span parentage.
|
||||
- [ ] Unit test feedback ID persistence.
|
||||
- [ ] Unit test redaction preserving token usage.
|
||||
- [x] Unit test metadata construction.
|
||||
- [x] Unit test OTel product span parentage.
|
||||
- [x] Unit test feedback ID persistence.
|
||||
- [x] Unit test redaction preserving token usage.
|
||||
- [ ] Local exporter test proving one foreground message turn contains
|
||||
product spans, native provider spans, and local tool spans.
|
||||
- [ ] Live LangSmith validation behind explicit credentials.
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@ export interface AgentTreeSnapshot {
|
|||
runId: string;
|
||||
messageGroupId?: string;
|
||||
runIds?: string[];
|
||||
traceId?: string;
|
||||
spanId?: string;
|
||||
langsmithRunId?: string;
|
||||
langsmithTraceId?: string;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
import {
|
||||
createDetachedSubAgentTraceContext,
|
||||
getCurrentOtelSpanContext,
|
||||
getCurrentTraceToolCallId,
|
||||
mergeCurrentTraceMetadata,
|
||||
} from '../../tracing/langsmith-tracing';
|
||||
import type {
|
||||
|
|
@ -29,7 +31,7 @@ export async function startSubAgentTrace(
|
|||
if (!context.tracing) return undefined;
|
||||
|
||||
return await context.tracing.startChildRun(context.tracing.actorRun, {
|
||||
name: `subagent:${options.role}`,
|
||||
name: `instance-ai.subagent.${options.role}.stream`,
|
||||
tags: ['sub-agent'],
|
||||
metadata: {
|
||||
agent_role: options.role,
|
||||
|
|
@ -62,6 +64,11 @@ export async function createDetachedSubAgentTracing(
|
|||
typeof context.tracing.actorRun.metadata?.agent_id === 'string'
|
||||
? context.tracing.actorRun.metadata.agent_id
|
||||
: context.orchestratorAgentId;
|
||||
const spawnedByAgentRole =
|
||||
typeof context.tracing.actorRun.metadata?.agent_role === 'string'
|
||||
? context.tracing.actorRun.metadata.agent_role
|
||||
: undefined;
|
||||
const activeSpanContext = getCurrentOtelSpanContext();
|
||||
const tracing = await createDetachedSubAgentTraceContext({
|
||||
projectName: context.tracing.projectName,
|
||||
threadId: context.threadId,
|
||||
|
|
@ -79,9 +86,15 @@ export async function createDetachedSubAgentTracing(
|
|||
taskId: options.taskId,
|
||||
plannedTaskId: options.plannedTaskId,
|
||||
workItemId: options.workItemId,
|
||||
spawnedByTraceId: context.tracing.rootRun.traceId,
|
||||
spawnedByTraceId:
|
||||
activeSpanContext?.traceId ??
|
||||
context.tracing.rootRun.otelTraceId ??
|
||||
context.tracing.rootRun.traceId,
|
||||
spawnedBySpanId: activeSpanContext?.spanId ?? context.tracing.actorRun.otelSpanId,
|
||||
spawnedByRunId: context.tracing.actorRun.id,
|
||||
spawnedByAgentId,
|
||||
spawnedByAgentRole,
|
||||
spawnedByToolCallId: getCurrentTraceToolCallId(),
|
||||
proxyConfig: context.tracingProxyConfig,
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,156 @@
|
|||
import type { Context } from '@opentelemetry/api';
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
|
||||
import { executeTool } from '../../__tests__/tool-test-utils';
|
||||
|
||||
jest.mock('@n8n/agents', () => {
|
||||
const actual = jest.requireActual<Record<string, unknown>>('@n8n/agents');
|
||||
const { context, trace } = jest.requireActual<{
|
||||
context: {
|
||||
active(): Context;
|
||||
with<T>(ctx: Context, fn: () => T): T;
|
||||
};
|
||||
trace: {
|
||||
getSpan(ctx: Context): unknown;
|
||||
setSpan(ctx: Context, span: unknown): Context;
|
||||
};
|
||||
}>('@opentelemetry/api');
|
||||
|
||||
let spanCounter = 0;
|
||||
const spans: Array<{
|
||||
id: string;
|
||||
traceId: string;
|
||||
parentSpanId?: string;
|
||||
name: string;
|
||||
attributes: Record<string, unknown>;
|
||||
status?: { code: number; message?: string };
|
||||
ended: boolean;
|
||||
}> = [];
|
||||
|
||||
function nextHex(length: number): string {
|
||||
spanCounter += 1;
|
||||
return spanCounter.toString(16).padStart(length, '0').slice(-length);
|
||||
}
|
||||
|
||||
class MockSpan {
|
||||
private readonly traceId: string;
|
||||
private readonly spanId: string;
|
||||
private readonly record: (typeof spans)[number];
|
||||
|
||||
constructor(name: string, attributes: Record<string, unknown>, parentSpan?: MockSpan) {
|
||||
this.traceId = parentSpan?.spanContext().traceId ?? nextHex(32);
|
||||
this.spanId = nextHex(16);
|
||||
this.record = {
|
||||
id: this.spanId,
|
||||
traceId: this.traceId,
|
||||
...(parentSpan ? { parentSpanId: parentSpan.spanContext().spanId } : {}),
|
||||
name,
|
||||
attributes: { ...attributes },
|
||||
ended: false,
|
||||
};
|
||||
spans.push(this.record);
|
||||
}
|
||||
|
||||
spanContext(): { traceId: string; spanId: string } {
|
||||
return { traceId: this.traceId, spanId: this.spanId };
|
||||
}
|
||||
|
||||
setAttributes(attributes: Record<string, unknown>): void {
|
||||
Object.assign(this.record.attributes, attributes);
|
||||
}
|
||||
|
||||
recordException(): void {}
|
||||
|
||||
setStatus(status: { code: number; message?: string }): void {
|
||||
this.record.status = status;
|
||||
}
|
||||
|
||||
end(): void {
|
||||
this.record.ended = true;
|
||||
}
|
||||
}
|
||||
|
||||
const tracer = {
|
||||
startSpan: (
|
||||
name: string,
|
||||
options?: { attributes?: Record<string, unknown> },
|
||||
parentContext?: Context,
|
||||
) => {
|
||||
const parentSpan = trace.getSpan(parentContext ?? context.active()) as MockSpan | undefined;
|
||||
return new MockSpan(name, options?.attributes ?? {}, parentSpan);
|
||||
},
|
||||
startActiveSpan: async <T>(
|
||||
name: string,
|
||||
options: { attributes?: Record<string, unknown> },
|
||||
fn: (span: MockSpan) => Promise<T>,
|
||||
): Promise<T> => {
|
||||
const span = tracer.startSpan(name, options);
|
||||
const spanContext = trace.setSpan(context.active(), span as never);
|
||||
return await context.with(spanContext, async () => await fn(span));
|
||||
},
|
||||
};
|
||||
|
||||
const provider = {
|
||||
forceFlush: jest.fn(async () => await Promise.resolve()),
|
||||
shutdown: jest.fn(async () => await Promise.resolve()),
|
||||
};
|
||||
|
||||
class MockLangSmithTelemetry {
|
||||
private functionIdValue?: string;
|
||||
private metadataValue?: Record<string, unknown>;
|
||||
private recordInputsValue = true;
|
||||
private recordOutputsValue = true;
|
||||
|
||||
functionId(value: string): this {
|
||||
this.functionIdValue = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
metadata(value: Record<string, unknown>): this {
|
||||
this.metadataValue = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
recordInputs(value: boolean): this {
|
||||
this.recordInputsValue = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
recordOutputs(value: boolean): this {
|
||||
this.recordOutputsValue = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
async build(): Promise<Record<string, unknown>> {
|
||||
return await Promise.resolve({
|
||||
enabled: true,
|
||||
functionId: this.functionIdValue,
|
||||
metadata: this.metadataValue,
|
||||
recordInputs: this.recordInputsValue,
|
||||
recordOutputs: this.recordOutputsValue,
|
||||
integrations: [],
|
||||
tracer,
|
||||
provider,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
...actual,
|
||||
LangSmithTelemetry: MockLangSmithTelemetry,
|
||||
__mock: {
|
||||
reset: () => {
|
||||
spanCounter = 0;
|
||||
spans.length = 0;
|
||||
provider.forceFlush.mockClear();
|
||||
provider.shutdown.mockClear();
|
||||
},
|
||||
getSpans: () => spans,
|
||||
getProvider: () => provider,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
jest.mock('langsmith', () => {
|
||||
let runCounter = 0;
|
||||
const createdRunTrees: Array<{
|
||||
|
|
@ -245,6 +395,25 @@ type LangSmithMockModule = {
|
|||
};
|
||||
};
|
||||
|
||||
type AgentsMockModule = {
|
||||
__mock: {
|
||||
reset: () => void;
|
||||
getSpans: () => Array<{
|
||||
id: string;
|
||||
traceId: string;
|
||||
parentSpanId?: string;
|
||||
name: string;
|
||||
attributes: Record<string, unknown>;
|
||||
status?: { code: number; message?: string };
|
||||
ended: boolean;
|
||||
}>;
|
||||
getProvider: () => {
|
||||
forceFlush: jest.Mock<Promise<void>, []>;
|
||||
shutdown: jest.Mock<Promise<void>, []>;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
function isExecutableTool(
|
||||
value: unknown,
|
||||
): value is { handler: (input: unknown, context: unknown) => Promise<unknown> } {
|
||||
|
|
@ -274,6 +443,9 @@ const { createAskUserTool } =
|
|||
const { __mock: langsmithMock } =
|
||||
// eslint-disable-next-line @typescript-eslint/no-require-imports
|
||||
require('langsmith') as LangSmithMockModule;
|
||||
const { __mock: agentsMock } =
|
||||
// eslint-disable-next-line @typescript-eslint/no-require-imports
|
||||
require('@n8n/agents') as AgentsMockModule;
|
||||
|
||||
describe('createInstanceAiTraceContext', () => {
|
||||
const originalLangSmithApiKey = process.env.LANGSMITH_API_KEY;
|
||||
|
|
@ -282,6 +454,7 @@ describe('createInstanceAiTraceContext', () => {
|
|||
|
||||
beforeEach(() => {
|
||||
langsmithMock.reset();
|
||||
agentsMock.reset();
|
||||
process.env.LANGSMITH_API_KEY = 'test-key';
|
||||
delete process.env.LANGSMITH_TRACING;
|
||||
delete process.env.LANGCHAIN_TRACING_V2;
|
||||
|
|
@ -459,7 +632,7 @@ describe('createInstanceAiTraceContext', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('rehydrates child runs with their parent linkage before patching', async () => {
|
||||
it('finishes OTel child spans with their parent linkage', async () => {
|
||||
const tracing = await createInstanceAiTraceContext({
|
||||
threadId: 'thread-1',
|
||||
messageId: 'message-1',
|
||||
|
|
@ -475,9 +648,14 @@ describe('createInstanceAiTraceContext', () => {
|
|||
}),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
const patchTarget = langsmithMock.getCreatedRunTrees().at(-1);
|
||||
expect(patchTarget?.id).toBe(tracing?.orchestratorRun.id);
|
||||
expect(patchTarget?.parent_run_id).toBe(tracing?.messageRun.id);
|
||||
const spans = agentsMock.getSpans();
|
||||
const orchestratorSpan = spans.find((span) => span.id === tracing?.orchestratorRun.otelSpanId);
|
||||
expect(orchestratorSpan?.parentSpanId).toBe(tracing?.messageRun.otelSpanId);
|
||||
expect(orchestratorSpan?.ended).toBe(true);
|
||||
expect(orchestratorSpan?.attributes.gen_ai_completion).toBeUndefined();
|
||||
expect(orchestratorSpan?.attributes['gen_ai.completion']).toBe(
|
||||
JSON.stringify({ result: 'done' }),
|
||||
);
|
||||
});
|
||||
|
||||
it('reuses the same message root when continuing a trace for the same message group', async () => {
|
||||
|
|
@ -520,8 +698,11 @@ describe('createInstanceAiTraceContext', () => {
|
|||
kind: 'builder',
|
||||
taskId: 'build-1',
|
||||
spawnedByTraceId: 'trace-parent-1',
|
||||
spawnedBySpanId: 'span-parent-1',
|
||||
spawnedByRunId: 'run-parent-1',
|
||||
spawnedByAgentId: 'agent-001',
|
||||
spawnedByAgentRole: 'orchestrator',
|
||||
spawnedByToolCallId: 'toolu-1',
|
||||
input: { task: 'Build a workflow' },
|
||||
});
|
||||
|
||||
|
|
@ -529,7 +710,7 @@ describe('createInstanceAiTraceContext', () => {
|
|||
expect(tracing?.traceKind).toBe('detached_subagent');
|
||||
expect(tracing?.rootRun.id).toBe(tracing?.actorRun.id);
|
||||
expect(tracing?.rootRun.parentRunId).toBeUndefined();
|
||||
expect(tracing?.rootRun.name).toBe('subagent:workflow-builder');
|
||||
expect(tracing?.rootRun.name).toBe('instance-ai.subagent.workflow-builder');
|
||||
expect(tracing?.rootRun.metadata).toEqual(
|
||||
expect.objectContaining({
|
||||
thread_id: 'thread-1',
|
||||
|
|
@ -538,8 +719,11 @@ describe('createInstanceAiTraceContext', () => {
|
|||
task_kind: 'builder',
|
||||
agent_id: 'agent-builder-1',
|
||||
spawned_by_trace_id: 'trace-parent-1',
|
||||
spawned_by_span_id: 'span-parent-1',
|
||||
spawned_by_run_id: 'run-parent-1',
|
||||
spawned_by_agent_id: 'agent-001',
|
||||
spawned_by_agent_role: 'orchestrator',
|
||||
spawned_by_tool_call_id: 'toolu-1',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
|
@ -712,25 +896,28 @@ describe('createInstanceAiTraceContext', () => {
|
|||
}
|
||||
|
||||
await tracing!.withRunTree(tracing!.orchestratorRun, async () => {
|
||||
await executeTool(
|
||||
wrappedAskUser,
|
||||
await wrappedAskUser.handler(
|
||||
{
|
||||
questions: [{ id: 'q1', question: 'What do you want?', type: 'text' }],
|
||||
},
|
||||
{
|
||||
agent: {
|
||||
suspend: async () => {
|
||||
await Promise.resolve();
|
||||
return undefined;
|
||||
},
|
||||
toolCallId: 'toolu-ask',
|
||||
resumeData: undefined,
|
||||
suspend: async () => {
|
||||
await Promise.resolve();
|
||||
return undefined as never;
|
||||
},
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
const createdRunNames = langsmithMock.getCreatedRunTrees().map((run) => run.name);
|
||||
expect(createdRunNames).toContain('tool:ask-user');
|
||||
expect(createdRunNames).toContain('hitl:suspend');
|
||||
const spans = agentsMock.getSpans();
|
||||
const spanNames = spans.map((span) => span.name);
|
||||
expect(spanNames).toContain('instance-ai.tool.ask-user');
|
||||
expect(spanNames).toContain('instance-ai.hitl.suspend');
|
||||
expect(
|
||||
spans.find((span) => span.name === 'instance-ai.tool.ask-user')?.attributes.tool_call_id,
|
||||
).toBe('toolu-ask');
|
||||
});
|
||||
|
||||
it('does not wrap ordinary local tools for product-level LangSmith spans', async () => {
|
||||
|
|
@ -776,7 +963,7 @@ describe('createInstanceAiTraceContext', () => {
|
|||
expect(tracing).toBeDefined();
|
||||
|
||||
const subAgentRun = await tracing!.startChildRun(tracing!.orchestratorRun, {
|
||||
name: 'subagent:workflow-builder',
|
||||
name: 'instance-ai.subagent.workflow-builder.stream',
|
||||
tags: ['sub-agent'],
|
||||
metadata: { agent_role: 'workflow-builder' },
|
||||
inputs: { task: 'Build a workflow' },
|
||||
|
|
@ -795,10 +982,10 @@ describe('createInstanceAiTraceContext', () => {
|
|||
);
|
||||
});
|
||||
|
||||
const llmRun = langsmithMock
|
||||
.getCreatedRunTrees()
|
||||
.find((run) => run.name === 'llm:anthropic/claude-sonnet-4-6');
|
||||
expect(llmRun?.parent_run_id).toBe(subAgentRun.id);
|
||||
const llmSpan = agentsMock
|
||||
.getSpans()
|
||||
.find((span) => span.name === 'llm:anthropic/claude-sonnet-4-6');
|
||||
expect(llmSpan?.parentSpanId).toBe(subAgentRun.otelSpanId);
|
||||
});
|
||||
|
||||
it('traces resumed suspendable tools without extra HITL child span spam', async () => {
|
||||
|
|
@ -861,10 +1048,10 @@ describe('createInstanceAiTraceContext', () => {
|
|||
],
|
||||
});
|
||||
|
||||
const createdRunNames = langsmithMock.getCreatedRunTrees().map((run) => run.name);
|
||||
expect(createdRunNames).toContain('tool:ask-user:resume');
|
||||
expect(createdRunNames).not.toContain('hitl:resume');
|
||||
expect(createdRunNames).not.toContain('hitl:approval');
|
||||
const spanNames = agentsMock.getSpans().map((span) => span.name);
|
||||
expect(spanNames).toContain('instance-ai.tool.ask-user');
|
||||
expect(spanNames).toContain('instance-ai.hitl.resume');
|
||||
expect(spanNames).not.toContain('instance-ai.hitl.suspend');
|
||||
});
|
||||
|
||||
it('creates ad-hoc child spans under the current run tree', async () => {
|
||||
|
|
@ -890,8 +1077,8 @@ describe('createInstanceAiTraceContext', () => {
|
|||
expect(result).toBe(42);
|
||||
});
|
||||
|
||||
const createdRunNames = langsmithMock.getCreatedRunTrees().map((run) => run.name);
|
||||
expect(createdRunNames).toContain('prepare_context');
|
||||
const spanNames = agentsMock.getSpans().map((span) => span.name);
|
||||
expect(spanNames).toContain('prepare_context');
|
||||
});
|
||||
|
||||
it('creates trace context when proxyConfig is provided even without env vars', async () => {
|
||||
|
|
@ -920,7 +1107,7 @@ describe('createInstanceAiTraceContext', () => {
|
|||
expect(tracing?.orchestratorRun).toBeDefined();
|
||||
});
|
||||
|
||||
it('passes client to RunTree when proxyConfig is provided', async () => {
|
||||
it('creates OTel product spans when proxyConfig is provided', async () => {
|
||||
const tracing = await createInstanceAiTraceContext({
|
||||
threadId: 'thread-client',
|
||||
messageId: 'message-client',
|
||||
|
|
@ -936,14 +1123,15 @@ describe('createInstanceAiTraceContext', () => {
|
|||
|
||||
expect(tracing).toBeDefined();
|
||||
|
||||
const rootRunTree = langsmithMock
|
||||
.getCreatedRunTrees()
|
||||
.find((run) => run.name === 'message_turn' && run.client);
|
||||
expect(rootRunTree).toBeDefined();
|
||||
expect(rootRunTree?.client).toBeDefined();
|
||||
const rootSpan = agentsMock.getSpans().find((span) => span.name === 'instance-ai.message_turn');
|
||||
expect(rootSpan).toBeDefined();
|
||||
expect(langsmithMock.getCreatedRunTrees()).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('does not pass client to RunTree without proxyConfig', async () => {
|
||||
it('does not create RunTree spans without proxyConfig', async () => {
|
||||
// Regression: normal tracing must not mix RunTree product spans with OTel
|
||||
// native spans, because LangSmith treats those ingestion paths as separate
|
||||
// trace hierarchies.
|
||||
await createInstanceAiTraceContext({
|
||||
threadId: 'thread-no-proxy',
|
||||
messageId: 'message-no-proxy',
|
||||
|
|
@ -952,12 +1140,9 @@ describe('createInstanceAiTraceContext', () => {
|
|||
input: { message: 'no proxy test' },
|
||||
});
|
||||
|
||||
const rootRunTree = langsmithMock
|
||||
.getCreatedRunTrees()
|
||||
.find((run) => run.name === 'message_turn');
|
||||
expect(rootRunTree).toBeDefined();
|
||||
// Without proxyConfig, the direct client is used (never undefined)
|
||||
expect(rootRunTree?.client).toBeDefined();
|
||||
const rootSpan = agentsMock.getSpans().find((span) => span.name === 'instance-ai.message_turn');
|
||||
expect(rootSpan).toBeDefined();
|
||||
expect(langsmithMock.getCreatedRunTrees()).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('returns undefined when tracing is explicitly disabled even with proxy', async () => {
|
||||
|
|
@ -987,6 +1172,7 @@ describe('submitLangsmithUserFeedback', () => {
|
|||
|
||||
beforeEach(() => {
|
||||
langsmithMock.reset();
|
||||
agentsMock.reset();
|
||||
process.env.LANGSMITH_API_KEY = 'test-key';
|
||||
delete process.env.LANGSMITH_TRACING;
|
||||
delete process.env.LANGCHAIN_TRACING_V2;
|
||||
|
|
|
|||
|
|
@ -7,7 +7,12 @@ import {
|
|||
type Telemetry,
|
||||
type ToolContext,
|
||||
} from '@n8n/agents';
|
||||
import { context as otelContext, trace as otelTrace } from '@opentelemetry/api';
|
||||
import {
|
||||
ROOT_CONTEXT,
|
||||
SpanStatusCode,
|
||||
context as otelContext,
|
||||
trace as otelTrace,
|
||||
} from '@opentelemetry/api';
|
||||
import type { Context as OtelContext, Span as OtelApiSpan } from '@opentelemetry/api';
|
||||
import { Client, RunTree } from 'langsmith';
|
||||
import { getCurrentRunTree, withRunTree as withLangSmithRunTree } from 'langsmith/traceable';
|
||||
|
|
@ -50,6 +55,10 @@ const LOCAL_TOOL_TRACE_NAMES = new Set([
|
|||
'complete-checkpoint',
|
||||
]);
|
||||
const traceParentOverrideStorage = new AsyncLocalStorage<{ current: RunTree | null }>();
|
||||
const productTraceStorage = new AsyncLocalStorage<{
|
||||
runtime: ProductOtelTraceRuntime;
|
||||
currentRun: InstanceAiTraceRun;
|
||||
}>();
|
||||
|
||||
// Per-request proxy auth headers, isolated via AsyncLocalStorage.
|
||||
// The proxy Client is cached per deployment URL; each concurrent request
|
||||
|
|
@ -190,13 +199,19 @@ function startProductSpan(
|
|||
metadata?: Record<string, unknown>;
|
||||
inputs?: unknown;
|
||||
parentRun?: InstanceAiTraceRun;
|
||||
parentContext?: OtelContext;
|
||||
root?: boolean;
|
||||
},
|
||||
): InstanceAiTraceRun {
|
||||
if (!isOtelTracer(runtime.telemetry.tracer)) {
|
||||
throw new Error('Instance AI tracing requires an OpenTelemetry tracer');
|
||||
}
|
||||
|
||||
const parentContext = options.parentRun ? runtime.contexts.get(options.parentRun.id) : undefined;
|
||||
const parentContext = options.root
|
||||
? ROOT_CONTEXT
|
||||
: (options.parentContext ??
|
||||
(options.parentRun ? runtime.contexts.get(options.parentRun.id) : undefined) ??
|
||||
otelContext.active());
|
||||
const span = runtime.telemetry.tracer.startSpan(
|
||||
options.name,
|
||||
{
|
||||
|
|
@ -268,10 +283,10 @@ async function finishProductSpan(
|
|||
|
||||
if (options?.error) {
|
||||
span.recordException(new Error(options.error));
|
||||
span.setStatus({ code: 2, message: options.error });
|
||||
span.setStatus({ code: SpanStatusCode.ERROR, message: options.error });
|
||||
run.error = options.error;
|
||||
} else {
|
||||
span.setStatus({ code: 1 });
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
}
|
||||
|
||||
run.endTime = Date.now();
|
||||
|
|
@ -293,7 +308,65 @@ async function withProductSpanContext<T>(
|
|||
return await fn();
|
||||
}
|
||||
|
||||
return await otelContext.with(spanContext, fn);
|
||||
return await productTraceStorage.run(
|
||||
{ runtime, currentRun: run },
|
||||
async () => await otelContext.with(spanContext, fn),
|
||||
);
|
||||
}
|
||||
|
||||
function getCurrentProductTrace():
|
||||
| { runtime: ProductOtelTraceRuntime; currentRun: InstanceAiTraceRun }
|
||||
| undefined {
|
||||
return productTraceStorage.getStore();
|
||||
}
|
||||
|
||||
function getActiveOtelContextWithSpan(): OtelContext | undefined {
|
||||
const activeContext = otelContext.active();
|
||||
return otelTrace.getSpan(activeContext) ? activeContext : undefined;
|
||||
}
|
||||
|
||||
function spanMetadataAttributes(
|
||||
metadata: Record<string, unknown> | undefined,
|
||||
): Record<string, AttributeValue> {
|
||||
const attributes: Record<string, AttributeValue> = {};
|
||||
for (const [key, value] of Object.entries(metadata ?? {})) {
|
||||
const attributeValue = toTelemetryAttributeValue(value);
|
||||
if (attributeValue === undefined) continue;
|
||||
attributes[key] = attributeValue;
|
||||
if (!key.startsWith('langsmith.metadata.')) {
|
||||
attributes[`langsmith.metadata.${key}`] = attributeValue;
|
||||
}
|
||||
}
|
||||
return attributes;
|
||||
}
|
||||
|
||||
function updateProductRunMetadata(
|
||||
runtime: ProductOtelTraceRuntime,
|
||||
run: InstanceAiTraceRun,
|
||||
metadata: Record<string, unknown>,
|
||||
): void {
|
||||
const mergedMetadata = mergeMetadata(run.metadata, metadata);
|
||||
if (!mergedMetadata) return;
|
||||
|
||||
run.metadata = mergedMetadata;
|
||||
const attributes = spanMetadataAttributes(metadata);
|
||||
if (Object.keys(attributes).length > 0) {
|
||||
runtime.spans.get(run.id)?.setAttributes(attributes);
|
||||
}
|
||||
}
|
||||
|
||||
function updateProductRunInputs(
|
||||
runtime: ProductOtelTraceRuntime,
|
||||
run: InstanceAiTraceRun,
|
||||
inputs: Record<string, unknown>,
|
||||
): void {
|
||||
const mergedInputs = sanitizeTracePayload(mergeRunTreeInputs(run.inputs, inputs));
|
||||
run.inputs = mergedInputs;
|
||||
|
||||
const prompt = stringifyTracePayload(mergedInputs);
|
||||
if (prompt !== undefined) {
|
||||
runtime.spans.get(run.id)?.setAttributes({ [GEN_AI_PROMPT]: prompt });
|
||||
}
|
||||
}
|
||||
|
||||
/** Get a LangSmith Client that uses gzip encoding (no brotli). */
|
||||
|
|
@ -353,8 +426,11 @@ interface CreateDetachedSubAgentTraceContextOptions extends CreateInstanceAiTrac
|
|||
plannedTaskId?: string;
|
||||
workItemId?: string;
|
||||
spawnedByTraceId?: string;
|
||||
spawnedBySpanId?: string;
|
||||
spawnedByRunId?: string;
|
||||
spawnedByAgentId?: string;
|
||||
spawnedByAgentRole?: string;
|
||||
spawnedByToolCallId?: string;
|
||||
}
|
||||
|
||||
interface CurrentTraceSpanOptions<T = unknown> {
|
||||
|
|
@ -945,7 +1021,38 @@ export function setTraceParentOverride(parentRun: RunTree | null | undefined): v
|
|||
}
|
||||
}
|
||||
|
||||
export function getCurrentOtelSpanContext(): { traceId: string; spanId: string } | undefined {
|
||||
const activeSpanContext = otelTrace.getSpan(otelContext.active())?.spanContext();
|
||||
if (activeSpanContext) {
|
||||
return {
|
||||
traceId: activeSpanContext.traceId,
|
||||
spanId: activeSpanContext.spanId,
|
||||
};
|
||||
}
|
||||
|
||||
const currentRun = getCurrentProductTrace()?.currentRun;
|
||||
if (currentRun?.otelTraceId && currentRun.otelSpanId) {
|
||||
return {
|
||||
traceId: currentRun.otelTraceId,
|
||||
spanId: currentRun.otelSpanId,
|
||||
};
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function getCurrentTraceToolCallId(): string | undefined {
|
||||
const metadata = getCurrentProductTrace()?.currentRun.metadata;
|
||||
return typeof metadata?.tool_call_id === 'string' ? metadata.tool_call_id : undefined;
|
||||
}
|
||||
|
||||
export function mergeCurrentTraceMetadata(metadata: Record<string, unknown>): void {
|
||||
const currentProductTrace = getCurrentProductTrace();
|
||||
if (currentProductTrace) {
|
||||
updateProductRunMetadata(currentProductTrace.runtime, currentProductTrace.currentRun, metadata);
|
||||
return;
|
||||
}
|
||||
|
||||
const currentRun = getTraceParentRun();
|
||||
if (!currentRun) {
|
||||
return;
|
||||
|
|
@ -968,6 +1075,12 @@ export function mergeTraceRunInputs(
|
|||
const mergedInputs = sanitizeTracePayload(mergeRunTreeInputs(run.inputs, inputs));
|
||||
run.inputs = mergedInputs;
|
||||
|
||||
const currentProductTrace = getCurrentProductTrace();
|
||||
if (currentProductTrace) {
|
||||
updateProductRunInputs(currentProductTrace.runtime, run, inputs);
|
||||
return;
|
||||
}
|
||||
|
||||
const currentRun = getTraceParentRun();
|
||||
if (currentRun?.id === run.id) {
|
||||
currentRun.inputs = mergedInputs;
|
||||
|
|
@ -1030,6 +1143,36 @@ export async function withCurrentTraceSpan<T>(
|
|||
options: CurrentTraceSpanOptions<T>,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const currentProductTrace = getCurrentProductTrace();
|
||||
if (currentProductTrace) {
|
||||
const activeParentContext = getActiveOtelContextWithSpan();
|
||||
const spanRun = startProductSpan(currentProductTrace.runtime, {
|
||||
projectName: currentProductTrace.currentRun.projectName,
|
||||
name: options.name,
|
||||
runType: options.runType ?? 'chain',
|
||||
tags: options.tags,
|
||||
metadata: options.metadata,
|
||||
inputs: options.inputs,
|
||||
parentRun: currentProductTrace.currentRun,
|
||||
...(activeParentContext ? { parentContext: activeParentContext } : {}),
|
||||
});
|
||||
|
||||
try {
|
||||
const result = await withProductSpanContext(currentProductTrace.runtime, spanRun, fn);
|
||||
await finishProductSpan(currentProductTrace.runtime, spanRun, {
|
||||
...(options.processOutputs ? { outputs: options.processOutputs(result) } : {}),
|
||||
metadata: { final_status: 'completed' },
|
||||
});
|
||||
return result;
|
||||
} catch (error) {
|
||||
await finishProductSpan(currentProductTrace.runtime, spanRun, {
|
||||
error: normalizeErrorMessage(error),
|
||||
metadata: { final_status: 'error' },
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
const parentRun = getTraceParentRun();
|
||||
if (!parentRun) {
|
||||
return await fn();
|
||||
|
|
@ -1104,12 +1247,179 @@ function isInterruptibleToolContext(
|
|||
return isRecord(context) && typeof context.suspend === 'function';
|
||||
}
|
||||
|
||||
function getToolCallId(context: NativeToolContext): string | undefined {
|
||||
return isRecord(context) && typeof context.toolCallId === 'string'
|
||||
? context.toolCallId
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function getProductToolSpanName(toolName: string): string {
|
||||
if (toolName.startsWith('workspace_') || toolName === 'workspace' || toolName === 'write-file') {
|
||||
return 'instance-ai.tool.workspace_edit';
|
||||
}
|
||||
if (toolName === 'submit-workflow') {
|
||||
return 'instance-ai.tool.workflow_submit';
|
||||
}
|
||||
if (toolName === 'verify-built-workflow' || toolName === 'report-verification-verdict') {
|
||||
return 'instance-ai.tool.workflow_validation';
|
||||
}
|
||||
if (toolName === 'build-workflow' || toolName === 'build-workflow-with-agent') {
|
||||
return 'instance-ai.tool.workflow_build';
|
||||
}
|
||||
if (toolName === 'complete-checkpoint' || toolName === 'task-control') {
|
||||
return 'instance-ai.tool.background_task';
|
||||
}
|
||||
return `instance-ai.tool.${toolName.replace(/[^a-zA-Z0-9._-]+/g, '-')}`;
|
||||
}
|
||||
|
||||
async function startAndFinishProductChildSpan(
|
||||
currentTrace: { runtime: ProductOtelTraceRuntime; currentRun: InstanceAiTraceRun },
|
||||
options: {
|
||||
name: string;
|
||||
runType?: string;
|
||||
tags?: string[];
|
||||
metadata?: Record<string, unknown>;
|
||||
inputs?: unknown;
|
||||
outputs?: unknown;
|
||||
error?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
const activeParentContext = getActiveOtelContextWithSpan();
|
||||
const childRun = startProductSpan(currentTrace.runtime, {
|
||||
projectName: currentTrace.currentRun.projectName,
|
||||
name: options.name,
|
||||
runType: options.runType ?? 'chain',
|
||||
tags: options.tags,
|
||||
metadata: options.metadata,
|
||||
inputs: options.inputs,
|
||||
parentRun: currentTrace.currentRun,
|
||||
...(activeParentContext ? { parentContext: activeParentContext } : {}),
|
||||
});
|
||||
await finishProductSpan(currentTrace.runtime, childRun, {
|
||||
...(options.outputs !== undefined ? { outputs: options.outputs } : {}),
|
||||
...(options.error ? { error: options.error } : {}),
|
||||
metadata: {
|
||||
final_status: options.error ? 'error' : 'completed',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function traceProductToolExecute(
|
||||
tool: TraceableNativeTool,
|
||||
options: InstanceAiToolTraceOptions | undefined,
|
||||
input: unknown,
|
||||
context: NativeToolContext,
|
||||
currentTrace: { runtime: ProductOtelTraceRuntime; currentRun: InstanceAiTraceRun },
|
||||
): Promise<unknown> {
|
||||
const resumeData = isInterruptibleToolContext(context) ? context.resumeData : undefined;
|
||||
const isResume = resumeData !== undefined && resumeData !== null;
|
||||
const activeParentContext = getActiveOtelContextWithSpan();
|
||||
const toolCallId = getToolCallId(context);
|
||||
const toolRun = startProductSpan(currentTrace.runtime, {
|
||||
projectName: currentTrace.currentRun.projectName,
|
||||
name: getProductToolSpanName(tool.name),
|
||||
runType: 'tool',
|
||||
tags: normalizeTags(['tool'], options?.tags),
|
||||
metadata: mergeMetadata(options?.metadata, {
|
||||
tool_name: tool.name,
|
||||
...(toolCallId ? { tool_call_id: toolCallId } : {}),
|
||||
...(options?.agentRole ? { agent_role: options.agentRole } : {}),
|
||||
phase: isResume ? 'resume' : 'initial',
|
||||
...(isResume
|
||||
? mergeMetadata(buildSuspendMetadata(tool.name, resumeData), {
|
||||
approved: isRecord(resumeData) ? resumeData.approved : undefined,
|
||||
})
|
||||
: {}),
|
||||
}),
|
||||
inputs: { input },
|
||||
parentRun: currentTrace.currentRun,
|
||||
...(activeParentContext ? { parentContext: activeParentContext } : {}),
|
||||
});
|
||||
|
||||
let toolRunFinished = false;
|
||||
const finishToolRun = async (finishOptions?: InstanceAiTraceRunFinishOptions) => {
|
||||
if (toolRunFinished) return;
|
||||
toolRunFinished = true;
|
||||
await finishProductSpan(currentTrace.runtime, toolRun, finishOptions);
|
||||
};
|
||||
|
||||
const originalSuspend = isInterruptibleToolContext(context) ? context.suspend : undefined;
|
||||
const wrappedContext: NativeToolContext =
|
||||
typeof originalSuspend === 'function'
|
||||
? {
|
||||
...context,
|
||||
suspend: async (suspendPayload: unknown) => {
|
||||
await startAndFinishProductChildSpan(
|
||||
{ runtime: currentTrace.runtime, currentRun: toolRun },
|
||||
{
|
||||
name: 'instance-ai.hitl.suspend',
|
||||
runType: 'chain',
|
||||
tags: ['hitl'],
|
||||
metadata: buildSuspendMetadata(tool.name, suspendPayload),
|
||||
inputs: suspendPayload,
|
||||
outputs: suspendPayload,
|
||||
},
|
||||
);
|
||||
await finishToolRun({
|
||||
outputs: {
|
||||
status: 'suspended',
|
||||
suspendPayload,
|
||||
},
|
||||
metadata: mergeMetadata(buildSuspendMetadata(tool.name, suspendPayload), {
|
||||
final_status: 'suspended',
|
||||
}),
|
||||
});
|
||||
return await originalSuspend(suspendPayload);
|
||||
},
|
||||
}
|
||||
: context;
|
||||
|
||||
try {
|
||||
const result = await withProductSpanContext(currentTrace.runtime, toolRun, async () => {
|
||||
if (isResume) {
|
||||
await startAndFinishProductChildSpan(
|
||||
{ runtime: currentTrace.runtime, currentRun: toolRun },
|
||||
{
|
||||
name: 'instance-ai.hitl.resume',
|
||||
runType: 'chain',
|
||||
tags: ['hitl', 'resume'],
|
||||
metadata: mergeMetadata(buildSuspendMetadata(tool.name, resumeData), {
|
||||
approved: isRecord(resumeData) ? resumeData.approved : undefined,
|
||||
}),
|
||||
inputs: resumeData,
|
||||
outputs: {
|
||||
status: 'resumed',
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
return await tool.handler(input, wrappedContext);
|
||||
});
|
||||
await finishToolRun({
|
||||
outputs: result,
|
||||
metadata: { final_status: 'completed' },
|
||||
});
|
||||
return result;
|
||||
} catch (error) {
|
||||
await finishToolRun({
|
||||
error: normalizeErrorMessage(error),
|
||||
metadata: { final_status: 'error' },
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function traceSuspendableToolExecute(
|
||||
tool: TraceableNativeTool,
|
||||
options: InstanceAiToolTraceOptions | undefined,
|
||||
input: unknown,
|
||||
context: NativeToolContext,
|
||||
): Promise<unknown> {
|
||||
const currentProductTrace = getCurrentProductTrace();
|
||||
if (currentProductTrace) {
|
||||
return await traceProductToolExecute(tool, options, input, context, currentProductTrace);
|
||||
}
|
||||
|
||||
const parentRun = getTraceParentRun();
|
||||
if (!parentRun) {
|
||||
return await tool.handler(input, context);
|
||||
|
|
@ -1193,6 +1503,11 @@ async function traceToolExecute(
|
|||
input: unknown,
|
||||
context: NativeToolContext,
|
||||
): Promise<unknown> {
|
||||
const currentProductTrace = getCurrentProductTrace();
|
||||
if (currentProductTrace) {
|
||||
return await traceProductToolExecute(tool, options, input, context, currentProductTrace);
|
||||
}
|
||||
|
||||
const parentRun = getTraceParentRun();
|
||||
if (!parentRun) {
|
||||
return await tool.handler(input, context);
|
||||
|
|
@ -1252,8 +1567,9 @@ function createTraceContext(
|
|||
parentRun: InstanceAiTraceRun,
|
||||
init: InstanceAiTraceRunInit,
|
||||
): Promise<InstanceAiTraceRun> =>
|
||||
await withProxy(async () =>
|
||||
otelRuntime
|
||||
await withProxy(async () => {
|
||||
const activeParentContext = getActiveOtelContextWithSpan();
|
||||
return otelRuntime
|
||||
? startProductSpan(otelRuntime, {
|
||||
projectName,
|
||||
name: init.name,
|
||||
|
|
@ -1262,9 +1578,10 @@ function createTraceContext(
|
|||
metadata: mergeMetadata(parentRun.metadata, init.metadata),
|
||||
inputs: init.inputs,
|
||||
parentRun,
|
||||
...(activeParentContext ? { parentContext: activeParentContext } : {}),
|
||||
})
|
||||
: await createChildRun(parentRun, init),
|
||||
);
|
||||
: await createChildRun(parentRun, init);
|
||||
});
|
||||
|
||||
const withRunTree = async <T>(run: InstanceAiTraceRun, fn: () => Promise<T>): Promise<T> =>
|
||||
await withProxy(async () =>
|
||||
|
|
@ -1792,15 +2109,24 @@ export async function createInstanceAiTraceContext(
|
|||
name: 'instance-ai.message_turn',
|
||||
runType: 'chain',
|
||||
tags: ['message-turn'],
|
||||
metadata: mergeMetadata(baseMetadata, { agent_role: 'message_turn' }),
|
||||
metadata: mergeMetadata(baseMetadata, {
|
||||
agent_role: 'message_turn',
|
||||
execution_mode: 'foreground',
|
||||
trace_kind: 'message_turn',
|
||||
}),
|
||||
inputs: options.input,
|
||||
root: true,
|
||||
});
|
||||
const orchestratorRun = startProductSpan(otelRuntime, {
|
||||
projectName,
|
||||
name: 'instance-ai.orchestrator',
|
||||
name: 'instance-ai.orchestrator.stream',
|
||||
runType: 'chain',
|
||||
tags: ['orchestrator'],
|
||||
metadata: mergeMetadata(baseMetadata, { agent_role: 'orchestrator' }),
|
||||
metadata: mergeMetadata(baseMetadata, {
|
||||
agent_role: 'orchestrator',
|
||||
execution_mode: 'foreground',
|
||||
trace_kind: 'message_turn',
|
||||
}),
|
||||
inputs: options.input,
|
||||
parentRun: messageRun,
|
||||
});
|
||||
|
|
@ -1848,6 +2174,7 @@ export async function continueInstanceAiTraceContext(
|
|||
metadata: mergeMetadata(baseMetadata, {
|
||||
agent_role: 'orchestrator',
|
||||
execution_mode: 'resume',
|
||||
trace_kind: 'message_turn',
|
||||
}),
|
||||
inputs: options.input,
|
||||
parentRun: existingContext.messageRun,
|
||||
|
|
@ -1910,16 +2237,26 @@ export async function createDetachedSubAgentTraceContext(
|
|||
metadata: mergeMetadata(baseMetadata, {
|
||||
agent_role: options.role,
|
||||
agent_id: options.agentId,
|
||||
execution_mode: 'detached_subagent',
|
||||
trace_kind: 'detached_subagent',
|
||||
task_kind: options.kind,
|
||||
...(options.taskId ? { task_id: options.taskId } : {}),
|
||||
...(options.plannedTaskId ? { planned_task_id: options.plannedTaskId } : {}),
|
||||
...(options.workItemId ? { work_item_id: options.workItemId } : {}),
|
||||
...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}),
|
||||
...(options.spawnedBySpanId ? { spawned_by_span_id: options.spawnedBySpanId } : {}),
|
||||
...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}),
|
||||
...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}),
|
||||
...(options.spawnedByAgentRole
|
||||
? { spawned_by_agent_role: options.spawnedByAgentRole }
|
||||
: {}),
|
||||
...(options.spawnedByToolCallId
|
||||
? { spawned_by_tool_call_id: options.spawnedByToolCallId }
|
||||
: {}),
|
||||
subagent_role: options.role,
|
||||
}),
|
||||
inputs: options.input,
|
||||
root: true,
|
||||
});
|
||||
|
||||
return createTraceContext(
|
||||
|
|
@ -1937,13 +2274,22 @@ export async function createDetachedSubAgentTraceContext(
|
|||
mergeMetadata(baseMetadata, {
|
||||
agent_role: options.role,
|
||||
agent_id: options.agentId,
|
||||
execution_mode: 'detached_subagent',
|
||||
trace_kind: 'detached_subagent',
|
||||
task_kind: options.kind,
|
||||
...(options.taskId ? { task_id: options.taskId } : {}),
|
||||
...(options.plannedTaskId ? { planned_task_id: options.plannedTaskId } : {}),
|
||||
...(options.workItemId ? { work_item_id: options.workItemId } : {}),
|
||||
...(options.spawnedByTraceId ? { spawned_by_trace_id: options.spawnedByTraceId } : {}),
|
||||
...(options.spawnedBySpanId ? { spawned_by_span_id: options.spawnedBySpanId } : {}),
|
||||
...(options.spawnedByRunId ? { spawned_by_run_id: options.spawnedByRunId } : {}),
|
||||
...(options.spawnedByAgentId ? { spawned_by_agent_id: options.spawnedByAgentId } : {}),
|
||||
...(options.spawnedByAgentRole
|
||||
? { spawned_by_agent_role: options.spawnedByAgentRole }
|
||||
: {}),
|
||||
...(options.spawnedByToolCallId
|
||||
? { spawned_by_tool_call_id: options.spawnedByToolCallId }
|
||||
: {}),
|
||||
subagent_role: options.role,
|
||||
}) ?? baseMetadata,
|
||||
baseTelemetry: otelRuntime.telemetry,
|
||||
|
|
|
|||
|
|
@ -25,4 +25,10 @@ export class InstanceAiRunSnapshot extends WithTimestamps {
|
|||
|
||||
@Column({ type: 'varchar', length: 36, nullable: true })
|
||||
langsmithTraceId: string | null;
|
||||
|
||||
@Column({ type: 'varchar', length: 32, nullable: true })
|
||||
traceId: string | null;
|
||||
|
||||
@Column({ type: 'varchar', length: 16, nullable: true })
|
||||
spanId: string | null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2162,8 +2162,8 @@ export class InstanceAiService {
|
|||
payload: { message: 'Recalling conversation...' },
|
||||
});
|
||||
const contextCompactionRun = tracing
|
||||
? await tracing.startChildRun(tracing.actorRun, {
|
||||
name: 'context_compaction',
|
||||
? await tracing.startChildRun(tracing.messageRun, {
|
||||
name: 'instance-ai.context_compaction',
|
||||
tags: ['context'],
|
||||
metadata: { agent_role: 'context_compaction' },
|
||||
inputs: {
|
||||
|
|
@ -2212,8 +2212,8 @@ export class InstanceAiService {
|
|||
});
|
||||
|
||||
const promptBuildRun = tracing
|
||||
? await tracing.startChildRun(tracing.actorRun, {
|
||||
name: 'prompt_build',
|
||||
? await tracing.startChildRun(tracing.messageRun, {
|
||||
name: 'instance-ai.prompt_build',
|
||||
tags: ['prompt'],
|
||||
metadata: { agent_role: 'prompt_build' },
|
||||
inputs: {
|
||||
|
|
@ -3508,6 +3508,8 @@ export class InstanceAiService {
|
|||
const saveOptions = {
|
||||
messageGroupId,
|
||||
runIds: groupRunIds,
|
||||
traceId: tracing?.rootRun.otelTraceId,
|
||||
spanId: tracing?.rootRun.otelSpanId,
|
||||
langsmithRunId: tracing?.rootRun.id,
|
||||
langsmithTraceId: tracing?.rootRun.traceId,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ function makeRow(overrides: Partial<InstanceAiRunSnapshot> = {}): InstanceAiRunS
|
|||
messageGroupId: null,
|
||||
runIds: null,
|
||||
tree: JSON.stringify({ agentId: 'agent-root' }),
|
||||
traceId: null,
|
||||
spanId: null,
|
||||
langsmithRunId: null,
|
||||
langsmithTraceId: null,
|
||||
createdAt: new Date(),
|
||||
|
|
@ -81,10 +83,12 @@ describe('DbSnapshotStorage', () => {
|
|||
});
|
||||
|
||||
describe('save', () => {
|
||||
it('persists langsmith IDs via upsert', async () => {
|
||||
it('persists trace IDs via upsert', async () => {
|
||||
await storage.save('thread-1', { agentId: 'agent-root' } as never, 'run-1', {
|
||||
messageGroupId: 'mg-1',
|
||||
runIds: ['run-1'],
|
||||
traceId: '0123456789abcdef0123456789abcdef',
|
||||
spanId: '0123456789abcdef',
|
||||
langsmithRunId: 'ls-run-1',
|
||||
langsmithTraceId: 'ls-trace-1',
|
||||
});
|
||||
|
|
@ -95,6 +99,8 @@ describe('DbSnapshotStorage', () => {
|
|||
runId: 'run-1',
|
||||
messageGroupId: 'mg-1',
|
||||
runIds: ['run-1'],
|
||||
traceId: '0123456789abcdef0123456789abcdef',
|
||||
spanId: '0123456789abcdef',
|
||||
langsmithRunId: 'ls-run-1',
|
||||
langsmithTraceId: 'ls-trace-1',
|
||||
}),
|
||||
|
|
@ -102,20 +108,27 @@ describe('DbSnapshotStorage', () => {
|
|||
);
|
||||
});
|
||||
|
||||
it('writes nulls when langsmith IDs are absent', async () => {
|
||||
it('writes nulls when trace IDs are absent', async () => {
|
||||
await storage.save('thread-1', { agentId: 'agent-root' } as never, 'run-1');
|
||||
|
||||
expect(repo.upsert).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ langsmithRunId: null, langsmithTraceId: null }),
|
||||
expect.objectContaining({
|
||||
traceId: null,
|
||||
spanId: null,
|
||||
langsmithRunId: null,
|
||||
langsmithTraceId: null,
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('updateLast', () => {
|
||||
it('preserves existing langsmith IDs when the caller does not supply new ones', async () => {
|
||||
it('preserves existing trace IDs when the caller does not supply new ones', async () => {
|
||||
const existing = makeRow({
|
||||
messageGroupId: 'mg-1',
|
||||
traceId: 'existing-trace',
|
||||
spanId: 'existing-span',
|
||||
langsmithRunId: 'ls-run-existing',
|
||||
langsmithTraceId: 'ls-trace-existing',
|
||||
});
|
||||
|
|
@ -128,6 +141,8 @@ describe('DbSnapshotStorage', () => {
|
|||
expect(repo.update).toHaveBeenCalledWith(
|
||||
{ threadId: 'thread-1', runId: 'run-1' },
|
||||
expect.objectContaining({
|
||||
traceId: 'existing-trace',
|
||||
spanId: 'existing-span',
|
||||
langsmithRunId: 'ls-run-existing',
|
||||
langsmithTraceId: 'ls-trace-existing',
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ import { InstanceAiRunSnapshotRepository } from '../repositories/instance-ai-run
|
|||
export interface SaveSnapshotOptions {
|
||||
messageGroupId?: string;
|
||||
runIds?: string[];
|
||||
traceId?: string;
|
||||
spanId?: string;
|
||||
langsmithRunId?: string;
|
||||
langsmithTraceId?: string;
|
||||
}
|
||||
|
|
@ -44,6 +46,8 @@ export class DbSnapshotStorage {
|
|||
runId: row.runId,
|
||||
messageGroupId: row.messageGroupId ?? undefined,
|
||||
runIds: row.runIds ?? undefined,
|
||||
traceId: row.traceId ?? undefined,
|
||||
spanId: row.spanId ?? undefined,
|
||||
langsmithRunId: row.langsmithRunId ?? undefined,
|
||||
langsmithTraceId: row.langsmithTraceId ?? undefined,
|
||||
};
|
||||
|
|
@ -55,7 +59,7 @@ export class DbSnapshotStorage {
|
|||
runId: string,
|
||||
options: SaveSnapshotOptions = {},
|
||||
): Promise<void> {
|
||||
const { messageGroupId, runIds, langsmithRunId, langsmithTraceId } = options;
|
||||
const { messageGroupId, runIds, traceId, spanId, langsmithRunId, langsmithTraceId } = options;
|
||||
await this.repo.upsert(
|
||||
{
|
||||
threadId,
|
||||
|
|
@ -63,6 +67,8 @@ export class DbSnapshotStorage {
|
|||
messageGroupId: messageGroupId ?? null,
|
||||
runIds: runIds ?? null,
|
||||
tree: JSON.stringify(agentTree),
|
||||
traceId: traceId ?? null,
|
||||
spanId: spanId ?? null,
|
||||
langsmithRunId: langsmithRunId ?? null,
|
||||
langsmithTraceId: langsmithTraceId ?? null,
|
||||
},
|
||||
|
|
@ -76,7 +82,7 @@ export class DbSnapshotStorage {
|
|||
runId: string,
|
||||
options: SaveSnapshotOptions = {},
|
||||
): Promise<void> {
|
||||
const { messageGroupId, runIds, langsmithRunId, langsmithTraceId } = options;
|
||||
const { messageGroupId, runIds, traceId, spanId, langsmithRunId, langsmithTraceId } = options;
|
||||
|
||||
// Prefer lookup by messageGroupId when available
|
||||
if (messageGroupId) {
|
||||
|
|
@ -92,7 +98,9 @@ export class DbSnapshotStorage {
|
|||
tree: JSON.stringify(agentTree),
|
||||
messageGroupId,
|
||||
runIds: runIds ?? existing.runIds,
|
||||
// Preserve existing LangSmith IDs if caller didn't provide new ones.
|
||||
// Preserve existing trace IDs if caller didn't provide new ones.
|
||||
traceId: traceId ?? existing.traceId,
|
||||
spanId: spanId ?? existing.spanId,
|
||||
langsmithRunId: langsmithRunId ?? existing.langsmithRunId,
|
||||
langsmithTraceId: langsmithTraceId ?? existing.langsmithTraceId,
|
||||
},
|
||||
|
|
@ -110,6 +118,8 @@ export class DbSnapshotStorage {
|
|||
tree: JSON.stringify(agentTree),
|
||||
messageGroupId: messageGroupId ?? byRunId.messageGroupId,
|
||||
runIds: runIds ?? byRunId.runIds,
|
||||
traceId: traceId ?? byRunId.traceId,
|
||||
spanId: spanId ?? byRunId.spanId,
|
||||
langsmithRunId: langsmithRunId ?? byRunId.langsmithRunId,
|
||||
langsmithTraceId: langsmithTraceId ?? byRunId.langsmithTraceId,
|
||||
},
|
||||
|
|
@ -131,6 +141,8 @@ export class DbSnapshotStorage {
|
|||
runId: r.runId,
|
||||
messageGroupId: r.messageGroupId ?? undefined,
|
||||
runIds: r.runIds ?? undefined,
|
||||
traceId: r.traceId ?? undefined,
|
||||
spanId: r.spanId ?? undefined,
|
||||
langsmithRunId: r.langsmithRunId ?? undefined,
|
||||
langsmithTraceId: r.langsmithTraceId ?? undefined,
|
||||
}));
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user