mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-26 22:35:18 +02:00
121 lines
3.7 KiB
TypeScript
121 lines
3.7 KiB
TypeScript
import type { InstanceAiEvent } from '@n8n/api-types';
|
|
|
|
import type { InstanceAiEventBus } from '../event-bus';
|
|
import type { Logger } from '../logger';
|
|
import {
|
|
createLlmStepTraceHooks,
|
|
executeResumableStream,
|
|
type LlmStepTraceHooks,
|
|
type ResumableStreamSource,
|
|
type TraceStatus,
|
|
} from './resumable-stream-executor';
|
|
import type { WorkSummary } from '../stream/work-summary-accumulator';
|
|
import { getTraceParentRun, withTraceParentContext } from '../tracing/langsmith-tracing';
|
|
import { asResumable } from '../utils/stream-helpers';
|
|
import type { SuspensionInfo } from '../utils/stream-helpers';
|
|
|
|
export interface StreamableAgent {
|
|
stream: (input: unknown, options: Record<string, unknown>) => Promise<ResumableStreamSource>;
|
|
}
|
|
|
|
export interface StreamRunOptions {
|
|
threadId: string;
|
|
runId: string;
|
|
agentId: string;
|
|
signal: AbortSignal;
|
|
eventBus: InstanceAiEventBus;
|
|
logger: Logger;
|
|
onActivity?: () => void;
|
|
}
|
|
|
|
export interface StreamRunResult {
|
|
status: TraceStatus;
|
|
mastraRunId: string;
|
|
text?: Promise<string>;
|
|
workSummary: WorkSummary;
|
|
suspension?: SuspensionInfo;
|
|
confirmationEvent?: Extract<InstanceAiEvent, { type: 'confirmation-request' }>;
|
|
}
|
|
|
|
export async function streamAgentRun(
|
|
agent: StreamableAgent,
|
|
input: unknown,
|
|
streamOptions: Record<string, unknown>,
|
|
options: StreamRunOptions,
|
|
): Promise<StreamRunResult> {
|
|
const traceParent = getTraceParentRun();
|
|
return await withTraceParentContext(traceParent, async () => {
|
|
const llmStepTraceHooks = createLlmStepTraceHooks(traceParent);
|
|
const result = await agent.stream(input, {
|
|
...streamOptions,
|
|
...(llmStepTraceHooks?.executionOptions ?? {}),
|
|
});
|
|
const mastraRunId = typeof result.runId === 'string' ? result.runId : '';
|
|
return await consumeStream(agent, result, { ...options, mastraRunId, llmStepTraceHooks });
|
|
});
|
|
}
|
|
|
|
export async function resumeAgentRun(
|
|
agent: unknown,
|
|
resumeData: Record<string, unknown>,
|
|
resumeOptions: Record<string, unknown>,
|
|
options: StreamRunOptions & { mastraRunId: string },
|
|
): Promise<StreamRunResult> {
|
|
const resumeTraceParent = getTraceParentRun();
|
|
return await withTraceParentContext(resumeTraceParent, async () => {
|
|
const llmStepTraceHooks = createLlmStepTraceHooks(resumeTraceParent);
|
|
const resumed = await asResumable(agent).resumeStream(resumeData, {
|
|
...resumeOptions,
|
|
...(llmStepTraceHooks?.executionOptions ?? {}),
|
|
});
|
|
const mastraRunId = (typeof resumed.runId === 'string' && resumed.runId) || options.mastraRunId;
|
|
return await consumeStream(agent, resumed, { ...options, mastraRunId, llmStepTraceHooks });
|
|
});
|
|
}
|
|
|
|
async function consumeStream(
|
|
agent: unknown,
|
|
stream: ResumableStreamSource,
|
|
options: StreamRunOptions & { mastraRunId: string; llmStepTraceHooks?: LlmStepTraceHooks },
|
|
): Promise<StreamRunResult> {
|
|
const result = await executeResumableStream({
|
|
agent,
|
|
stream,
|
|
context: {
|
|
threadId: options.threadId,
|
|
runId: options.runId,
|
|
agentId: options.agentId,
|
|
eventBus: options.eventBus,
|
|
signal: options.signal,
|
|
logger: options.logger,
|
|
onActivity: options.onActivity,
|
|
},
|
|
control: { mode: 'manual' },
|
|
initialMastraRunId: options.mastraRunId,
|
|
llmStepTraceHooks: options.llmStepTraceHooks,
|
|
});
|
|
|
|
if (result.status === 'suspended' && result.suspension) {
|
|
return {
|
|
status: 'suspended',
|
|
mastraRunId: result.mastraRunId,
|
|
text: result.text,
|
|
workSummary: result.workSummary,
|
|
suspension: result.suspension,
|
|
...(result.confirmationEvent ? { confirmationEvent: result.confirmationEvent } : {}),
|
|
};
|
|
}
|
|
|
|
return {
|
|
status:
|
|
result.status === 'cancelled'
|
|
? 'cancelled'
|
|
: result.status === 'errored'
|
|
? 'errored'
|
|
: 'completed',
|
|
mastraRunId: result.mastraRunId,
|
|
text: result.text,
|
|
workSummary: result.workSummary,
|
|
};
|
|
}
|