mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-05 02:59:27 +02:00
feat(core): Add smooth stream options to agents sdk (#31715)
Co-authored-by: bjorger <50590409+bjorger@users.noreply.github.com>
This commit is contained in:
parent
8eb25b78af
commit
790afbcf3d
|
|
@ -67,6 +67,7 @@ export type {
|
|||
McpVerifyResult,
|
||||
ModelConfig,
|
||||
ExecutionOptions,
|
||||
SmoothStreamOptions,
|
||||
AgentExecutionCounter,
|
||||
PersistedExecutionOptions,
|
||||
BuiltTelemetry,
|
||||
|
|
|
|||
|
|
@ -3635,6 +3635,66 @@ describe('AgentRuntime — telemetry propagation', () => {
|
|||
expect(expTelemetry.tracer).toBe(baseTelemetry.tracer);
|
||||
});
|
||||
|
||||
it('enables smoothStream by default on streamText', async () => {
|
||||
streamText.mockReturnValue(makeStreamSuccess());
|
||||
const smoothStreamSpy = vi.spyOn(aiModule, 'smoothStream');
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'smooth-stream-default-test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'test',
|
||||
eventBus: new AgentEventBus(),
|
||||
});
|
||||
|
||||
const { stream } = await runtime.stream('hello');
|
||||
await collectChunks(stream);
|
||||
|
||||
const callArgs = streamText.mock.calls[0][0] as Record<string, unknown>;
|
||||
expect(callArgs.experimental_transform).toEqual(expect.any(Function));
|
||||
expect(smoothStreamSpy).toHaveBeenCalledWith({});
|
||||
|
||||
smoothStreamSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('omits smoothStream when explicitly disabled', async () => {
|
||||
streamText.mockReturnValue(makeStreamSuccess());
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'smooth-stream-disabled-test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'test',
|
||||
eventBus: new AgentEventBus(),
|
||||
});
|
||||
|
||||
const { stream } = await runtime.stream('hello', { smoothStream: false });
|
||||
await collectChunks(stream);
|
||||
|
||||
const callArgs = streamText.mock.calls[0][0] as Record<string, unknown>;
|
||||
expect(callArgs.experimental_transform).toBeUndefined();
|
||||
});
|
||||
|
||||
it('forwards non-default smoothStream options to the AI SDK', async () => {
|
||||
streamText.mockReturnValue(makeStreamSuccess());
|
||||
|
||||
const smoothStreamSpy = vi.spyOn(aiModule, 'smoothStream');
|
||||
|
||||
const runtime = new AgentRuntime({
|
||||
name: 'smooth-stream-options-test',
|
||||
model: 'openai/gpt-4o-mini',
|
||||
instructions: 'test',
|
||||
eventBus: new AgentEventBus(),
|
||||
});
|
||||
|
||||
const smoothStreamOptions = { delayInMs: 25, chunking: 'line' as const };
|
||||
const { stream } = await runtime.stream('hello', { smoothStream: smoothStreamOptions });
|
||||
|
||||
await collectChunks(stream);
|
||||
|
||||
expect(smoothStreamSpy).toHaveBeenCalledWith(smoothStreamOptions);
|
||||
|
||||
smoothStreamSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('inherits telemetry from ExecutionOptions when no own telemetry is set', async () => {
|
||||
generateText.mockResolvedValue(makeGenerateSuccess());
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import type { ProviderOptions } from '@ai-sdk/provider-utils';
|
||||
import type { TelemetrySettings, ToolCallRepairFunction, ToolSet } from 'ai';
|
||||
import type { StreamTextTransform, TelemetrySettings, ToolCallRepairFunction, ToolSet } from 'ai';
|
||||
import type { z } from 'zod';
|
||||
import { zodToJsonSchema, type JsonSchema7Type } from 'zod-to-json-schema';
|
||||
|
||||
|
|
@ -748,6 +748,17 @@ export class AgentRuntime {
|
|||
},
|
||||
};
|
||||
}
|
||||
|
||||
private buildSmoothStreamTransformOptions(options?: ExecutionOptions): {
|
||||
experimental_transform?: StreamTextTransform<ToolSet>;
|
||||
} {
|
||||
if (options?.smoothStream === false) return {};
|
||||
|
||||
const { smoothStream } = loadAi();
|
||||
|
||||
return { experimental_transform: smoothStream(options?.smoothStream ?? {}) };
|
||||
}
|
||||
|
||||
/** Map resolved telemetry to AI SDK's experimental_telemetry shape. */
|
||||
private buildTelemetryOptions(options?: ExecutionOptions): {
|
||||
experimental_telemetry?: TelemetrySettings;
|
||||
|
|
@ -1315,6 +1326,7 @@ export class AgentRuntime {
|
|||
: {}),
|
||||
...(staticLoopContext.outputSpec ? { output: staticLoopContext.outputSpec } : {}),
|
||||
...this.buildAiSdkOptions(toolMap, options),
|
||||
...this.buildSmoothStreamTransformOptions(options),
|
||||
});
|
||||
|
||||
// Consume the stream. When the AbortSignal fires mid-stream the
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ export type {
|
|||
ModelConfig,
|
||||
RunOptions,
|
||||
ExecutionOptions,
|
||||
SmoothStreamOptions,
|
||||
AgentExecutionCounter,
|
||||
PersistedExecutionOptions,
|
||||
ResumeOptions,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import type { ProviderOptions } from '@ai-sdk/provider-utils';
|
||||
import type { LanguageModel } from 'ai';
|
||||
import type { LanguageModel, smoothStream } from 'ai';
|
||||
import type { JsonSchema7Type } from 'zod-to-json-schema';
|
||||
|
||||
import type { AgentMessage, ContentMetadata } from './message';
|
||||
|
|
@ -14,6 +14,8 @@ import type { SerializedMessageList } from '../runtime/message-list';
|
|||
import type { BuiltTelemetry } from '../telemetry';
|
||||
import type { JSONValue } from '../utils/json';
|
||||
|
||||
export type SmoothStreamOptions = NonNullable<Parameters<typeof smoothStream>[0]>;
|
||||
|
||||
export type FinishReason =
|
||||
| 'stop'
|
||||
| 'max-iterations'
|
||||
|
|
@ -157,6 +159,8 @@ export interface ExecutionOptions {
|
|||
maxIterations?: number;
|
||||
abortSignal?: AbortSignal;
|
||||
providerOptions?: ProviderOptions;
|
||||
/** AI SDK `smoothStream` transform. Enabled by default; pass `false` to disable. */
|
||||
smoothStream?: SmoothStreamOptions | false;
|
||||
/** Inherited telemetry from a host runtime. */
|
||||
telemetry?: BuiltTelemetry;
|
||||
/** Inherited execution counter from the host runtime. Used for aggregate heartbeat telemetry. */
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user