import type { InstanceAiEvent } from '@n8n/api-types'; import type { InstanceAiEventBus } from '../event-bus'; import type { Logger } from '../logger'; import { createLlmStepTraceHooks, executeResumableStream, type LlmStepTraceHooks, type ResumableStreamSource, type TraceStatus, } from './resumable-stream-executor'; import type { WorkSummary } from '../stream/work-summary-accumulator'; import { getTraceParentRun, withTraceParentContext } from '../tracing/langsmith-tracing'; import { asResumable } from '../utils/stream-helpers'; import type { SuspensionInfo } from '../utils/stream-helpers'; export interface StreamableAgent { stream: (input: unknown, options: Record) => Promise; } export interface StreamRunOptions { threadId: string; runId: string; agentId: string; signal: AbortSignal; eventBus: InstanceAiEventBus; logger: Logger; onActivity?: () => void; } export interface StreamRunResult { status: TraceStatus; mastraRunId: string; text?: Promise; workSummary: WorkSummary; suspension?: SuspensionInfo; confirmationEvent?: Extract; } export async function streamAgentRun( agent: StreamableAgent, input: unknown, streamOptions: Record, options: StreamRunOptions, ): Promise { const traceParent = getTraceParentRun(); return await withTraceParentContext(traceParent, async () => { const llmStepTraceHooks = createLlmStepTraceHooks(traceParent); const result = await agent.stream(input, { ...streamOptions, ...(llmStepTraceHooks?.executionOptions ?? {}), }); const mastraRunId = typeof result.runId === 'string' ? result.runId : ''; return await consumeStream(agent, result, { ...options, mastraRunId, llmStepTraceHooks }); }); } export async function resumeAgentRun( agent: unknown, resumeData: Record, resumeOptions: Record, options: StreamRunOptions & { mastraRunId: string }, ): Promise { const resumeTraceParent = getTraceParentRun(); return await withTraceParentContext(resumeTraceParent, async () => { const llmStepTraceHooks = createLlmStepTraceHooks(resumeTraceParent); const resumed = await asResumable(agent).resumeStream(resumeData, { ...resumeOptions, ...(llmStepTraceHooks?.executionOptions ?? {}), }); const mastraRunId = (typeof resumed.runId === 'string' && resumed.runId) || options.mastraRunId; return await consumeStream(agent, resumed, { ...options, mastraRunId, llmStepTraceHooks }); }); } async function consumeStream( agent: unknown, stream: ResumableStreamSource, options: StreamRunOptions & { mastraRunId: string; llmStepTraceHooks?: LlmStepTraceHooks }, ): Promise { const result = await executeResumableStream({ agent, stream, context: { threadId: options.threadId, runId: options.runId, agentId: options.agentId, eventBus: options.eventBus, signal: options.signal, logger: options.logger, onActivity: options.onActivity, }, control: { mode: 'manual' }, initialMastraRunId: options.mastraRunId, llmStepTraceHooks: options.llmStepTraceHooks, }); if (result.status === 'suspended' && result.suspension) { return { status: 'suspended', mastraRunId: result.mastraRunId, text: result.text, workSummary: result.workSummary, suspension: result.suspension, ...(result.confirmationEvent ? { confirmationEvent: result.confirmationEvent } : {}), }; } return { status: result.status === 'cancelled' ? 'cancelled' : result.status === 'errored' ? 'errored' : 'completed', mastraRunId: result.mastraRunId, text: result.text, workSummary: result.workSummary, }; }