mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-12 16:10:30 +02:00
feat(core): Add agents SDK telemetry hooks (no-changelog) (#30014)
This commit is contained in:
parent
a30772c933
commit
c94a403682
|
|
@ -2385,6 +2385,144 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
expect(expTelemetry.recordOutputs).toBe(false);
|
||||
});
|
||||
|
||||
it('wraps generate calls in a telemetry root span when the tracer supports active spans', async () => {
|
||||
generateText.mockResolvedValue(makeGenerateSuccess());
|
||||
const span = {
|
||||
end: jest.fn(),
|
||||
recordException: jest.fn(),
|
||||
setStatus: jest.fn(),
|
||||
};
|
||||
const tracer = {
|
||||
startActiveSpan: jest.fn(async (_name: string, _options: unknown, fn: unknown) => {
|
||||
if (typeof fn !== 'function') {
|
||||
throw new Error('Expected span callback');
|
||||
}
|
||||
const spanFn = fn as (spanValue: typeof span) => Promise<unknown>;
|
||||
return await spanFn(span);
|
||||
}),
|
||||
};
|
||||
const telemetry: BuiltTelemetry = { ...baseTelemetry, tracer };
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'telemetry-root-test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'test',
|
||||
eventBus: new AgentEventBus(),
|
||||
telemetry,
|
||||
});
|
||||
|
||||
await runtime.generate('hello');
|
||||
|
||||
expect(tracer.startActiveSpan).toHaveBeenCalledWith(
|
||||
'test-agent.generate',
|
||||
{
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
attributes: expect.objectContaining<Record<string, string>>({
|
||||
'langsmith.traceable': 'true',
|
||||
'langsmith.trace.name': 'test-agent.generate',
|
||||
'langsmith.span.kind': 'chain',
|
||||
'langsmith.metadata.agent_name': 'telemetry-root-test',
|
||||
'langsmith.metadata.env': 'test',
|
||||
}),
|
||||
},
|
||||
expect.any(Function),
|
||||
);
|
||||
expect(span.end).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('can suppress the generic runtime root span while keeping native telemetry enabled', async () => {
|
||||
generateText.mockResolvedValue(makeGenerateSuccess());
|
||||
const tracer = {
|
||||
startActiveSpan: jest.fn(),
|
||||
};
|
||||
const telemetry: BuiltTelemetry = {
|
||||
...baseTelemetry,
|
||||
runtimeRootSpanEnabled: false,
|
||||
tracer,
|
||||
};
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'telemetry-root-test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'test',
|
||||
eventBus: new AgentEventBus(),
|
||||
telemetry,
|
||||
});
|
||||
|
||||
await runtime.generate('hello');
|
||||
|
||||
expect(tracer.startActiveSpan).not.toHaveBeenCalled();
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||
const callArgs = generateText.mock.calls[0][0] as Record<string, unknown>;
|
||||
expect(callArgs.experimental_telemetry).toEqual(
|
||||
expect.objectContaining({
|
||||
isEnabled: true,
|
||||
functionId: 'test-agent',
|
||||
tracer,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('adds a LangSmith tool catalog to telemetry root spans', async () => {
|
||||
generateText.mockResolvedValue(makeGenerateSuccess());
|
||||
const span = {
|
||||
end: jest.fn(),
|
||||
recordException: jest.fn(),
|
||||
setStatus: jest.fn(),
|
||||
};
|
||||
const tracer = {
|
||||
startActiveSpan: jest.fn(async (_name: string, _options: unknown, fn: unknown) => {
|
||||
if (typeof fn !== 'function') {
|
||||
throw new Error('Expected span callback');
|
||||
}
|
||||
const spanFn = fn as (spanValue: typeof span) => Promise<unknown>;
|
||||
return await spanFn(span);
|
||||
}),
|
||||
};
|
||||
const telemetry: BuiltTelemetry = {
|
||||
...baseTelemetry,
|
||||
metadata: {
|
||||
...baseTelemetry.metadata,
|
||||
langsmith_trace_id: 'trace-1',
|
||||
langsmith_actor_run_id: 'actor-run-1',
|
||||
},
|
||||
tracer,
|
||||
};
|
||||
const tool = new ToolBuilder('lookup')
|
||||
.description('Lookup records')
|
||||
.input(z.object({ query: z.string() }))
|
||||
.handler(async () => await Promise.resolve({ ok: true }))
|
||||
.build();
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'telemetry-root-test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'test',
|
||||
eventBus: new AgentEventBus(),
|
||||
tools: [tool],
|
||||
telemetry,
|
||||
});
|
||||
|
||||
await runtime.generate('hello');
|
||||
|
||||
const rootSpanOptions = tracer.startActiveSpan.mock.calls[0][1] as {
|
||||
attributes: Record<string, unknown>;
|
||||
};
|
||||
const { attributes } = rootSpanOptions;
|
||||
expect(attributes).toEqual(
|
||||
expect.objectContaining({
|
||||
'langsmith.metadata.available_tools': ['lookup'],
|
||||
}),
|
||||
);
|
||||
expect(attributes).not.toHaveProperty('langsmith.trace.id');
|
||||
expect(attributes).not.toHaveProperty('langsmith.span.parent_id');
|
||||
expect(attributes['gen_ai.prompt']).toEqual(expect.stringContaining('"name":"lookup"'));
|
||||
expect(attributes['gen_ai.prompt']).toEqual(
|
||||
expect.stringContaining('"description":"Lookup records"'),
|
||||
);
|
||||
expect(attributes['gen_ai.prompt']).toEqual(expect.stringContaining('"input_schema"'));
|
||||
});
|
||||
|
||||
it('passes telemetry config into streamText as experimental_telemetry', async () => {
|
||||
streamText.mockReturnValue(makeStreamSuccess());
|
||||
|
||||
|
|
@ -2436,6 +2574,7 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
|
||||
it('passes resolved telemetry to tool handlers via parentTelemetry', async () => {
|
||||
let capturedTelemetry: BuiltTelemetry | undefined;
|
||||
let capturedToolCallId: string | undefined;
|
||||
|
||||
const spyTool: BuiltTool = new ToolBuilder('spy')
|
||||
.description('captures telemetry from context')
|
||||
|
|
@ -2443,6 +2582,7 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
.output(z.object({ ok: z.boolean() }))
|
||||
.handler(async (_input, ctx) => {
|
||||
capturedTelemetry = ctx.parentTelemetry;
|
||||
capturedToolCallId = ctx.toolCallId;
|
||||
return await Promise.resolve({ ok: true });
|
||||
})
|
||||
.build();
|
||||
|
|
@ -2463,6 +2603,82 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
await runtime.generate('test');
|
||||
|
||||
expect(capturedTelemetry).toBe(baseTelemetry);
|
||||
expect(capturedToolCallId).toBe('tc1');
|
||||
});
|
||||
|
||||
it('emits AI SDK-compatible telemetry spans for local tool execution', async () => {
|
||||
const spans: Array<{
|
||||
name: string;
|
||||
span: {
|
||||
end: jest.Mock;
|
||||
recordException: jest.Mock;
|
||||
setAttributes: jest.Mock;
|
||||
setStatus: jest.Mock;
|
||||
};
|
||||
}> = [];
|
||||
const tracer = {
|
||||
startActiveSpan: jest.fn(async (name: string, _options: unknown, fn: unknown) => {
|
||||
if (typeof fn !== 'function') {
|
||||
throw new Error('Expected span callback');
|
||||
}
|
||||
const span = {
|
||||
end: jest.fn(),
|
||||
recordException: jest.fn(),
|
||||
setAttributes: jest.fn(),
|
||||
setStatus: jest.fn(),
|
||||
};
|
||||
spans.push({ name, span });
|
||||
const spanFn = fn as (spanValue: typeof span) => Promise<unknown>;
|
||||
return await spanFn(span);
|
||||
}),
|
||||
};
|
||||
const telemetry: BuiltTelemetry = {
|
||||
...baseTelemetry,
|
||||
recordOutputs: true,
|
||||
tracer,
|
||||
};
|
||||
const spyTool: BuiltTool = new ToolBuilder('spy')
|
||||
.description('captures telemetry from context')
|
||||
.input(z.object({ x: z.string() }))
|
||||
.output(z.object({ ok: z.boolean() }))
|
||||
.handler(async () => await Promise.resolve({ ok: true }))
|
||||
.build();
|
||||
|
||||
generateText
|
||||
.mockResolvedValueOnce(makeGenerateWithToolCall('tc1', 'spy', { x: 'test' }))
|
||||
.mockResolvedValueOnce(makeGenerateSuccess('done'));
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'tool-telemetry-test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'test',
|
||||
eventBus: new AgentEventBus(),
|
||||
tools: [spyTool],
|
||||
telemetry,
|
||||
});
|
||||
|
||||
await runtime.generate('test');
|
||||
|
||||
const toolCallSpan = tracer.startActiveSpan.mock.calls.find(([name]) => name === 'ai.toolCall');
|
||||
expect(toolCallSpan).toBeDefined();
|
||||
expect(toolCallSpan?.[1]).toEqual({
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
attributes: expect.objectContaining<Record<string, string>>({
|
||||
'operation.name': 'ai.toolCall test-agent',
|
||||
'resource.name': 'test-agent',
|
||||
'ai.operationId': 'ai.toolCall',
|
||||
'ai.telemetry.functionId': 'test-agent',
|
||||
'ai.telemetry.metadata.env': 'test',
|
||||
'ai.toolCall.name': 'spy',
|
||||
'ai.toolCall.id': 'tc1',
|
||||
'ai.toolCall.args': '{"x":"test"}',
|
||||
}),
|
||||
});
|
||||
const toolSpan = spans.find((span) => span.name === 'ai.toolCall')?.span;
|
||||
expect(toolSpan?.setAttributes).toHaveBeenCalledWith({
|
||||
'ai.toolCall.result': '{"ok":true}',
|
||||
});
|
||||
expect(toolSpan?.end).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('passes inherited telemetry to tool handlers for sub-agent scenarios', async () => {
|
||||
|
|
|
|||
201
packages/@n8n/agents/src/__tests__/langsmith-telemetry.test.ts
Normal file
201
packages/@n8n/agents/src/__tests__/langsmith-telemetry.test.ts
Normal file
|
|
@ -0,0 +1,201 @@
|
|||
const mockExporterConfigs: unknown[] = [];
|
||||
const mockBatchProcessorInputs: unknown[] = [];
|
||||
const mockBatchProcessorInstances: Array<{
|
||||
forceFlush: jest.Mock<Promise<void>, []>;
|
||||
onStart: jest.Mock<void, [unknown, unknown]>;
|
||||
onEnd: jest.Mock<void, [unknown]>;
|
||||
shutdown: jest.Mock<Promise<void>, []>;
|
||||
}> = [];
|
||||
const mockProviderConfigs: unknown[] = [];
|
||||
const mockAwaitPendingTraceBatches = jest.fn(async () => await Promise.resolve());
|
||||
const mockTracer = { startSpan: jest.fn() };
|
||||
const mockProvider = {
|
||||
getTracer: jest.fn(() => mockTracer),
|
||||
register: jest.fn(),
|
||||
forceFlush: jest.fn(),
|
||||
shutdown: jest.fn(),
|
||||
};
|
||||
|
||||
jest.mock('langsmith/experimental/otel/exporter', () => ({
|
||||
LangSmithOTLPTraceExporter: jest.fn((config: unknown) => {
|
||||
mockExporterConfigs.push(config);
|
||||
return { type: 'exporter' };
|
||||
}),
|
||||
}));
|
||||
|
||||
jest.mock('@opentelemetry/sdk-trace-base', () => ({
|
||||
BatchSpanProcessor: jest.fn((exporter: unknown) => {
|
||||
mockBatchProcessorInputs.push(exporter);
|
||||
const processor = {
|
||||
forceFlush: jest.fn(async () => await Promise.resolve()),
|
||||
onStart: jest.fn(),
|
||||
onEnd: jest.fn(),
|
||||
shutdown: jest.fn(async () => await Promise.resolve()),
|
||||
};
|
||||
mockBatchProcessorInstances.push(processor);
|
||||
return processor;
|
||||
}),
|
||||
}));
|
||||
|
||||
jest.mock('langsmith', () => ({
|
||||
RunTree: {
|
||||
getSharedClient: jest.fn(() => ({
|
||||
awaitPendingTraceBatches: mockAwaitPendingTraceBatches,
|
||||
})),
|
||||
},
|
||||
}));
|
||||
|
||||
jest.mock('@opentelemetry/sdk-trace-node', () => ({
|
||||
NodeTracerProvider: jest.fn((config: unknown) => {
|
||||
mockProviderConfigs.push(config);
|
||||
return mockProvider;
|
||||
}),
|
||||
}));
|
||||
|
||||
import { LangSmithTelemetry } from '../integrations/langsmith';
|
||||
|
||||
describe('LangSmithTelemetry', () => {
|
||||
const previousTracingV2 = process.env.LANGCHAIN_TRACING_V2;
|
||||
|
||||
beforeEach(() => {
|
||||
mockExporterConfigs.length = 0;
|
||||
mockBatchProcessorInputs.length = 0;
|
||||
mockBatchProcessorInstances.length = 0;
|
||||
mockProviderConfigs.length = 0;
|
||||
mockAwaitPendingTraceBatches.mockClear();
|
||||
mockProvider.getTracer.mockClear();
|
||||
mockProvider.register.mockClear();
|
||||
mockProvider.forceFlush.mockClear();
|
||||
mockProvider.shutdown.mockClear();
|
||||
delete process.env.LANGCHAIN_TRACING_V2;
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
if (previousTracingV2 === undefined) {
|
||||
delete process.env.LANGCHAIN_TRACING_V2;
|
||||
} else {
|
||||
process.env.LANGCHAIN_TRACING_V2 = previousTracingV2;
|
||||
}
|
||||
});
|
||||
|
||||
it('passes proxy headers and derived OTLP URL to the LangSmith exporter', async () => {
|
||||
const transformExportedSpan = (span: unknown) => span;
|
||||
const getHeaders = jest.fn(async () => {
|
||||
await Promise.resolve();
|
||||
return { Authorization: 'Bearer proxy-token' } satisfies Record<string, string>;
|
||||
});
|
||||
const built = await new LangSmithTelemetry({
|
||||
apiKey: '-',
|
||||
project: 'instance-ai',
|
||||
endpoint: 'https://ai-proxy.test/langsmith',
|
||||
headers: getHeaders,
|
||||
transformExportedSpan,
|
||||
}).build();
|
||||
|
||||
expect(getHeaders).toHaveBeenCalledTimes(1);
|
||||
expect(mockExporterConfigs).toEqual([
|
||||
{
|
||||
apiKey: '-',
|
||||
projectName: 'instance-ai',
|
||||
headers: { Authorization: 'Bearer proxy-token' },
|
||||
transformExportedSpan,
|
||||
url: 'https://ai-proxy.test/langsmith/otel/v1/traces',
|
||||
},
|
||||
]);
|
||||
expect(mockBatchProcessorInputs).toEqual([{ type: 'exporter' }]);
|
||||
expect(mockProviderConfigs).toHaveLength(1);
|
||||
const providerConfig = mockProviderConfigs[0] as { spanProcessors: unknown[] };
|
||||
expect(providerConfig.spanProcessors).toHaveLength(1);
|
||||
const spanProcessor = providerConfig.spanProcessors[0] as Record<string, unknown>;
|
||||
expect(typeof spanProcessor.forceFlush).toBe('function');
|
||||
expect(typeof spanProcessor.onStart).toBe('function');
|
||||
expect(typeof spanProcessor.onEnd).toBe('function');
|
||||
expect(typeof spanProcessor.shutdown).toBe('function');
|
||||
expect(mockProvider.register).toHaveBeenCalledWith({ propagator: null });
|
||||
expect(mockProvider.getTracer).toHaveBeenCalledWith('@n8n/agents');
|
||||
expect(built.tracer).toBe(mockTracer);
|
||||
expect(built.provider).toBe(mockProvider);
|
||||
expect(process.env.LANGCHAIN_TRACING_V2).toBe('true');
|
||||
});
|
||||
|
||||
it('does not allow endpoint overrides when using an engine-resolved key', async () => {
|
||||
const telemetry = new LangSmithTelemetry({
|
||||
project: 'instance-ai',
|
||||
endpoint: 'https://should-not-be-used.test',
|
||||
});
|
||||
telemetry.resolvedApiKey = 'resolved-key';
|
||||
|
||||
await telemetry.build();
|
||||
|
||||
expect(mockExporterConfigs).toEqual([
|
||||
{
|
||||
apiKey: 'resolved-key',
|
||||
projectName: 'instance-ai',
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it('filters noisy AI SDK operation wrappers while preserving provider and tool spans', async () => {
|
||||
await new LangSmithTelemetry({
|
||||
apiKey: 'ls-test-key',
|
||||
project: 'instance-ai',
|
||||
}).build();
|
||||
|
||||
const processor = mockProviderConfigs[0] as {
|
||||
spanProcessors: Array<{
|
||||
onStart(span: unknown, parentContext: unknown): void;
|
||||
onEnd(span: unknown): void;
|
||||
}>;
|
||||
};
|
||||
const filteredProcessor = processor.spanProcessors[0];
|
||||
const delegate = mockBatchProcessorInstances[0];
|
||||
const makeSpan = (
|
||||
spanId: string,
|
||||
attributes: Record<string, unknown>,
|
||||
parentSpanId?: string,
|
||||
) => ({
|
||||
attributes,
|
||||
spanContext: () => ({ traceId: 'trace-1', spanId }),
|
||||
...(parentSpanId ? { parentSpanContext: { spanId: parentSpanId } } : {}),
|
||||
});
|
||||
|
||||
const root = makeSpan('1111111111111111', { 'langsmith.traceable': 'true' });
|
||||
const streamWrapper = makeSpan(
|
||||
'2222222222222222',
|
||||
{ 'ai.operationId': 'ai.streamText' },
|
||||
'1111111111111111',
|
||||
);
|
||||
const providerRequest = makeSpan(
|
||||
'3333333333333333',
|
||||
{ 'ai.operationId': 'ai.streamText.doStream' },
|
||||
'2222222222222222',
|
||||
);
|
||||
const toolCall = makeSpan(
|
||||
'4444444444444444',
|
||||
{ 'ai.operationId': 'ai.toolCall' },
|
||||
'2222222222222222',
|
||||
);
|
||||
|
||||
filteredProcessor.onStart(root, {});
|
||||
filteredProcessor.onStart(streamWrapper, {});
|
||||
filteredProcessor.onStart(providerRequest, {});
|
||||
filteredProcessor.onStart(toolCall, {});
|
||||
filteredProcessor.onEnd(toolCall);
|
||||
filteredProcessor.onEnd(providerRequest);
|
||||
filteredProcessor.onEnd(streamWrapper);
|
||||
filteredProcessor.onEnd(root);
|
||||
|
||||
expect(delegate.onStart).toHaveBeenCalledTimes(3);
|
||||
expect(delegate.onStart).toHaveBeenNthCalledWith(1, root, {});
|
||||
expect(delegate.onStart).toHaveBeenNthCalledWith(2, providerRequest, {});
|
||||
expect(delegate.onStart).toHaveBeenNthCalledWith(3, toolCall, {});
|
||||
expect(providerRequest.attributes).toEqual(
|
||||
expect.objectContaining({
|
||||
'langsmith.span.parent_id': '00000000-0000-0000-1111-111111111111',
|
||||
'langsmith.traceable_parent_otel_span_id': '1111111111111111',
|
||||
}),
|
||||
);
|
||||
expect(delegate.onEnd).toHaveBeenCalledTimes(3);
|
||||
expect(delegate.onEnd).not.toHaveBeenCalledWith(streamWrapper);
|
||||
});
|
||||
});
|
||||
|
|
@ -8,6 +8,7 @@ describe('Telemetry builder', () => {
|
|||
expect(built.enabled).toBe(true);
|
||||
expect(built.recordInputs).toBe(true);
|
||||
expect(built.recordOutputs).toBe(true);
|
||||
expect(built.runtimeRootSpanEnabled).toBe(true);
|
||||
expect(built.functionId).toBeUndefined();
|
||||
expect(built.metadata).toBeUndefined();
|
||||
expect(built.integrations).toEqual([]);
|
||||
|
|
@ -22,6 +23,7 @@ describe('Telemetry builder', () => {
|
|||
.metadata({ team: 'platform', version: 2 })
|
||||
.recordInputs(false)
|
||||
.recordOutputs(false)
|
||||
.runtimeRootSpan(false)
|
||||
.build();
|
||||
|
||||
expect(built.enabled).toBe(false);
|
||||
|
|
@ -29,6 +31,7 @@ describe('Telemetry builder', () => {
|
|||
expect(built.metadata).toEqual({ team: 'platform', version: 2 });
|
||||
expect(built.recordInputs).toBe(false);
|
||||
expect(built.recordOutputs).toBe(false);
|
||||
expect(built.runtimeRootSpanEnabled).toBe(false);
|
||||
});
|
||||
|
||||
it('accepts a pre-built tracer', async () => {
|
||||
|
|
|
|||
|
|
@ -2,9 +2,11 @@ import type * as AiImport from 'ai';
|
|||
import type { LanguageModel } from 'ai';
|
||||
|
||||
import { generateTitleFromMessage } from '../runtime/title-generation';
|
||||
import type { BuiltTelemetry } from '../types';
|
||||
|
||||
type GenerateTextCall = {
|
||||
messages: Array<{ role: string; content: string }>;
|
||||
experimental_telemetry?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
const mockGenerateText = jest.fn<Promise<{ text: string }>, [GenerateTextCall]>();
|
||||
|
|
@ -120,4 +122,67 @@ describe('generateTitleFromMessage', () => {
|
|||
const call = mockGenerateText.mock.calls[0][0];
|
||||
expect(call.messages[0].content).toBe('Custom system prompt');
|
||||
});
|
||||
|
||||
it('passes generic telemetry to the title LLM call', async () => {
|
||||
mockGenerateText.mockResolvedValue({ text: 'Berlin rain alert' });
|
||||
const telemetry: BuiltTelemetry = {
|
||||
enabled: true,
|
||||
functionId: 'instance-ai.thread-title',
|
||||
metadata: { thread_id: 'thread-1' },
|
||||
recordInputs: true,
|
||||
recordOutputs: false,
|
||||
runtimeRootSpanEnabled: false,
|
||||
integrations: [],
|
||||
};
|
||||
|
||||
await generateTitleFromMessage(fakeModel, 'Build a daily Berlin rain alert workflow', {
|
||||
telemetry,
|
||||
});
|
||||
|
||||
const call = mockGenerateText.mock.calls[0][0];
|
||||
expect(call.experimental_telemetry).toEqual({
|
||||
isEnabled: true,
|
||||
functionId: 'instance-ai.thread-title',
|
||||
metadata: { thread_id: 'thread-1' },
|
||||
recordInputs: true,
|
||||
recordOutputs: false,
|
||||
tracer: undefined,
|
||||
integrations: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('wraps the user message in a title-generation instruction so the model does not answer it', async () => {
|
||||
mockGenerateText.mockResolvedValue({ text: 'Berlin rain alert' });
|
||||
await generateTitleFromMessage(fakeModel, 'Build a daily Berlin rain alert workflow');
|
||||
const call = mockGenerateText.mock.calls[0][0];
|
||||
expect(call.messages[1].role).toBe('user');
|
||||
expect(call.messages[1].content).toContain('Generate a title');
|
||||
expect(call.messages[1].content).toContain('<message>');
|
||||
expect(call.messages[1].content).toContain('Build a daily Berlin rain alert workflow');
|
||||
expect(call.messages[1].content).toContain('</message>');
|
||||
});
|
||||
|
||||
it('drops a streamed code fence and everything after it', async () => {
|
||||
mockGenerateText.mockResolvedValue({
|
||||
text: 'Here\'s your chat workflow with the requested configuration:\n\n```json\n{\n "nodes": []\n}\n```',
|
||||
});
|
||||
const result = await generateTitleFromMessage(
|
||||
fakeModel,
|
||||
'build me a chat workflow with openai',
|
||||
);
|
||||
expect(result).toBe("Here's your chat workflow with the requested configuration");
|
||||
expect(result).not.toContain('```');
|
||||
expect(result).not.toContain('\n');
|
||||
});
|
||||
|
||||
it('collapses embedded newlines and stray backticks into a single-line title', async () => {
|
||||
mockGenerateText.mockResolvedValue({
|
||||
text: 'Scryfall\nrandom `card` workflow',
|
||||
});
|
||||
const result = await generateTitleFromMessage(
|
||||
fakeModel,
|
||||
'build a workflow that queries Scryfall for a random card',
|
||||
);
|
||||
expect(result).toBe('Scryfall random card workflow');
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,6 +1,167 @@
|
|||
import { Telemetry } from '../sdk/telemetry';
|
||||
import type { BuiltTelemetry, OpaqueTracer, OpaqueTracerProvider } from '../types/telemetry';
|
||||
|
||||
let registeredOtelContext = false;
|
||||
|
||||
const LANGSMITH_TRACEABLE = 'langsmith.traceable';
|
||||
const LANGSMITH_IS_ROOT = 'langsmith.is_root';
|
||||
const LANGSMITH_PARENT_RUN_ID = 'langsmith.span.parent_id';
|
||||
const LANGSMITH_TRACEABLE_PARENT_OTEL_SPAN_ID = 'langsmith.traceable_parent_otel_span_id';
|
||||
const AI_OPERATION_ID = 'ai.operationId';
|
||||
const TRACEABLE_AI_SDK_OPERATIONS = new Set([
|
||||
'ai.generateText.doGenerate',
|
||||
'ai.streamText.doStream',
|
||||
'ai.generateObject.doGenerate',
|
||||
'ai.streamObject.doStream',
|
||||
'ai.toolCall',
|
||||
]);
|
||||
|
||||
interface OtelSpanLike {
|
||||
attributes: Record<string, unknown>;
|
||||
spanContext(): {
|
||||
traceId: string;
|
||||
spanId: string;
|
||||
};
|
||||
parentSpanId?: string;
|
||||
parentSpanContext?: {
|
||||
spanId?: string;
|
||||
};
|
||||
}
|
||||
|
||||
interface SpanProcessorLike {
|
||||
forceFlush(): Promise<void>;
|
||||
onStart(span: unknown, parentContext: unknown): void;
|
||||
onEnd(span: unknown): void;
|
||||
shutdown(): Promise<void>;
|
||||
}
|
||||
|
||||
interface BatchSpanProcessorConstructor {
|
||||
new (exporter: unknown): SpanProcessorLike;
|
||||
}
|
||||
|
||||
interface LangSmithRunTree {
|
||||
getSharedClient(): {
|
||||
awaitPendingTraceBatches(): Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
function isOtelSpanLike(value: unknown): value is OtelSpanLike {
|
||||
return (
|
||||
value !== null &&
|
||||
typeof value === 'object' &&
|
||||
typeof Reflect.get(value, 'spanContext') === 'function' &&
|
||||
typeof Reflect.get(value, 'attributes') === 'object'
|
||||
);
|
||||
}
|
||||
|
||||
function getParentSpanId(span: OtelSpanLike): string | undefined {
|
||||
return span.parentSpanId ?? span.parentSpanContext?.spanId;
|
||||
}
|
||||
|
||||
function getUuidFromOtelSpanId(spanId: string): string {
|
||||
const paddedHex = spanId.padStart(16, '0');
|
||||
return `00000000-0000-0000-${paddedHex.substring(0, 4)}-${paddedHex.substring(4, 16)}`;
|
||||
}
|
||||
|
||||
function isTraceableSpan(span: OtelSpanLike): boolean {
|
||||
const operationId = span.attributes[AI_OPERATION_ID];
|
||||
return (
|
||||
span.attributes[LANGSMITH_TRACEABLE] === 'true' ||
|
||||
(typeof operationId === 'string' && TRACEABLE_AI_SDK_OPERATIONS.has(operationId))
|
||||
);
|
||||
}
|
||||
|
||||
function createLangSmithSpanProcessor(options: {
|
||||
exporter: unknown;
|
||||
BatchSpanProcessor: BatchSpanProcessorConstructor;
|
||||
RunTree: LangSmithRunTree;
|
||||
}): SpanProcessorLike {
|
||||
const delegate = new options.BatchSpanProcessor(options.exporter);
|
||||
const traceMap: Record<
|
||||
string,
|
||||
{
|
||||
spanCount: number;
|
||||
spanInfo: Record<string, { isTraceable: boolean; parentSpanId?: string }>;
|
||||
}
|
||||
> = {};
|
||||
|
||||
return {
|
||||
async forceFlush() {
|
||||
await delegate.forceFlush();
|
||||
},
|
||||
|
||||
onStart(span, parentContext) {
|
||||
if (!isOtelSpanLike(span)) {
|
||||
delegate.onStart(span, parentContext);
|
||||
return;
|
||||
}
|
||||
|
||||
const spanContext = span.spanContext();
|
||||
traceMap[spanContext.traceId] ??= {
|
||||
spanCount: 0,
|
||||
spanInfo: {},
|
||||
};
|
||||
|
||||
const traceInfo = traceMap[spanContext.traceId];
|
||||
traceInfo.spanCount++;
|
||||
const traceable = isTraceableSpan(span);
|
||||
const parentSpanId = getParentSpanId(span);
|
||||
traceInfo.spanInfo[spanContext.spanId] = {
|
||||
isTraceable: traceable,
|
||||
...(parentSpanId ? { parentSpanId } : {}),
|
||||
};
|
||||
|
||||
let currentCandidateParentSpanId = parentSpanId;
|
||||
let traceableParentSpanId: string | undefined;
|
||||
while (currentCandidateParentSpanId) {
|
||||
const currentSpanInfo = traceInfo.spanInfo[currentCandidateParentSpanId];
|
||||
if (currentSpanInfo?.isTraceable) {
|
||||
traceableParentSpanId = currentCandidateParentSpanId;
|
||||
break;
|
||||
}
|
||||
currentCandidateParentSpanId = currentSpanInfo?.parentSpanId;
|
||||
}
|
||||
|
||||
if (!traceableParentSpanId) {
|
||||
span.attributes[LANGSMITH_IS_ROOT] = true;
|
||||
} else {
|
||||
span.attributes[LANGSMITH_PARENT_RUN_ID] = getUuidFromOtelSpanId(traceableParentSpanId);
|
||||
span.attributes[LANGSMITH_TRACEABLE_PARENT_OTEL_SPAN_ID] = traceableParentSpanId;
|
||||
}
|
||||
|
||||
if (traceable) {
|
||||
delegate.onStart(span, parentContext);
|
||||
}
|
||||
},
|
||||
|
||||
onEnd(span) {
|
||||
if (!isOtelSpanLike(span)) {
|
||||
delegate.onEnd(span);
|
||||
return;
|
||||
}
|
||||
|
||||
const spanContext = span.spanContext();
|
||||
const traceInfo = traceMap[spanContext.traceId];
|
||||
const spanInfo = traceInfo?.spanInfo[spanContext.spanId];
|
||||
if (!traceInfo || !spanInfo) return;
|
||||
|
||||
traceInfo.spanCount--;
|
||||
if (traceInfo.spanCount <= 0) {
|
||||
delete traceMap[spanContext.traceId];
|
||||
}
|
||||
|
||||
if (spanInfo.isTraceable) {
|
||||
delegate.onEnd(span);
|
||||
}
|
||||
},
|
||||
|
||||
async shutdown() {
|
||||
await options.RunTree.getSharedClient().awaitPendingTraceBatches();
|
||||
await delegate.shutdown();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export interface LangSmithTelemetryConfig {
|
||||
/** LangSmith API key. If omitted, resolved via `.credential()` or LANGSMITH_API_KEY env var. */
|
||||
apiKey?: string;
|
||||
|
|
@ -13,6 +174,10 @@ export interface LangSmithTelemetryConfig {
|
|||
* as `${endpoint}/otel/v1/traces`. Use this for custom collectors or testing.
|
||||
*/
|
||||
url?: string;
|
||||
/** Default headers to send with LangSmith OTLP export requests. */
|
||||
headers?: Record<string, string> | (() => Promise<Record<string, string>>);
|
||||
/** Optional hook for redacting or annotating spans before LangSmith export. */
|
||||
transformExportedSpan?: (span: unknown) => unknown;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -29,6 +194,7 @@ async function createLangSmithTracer(
|
|||
spanProcessors?: unknown[];
|
||||
}) => OpaqueTracerProvider & {
|
||||
getTracer(name: string): OpaqueTracer;
|
||||
register(config?: { propagator?: null }): void;
|
||||
};
|
||||
};
|
||||
|
||||
|
|
@ -36,14 +202,16 @@ async function createLangSmithTracer(
|
|||
LangSmithOTLPTraceExporter: new (cfg?: {
|
||||
apiKey?: string;
|
||||
projectName?: string;
|
||||
endpoint?: string;
|
||||
url?: string;
|
||||
headers?: Record<string, string>;
|
||||
transformExportedSpan?: (span: unknown) => unknown;
|
||||
}) => unknown;
|
||||
};
|
||||
|
||||
const { LangSmithOTLPSpanProcessor } = (await import(
|
||||
'langsmith/experimental/otel/processor'
|
||||
)) as {
|
||||
LangSmithOTLPSpanProcessor: new (exporter: unknown) => unknown;
|
||||
const { BatchSpanProcessor } = (await import('@opentelemetry/sdk-trace-base')) as {
|
||||
BatchSpanProcessor: BatchSpanProcessorConstructor;
|
||||
};
|
||||
const { RunTree } = (await import('langsmith')) as {
|
||||
RunTree: LangSmithRunTree;
|
||||
};
|
||||
|
||||
// SECURITY: When the engine-resolved credential is the active key (i.e. no
|
||||
|
|
@ -55,19 +223,34 @@ async function createLangSmithTracer(
|
|||
? undefined
|
||||
: (config?.url ??
|
||||
(config?.endpoint ? `${config.endpoint.replace(/\/$/, '')}/otel/v1/traces` : undefined));
|
||||
const headers = typeof config?.headers === 'function' ? await config.headers() : config?.headers;
|
||||
|
||||
const exporter = new LangSmithOTLPTraceExporter({
|
||||
apiKey,
|
||||
projectName: config?.project,
|
||||
...(headers ? { headers } : {}),
|
||||
...(config?.transformExportedSpan
|
||||
? { transformExportedSpan: config.transformExportedSpan }
|
||||
: {}),
|
||||
...(url ? { url } : {}),
|
||||
});
|
||||
|
||||
const processor = new LangSmithOTLPSpanProcessor(exporter);
|
||||
const processor = createLangSmithSpanProcessor({
|
||||
exporter,
|
||||
BatchSpanProcessor,
|
||||
RunTree,
|
||||
});
|
||||
|
||||
const provider = new NodeTracerProvider({
|
||||
spanProcessors: [processor],
|
||||
});
|
||||
// Do NOT call provider.register() — avoid polluting the global tracer provider.
|
||||
if (!registeredOtelContext) {
|
||||
// AI SDK creates nested operation/provider/tool spans through the active
|
||||
// OpenTelemetry context. Without the Node context manager these spans are
|
||||
// exported as separate root traces even when an explicit tracer is passed.
|
||||
provider.register({ propagator: null });
|
||||
registeredOtelContext = true;
|
||||
}
|
||||
|
||||
return { tracer: provider.getTracer('@n8n/agents'), provider };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import { isLlmMessage } from '../sdk/message';
|
|||
import type {
|
||||
AgentRunState,
|
||||
AnthropicThinkingConfig,
|
||||
AttributeValue,
|
||||
BuiltMemory,
|
||||
BuiltProviderTool,
|
||||
BuiltTelemetry,
|
||||
|
|
@ -30,6 +31,7 @@ import type {
|
|||
XaiThinkingConfig,
|
||||
} from '../types';
|
||||
import { AgentEventBus } from './event-bus';
|
||||
import { toJsonValue } from './json-value';
|
||||
import { saveMessagesToThread } from './memory-store';
|
||||
import { AgentMessageList, type SerializedMessageList } from './message-list';
|
||||
import { fromAiFinishReason, fromAiMessages } from './messages';
|
||||
|
|
@ -68,6 +70,89 @@ import type { JSONObject, JSONValue } from '../types/utils/json';
|
|||
import { parseWithSchema } from '../utils/parse';
|
||||
import { isZodSchema } from '../utils/zod';
|
||||
|
||||
interface TelemetrySpan {
|
||||
end(): void;
|
||||
recordException?(error: unknown): void;
|
||||
setAttributes?(attributes: Record<string, AttributeValue>): void;
|
||||
setStatus?(status: { code: number; message?: string }): void;
|
||||
}
|
||||
|
||||
interface ActiveSpanTracer {
|
||||
startActiveSpan<T>(
|
||||
name: string,
|
||||
options: { attributes?: Record<string, AttributeValue> },
|
||||
fn: (span: TelemetrySpan) => T,
|
||||
): T;
|
||||
}
|
||||
|
||||
function isActiveSpanTracer(value: unknown): value is ActiveSpanTracer {
|
||||
return (
|
||||
value !== null &&
|
||||
typeof value === 'object' &&
|
||||
typeof Reflect.get(value, 'startActiveSpan') === 'function'
|
||||
);
|
||||
}
|
||||
|
||||
function stringifyTelemetryValue(value: unknown): string | undefined {
|
||||
try {
|
||||
return JSON.stringify(value);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function getToolInputSchema(tool: BuiltTool | BuiltProviderTool): unknown {
|
||||
if (!tool.inputSchema) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return isZodSchema(tool.inputSchema) ? zodToJsonSchema(tool.inputSchema) : tool.inputSchema;
|
||||
}
|
||||
|
||||
function summarizeToolForTelemetry(tool: BuiltTool): Record<string, unknown> {
|
||||
return {
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
type: tool.mcpTool ? 'mcp' : 'local',
|
||||
...(tool.mcpServerName ? { mcp_server: tool.mcpServerName } : {}),
|
||||
...(tool.suspendSchema || tool.resumeSchema || tool.withDefaultApproval
|
||||
? { approval: true }
|
||||
: {}),
|
||||
...(tool.inputSchema ? { input_schema: getToolInputSchema(tool) } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function summarizeProviderToolForTelemetry(tool: BuiltProviderTool): Record<string, unknown> {
|
||||
const [provider] = tool.name.split('.');
|
||||
return {
|
||||
name: tool.name,
|
||||
provider,
|
||||
type: 'provider',
|
||||
args: tool.args,
|
||||
...(tool.inputSchema ? { input_schema: getToolInputSchema(tool) } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function buildAgentRootInputAttributes(config: AgentRuntimeConfig): Record<string, AttributeValue> {
|
||||
const localTools = (config.tools ?? []).map(summarizeToolForTelemetry);
|
||||
const providerTools = (config.providerTools ?? []).map(summarizeProviderToolForTelemetry);
|
||||
const tools = [...localTools, ...providerTools];
|
||||
const toolNames = tools
|
||||
.map((tool) => (typeof tool.name === 'string' ? tool.name : undefined))
|
||||
.filter((name): name is string => name !== undefined);
|
||||
|
||||
const serialized = stringifyTelemetryValue({
|
||||
agent: config.name,
|
||||
tool_count: tools.length,
|
||||
tools,
|
||||
});
|
||||
|
||||
return {
|
||||
...(toolNames.length > 0 ? { 'langsmith.metadata.available_tools': toolNames } : {}),
|
||||
...(serialized ? { 'gen_ai.prompt': serialized } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
export interface AgentRuntimeConfig {
|
||||
name: string;
|
||||
model: ModelConfig;
|
||||
|
|
@ -246,8 +331,14 @@ export class AgentRuntime {
|
|||
const runId = generateRunId();
|
||||
let list: AgentMessageList | undefined = undefined;
|
||||
try {
|
||||
list = await this.initRun(input, options);
|
||||
const rawResult = await this.runGenerateLoop({ list, options, runId });
|
||||
const initializedList = await this.initRun(input, options);
|
||||
list = initializedList;
|
||||
const rawResult = await this.withTelemetryRootSpan(
|
||||
'generate',
|
||||
options,
|
||||
runId,
|
||||
async () => await this.runGenerateLoop({ list: initializedList, options, runId }),
|
||||
);
|
||||
return this.finalizeGenerate(rawResult, list, runId);
|
||||
} catch (error) {
|
||||
await this.flushTelemetry(options);
|
||||
|
|
@ -354,12 +445,18 @@ export class AgentRuntime {
|
|||
await this.setListWorkingMemoryConfig(list, state.persistence);
|
||||
|
||||
if (method === 'generate') {
|
||||
const rawResult = await this.runGenerateLoop({
|
||||
const rawResult = await this.withTelemetryRootSpan(
|
||||
'generate',
|
||||
resumeOptions,
|
||||
options.runId,
|
||||
async () =>
|
||||
await this.runGenerateLoop({
|
||||
list,
|
||||
options: resumeOptions,
|
||||
runId: options.runId,
|
||||
pendingResume,
|
||||
});
|
||||
}),
|
||||
);
|
||||
if (!rawResult.pendingSuspend) {
|
||||
await this.cleanupRun(options.runId);
|
||||
}
|
||||
|
|
@ -621,6 +718,120 @@ export class AgentRuntime {
|
|||
};
|
||||
}
|
||||
|
||||
private buildTelemetryRootAttributes(
|
||||
t: BuiltTelemetry,
|
||||
spanName: string,
|
||||
runId: string,
|
||||
): Record<string, AttributeValue> {
|
||||
const metadataAttributes = this.buildTelemetryMetadataAttributes(t, 'langsmith.metadata');
|
||||
|
||||
return {
|
||||
'langsmith.traceable': 'true',
|
||||
'langsmith.trace.name': spanName,
|
||||
'langsmith.span.kind': 'chain',
|
||||
'langsmith.metadata.agent_name': this.config.name,
|
||||
'langsmith.metadata.agent_run_id': runId,
|
||||
...metadataAttributes,
|
||||
...buildAgentRootInputAttributes(this.config),
|
||||
};
|
||||
}
|
||||
|
||||
private buildTelemetryMetadataAttributes(
|
||||
t: BuiltTelemetry,
|
||||
prefix: string,
|
||||
): Record<string, AttributeValue> {
|
||||
return Object.fromEntries(
|
||||
Object.entries(t.metadata ?? {}).map(([key, value]) => [`${prefix}.${key}`, value]),
|
||||
);
|
||||
}
|
||||
|
||||
private buildAiSdkOperationAttributes(
|
||||
operationId: string,
|
||||
t: BuiltTelemetry,
|
||||
): Record<string, AttributeValue> {
|
||||
const functionId = t.functionId ?? this.config.name;
|
||||
return {
|
||||
'operation.name': `${operationId} ${functionId}`,
|
||||
'resource.name': functionId,
|
||||
'ai.operationId': operationId,
|
||||
'ai.telemetry.functionId': functionId,
|
||||
...this.buildTelemetryMetadataAttributes(t, 'ai.telemetry.metadata'),
|
||||
};
|
||||
}
|
||||
|
||||
private async withTelemetryRootSpan<T>(
|
||||
operation: 'generate' | 'stream',
|
||||
options: ExecutionOptions | undefined,
|
||||
runId: string,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const t = this.resolveTelemetry(options);
|
||||
if (!t?.enabled || t.runtimeRootSpanEnabled === false || !isActiveSpanTracer(t.tracer)) {
|
||||
return await fn();
|
||||
}
|
||||
|
||||
const spanName = `${t.functionId ?? this.config.name}.${operation}`;
|
||||
return await t.tracer.startActiveSpan(
|
||||
spanName,
|
||||
{ attributes: this.buildTelemetryRootAttributes(t, spanName, runId) },
|
||||
async (span) => {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (error) {
|
||||
span.recordException?.(error);
|
||||
span.setStatus?.({ code: 2, message: String(error) });
|
||||
throw error;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private async withTelemetryToolSpan<T>(
|
||||
toolCallId: string,
|
||||
toolName: string,
|
||||
input: JSONValue,
|
||||
t: BuiltTelemetry | undefined,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
if (!t?.enabled || !isActiveSpanTracer(t.tracer)) {
|
||||
return await fn();
|
||||
}
|
||||
|
||||
const shouldRecordInputs = t.recordInputs ?? true;
|
||||
const inputValue = shouldRecordInputs ? stringifyTelemetryValue(input) : undefined;
|
||||
|
||||
return await t.tracer.startActiveSpan(
|
||||
'ai.toolCall',
|
||||
{
|
||||
attributes: {
|
||||
...this.buildAiSdkOperationAttributes('ai.toolCall', t),
|
||||
'ai.toolCall.name': toolName,
|
||||
'ai.toolCall.id': toolCallId,
|
||||
...(inputValue !== undefined ? { 'ai.toolCall.args': inputValue } : {}),
|
||||
},
|
||||
},
|
||||
async (span) => {
|
||||
try {
|
||||
const result = await fn();
|
||||
const shouldRecordOutputs = t.recordOutputs ?? true;
|
||||
const outputValue = shouldRecordOutputs ? stringifyTelemetryValue(result) : undefined;
|
||||
if (outputValue !== undefined) {
|
||||
span.setAttributes?.({ 'ai.toolCall.result': outputValue });
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
span.recordException?.(error);
|
||||
span.setStatus?.({ code: 2, message: String(error) });
|
||||
throw error;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/** Core generate loop using generateText (non-streaming). */
|
||||
private async runGenerateLoop(ctx: LoopContext): Promise<GenerateResult> {
|
||||
const { list, options, runId, pendingResume } = ctx;
|
||||
|
|
@ -815,7 +1026,12 @@ export class AgentRuntime {
|
|||
};
|
||||
this.eventBus.on(AgentEvent.ToolExecutionStart, onToolExecutionStart);
|
||||
|
||||
this.runStreamLoop({ ...ctx, writer })
|
||||
this.withTelemetryRootSpan(
|
||||
'stream',
|
||||
options,
|
||||
runId,
|
||||
async () => await this.runStreamLoop({ ...ctx, writer }),
|
||||
)
|
||||
.catch(async (error: unknown) => {
|
||||
await this.flushTelemetry(options);
|
||||
await this.cleanupRun(runId);
|
||||
|
|
@ -1572,7 +1788,14 @@ export class AgentRuntime {
|
|||
|
||||
let toolResult: unknown;
|
||||
try {
|
||||
toolResult = await executeTool(toolInput, builtTool, resumeData, resolvedTelemetry);
|
||||
toolResult = await this.withTelemetryToolSpan(
|
||||
toolCallId,
|
||||
toolName,
|
||||
toolInput,
|
||||
resolvedTelemetry,
|
||||
async () =>
|
||||
await executeTool(toolInput, builtTool, resumeData, resolvedTelemetry, toolCallId),
|
||||
);
|
||||
} catch (error) {
|
||||
return makeToolError(error as Error);
|
||||
}
|
||||
|
|
@ -1623,7 +1846,7 @@ export class AgentRuntime {
|
|||
? builtTool.toModelOutput(actualResult)
|
||||
: actualResult;
|
||||
|
||||
list.setToolCallResult(toolCallId, modelResult as JSONValue);
|
||||
list.setToolCallResult(toolCallId, toJsonValue(modelResult));
|
||||
|
||||
const customMessage = builtTool?.toMessage?.(actualResult);
|
||||
if (customMessage) {
|
||||
|
|
|
|||
50
packages/@n8n/agents/src/runtime/json-value.ts
Normal file
50
packages/@n8n/agents/src/runtime/json-value.ts
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
import type { JSONObject, JSONValue } from '../types/utils/json';
|
||||
|
||||
export function toJsonValue(value: unknown, seen = new WeakSet<object>()): JSONValue {
|
||||
if (value === null || typeof value === 'string' || typeof value === 'boolean') {
|
||||
return value;
|
||||
}
|
||||
|
||||
if (typeof value === 'number') {
|
||||
return Number.isFinite(value) ? value : null;
|
||||
}
|
||||
|
||||
if (value === undefined || typeof value === 'function' || typeof value === 'symbol') {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (typeof value === 'bigint') {
|
||||
return value.toString();
|
||||
}
|
||||
|
||||
if (value instanceof Date) {
|
||||
return value.toISOString();
|
||||
}
|
||||
|
||||
if (value instanceof Error) {
|
||||
return {
|
||||
name: value.name,
|
||||
message: value.message,
|
||||
};
|
||||
}
|
||||
|
||||
if (Array.isArray(value)) {
|
||||
return value.map((entry) => toJsonValue(entry, seen));
|
||||
}
|
||||
|
||||
if (typeof value === 'object') {
|
||||
if (seen.has(value)) {
|
||||
return '[Circular]';
|
||||
}
|
||||
|
||||
seen.add(value);
|
||||
const result: JSONObject = {};
|
||||
for (const [key, entryValue] of Object.entries(value)) {
|
||||
result[key] = toJsonValue(entryValue, seen);
|
||||
}
|
||||
seen.delete(value);
|
||||
return result;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
import { generateText, type LanguageModel } from 'ai';
|
||||
|
||||
import type { BuiltMemory, TitleGenerationConfig } from '../types';
|
||||
import type { BuiltMemory, BuiltTelemetry, TitleGenerationConfig } from '../types';
|
||||
import { createFilteredLogger } from './logger';
|
||||
import { createModel } from './model-factory';
|
||||
import type { ModelConfig } from '../types/sdk/agent';
|
||||
|
|
@ -47,6 +47,27 @@ const TRIVIAL_MESSAGE_MAX_CHARS = 15;
|
|||
const TRIVIAL_MESSAGE_MAX_WORDS = 3;
|
||||
const MAX_TITLE_LENGTH = 80;
|
||||
|
||||
interface GenerateTitleFromMessageOptions {
|
||||
instructions?: string;
|
||||
telemetry?: BuiltTelemetry;
|
||||
}
|
||||
|
||||
function buildTelemetryOptions(telemetry: BuiltTelemetry | undefined): Record<string, unknown> {
|
||||
if (!telemetry?.enabled) return {};
|
||||
|
||||
return {
|
||||
experimental_telemetry: {
|
||||
isEnabled: true,
|
||||
functionId: telemetry.functionId ?? 'title-generation',
|
||||
metadata: telemetry.metadata,
|
||||
recordInputs: telemetry.recordInputs,
|
||||
recordOutputs: telemetry.recordOutputs,
|
||||
tracer: telemetry.tracer,
|
||||
integrations: telemetry.integrations.length > 0 ? telemetry.integrations : undefined,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether a user message has too little substance to title a conversation
|
||||
* (e.g. "hey", "hello"). For these, the LLM tends to hallucinate an
|
||||
|
|
@ -99,7 +120,7 @@ function sanitizeTitle(raw: string): string {
|
|||
export async function generateTitleFromMessage(
|
||||
model: LanguageModel,
|
||||
userMessage: string,
|
||||
opts?: { instructions?: string },
|
||||
opts?: GenerateTitleFromMessageOptions,
|
||||
): Promise<string | null> {
|
||||
const trimmed = userMessage.trim();
|
||||
if (!trimmed) return null;
|
||||
|
|
@ -117,6 +138,7 @@ export async function generateTitleFromMessage(
|
|||
content: `Generate a title for the following first message of a conversation. Do not answer the message — only produce the title.\n\n<message>\n${trimmed}\n</message>`,
|
||||
},
|
||||
],
|
||||
...buildTelemetryOptions(opts?.telemetry),
|
||||
});
|
||||
|
||||
const raw = result.text?.trim();
|
||||
|
|
|
|||
|
|
@ -142,6 +142,7 @@ export async function executeTool(
|
|||
builtTool: BuiltTool,
|
||||
resumeData?: unknown,
|
||||
parentTelemetry?: BuiltTelemetry,
|
||||
toolCallId?: string,
|
||||
): Promise<unknown> {
|
||||
if (!builtTool.handler) {
|
||||
throw new Error(`No handler found for tool "${builtTool.name}"`);
|
||||
|
|
@ -154,11 +155,12 @@ export async function executeTool(
|
|||
},
|
||||
resumeData,
|
||||
parentTelemetry,
|
||||
toolCallId,
|
||||
};
|
||||
return await builtTool.handler(args, ctx);
|
||||
}
|
||||
|
||||
const ctx: ToolContext = { parentTelemetry };
|
||||
const ctx: ToolContext = { parentTelemetry, toolCallId };
|
||||
return await builtTool.handler(args, ctx);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -153,6 +153,8 @@ export class Telemetry {
|
|||
|
||||
protected recordOutputsValue = true;
|
||||
|
||||
protected runtimeRootSpanEnabledValue = true;
|
||||
|
||||
protected redactFn?: RedactFn;
|
||||
|
||||
protected integrationsList: TelemetryIntegration[] = [];
|
||||
|
|
@ -223,6 +225,12 @@ export class Telemetry {
|
|||
return this;
|
||||
}
|
||||
|
||||
/** Enable or disable the generic AgentRuntime root span around generate/stream loops. */
|
||||
runtimeRootSpan(value: boolean): this {
|
||||
this.runtimeRootSpanEnabledValue = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a redaction callback. When set, all integration hooks will
|
||||
* have their event data passed through this function before the
|
||||
|
|
@ -287,6 +295,7 @@ export class Telemetry {
|
|||
metadata: this.metadataValue,
|
||||
recordInputs: this.recordInputsValue,
|
||||
recordOutputs: this.recordOutputsValue,
|
||||
runtimeRootSpanEnabled: this.runtimeRootSpanEnabledValue,
|
||||
integrations,
|
||||
tracer,
|
||||
provider,
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ import type { BuiltTelemetry } from '../telemetry';
|
|||
import type { JSONObject } from '../utils/json';
|
||||
|
||||
export interface ToolContext {
|
||||
/** AI SDK tool call ID for the current local tool execution. */
|
||||
toolCallId?: string;
|
||||
/** Telemetry config from the parent agent, for sub-agent propagation. */
|
||||
parentTelemetry?: BuiltTelemetry;
|
||||
}
|
||||
|
|
@ -19,6 +21,8 @@ export interface InterruptibleToolContext<S = unknown, R = unknown> {
|
|||
suspend: (payload: S) => Promise<never>;
|
||||
/** Data from the consumer after resume. Undefined on first invocation. */
|
||||
resumeData: R | undefined;
|
||||
/** AI SDK tool call ID for the current local tool execution. */
|
||||
toolCallId?: string;
|
||||
/** Telemetry config from the parent agent, for sub-agent propagation. */
|
||||
parentTelemetry?: BuiltTelemetry;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ export interface BuiltTelemetry {
|
|||
readonly metadata?: Record<string, AttributeValue>;
|
||||
readonly recordInputs: boolean;
|
||||
readonly recordOutputs: boolean;
|
||||
/** Whether AgentRuntime should add a generic chain span around generate/stream loops. */
|
||||
readonly runtimeRootSpanEnabled?: boolean;
|
||||
/** Integrations are pre-wrapped with redaction if .redact() was set at build time. */
|
||||
readonly integrations: TelemetryIntegration[];
|
||||
readonly tracer?: OpaqueTracer;
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user