From 790afbcf3dfc8c10169ca7ae3e54d1f242d76108 Mon Sep 17 00:00:00 2001 From: Riqwan Thamir Date: Thu, 4 Jun 2026 12:28:40 +0200 Subject: [PATCH] feat(core): Add smooth stream options to agents sdk (#31715) Co-authored-by: bjorger <50590409+bjorger@users.noreply.github.com> --- packages/@n8n/agents/src/index.ts | 1 + .../runtime/__tests__/agent-runtime.test.ts | 60 +++++++++++++++++++ .../@n8n/agents/src/runtime/agent-runtime.ts | 14 ++++- packages/@n8n/agents/src/types/index.ts | 1 + packages/@n8n/agents/src/types/sdk/agent.ts | 6 +- 5 files changed, 80 insertions(+), 2 deletions(-) diff --git a/packages/@n8n/agents/src/index.ts b/packages/@n8n/agents/src/index.ts index 2aecfc8db15..0999112cd61 100644 --- a/packages/@n8n/agents/src/index.ts +++ b/packages/@n8n/agents/src/index.ts @@ -67,6 +67,7 @@ export type { McpVerifyResult, ModelConfig, ExecutionOptions, + SmoothStreamOptions, AgentExecutionCounter, PersistedExecutionOptions, BuiltTelemetry, diff --git a/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts b/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts index 39d9ad1246b..98ed512ae99 100644 --- a/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts +++ b/packages/@n8n/agents/src/runtime/__tests__/agent-runtime.test.ts @@ -3635,6 +3635,66 @@ describe('AgentRuntime — telemetry propagation', () => { expect(expTelemetry.tracer).toBe(baseTelemetry.tracer); }); + it('enables smoothStream by default on streamText', async () => { + streamText.mockReturnValue(makeStreamSuccess()); + const smoothStreamSpy = vi.spyOn(aiModule, 'smoothStream'); + + const runtime = new AgentRuntime({ + name: 'smooth-stream-default-test', + model: 'openai/gpt-4o-mini', + instructions: 'test', + eventBus: new AgentEventBus(), + }); + + const { stream } = await runtime.stream('hello'); + await collectChunks(stream); + + const callArgs = streamText.mock.calls[0][0] as Record; + expect(callArgs.experimental_transform).toEqual(expect.any(Function)); + expect(smoothStreamSpy).toHaveBeenCalledWith({}); + + smoothStreamSpy.mockRestore(); + }); + + it('omits smoothStream when explicitly disabled', async () => { + streamText.mockReturnValue(makeStreamSuccess()); + + const runtime = new AgentRuntime({ + name: 'smooth-stream-disabled-test', + model: 'openai/gpt-4o-mini', + instructions: 'test', + eventBus: new AgentEventBus(), + }); + + const { stream } = await runtime.stream('hello', { smoothStream: false }); + await collectChunks(stream); + + const callArgs = streamText.mock.calls[0][0] as Record; + expect(callArgs.experimental_transform).toBeUndefined(); + }); + + it('forwards non-default smoothStream options to the AI SDK', async () => { + streamText.mockReturnValue(makeStreamSuccess()); + + const smoothStreamSpy = vi.spyOn(aiModule, 'smoothStream'); + + const runtime = new AgentRuntime({ + name: 'smooth-stream-options-test', + model: 'openai/gpt-4o-mini', + instructions: 'test', + eventBus: new AgentEventBus(), + }); + + const smoothStreamOptions = { delayInMs: 25, chunking: 'line' as const }; + const { stream } = await runtime.stream('hello', { smoothStream: smoothStreamOptions }); + + await collectChunks(stream); + + expect(smoothStreamSpy).toHaveBeenCalledWith(smoothStreamOptions); + + smoothStreamSpy.mockRestore(); + }); + it('inherits telemetry from ExecutionOptions when no own telemetry is set', async () => { generateText.mockResolvedValue(makeGenerateSuccess()); diff --git a/packages/@n8n/agents/src/runtime/agent-runtime.ts b/packages/@n8n/agents/src/runtime/agent-runtime.ts index d259717b94e..2ba8cf72794 100644 --- a/packages/@n8n/agents/src/runtime/agent-runtime.ts +++ b/packages/@n8n/agents/src/runtime/agent-runtime.ts @@ -1,5 +1,5 @@ import type { ProviderOptions } from '@ai-sdk/provider-utils'; -import type { TelemetrySettings, ToolCallRepairFunction, ToolSet } from 'ai'; +import type { StreamTextTransform, TelemetrySettings, ToolCallRepairFunction, ToolSet } from 'ai'; import type { z } from 'zod'; import { zodToJsonSchema, type JsonSchema7Type } from 'zod-to-json-schema'; @@ -748,6 +748,17 @@ export class AgentRuntime { }, }; } + + private buildSmoothStreamTransformOptions(options?: ExecutionOptions): { + experimental_transform?: StreamTextTransform; + } { + if (options?.smoothStream === false) return {}; + + const { smoothStream } = loadAi(); + + return { experimental_transform: smoothStream(options?.smoothStream ?? {}) }; + } + /** Map resolved telemetry to AI SDK's experimental_telemetry shape. */ private buildTelemetryOptions(options?: ExecutionOptions): { experimental_telemetry?: TelemetrySettings; @@ -1315,6 +1326,7 @@ export class AgentRuntime { : {}), ...(staticLoopContext.outputSpec ? { output: staticLoopContext.outputSpec } : {}), ...this.buildAiSdkOptions(toolMap, options), + ...this.buildSmoothStreamTransformOptions(options), }); // Consume the stream. When the AbortSignal fires mid-stream the diff --git a/packages/@n8n/agents/src/types/index.ts b/packages/@n8n/agents/src/types/index.ts index 0cb8953d35f..0099e79aee7 100644 --- a/packages/@n8n/agents/src/types/index.ts +++ b/packages/@n8n/agents/src/types/index.ts @@ -36,6 +36,7 @@ export type { ModelConfig, RunOptions, ExecutionOptions, + SmoothStreamOptions, AgentExecutionCounter, PersistedExecutionOptions, ResumeOptions, diff --git a/packages/@n8n/agents/src/types/sdk/agent.ts b/packages/@n8n/agents/src/types/sdk/agent.ts index e19e25aaf29..cfe14bc035c 100644 --- a/packages/@n8n/agents/src/types/sdk/agent.ts +++ b/packages/@n8n/agents/src/types/sdk/agent.ts @@ -1,5 +1,5 @@ import type { ProviderOptions } from '@ai-sdk/provider-utils'; -import type { LanguageModel } from 'ai'; +import type { LanguageModel, smoothStream } from 'ai'; import type { JsonSchema7Type } from 'zod-to-json-schema'; import type { AgentMessage, ContentMetadata } from './message'; @@ -14,6 +14,8 @@ import type { SerializedMessageList } from '../runtime/message-list'; import type { BuiltTelemetry } from '../telemetry'; import type { JSONValue } from '../utils/json'; +export type SmoothStreamOptions = NonNullable[0]>; + export type FinishReason = | 'stop' | 'max-iterations' @@ -157,6 +159,8 @@ export interface ExecutionOptions { maxIterations?: number; abortSignal?: AbortSignal; providerOptions?: ProviderOptions; + /** AI SDK `smoothStream` transform. Enabled by default; pass `false` to disable. */ + smoothStream?: SmoothStreamOptions | false; /** Inherited telemetry from a host runtime. */ telemetry?: BuiltTelemetry; /** Inherited execution counter from the host runtime. Used for aggregate heartbeat telemetry. */