import type { InstanceAiEvent } from '@n8n/api-types'; import type { RunTree } from 'langsmith'; import type { InstanceAiEventBus } from '../event-bus'; import type { Logger } from '../logger'; import { mapMastraChunkToEvent } from '../stream/map-chunk'; import { WorkSummaryAccumulator, type WorkSummary } from '../stream/work-summary-accumulator'; import { getTraceParentRun, setTraceParentOverride } from '../tracing/langsmith-tracing'; import { asResumable, parseSuspension } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; type ConfirmationRequestEvent = Extract; export interface ResumableStreamSource { runId?: string; fullStream: AsyncIterable; text?: Promise; steps?: Promise; response?: Promise; finishReason?: Promise; request?: Promise; usage?: Promise; totalUsage?: Promise; } export interface ResumableStreamContext { threadId: string; runId: string; agentId: string; eventBus: InstanceAiEventBus; signal: AbortSignal; logger: Logger; onActivity?: () => void; } export interface ManualSuspensionControl { mode: 'manual'; } export interface AutoResumeControl { mode: 'auto'; waitForConfirmation: (requestId: string) => Promise>; drainCorrections?: () => string[]; /** Returns a promise that resolves when a new user correction is queued. Used to unblock * HITL suspensions so the builder can incorporate the correction instead of waiting forever. */ waitForCorrection?: () => Promise; onSuspension?: (suspension: SuspensionInfo) => void; buildResumeOptions?: (input: { mastraRunId: string; suspension: SuspensionInfo; }) => Record; } export type ResumableStreamControl = ManualSuspensionControl | AutoResumeControl; export interface ExecuteResumableStreamOptions { agent: unknown; stream: ResumableStreamSource; context: ResumableStreamContext; control: ResumableStreamControl; initialMastraRunId?: string; llmStepTraceHooks?: LlmStepTraceHooks; } export type TraceStatus = 'completed' | 'cancelled' | 'suspended' | 'errored'; export interface ExecuteResumableStreamResult { status: TraceStatus; mastraRunId: string; text?: Promise; suspension?: SuspensionInfo; confirmationEvent?: ConfirmationRequestEvent; /** Accumulated tool call outcomes observed during stream consumption. */ workSummary: WorkSummary; } export interface LlmStepTraceHooks { executionOptions: { prepareStep?: (options: unknown) => Promise; experimental_prepareStep?: (options: unknown) => Promise; experimental_onStepStart?: (options: unknown) => Promise; onStepFinish: (stepResult: unknown) => Promise; experimental_telemetry?: { isEnabled: boolean }; }; onStreamChunk: (chunk: unknown) => void; startSegment: () => void; finalize: ( source: ResumableStreamSource, options?: { status?: TraceStatus; error?: string; }, ) => Promise; } interface NormalizedModelMetadata { provider?: string; modelName?: string; } interface LlmStepTraceRecord { messageId: string; stepNumber?: number; runTree: RunTree; model: NormalizedModelMetadata; inputs: Record; textParts: string[]; reasoningParts: string[]; toolCalls: Array>; toolResults: Array>; finishReason?: string; usage?: unknown; response?: unknown; request?: unknown; providerMetadata?: unknown; sourceUsage?: unknown; sourceTotalUsage?: unknown; warnings?: unknown; isContinued?: boolean; recordedFirstToken: boolean; finished: boolean; } interface StepResultLike { stepNumber?: number; text?: string; reasoning?: unknown; toolCalls?: unknown[]; toolResults?: unknown[]; finishReason?: string; usage?: unknown; request?: { body?: unknown }; response?: { messages?: unknown[]; headers?: Record; id?: string; timestamp?: Date; modelId?: string; [key: string]: unknown; }; providerMetadata?: unknown; } interface StepStartLike { stepNumber?: number; messages?: unknown[]; model?: { provider?: string; modelId?: string; }; } interface SyntheticToolTraceRecord { toolCallId: string; toolName: string; runTree: RunTree; finished: boolean; } const MAX_TRACE_STRING_LENGTH = 2_000; const MAX_TRACE_ARRAY_ITEMS = 20; const MAX_TRACE_OBJECT_KEYS = 20; const SYNTHETIC_TOOL_TRACE_NAMES = new Set(); function isRecord(value: unknown): value is Record { return value !== null && typeof value === 'object' && !Array.isArray(value); } function getFiniteNumber(value: unknown): number | undefined { return typeof value === 'number' && Number.isFinite(value) ? value : undefined; } function dedupeTags(tags: Array): string[] | undefined { const values = tags.filter((tag): tag is string => Boolean(tag)); return values.length > 0 ? [...new Set(values)] : undefined; } function truncateTraceString(value: string): string { if (value.length <= MAX_TRACE_STRING_LENGTH) { return value; } return `${value.slice(0, MAX_TRACE_STRING_LENGTH)}…`; } function sanitizeTraceValue(value: unknown, depth = 0): unknown { if (value === null || value === undefined) { return value; } if (typeof value === 'string') { return truncateTraceString(value); } if (typeof value === 'number' || typeof value === 'boolean') { return value; } if (typeof value === 'bigint') { return value.toString(); } if (value instanceof Date) { return value.toISOString(); } if (Array.isArray(value)) { if (depth >= 3) { return `[array(${value.length})]`; } return value .slice(0, MAX_TRACE_ARRAY_ITEMS) .map((entry) => sanitizeTraceValue(entry, depth + 1)); } if (isRecord(value)) { if (depth >= 3) { return `[object ${Object.keys(value).length} keys]`; } const entries = Object.entries(value).slice(0, MAX_TRACE_OBJECT_KEYS); const sanitized: Record = {}; for (const [key, entryValue] of entries) { sanitized[key] = sanitizeTraceValue(entryValue, depth + 1); } if (Object.keys(value).length > entries.length) { sanitized.__truncatedKeys = Object.keys(value).length - entries.length; } return sanitized; } return truncateTraceString(Object.prototype.toString.call(value)); } function sanitizeTracePayload(value: unknown): Record { if (!isRecord(value)) { return { value: sanitizeTraceValue(value) }; } const sanitized: Record = {}; for (const [key, entryValue] of Object.entries(value)) { sanitized[key] = sanitizeTraceValue(entryValue); } return sanitized; } function normalizeModelMetadata(modelId: unknown): NormalizedModelMetadata { if (typeof modelId === 'string' && modelId.length > 0) { const [provider, ...modelParts] = modelId.split('/'); return modelParts.length > 0 ? { provider, modelName: modelParts.join('/') } : { modelName: modelId }; } if (isRecord(modelId) && typeof modelId.id === 'string') { return normalizeModelMetadata(modelId.id); } return {}; } function formatLlmRunName(model: NormalizedModelMetadata): string { if (model.provider && model.modelName) { return `llm:${model.provider}/${model.modelName}`; } if (model.modelName) { return `llm:${model.modelName}`; } return 'llm'; } function parseMaybeJson(value: unknown): unknown { if (typeof value !== 'string') { return value; } try { return JSON.parse(value) as unknown; } catch { return value; } } function buildLlmInputPayload(request: unknown): Record { const parsedRequest = isRecord(request) && 'body' in request ? parseMaybeJson(request.body) : request; if (!isRecord(parsedRequest)) { return { request: parsedRequest }; } const selected: Record = {}; for (const key of [ 'messages', 'prompt', 'input', 'system', 'model', 'temperature', 'tools', 'tool_choice', 'toolChoice', 'stop', 'stop_sequences', 'stopSequences', 'max_tokens', 'max_output_tokens', 'max_completion_tokens', ]) { if (parsedRequest[key] !== undefined) { selected[key] = key === 'messages' && Array.isArray(parsedRequest[key]) ? (normalizeTraceMessages(parsedRequest[key]) ?? []) : parsedRequest[key]; } } if (Object.keys(selected).length > 0) { return selected; } return { request: parsedRequest }; } function extractInputTokenCount(usage: Record): number | undefined { if (isRecord(usage.inputTokenDetails)) { return ( getFiniteNumber(usage.inputTokenDetails.noCacheTokens) ?? getFiniteNumber(usage.inputTokenDetails.no_cache_tokens) ?? undefined ); } if (isRecord(usage.inputTokens)) { return ( getFiniteNumber(usage.inputTokens.noCache) ?? getFiniteNumber(usage.inputTokens.no_cache) ?? getFiniteNumber(usage.inputTokens.total) ?? undefined ); } return getFiniteNumber(usage.inputTokens) ?? getFiniteNumber(usage.promptTokens) ?? undefined; } function extractOutputTokenCount(usage: Record): number | undefined { if (isRecord(usage.outputTokens)) { return ( getFiniteNumber(usage.outputTokens.total) ?? getFiniteNumber(usage.outputTokens.text) ?? undefined ); } return ( getFiniteNumber(usage.outputTokens) ?? getFiniteNumber(usage.completionTokens) ?? undefined ); } function extractReasoningTokenCount(usage: Record): number | undefined { if (isRecord(usage.outputTokens)) { return getFiniteNumber(usage.outputTokens.reasoning) ?? undefined; } return getFiniteNumber(usage.reasoningTokens) ?? undefined; } function extractCacheCreationTokens(raw: unknown): number | undefined { if (!isRecord(raw)) return undefined; if (isRecord(raw.inputTokenDetails)) { return ( getFiniteNumber(raw.inputTokenDetails.cacheWriteTokens) ?? getFiniteNumber(raw.inputTokenDetails.cache_write_tokens) ?? undefined ); } if (isRecord(raw.inputTokens)) { return ( getFiniteNumber(raw.inputTokens.cacheWrite) ?? getFiniteNumber(raw.inputTokens.cache_write) ?? getFiniteNumber(raw.inputTokens.cacheWriteTokens) ?? getFiniteNumber(raw.inputTokens.cache_write_tokens) ?? undefined ); } return ( getFiniteNumber(raw.cacheCreationInputTokens) ?? getFiniteNumber(raw.cache_creation_input_tokens) ?? getFiniteNumber(raw.input_cache_write) ?? undefined ); } function extractCacheReadTokens(raw: unknown): number | undefined { if (!isRecord(raw)) return undefined; if (isRecord(raw.inputTokenDetails)) { return ( getFiniteNumber(raw.inputTokenDetails.cacheReadTokens) ?? getFiniteNumber(raw.inputTokenDetails.cache_read_tokens) ?? undefined ); } if (isRecord(raw.inputTokens)) { return ( getFiniteNumber(raw.inputTokens.cacheRead) ?? getFiniteNumber(raw.inputTokens.cache_read) ?? getFiniteNumber(raw.inputTokens.cacheReadTokens) ?? getFiniteNumber(raw.inputTokens.cache_read_tokens) ?? undefined ); } return ( getFiniteNumber(raw.cachedInputTokens) ?? getFiniteNumber(raw.cacheRead) ?? getFiniteNumber(raw.cache_read_input_tokens) ?? getFiniteNumber(raw.input_cache_read) ?? undefined ); } function extractUsageFromProviderMetadata( providerMetadata: unknown, ): Record | undefined { if (!isRecord(providerMetadata)) { return undefined; } if (isRecord(providerMetadata.usage)) { return providerMetadata.usage; } for (const value of Object.values(providerMetadata)) { if (!isRecord(value)) { continue; } if (isRecord(value.usage)) { return value.usage; } } return undefined; } function mergeUsageMetadata( primary: Record | undefined, fallback: Record | undefined, ): Record | undefined { if (!primary) { return fallback; } if (!fallback) { return primary; } const merged: Record = { ...primary }; for (const key of ['input_tokens', 'output_tokens', 'total_tokens']) { if (merged[key] === undefined && fallback[key] !== undefined) { merged[key] = fallback[key]; } } const mergedInputTokenDetails: Record = {}; if (isRecord(fallback.input_token_details)) { Object.assign(mergedInputTokenDetails, fallback.input_token_details); } if (isRecord(primary.input_token_details)) { Object.assign(mergedInputTokenDetails, primary.input_token_details); } if (Object.keys(mergedInputTokenDetails).length > 0) { merged.input_token_details = mergedInputTokenDetails; } const mergedOutputTokenDetails: Record = {}; if (isRecord(fallback.output_token_details)) { Object.assign(mergedOutputTokenDetails, fallback.output_token_details); } if (isRecord(primary.output_token_details)) { Object.assign(mergedOutputTokenDetails, primary.output_token_details); } if (Object.keys(mergedOutputTokenDetails).length > 0) { merged.output_token_details = mergedOutputTokenDetails; } return merged; } function buildUsageMetadata( usage: unknown, providerMetadata?: unknown, ): Record | undefined { const usageRecord = isRecord(usage) ? usage : undefined; const providerUsage = extractUsageFromProviderMetadata(providerMetadata); if (!usageRecord && !providerUsage) { return undefined; } const inputTokens = (usageRecord ? extractInputTokenCount(usageRecord) : undefined) ?? (providerUsage ? extractInputTokenCount(providerUsage) : undefined); const outputTokens = (usageRecord ? extractOutputTokenCount(usageRecord) : undefined) ?? (providerUsage ? extractOutputTokenCount(providerUsage) : undefined); const totalTokens = inputTokens !== undefined || outputTokens !== undefined ? (inputTokens ?? 0) + (outputTokens ?? 0) : ((usageRecord ? getFiniteNumber(usageRecord.totalTokens) : undefined) ?? (providerUsage ? getFiniteNumber(providerUsage.totalTokens) : undefined)); const cachedInputTokens = (usageRecord ? getFiniteNumber(usageRecord.cachedInputTokens) : undefined) ?? (usageRecord ? extractCacheReadTokens(usageRecord) : undefined) ?? (usageRecord ? extractCacheReadTokens(usageRecord.raw) : undefined) ?? (providerUsage ? extractCacheReadTokens(providerUsage) : undefined) ?? (providerUsage ? extractCacheReadTokens(providerUsage.raw) : undefined); const cacheCreationTokens = (usageRecord ? extractCacheCreationTokens(usageRecord) : undefined) ?? (usageRecord ? extractCacheCreationTokens(usageRecord.raw) : undefined) ?? (providerUsage ? extractCacheCreationTokens(providerUsage) : undefined) ?? (providerUsage ? extractCacheCreationTokens(providerUsage.raw) : undefined); const reasoningTokens = (usageRecord ? extractReasoningTokenCount(usageRecord) : undefined) ?? (providerUsage ? extractReasoningTokenCount(providerUsage) : undefined); const usageMetadata: Record = {}; if (inputTokens !== undefined) { usageMetadata.input_tokens = inputTokens; } if (outputTokens !== undefined) { usageMetadata.output_tokens = outputTokens; } if (totalTokens !== undefined) { usageMetadata.total_tokens = totalTokens; } const inputTokenDetails: Record = {}; if (cachedInputTokens !== undefined) { inputTokenDetails.cache_read = cachedInputTokens; } if (cacheCreationTokens !== undefined) { inputTokenDetails.cache_creation = cacheCreationTokens; } if (Object.keys(inputTokenDetails).length > 0) { usageMetadata.input_token_details = inputTokenDetails; } if (reasoningTokens !== undefined) { usageMetadata.output_token_details = { reasoning: reasoningTokens }; } return Object.keys(usageMetadata).length > 0 ? mergeUsageMetadata( usageMetadata, providerUsage ? buildUsageMetadata(providerUsage) : undefined, ) : providerUsage ? buildUsageMetadata(providerUsage) : undefined; } function summarizeUsageLikeValue(value: unknown): Record | undefined { if (!isRecord(value)) { return undefined; } const summary: Record = {}; for (const key of [ 'promptTokens', 'completionTokens', 'totalTokens', 'cachedInputTokens', 'reasoningTokens', 'inputTokens', 'outputTokens', 'inputTokenDetails', 'outputTokenDetails', 'input_tokens', 'output_tokens', 'cache_creation_input_tokens', 'cache_read_input_tokens', 'cacheCreationInputTokens', 'cacheReadInputTokens', 'iterations', ]) { if (value[key] !== undefined) { summary[key] = sanitizeTraceValue(value[key]); } } for (const nestedKey of ['usage', 'raw', 'rawUsage', 'anthropic', 'openai']) { const nestedSummary = summarizeUsageLikeValue(value[nestedKey]); if (nestedSummary) { summary[nestedKey] = nestedSummary; } } return Object.keys(summary).length > 0 ? summary : undefined; } function buildLlmUsageDebug( record: LlmStepTraceRecord, stepResult?: StepResultLike, ): Record | undefined { const usageDebug: Record = {}; const stepUsage = summarizeUsageLikeValue(stepResult?.usage); if (stepUsage) { usageDebug.step_usage = stepUsage; } const recordUsage = summarizeUsageLikeValue(record.usage); if (recordUsage) { usageDebug.record_usage = recordUsage; } const streamUsage = summarizeUsageLikeValue(record.sourceUsage); if (streamUsage) { usageDebug.stream_usage = streamUsage; } const streamTotalUsage = summarizeUsageLikeValue(record.sourceTotalUsage); if (streamTotalUsage) { usageDebug.stream_total_usage = streamTotalUsage; } const stepProviderMetadata = summarizeUsageLikeValue(stepResult?.providerMetadata); if (stepProviderMetadata) { usageDebug.step_provider_metadata = stepProviderMetadata; } const providerMetadata = summarizeUsageLikeValue(record.providerMetadata); if (providerMetadata) { usageDebug.provider_metadata = providerMetadata; } const stepResponse = summarizeUsageLikeValue(stepResult?.response); if (stepResponse) { usageDebug.step_response = stepResponse; } const response = summarizeUsageLikeValue(record.response); if (response) { usageDebug.response = response; } return Object.keys(usageDebug).length > 0 ? sanitizeTracePayload(usageDebug) : undefined; } async function resolveUsagePromise(usage: Promise | undefined): Promise { if (!usage) { return undefined; } return await usage.then( (value) => value, () => undefined, ); } async function resolveSegmentUsage(source: ResumableStreamSource): Promise<{ lastStepUsage?: unknown; totalUsage?: unknown; }> { const [lastStepUsage, totalUsage] = await Promise.all([ resolveUsagePromise(source.usage), resolveUsagePromise(source.totalUsage), ]); return { lastStepUsage, totalUsage }; } function maybeBackfillRecordUsageFromSegment( record: LlmStepTraceRecord, records: LlmStepTraceRecord[], usage: { lastStepUsage?: unknown; totalUsage?: unknown; }, ): void { if (usage.lastStepUsage !== undefined) { record.sourceUsage = usage.lastStepUsage; } if (usage.totalUsage !== undefined) { record.sourceTotalUsage = usage.totalUsage; } if (record.usage !== undefined) { return; } const isLastRecord = records[records.length - 1] === record; if (isLastRecord && usage.lastStepUsage !== undefined) { record.usage = usage.lastStepUsage; return; } if (records.length === 1 && usage.totalUsage !== undefined) { record.usage = usage.totalUsage; } } function toTraceObject(value: unknown): Record { if (isRecord(value)) { return value; } return { value }; } function extractResponseMessages(value: unknown): unknown[] | undefined { return isRecord(value) && Array.isArray(value.messages) ? value.messages : undefined; } function extractTextParts(value: unknown): string[] { if (typeof value === 'string') { return [value]; } if (Array.isArray(value)) { return value.flatMap((entry) => extractTextParts(entry)); } if (!isRecord(value)) { return []; } if (value.type === 'text') { if (typeof value.text === 'string') { return [value.text]; } if (isRecord(value.text) && typeof value.text.value === 'string') { return [value.text.value]; } } if ('content' in value) { if (typeof value.content === 'string') { return [value.content]; } if (Array.isArray(value.content)) { return value.content.flatMap((entry) => extractTextParts(entry)); } } if (Array.isArray(value.parts)) { return value.parts.flatMap((entry) => extractTextParts(entry)); } return []; } function extractTraceToolCallsFromMessage(message: unknown): Array> { if (!isRecord(message)) { return []; } const toolCalls: Array> = []; const pushToolCall = (value: unknown) => { if (!isRecord(value)) { return; } toolCalls.push({ ...(typeof value.toolCallId === 'string' ? { toolCallId: value.toolCallId } : {}), ...(typeof value.toolName === 'string' ? { toolName: value.toolName } : {}), }); }; if (Array.isArray(message.tool_calls)) { for (const toolCall of message.tool_calls) { pushToolCall(toolCall); } } const content = message.content; if (Array.isArray(content)) { for (const part of content) { if (!isRecord(part)) { continue; } if (part.type === 'tool-call') { pushToolCall(part); } } } if (isRecord(content) && Array.isArray(content.parts)) { for (const part of content.parts) { if (isRecord(part) && part.type === 'tool-invocation' && isRecord(part.toolInvocation)) { pushToolCall(part.toolInvocation); } } } return toolCalls; } function summarizeRequestedTools( toolCalls: Array>, ): Array> | undefined { const summaries = toolCalls .map((toolCall) => ({ ...(typeof toolCall.toolCallId === 'string' ? { toolCallId: toolCall.toolCallId } : {}), ...(typeof toolCall.toolName === 'string' ? { toolName: toolCall.toolName } : {}), })) .filter((toolCall) => Object.keys(toolCall).length > 0); return summaries.length > 0 ? summaries : undefined; } function buildToolSummaryText(toolCalls: Array>): string | undefined { const toolNames = [ ...new Set( toolCalls .map((toolCall) => (typeof toolCall.toolName === 'string' ? toolCall.toolName : undefined)) .filter((toolName): toolName is string => toolName !== undefined), ), ]; if (toolNames.length === 0) { return undefined; } return `Calling tools: ${toolNames.join(', ')}`; } function normalizeTraceMessage(message: unknown): Record | undefined { if (!isRecord(message) || typeof message.role !== 'string') { return undefined; } if (message.role === 'tool') { return undefined; } const textContent = extractTextParts(message.content).join('').trim(); const toolSummaryText = buildToolSummaryText(extractTraceToolCallsFromMessage(message)); const content = textContent && toolSummaryText ? `${textContent}\n\n[${toolSummaryText}]` : textContent || (toolSummaryText ? `[${toolSummaryText}]` : undefined); if (!content) { return undefined; } return { ...(typeof message.id === 'string' ? { id: message.id } : {}), role: message.role, content, }; } function normalizeTraceMessages( messages: unknown[] | undefined, ): Array> | undefined { if (!Array.isArray(messages)) { return undefined; } const normalized = messages .map((message) => normalizeTraceMessage(message)) .filter((message): message is Record => message !== undefined); return normalized.length > 0 ? normalized : undefined; } function buildAssistantChoice( responseMessages: unknown[] | undefined, record: LlmStepTraceRecord, ): Record | undefined { const assistantMessage = responseMessages?.find( (message): message is Record => isRecord(message) && message.role === 'assistant', ); if (assistantMessage) { return { message: assistantMessage }; } const toolSummaryText = buildToolSummaryText(record.toolCalls); const content = record.textParts.join(''); if (!content && !toolSummaryText) { return undefined; } const message: Record = { role: 'assistant' }; if (content && toolSummaryText) { message.content = `${content}\n\n[${toolSummaryText}]`; } else if (content) { message.content = content; } else if (toolSummaryText) { message.content = `[${toolSummaryText}]`; } return { message }; } function buildLlmOutputs( record: LlmStepTraceRecord, stepResult?: StepResultLike, ): Record { const rawResponseMessages = extractResponseMessages(stepResult?.response) ?? extractResponseMessages(record.response); const responseMessages = normalizeTraceMessages(rawResponseMessages); const usageMetadata = buildUsageMetadata( stepResult?.usage ?? record.usage, stepResult?.providerMetadata ?? record.providerMetadata, ); const usageDebug = buildLlmUsageDebug(record, stepResult); const outputs: Record = {}; const choice = buildAssistantChoice(responseMessages, record); const messages = responseMessages ?? (choice && isRecord(choice.message) ? [choice.message] : undefined); if (choice) { outputs.choices = [choice]; } if (messages) { outputs.messages = messages; } const requestedTools = summarizeRequestedTools(record.toolCalls); if (requestedTools) { outputs.requested_tools = requestedTools; } const reasoningText = record.reasoningParts.join('') || (typeof stepResult?.reasoning === 'string' ? stepResult.reasoning : Array.isArray(stepResult?.reasoning) ? stepResult.reasoning .map((entry) => isRecord(entry) && typeof entry.text === 'string' ? entry.text : undefined, ) .filter((entry): entry is string => entry !== undefined) .join('') : undefined); if (reasoningText) { outputs.reasoning = reasoningText; } if (record.finishReason || stepResult?.finishReason) { outputs.finish_reason = stepResult?.finishReason ?? record.finishReason; } if (usageMetadata) { outputs.usage_metadata = usageMetadata; } if (usageDebug) { outputs.usage_debug = usageDebug; } return outputs; } function buildLlmMetadata( record: LlmStepTraceRecord, stepResult?: StepResultLike, ): Record { const metadata: Record = { step_message_id: record.messageId, final_status: 'completed', ...(record.model.provider ? { ls_provider: record.model.provider } : {}), ...(record.model.modelName ? { ls_model_name: record.model.modelName } : {}), ...(record.finishReason || stepResult?.finishReason ? { finish_reason: stepResult?.finishReason ?? record.finishReason } : {}), }; const usageMetadata = buildUsageMetadata( stepResult?.usage ?? record.usage, stepResult?.providerMetadata ?? record.providerMetadata, ); if (usageMetadata) { metadata.usage_metadata = usageMetadata; } return metadata; } async function finishRunTree( runTree: RunTree, options: { outputs?: Record; metadata?: Record; error?: string; endTime?: number; }, ): Promise { await runTree.end( options.outputs, options.error, options.endTime ?? Date.now(), options.metadata, ); await runTree.patchRun(); } function getChunkPayload(chunk: unknown): Record | undefined { if (!isRecord(chunk)) { return undefined; } return isRecord(chunk.payload) ? chunk.payload : chunk; } function buildSyntheticToolInputs( toolCallId: string, _toolName: string, args: unknown, ): Record { return sanitizeTracePayload({ toolCallId, args, }); } function shouldCreateSyntheticToolTrace(payload: Record): boolean { const toolName = typeof payload.toolName === 'string' ? payload.toolName : ''; return ( toolName.startsWith('mastra_') || SYNTHETIC_TOOL_TRACE_NAMES.has(toolName) || payload.providerExecuted === true || payload.dynamic === true ); } async function startSyntheticToolTrace( chunk: unknown, records: Map, ): Promise { if (!isRecord(chunk) || chunk.type !== 'tool-call') { return; } const payload = getChunkPayload(chunk); if (!payload || !shouldCreateSyntheticToolTrace(payload)) { return; } const toolCallId = typeof payload.toolCallId === 'string' ? payload.toolCallId : ''; const toolName = typeof payload.toolName === 'string' ? payload.toolName : ''; if (!toolCallId || !toolName || records.has(toolCallId)) { return; } const parentRun = getTraceParentRun(); if (!parentRun) { return; } const runTree = parentRun.createChild({ name: `tool:${toolName}`, run_type: 'tool', tags: dedupeTags([ ...(parentRun.tags ?? []), 'tool', ...(toolName.startsWith('mastra_') ? ['native-tool'] : []), ]), metadata: { ...(parentRun.metadata ?? {}), tool_name: toolName, synthetic_tool_trace: true, ...(payload.providerExecuted === true ? { provider_executed: true } : {}), ...(payload.dynamic === true ? { dynamic_tool: true } : {}), }, inputs: buildSyntheticToolInputs(toolCallId, toolName, payload.args), }); await runTree.postRun(); records.set(toolCallId, { toolCallId, toolName, runTree, finished: false, }); } async function finishSyntheticToolTrace( chunk: unknown, records: Map, ): Promise { if (!isRecord(chunk) || (chunk.type !== 'tool-result' && chunk.type !== 'tool-error')) { return; } const payload = getChunkPayload(chunk); if (!payload) { return; } const toolCallId = typeof payload.toolCallId === 'string' ? payload.toolCallId : ''; if (!toolCallId) { return; } if (!records.has(toolCallId)) { if (!shouldCreateSyntheticToolTrace(payload)) { return; } await startSyntheticToolTrace( { type: 'tool-call', payload, }, records, ); } const record = records.get(toolCallId); if (!record || record.finished) { return; } record.finished = true; await finishRunTree(record.runTree, { outputs: sanitizeTracePayload({ result: payload.result, }), ...(payload.isError === true ? { error: typeof payload.result === 'string' ? payload.result : typeof payload.error === 'string' ? payload.error : 'Tool execution failed', } : {}), metadata: { final_status: payload.isError === true ? 'error' : 'completed', }, }); } async function finalizeSyntheticToolTraces( records: Map, options?: { status?: TraceStatus; error?: string }, ): Promise { for (const record of records.values()) { if (record.finished) { continue; } record.finished = true; await finishRunTree(record.runTree, { outputs: sanitizeTracePayload({ status: options?.status ?? 'completed', }), ...(options?.error ? { error: options.error } : {}), metadata: { final_status: options?.status ?? 'completed', }, }); } } async function startLlmStepTrace( parentRun: RunTree | undefined, messageId: string, request: unknown, stepNumber?: number, ): Promise { const resolvedParentRun = parentRun ?? getTraceParentRun(); if (!resolvedParentRun) { return undefined; } const inputs = buildLlmInputPayload(request); const model = normalizeModelMetadata(resolvedParentRun.metadata?.model_id); const runTree = resolvedParentRun.createChild({ name: formatLlmRunName(model), run_type: 'llm', tags: dedupeTags([...(resolvedParentRun.tags ?? []), 'llm']), metadata: { ...(resolvedParentRun.metadata ?? {}), step_message_id: messageId, ...(typeof stepNumber === 'number' ? { step_number: stepNumber + 1 } : {}), ...(model.provider ? { ls_provider: model.provider } : {}), ...(model.modelName ? { ls_model_name: model.modelName } : {}), }, inputs, }); await runTree.postRun(); return { messageId, stepNumber, runTree, model, inputs, textParts: [], reasoningParts: [], toolCalls: [], toolResults: [], request, recordedFirstToken: false, finished: false, }; } function findActiveStepRecord( records: LlmStepTraceRecord[], messageId?: string, ): LlmStepTraceRecord | undefined { if (messageId) { return records.find((record) => record.messageId === messageId && !record.finished); } for (let index = records.length - 1; index >= 0; index--) { if (!records[index].finished) { return records[index]; } } return undefined; } function recordFirstTokenEvent(record: LlmStepTraceRecord): void { if (record.recordedFirstToken) { return; } record.runTree.addEvent({ name: 'new_token', time: new Date().toISOString() }); record.recordedFirstToken = true; } function updateStepRecordFromChunk( chunk: unknown, records: LlmStepTraceRecord[], ): LlmStepTraceRecord | undefined { if (!isRecord(chunk) || typeof chunk.type !== 'string') { return undefined; } if (chunk.type === 'step-start' && typeof chunk.messageId === 'string') { return undefined; } const record = findActiveStepRecord(records); if (!record) { return undefined; } const payload = isRecord(chunk.payload) ? chunk.payload : chunk; if ((chunk.type === 'text-delta' || chunk.type === 'text') && typeof payload.text === 'string') { record.textParts.push(payload.text); recordFirstTokenEvent(record); } if ( (chunk.type === 'reasoning-delta' || chunk.type === 'reasoning') && typeof payload.text === 'string' ) { record.reasoningParts.push(payload.text); } if (chunk.type === 'tool-call' && isRecord(payload)) { record.toolCalls.push(toTraceObject(payload)); } if ((chunk.type === 'tool-result' || chunk.type === 'tool-error') && isRecord(payload)) { record.toolResults.push(toTraceObject(payload)); } return record; } function applyStepFinishChunk( chunk: unknown, records: LlmStepTraceRecord[], ): LlmStepTraceRecord | undefined { if (!isRecord(chunk) || chunk.type !== 'step-finish') { return undefined; } const payload = getChunkPayload(chunk); const messageId = typeof payload?.messageId === 'string' ? payload.messageId : undefined; const record = findActiveStepRecord(records, messageId); if (!record) { return undefined; } const output = isRecord(payload?.output) ? payload.output : undefined; const stepResult = isRecord(payload?.stepResult) ? payload.stepResult : undefined; const metadata = isRecord(payload?.metadata) ? payload.metadata : undefined; record.finishReason = (stepResult && typeof stepResult.reason === 'string' ? stepResult.reason : undefined) ?? (payload && typeof payload.finishReason === 'string' ? payload.finishReason : undefined) ?? record.finishReason; const usage = output?.usage ?? payload?.usage; if (usage !== undefined) { record.usage = usage; } if (payload?.response !== undefined) { record.response = payload.response; } const request = metadata?.request ?? payload?.request; if (request !== undefined) { record.request = request; } const providerMetadata = metadata?.providerMetadata ?? payload?.providerMetadata; if (providerMetadata !== undefined) { record.providerMetadata = providerMetadata; } record.warnings = (stepResult && Array.isArray(stepResult.warnings) ? stepResult.warnings : undefined) ?? payload?.warnings ?? record.warnings; record.isContinued = stepResult?.isContinued === true ? true : payload?.isContinued === true || record.isContinued; if (output && typeof output.text === 'string') { record.textParts = output.text.length > 0 ? [output.text] : []; } if (output && Array.isArray(output.toolCalls)) { record.toolCalls = output.toolCalls.map((entry) => toTraceObject(entry)); } if (output && Array.isArray(output.toolResults)) { record.toolResults = output.toolResults.map((entry) => toTraceObject(entry)); } const responseModelId = payload && isRecord(payload.response) && typeof payload.response.modelId === 'string' ? payload.response.modelId : undefined; if (responseModelId) { const model = normalizeModelMetadata(responseModelId); record.model = { provider: model.provider ?? record.model.provider, modelName: model.modelName ?? responseModelId, }; record.runTree.name = formatLlmRunName(record.model); record.runTree.metadata = { ...(record.runTree.metadata ?? {}), ...(record.model.provider ? { ls_provider: record.model.provider } : {}), ...(record.model.modelName ? { ls_model_name: record.model.modelName } : {}), }; } return record; } function isStepResultLike(value: unknown): value is StepResultLike { return isRecord(value); } function toStepResultLike(value: unknown): StepResultLike | undefined { return isStepResultLike(value) ? value : undefined; } function toStepStartLike(value: unknown): StepStartLike | undefined { return isRecord(value) ? value : undefined; } function getSyntheticStepMessageId(stepResult: StepResultLike, index: number): string { const messages = extractResponseMessages(stepResult.response); const messageId = messages?.find( (message): message is Record => isRecord(message) && typeof message.id === 'string', )?.id; return typeof messageId === 'string' ? messageId : `step-${index + 1}`; } async function createFallbackStepTrace( parentRun: RunTree | undefined, stepResult: StepResultLike, index: number, ): Promise { const record = await startLlmStepTrace( parentRun, getSyntheticStepMessageId(stepResult, index), stepResult.request, stepResult.stepNumber ?? index, ); if (!record) { return undefined; } if (typeof stepResult.text === 'string' && stepResult.text.length > 0) { record.textParts.push(stepResult.text); recordFirstTokenEvent(record); } if (typeof stepResult.reasoning === 'string' && stepResult.reasoning.length > 0) { record.reasoningParts.push(stepResult.reasoning); } else if (Array.isArray(stepResult.reasoning)) { record.reasoningParts.push( ...stepResult.reasoning .map((entry) => isRecord(entry) && typeof entry.text === 'string' ? entry.text : undefined, ) .filter((entry): entry is string => entry !== undefined), ); } if (Array.isArray(stepResult.toolCalls)) { record.toolCalls.push(...stepResult.toolCalls.map((entry) => toTraceObject(entry))); } if (Array.isArray(stepResult.toolResults)) { record.toolResults.push(...stepResult.toolResults.map((entry) => toTraceObject(entry))); } record.finishReason = stepResult.finishReason; if (stepResult.usage !== undefined) { record.usage = stepResult.usage; } if (stepResult.request !== undefined) { record.request = stepResult.request; } if (stepResult.response !== undefined) { record.response = stepResult.response; } if (stepResult.providerMetadata !== undefined) { record.providerMetadata = stepResult.providerMetadata; } const responseModelId = isRecord(stepResult.response) && typeof stepResult.response.modelId === 'string' ? stepResult.response.modelId : undefined; if (responseModelId) { const model = normalizeModelMetadata(responseModelId); record.model = { provider: model.provider ?? record.model.provider, modelName: model.modelName ?? responseModelId, }; record.runTree.name = formatLlmRunName(record.model); } return record; } function applyStepResultToRecord(record: LlmStepTraceRecord, stepResult: StepResultLike): void { if (typeof stepResult.text === 'string') { record.textParts = stepResult.text.length > 0 ? [stepResult.text] : []; } if (typeof stepResult.reasoning === 'string') { record.reasoningParts = stepResult.reasoning.length > 0 ? [stepResult.reasoning] : []; } else if (Array.isArray(stepResult.reasoning)) { record.reasoningParts = stepResult.reasoning .map((entry) => (isRecord(entry) && typeof entry.text === 'string' ? entry.text : undefined)) .filter((entry): entry is string => entry !== undefined); } if (Array.isArray(stepResult.toolCalls)) { record.toolCalls = stepResult.toolCalls.map((entry) => toTraceObject(entry)); } if (Array.isArray(stepResult.toolResults)) { record.toolResults = stepResult.toolResults.map((entry) => toTraceObject(entry)); } if (typeof stepResult.finishReason === 'string') { record.finishReason = stepResult.finishReason; } if (stepResult.usage !== undefined) { record.usage = stepResult.usage; } if (stepResult.request !== undefined) { record.request = stepResult.request; } if (stepResult.response !== undefined) { record.response = stepResult.response; } if (stepResult.providerMetadata !== undefined) { record.providerMetadata = stepResult.providerMetadata; } const responseModelId = isRecord(stepResult.response) && typeof stepResult.response.modelId === 'string' ? stepResult.response.modelId : undefined; if (responseModelId) { const model = normalizeModelMetadata(responseModelId); record.model = { provider: model.provider ?? record.model.provider, modelName: model.modelName ?? responseModelId, }; record.runTree.name = formatLlmRunName(record.model); } } export function createLlmStepTraceHooks( explicitParentRun?: RunTree, ): LlmStepTraceHooks | undefined { const activeParentRun = explicitParentRun ?? getTraceParentRun(); if (!activeParentRun) { return undefined; } const records: LlmStepTraceRecord[] = []; const recordsByStepNumber = new Map(); const getActiveRecord = (): LlmStepTraceRecord | undefined => { for (let index = records.length - 1; index >= 0; index--) { const record = records[index]; if (!record.finished) { return record; } } return undefined; }; const restoreTraceParent = () => { setTraceParentOverride(activeParentRun); }; restoreTraceParent(); const patchFinishedRecordIfNeeded = async ( record: LlmStepTraceRecord, stepResult: StepResultLike | undefined, options?: { status?: TraceStatus; error?: string }, ): Promise => { const metadata = { ...(record.runTree.metadata ?? {}), ...buildLlmMetadata(record, stepResult), ...(options?.status && options.status !== 'completed' ? { final_status: options.status } : {}), }; record.runTree.inputs = record.inputs; record.runTree.name = formatLlmRunName(record.model); record.runTree.metadata = metadata; const endTimeValue = Reflect.get(record.runTree, 'end_time'); const endTime = typeof endTimeValue === 'number' ? endTimeValue : undefined; await finishRunTree(record.runTree, { outputs: buildLlmOutputs(record, stepResult), metadata, ...(options?.error ? { error: options.error } : {}), ...(endTime !== undefined ? { endTime } : {}), }); }; const startStepTrace = async (options: unknown): Promise => { const stepStart = toStepStartLike(options); if (typeof stepStart?.stepNumber !== 'number') { return undefined; } const existingRecord = recordsByStepNumber.get(stepStart.stepNumber); if (existingRecord && !existingRecord.finished) { setTraceParentOverride(existingRecord.runTree); return existingRecord; } const record = await startLlmStepTrace( activeParentRun, `step-${stepStart.stepNumber + 1}`, { messages: Array.isArray(stepStart.messages) ? stepStart.messages : [], }, stepStart.stepNumber, ); if (!record) { return undefined; } const stepModelId = stepStart.model?.modelId; if (typeof stepModelId === 'string' && stepModelId.length > 0) { record.model = { provider: stepStart.model?.provider ?? record.model.provider, modelName: normalizeModelMetadata(stepModelId).modelName ?? stepModelId, }; record.runTree.name = formatLlmRunName(record.model); record.runTree.metadata = { ...(record.runTree.metadata ?? {}), ...(record.model.provider ? { ls_provider: record.model.provider } : {}), ...(record.model.modelName ? { ls_model_name: record.model.modelName } : {}), }; } recordsByStepNumber.set(stepStart.stepNumber, record); records.push(record); setTraceParentOverride(record.runTree); return record; }; const prepareStep = async (options: unknown): Promise => { await startStepTrace(options); return undefined; }; const onStepStart = async (options: unknown): Promise => { await startStepTrace(options); }; const onStepFinish = async (stepResultValue: unknown): Promise => { const stepResult = toStepResultLike(stepResultValue); if (!stepResult) { return; } const stepNumber = typeof stepResult.stepNumber === 'number' ? stepResult.stepNumber : undefined; const record = stepNumber !== undefined ? recordsByStepNumber.get(stepNumber) : undefined; if (!record || record.finished) { // Resumed streams can replay already-finished step results before the next real // step starts. Those events do not represent a new LLM invocation, so ignore them // instead of creating a synthetic 1ms fallback span. return; } applyStepResultToRecord(record, stepResult); record.runTree.inputs = record.inputs; record.runTree.name = formatLlmRunName(record.model); record.runTree.metadata = { ...(record.runTree.metadata ?? {}), ...buildLlmMetadata(record, stepResult), }; await finishRunTree(record.runTree, { outputs: buildLlmOutputs(record, stepResult), metadata: record.runTree.metadata, }); record.finished = true; if (stepNumber !== undefined) { recordsByStepNumber.delete(stepNumber); } restoreTraceParent(); }; return { executionOptions: { prepareStep, experimental_prepareStep: prepareStep, experimental_onStepStart: onStepStart, onStepFinish, // Disable Vercel AI SDK's built-in LangSmith tracing — we manage traces ourselves experimental_telemetry: { isEnabled: false }, }, onStreamChunk: (chunk) => { updateStepRecordFromChunk(chunk, records); applyStepFinishChunk(chunk, records); }, startSegment: () => { for (const [stepNumber, record] of recordsByStepNumber.entries()) { if (record.finished) { recordsByStepNumber.delete(stepNumber); } } const activeRecord = getActiveRecord(); if (activeRecord) { setTraceParentOverride(activeRecord.runTree); return; } restoreTraceParent(); }, finalize: async (source, options) => { const resolvedSteps = await source.steps?.then( (steps) => steps, () => undefined, ); const segmentUsage = await resolveSegmentUsage(source); const stepResults = Array.isArray(resolvedSteps) ? resolvedSteps .map((stepValue) => toStepResultLike(stepValue)) .filter((stepResult): stepResult is StepResultLike => stepResult !== undefined) : []; const stepResultsByStepNumber = new Map(); for (const stepResult of stepResults) { if (typeof stepResult.stepNumber === 'number') { stepResultsByStepNumber.set(stepResult.stepNumber, stepResult); } } for (const [index, record] of records.entries()) { const stepResult = (typeof record.stepNumber === 'number' ? stepResultsByStepNumber.get(record.stepNumber) : undefined) ?? stepResults[index]; const hadUsageMetadata = buildUsageMetadata(record.usage, record.providerMetadata); const hadUsageMetadataJson = hadUsageMetadata ? JSON.stringify(hadUsageMetadata) : undefined; const hadResponse = record.response !== undefined; const hadFinishReason = record.finishReason !== undefined; if (stepResult) { applyStepResultToRecord(record, stepResult); } maybeBackfillRecordUsageFromSegment(record, records, segmentUsage); if (record.finished) { const hasUsageMetadata = buildUsageMetadata(record.usage, record.providerMetadata); const hasUsageMetadataJson = hasUsageMetadata ? JSON.stringify(hasUsageMetadata) : undefined; const hasResponse = record.response !== undefined; const hasFinishReason = record.finishReason !== undefined; if ( hadUsageMetadataJson !== hasUsageMetadataJson || (!hadResponse && hasResponse) || (!hadFinishReason && hasFinishReason) ) { await patchFinishedRecordIfNeeded(record, stepResult, options); } continue; } record.runTree.inputs = record.inputs; record.runTree.name = formatLlmRunName(record.model); record.runTree.metadata = { ...(record.runTree.metadata ?? {}), ...buildLlmMetadata(record, stepResult), ...(options?.status && options.status !== 'completed' ? { final_status: options.status } : {}), }; await finishRunTree(record.runTree, { outputs: buildLlmOutputs(record, stepResult), metadata: record.runTree.metadata, ...(options?.error ? { error: options.error } : {}), }); record.finished = true; } restoreTraceParent(); recordsByStepNumber.clear(); }, }; } async function finalizeLlmStepTraces( source: ResumableStreamSource, records: LlmStepTraceRecord[], options?: { status?: TraceStatus; error?: string }, ): Promise { const parentRun = getTraceParentRun(); const resolvedSteps = await source.steps?.then( (steps) => steps, () => undefined, ); const segmentUsage = await resolveSegmentUsage(source); const stepResults = Array.isArray(resolvedSteps) ? resolvedSteps .map((stepValue) => toStepResultLike(stepValue)) .filter((stepResult): stepResult is StepResultLike => stepResult !== undefined) : []; const stepResultsByStepNumber = new Map(); for (const stepResult of stepResults) { if (typeof stepResult.stepNumber === 'number') { stepResultsByStepNumber.set(stepResult.stepNumber, stepResult); } } if (records.length === 0 && stepResults.length > 0) { for (const [index, stepResult] of stepResults.entries()) { const fallbackRecord = await createFallbackStepTrace(parentRun, stepResult, index); if (fallbackRecord) { records.push(fallbackRecord); } } } if (records.length === 0) { return; } for (const [index, record] of records.entries()) { if (record.finished) { continue; } const stepResult = (typeof record.stepNumber === 'number' ? stepResultsByStepNumber.get(record.stepNumber) : undefined) ?? stepResults[index]; if (stepResult) { applyStepResultToRecord(record, stepResult); } maybeBackfillRecordUsageFromSegment(record, records, segmentUsage); record.runTree.inputs = record.inputs; record.runTree.name = formatLlmRunName(record.model); record.runTree.metadata = { ...(record.runTree.metadata ?? {}), ...buildLlmMetadata(record, stepResult), ...(options?.status && options.status !== 'completed' ? { final_status: options.status } : {}), }; await finishRunTree(record.runTree, { outputs: buildLlmOutputs(record, stepResult), metadata: record.runTree.metadata, ...(options?.error ? { error: options.error } : {}), }); record.finished = true; } } export async function executeResumableStream( options: ExecuteResumableStreamOptions, ): Promise { let activeSource = options.stream; let activeStream = options.stream.fullStream; let activeMastraRunId = options.stream.runId ?? options.initialMastraRunId ?? ''; let text = options.stream.text; const workSummaryAccumulator = new WorkSummaryAccumulator(); let currentResponseId: string | undefined; while (true) { let suspension: SuspensionInfo | undefined; let hasError = false; let pendingConfirmation: Promise> | undefined; let confirmationEvent: ConfirmationRequestEvent | undefined; let confirmationEventPublished = false; const llmStepRecords: LlmStepTraceRecord[] = []; const syntheticToolRecords = new Map(); options.llmStepTraceHooks?.startSegment(); for await (const chunk of activeStream) { options.context.onActivity?.(); if (options.context.signal.aborted) { if (options.llmStepTraceHooks) { await options.llmStepTraceHooks.finalize(activeSource, { status: 'cancelled', error: 'Run cancelled while streaming', }); } else { await finalizeLlmStepTraces(activeSource, llmStepRecords, { status: 'cancelled', error: 'Run cancelled while streaming', }); } await finalizeSyntheticToolTraces(syntheticToolRecords, { status: 'cancelled', error: 'Run cancelled while streaming', }); return { status: 'cancelled', mastraRunId: activeMastraRunId, text, workSummary: workSummaryAccumulator.toSummary(), }; } await startSyntheticToolTrace(chunk, syntheticToolRecords); await finishSyntheticToolTrace(chunk, syntheticToolRecords); options.llmStepTraceHooks?.onStreamChunk(chunk); // Always capture responseId from step-start, regardless of trace hook path. if (isRecord(chunk) && chunk.type === 'step-start') { const stepPayload = getChunkPayload(chunk); const stepMessageId = typeof stepPayload?.messageId === 'string' ? stepPayload.messageId : undefined; currentResponseId = stepMessageId; } if (options.llmStepTraceHooks) { // Step lifecycle is handled by prepareStep/onStepFinish callbacks. } else if (isRecord(chunk) && chunk.type === 'step-start') { const payload = getChunkPayload(chunk); const messageId = typeof payload?.messageId === 'string' ? payload.messageId : undefined; const request = payload?.request; const stepTrace = typeof messageId === 'string' ? await startLlmStepTrace(undefined, messageId, request) : undefined; if (stepTrace) { llmStepRecords.push(stepTrace); } } else { updateStepRecordFromChunk(chunk, llmStepRecords); applyStepFinishChunk(chunk, llmStepRecords); } const parsedSuspension = parseSuspension(chunk); if (parsedSuspension) { if (!suspension) { suspension = parsedSuspension; if (options.control.mode === 'auto') { options.control.onSuspension?.(parsedSuspension); pendingConfirmation = options.control.waitForConfirmation(parsedSuspension.requestId); } } else if (!isSameSuspension(parsedSuspension, suspension)) { options.context.logger.warn( 'Additional HITL suspension encountered before resume; deferring', { threadId: options.context.threadId, runId: options.context.runId, activeRequestId: suspension.requestId, deferredRequestId: parsedSuspension.requestId, activeToolCallId: suspension.toolCallId, deferredToolCallId: parsedSuspension.toolCallId, }, ); } } if (isErrorChunk(chunk)) { hasError = true; } const event = mapMastraChunkToEvent( options.context.runId, options.context.agentId, chunk, currentResponseId, ); if (event) { workSummaryAccumulator.observe(event); let shouldPublishEvent = true; if (event.type === 'confirmation-request') { const isPrimarySuspension = suspension !== undefined && event.payload.requestId === suspension.requestId && event.payload.toolCallId === suspension.toolCallId; if (!isPrimarySuspension || confirmationEventPublished || confirmationEvent) { shouldPublishEvent = false; } if (shouldPublishEvent && options.control.mode === 'manual') { confirmationEvent = event; shouldPublishEvent = false; } if (shouldPublishEvent) { confirmationEventPublished = true; } } if (shouldPublishEvent) { options.context.eventBus.publish(options.context.threadId, event); } } if (options.control.mode === 'auto' && options.control.drainCorrections) { publishCorrections(options.context, options.control.drainCorrections()); } } const traceStatus = suspension ? 'suspended' : hasError ? 'errored' : 'completed'; if (options.llmStepTraceHooks) { await options.llmStepTraceHooks.finalize(activeSource, { status: traceStatus, }); } else { await finalizeLlmStepTraces(activeSource, llmStepRecords, { status: traceStatus, }); } await finalizeSyntheticToolTraces(syntheticToolRecords, { status: traceStatus, }); if (options.context.signal.aborted) { return { status: 'cancelled', mastraRunId: activeMastraRunId, text, workSummary: workSummaryAccumulator.toSummary(), }; } if (!suspension) { return { status: hasError ? 'errored' : 'completed', mastraRunId: activeMastraRunId, text, workSummary: workSummaryAccumulator.toSummary(), }; } if (options.control.mode === 'manual') { return { status: 'suspended', mastraRunId: activeMastraRunId, text, suspension, workSummary: workSummaryAccumulator.toSummary(), ...(confirmationEvent ? { confirmationEvent } : {}), }; } const confirmationPromise = pendingConfirmation ?? options.control.waitForConfirmation(suspension.requestId); const resumeData = await waitForConfirmationOrCorrection( options.context.signal, confirmationPromise, options.control, options.context, ); const resumeOptions = options.control.buildResumeOptions?.({ mastraRunId: activeMastraRunId, suspension, }) ?? { runId: activeMastraRunId, toolCallId: suspension.toolCallId, }; const resumed = await asResumable(options.agent).resumeStream(resumeData, { ...resumeOptions, ...(options.llmStepTraceHooks?.executionOptions ?? {}), }); activeMastraRunId = (typeof resumed.runId === 'string' ? resumed.runId : '') || activeMastraRunId; activeSource = resumed; activeStream = resumed.fullStream; text = resumed.text; } } function publishCorrections(context: ResumableStreamContext, corrections: string[]): void { for (const correction of corrections) { context.eventBus.publish(context.threadId, { type: 'text-delta', runId: context.runId, agentId: context.agentId, payload: { text: `\n[USER CORRECTION]: ${correction}\n` }, }); } } function isErrorChunk(chunk: unknown): boolean { return ( chunk !== null && typeof chunk === 'object' && (chunk as Record).type === 'error' ); } async function waitForConfirmation( signal: AbortSignal, confirmationPromise: Promise>, ): Promise> { if (signal.aborted) { throw new Error('Run cancelled while waiting for confirmation'); } let abortHandler: (() => void) | undefined; try { return await Promise.race([ confirmationPromise, new Promise((_, reject) => { abortHandler = () => reject(new Error('Run cancelled while waiting for confirmation')); signal.addEventListener('abort', abortHandler, { once: true }); }), ]); } finally { if (abortHandler) { signal.removeEventListener('abort', abortHandler); } } } /** * Race the HITL confirmation promise against an incoming user correction. * When a correction arrives first, auto-confirm the suspended tool call with * override data so the builder can resume and incorporate the correction. */ async function waitForConfirmationOrCorrection( signal: AbortSignal, confirmationPromise: Promise>, control: AutoResumeControl, context: ResumableStreamContext, ): Promise> { if (!control.waitForCorrection) { return await waitForConfirmation(signal, confirmationPromise); } const correctionSentinel = Object.freeze({ __correctionOverride: true }); const raced = Promise.race([ confirmationPromise, control.waitForCorrection().then(() => correctionSentinel as Record), ]); const result = await waitForConfirmation(signal, raced); if (result === correctionSentinel) { const corrections = control.drainCorrections?.() ?? []; publishCorrections(context, corrections); return { __correctionOverride: true, message: 'The user sent a correction while this tool was waiting for confirmation. ' + 'The confirmation has been skipped. Apply the correction and continue.', corrections, }; } return result; } function isSameSuspension(left: SuspensionInfo, right: SuspensionInfo): boolean { return left.requestId === right.requestId && left.toolCallId === right.toolCallId; }