n8n/packages/@n8n/instance-ai/docs/streaming-protocol.md
oleg 629826ca1d
Some checks are pending
Build: Benchmark Image / build (push) Waiting to run
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.14.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Util: Sync API Docs / sync-public-api (push) Waiting to run
feat: Instance AI and local gateway modules (no-changelog) (#27206)
Signed-off-by: Oleg Ivaniv <me@olegivaniv.com>
Co-authored-by: Albert Alises <albert.alises@gmail.com>
Co-authored-by: Jaakko Husso <jaakko@n8n.io>
Co-authored-by: Dimitri Lavrenük <20122620+dlavrenuek@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Co-authored-by: Tuukka Kantola <Tuukkaa@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Mutasem Aldmour <4711238+mutdmour@users.noreply.github.com>
Co-authored-by: Raúl Gómez Morales <raul00gm@gmail.com>
Co-authored-by: Elias Meire <elias@meire.dev>
Co-authored-by: Dimitri Lavrenük <dimitri.lavrenuek@n8n.io>
Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
Co-authored-by: Mutasem Aldmour <mutasem@n8n.io>
2026-04-01 21:33:38 +03:00

536 lines
18 KiB
Markdown

# Streaming Protocol
## Overview
Instance AI uses a pub/sub event bus to deliver agent events to the frontend
in real-time. All agents — the orchestrator and dynamically spawned sub-agents —
publish events to a per-thread channel. The frontend subscribes independently
via SSE.
The protocol is designed for minimal time-to-first-token, progressive rendering
of multi-agent activity, and resilient reconnection.
## Transport
### Sending Messages
- **Endpoint**: `POST /instance-ai/chat/:threadId`
- **Request body**: `{ "message": "user's message" }`
- **Response**: `{ "runId": "run_abc123" }`
- **Concurrency**: One active run per thread. A second POST for the same thread
while a run is active is rejected (`409 Conflict`).
The POST kicks off the orchestrator. Events are delivered via the SSE endpoint,
not the POST response.
### Receiving Events
- **Endpoint**: `GET /instance-ai/events/:threadId`
- **Format**: Server-Sent Events (SSE)
- **Reconnect**: `Last-Event-ID` header (auto-reconnect) or `?lastEventId`
query parameter (manual reconnect) replays missed events from storage
### SSE Headers
```
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no
```
`X-Accel-Buffering: no` disables nginx/reverse proxy buffering so events are
delivered immediately.
### SSE Event IDs
Each SSE frame includes an `id:` field generated by the server:
```text
id: 42
data: {"type":"text-delta","runId":"run_abc","agentId":"agent-001","payload":{"text":"..."}}
```
Event IDs are monotonically increasing integers per thread channel and unique
within that thread.
## Event Schema
Every event follows this schema:
```typescript
{
type: string; // event type
runId: string; // correlates all events in a single message → response cycle
agentId: string; // agent this event is attributed to in the UI
payload?: object; // event-specific data
}
```
The `runId` correlates all events belonging to one user message → assistant
response cycle. It is returned by the POST endpoint and carried on every event.
The `agentId` identifies which agent branch (orchestrator or sub-agent) the
event belongs to. The frontend uses this to render an agent activity tree.
For the full TypeScript type definitions, see
`@n8n/api-types``instanceAiEventSchema` in `schemas/instance-ai.schema.ts`.
## Event Types
### `run-start`
The orchestrator has started processing a user message. Always the first
event in a run.
```json
{
"type": "run-start",
"runId": "run_abc123",
"agentId": "agent-001",
"payload": {
"messageId": "msg_xyz"
}
}
```
The `agentId` on this event identifies the orchestrator — the frontend uses
this as the root of the agent activity tree.
### `text-delta`
Incremental text from an agent's response.
```json
{"type":"text-delta","runId":"run_abc123","agentId":"agent-001","payload":{"text":"You have 3 active workflows."}}
```
The frontend appends `payload.text` to the agent's current message content.
### `reasoning-delta`
Incremental reasoning/thinking from an agent. Always streamed to the frontend
when the model produces it — this gives users visibility into the agent's
decision-making and supports faster iteration.
```json
{"type":"reasoning-delta","runId":"run_abc123","agentId":"agent-001","payload":{"text":"Let me check the workflow list..."}}
```
**Policy**: Reasoning is always shown to the user (ADR-012). Not all models emit
reasoning tokens; when a model doesn't support it, no `reasoning-delta` events
are sent. The frontend should handle the absence gracefully.
### `tool-call`
An agent is invoking a tool. Sent before the tool executes.
```json
{
"type": "tool-call",
"runId": "run_abc123",
"agentId": "agent-001",
"payload": {
"toolCallId": "tc_abc123",
"toolName": "list-workflows",
"args": {"limit": 10}
}
}
```
The frontend adds a new entry to the agent's `toolCalls` with `isLoading: true`.
### `tool-result`
A tool has completed successfully.
```json
{
"type": "tool-result",
"runId": "run_abc123",
"agentId": "agent-001",
"payload": {
"toolCallId": "tc_abc123",
"result": {"workflows": [{"id": "1", "name": "My Workflow", "active": true}]}
}
}
```
The frontend updates the matching `toolCall` entry: sets `result` and
`isLoading: false`.
### `tool-error`
A tool has failed.
```json
{
"type": "tool-error",
"runId": "run_abc123",
"agentId": "agent-001",
"payload": {
"toolCallId": "tc_abc123",
"error": "Workflow not found"
}
}
```
### `agent-spawned`
The orchestrator has created a new sub-agent via the `delegate` tool.
```json
{
"type": "agent-spawned",
"runId": "run_abc123",
"agentId": "agent-002",
"payload": {
"parentId": "agent-001",
"role": "workflow builder",
"tools": ["create-workflow", "update-workflow", "list-nodes", "get-node-description"]
}
}
```
The frontend adds a new node to the agent activity tree under the parent.
For this event type, `agentId` is the spawned sub-agent ID; `payload.parentId`
links it to the orchestrator.
### `agent-completed`
A sub-agent has finished its work.
```json
{
"type": "agent-completed",
"runId": "run_abc123",
"agentId": "agent-002",
"payload": {
"role": "workflow builder",
"result": "Created workflow wf-123 with 3 nodes"
}
}
```
The frontend marks the sub-agent node as completed.
### `confirmation-request`
A tool requires user approval before execution (HITL confirmation protocol).
The tool's execution is paused until the user responds.
```json
{
"type": "confirmation-request",
"runId": "run_abc123",
"agentId": "agent-001",
"payload": {
"requestId": "cr_xyz",
"toolCallId": "tc_abc123",
"toolName": "delete-workflow",
"args": {"workflowId": "wf-123"},
"severity": "warning",
"message": "Archive workflow 'My Workflow'?"
}
}
```
The frontend renders an approval card on the matching tool call (matched by
`toolCallId`). The user responds via `POST /instance-ai/confirm/:requestId`
with `{ approved: boolean }`. On approval, normal `tool-result` follows. On
denial, `tool-error` follows.
**Rich payload fields** (all optional, extend the base confirmation):
| Field | Type | When used |
|-------|------|-----------|
| `inputType` | `'approval'` \| `'text'` \| `'questions'` \| `'plan-review'` | Controls which UI component renders. Default: `approval` |
| `questions` | `[{id, question, type, options?}]` | Structured Q&A wizard (`inputType=questions`) |
| `tasks` | `TaskList` | Plan approval checklist (`inputType=plan-review`) |
| `introMessage` | string | Intro text shown above questions or plan review |
| `credentialRequests` | array | Credential setup requests |
| `credentialFlow` | `{stage: 'generic' \| 'finalize'}` | Controls credential picker UX |
| `setupRequests` | `WorkflowSetupNode[]` | Per-node setup cards for workflow credential/parameter config |
| `workflowId` | string | Workflow being set up (for `setup-workflow` tool) |
| `projectId` | string | Scopes actions to a project (e.g., credential creation) |
| `domainAccess` | `{url, host}` | Renders domain-access approval UI instead of generic confirm |
### `tasks-update`
A task checklist has been created or updated. The frontend renders a live
progress indicator from this data.
```json
{
"type": "tasks-update",
"runId": "run_abc123",
"agentId": "agent-001",
"payload": {
"tasks": [
{"id": "t1", "description": "Build weather workflow", "status": "completed"},
{"id": "t2", "description": "Set up Slack credential", "status": "in_progress"},
{"id": "t3", "description": "Test end-to-end", "status": "pending"}
]
}
}
```
### `status`
A transient status message. Empty string clears the indicator.
```json
{"type":"status","runId":"run_abc123","agentId":"agent-001","payload":{"message":"Searching nodes..."}}
```
### `thread-title-updated`
The thread title has been updated (e.g., auto-generated from conversation).
```json
{"type":"thread-title-updated","runId":"run_abc123","agentId":"agent-001","payload":{"title":"Weather to Slack workflow"}}
```
### `error`
A system-level error occurred.
```json
{"type":"error","runId":"run_abc123","agentId":"agent-001","payload":{"content":"An error occurred"}}
```
### `run-finish`
The orchestrator has finished processing the user's message. Always the last
event in a run.
```json
{"type":"run-finish","runId":"run_abc123","agentId":"agent-001","payload":{"status":"completed"}}
```
The frontend sets `isStreaming: false` and re-enables input.
When a run is cancelled:
```json
{"type":"run-finish","runId":"run_abc123","agentId":"agent-001","payload":{"status":"cancelled","reason":"user_cancelled"}}
```
When a run errors:
```json
{"type":"run-finish","runId":"run_abc123","agentId":"agent-001","payload":{"status":"error","reason":"LLM provider unavailable"}}
```
## Typical Event Sequence
### Simple Query (No Sub-Agents)
```
← run-start {runId: "r1", agentId: "a1", payload: {messageId: "m1"}}
← reasoning-delta {runId: "r1", agentId: "a1", payload: {text: "Let me look up..."}}
← tool-call {runId: "r1", agentId: "a1", payload: {toolName: "list-workflows"}}
← tool-result {runId: "r1", agentId: "a1", payload: {result: [...]}}
← text-delta {runId: "r1", agentId: "a1", payload: {text: "You have 3 workflows:\n"}}
← run-finish {runId: "r1", agentId: "a1", payload: {status: "completed"}}
```
### Autonomous Loop (With Sub-Agents)
```
← run-start {runId: "r1", agentId: "a1", payload: {messageId: "m1"}}
← tool-call {runId: "r1", agentId: "a1", payload: {toolName: "plan", ...}}
← tool-result {runId: "r1", agentId: "a1", payload: {result: {goal: "Weather to Slack"}}}
← tool-call {runId: "r1", agentId: "a1", payload: {toolName: "delegate", toolCallId: "tc2"}}
← agent-spawned {runId: "r1", agentId: "a2", payload: {parentId: "a1", role: "workflow builder"}}
← tool-call {runId: "r1", agentId: "a2", payload: {toolName: "create-workflow"}}
← tool-result {runId: "r1", agentId: "a2", payload: {result: {id: "wf-123"}}}
← agent-completed {runId: "r1", agentId: "a2", payload: {result: "Created wf-123"}}
← tool-result {runId: "r1", agentId: "a1", payload: {toolCallId: "tc2", result: "Created wf-123"}}
← tool-call {runId: "r1", agentId: "a1", payload: {toolName: "run-workflow"}}
← tool-result {runId: "r1", agentId: "a1", payload: {result: {executionId: "exec-456"}}}
← tool-call {runId: "r1", agentId: "a1", payload: {toolName: "get-execution"}}
← tool-result {runId: "r1", agentId: "a1", payload: {result: {status: "error"}}}
← tool-call {runId: "r1", agentId: "a1", payload: {toolName: "delegate", toolCallId: "tc5"}}
← agent-spawned {runId: "r1", agentId: "a3", payload: {parentId: "a1", role: "execution debugger"}}
← tool-call {runId: "r1", agentId: "a3", payload: {toolName: "get-execution"}}
← reasoning-delta {runId: "r1", agentId: "a3", payload: {text: "The HTTP node returned 401..."}}
← agent-completed {runId: "r1", agentId: "a3", payload: {result: "Missing API key header"}}
← tool-result {runId: "r1", agentId: "a1", payload: {toolCallId: "tc5", result: "Missing API key"}}
← tool-call {runId: "r1", agentId: "a1", payload: {toolName: "plan", args: {action: "update"}}}
← ...loop continues...
← text-delta {runId: "r1", agentId: "a1", payload: {text: "Done! I created a workflow..."}}
← run-finish {runId: "r1", agentId: "a1", payload: {status: "completed"}}
```
## Event Bus
### Architecture
```mermaid
graph LR
subgraph Agents
O[Orchestrator] -->|publish| Bus[Event Bus]
S1[Sub-Agent A] -->|publish| Bus
S2[Sub-Agent B] -->|publish| Bus
end
Bus --> Store[Thread Storage]
Bus --> SSE[SSE Endpoint]
SSE --> FE[Frontend]
```
All events are published to a per-thread channel on the event bus. Events are
simultaneously persisted to thread storage and delivered to connected SSE clients.
### Implementations
| Deployment | Transport | Why |
|---|---|---|
| Single instance | In-process `EventEmitter` | Zero infrastructure |
| Queue mode | Redis Pub/Sub | n8n already uses Redis |
Event persistence uses thread storage regardless of transport — this provides
replay capability for reconnection.
### Reconnection & Replay (Canonical Rule)
The SSE endpoint supports replay via `event.id > cursor`. The cursor is
provided by the client through one of two mechanisms. The server behavior
is identical for both — only the source of the cursor differs.
Three scenarios:
| Scenario | Cursor source | Server behavior |
|---|---|---|
| **Auto-reconnect** (connection drop) | `Last-Event-ID` header, set by the browser automatically | Replay events after cursor, then switch to live |
| **Page reload** (same thread) | `?lastEventId=N` query parameter, from the frontend's per-thread stored cursor | Replay events after cursor, then switch to live |
| **Thread switch** (or first open) | No cursor (neither header nor query param) | Replay full event history from the beginning |
The backend must accept the cursor from both `Last-Event-ID` header and
`?lastEventId` query parameter. If neither is present, replay starts from
event ID 0 (full history).
IDs are monotonically increasing integers per thread. Replay does not
require dedup.
## Abort Support
The frontend can abort a running agent by sending:
- **Endpoint**: `POST /instance-ai/chat/:threadId/cancel`
- **Semantics**: Idempotent. Cancels the active run for the thread (if any).
- **Behavior**: Stops orchestrator and active sub-agents, then emits final
`run-finish` with `payload.status = "cancelled"`.
- **Race behavior**: If the run already completed, cancel is a no-op.
## Frontend Rendering
### Agent Activity Tree
The frontend renders events as a collapsible tree grouped by `agentId`:
```
🤖 Orchestrator
├── 💭 "Let me check what credentials are available..."
├── 🔧 list-credentials → [slack-bot, weather-api]
├── 📋 plan: build → execute → inspect
├── 🤖 Sub-Agent A (workflow builder)
│ ├── 🔧 list-nodes → [scheduleTrigger, httpRequest, slack]
│ ├── 🔧 create-workflow → wf-123
│ └── ✅ "Created wf-123 with 3 nodes"
├── 🔧 run-workflow wf-123
├── 🔧 get-execution → error (401)
├── 🤖 Sub-Agent B (execution debugger)
│ ├── 🔧 get-execution → {error details}
│ ├── 💭 "HTTP node returned 401..."
│ └── ✅ "Missing API key in query params"
└── 💬 "Done! Your workflow runs daily at 8am..."
```
Sub-agent sections are collapsible — users can drill into details or just see
the summary.
## Session Restore
When the user refreshes the page or navigates back to a thread, the frontend
restores the full session state (messages, tool calls, agent trees) without
replaying all SSE events.
### Endpoints
- **`GET /instance-ai/threads/:threadId/messages`** — returns rich
`InstanceAiMessage[]` with full agent trees, tool calls, and reasoning.
Includes a `nextEventId` field indicating the SSE cursor position at the
time of response.
- **`GET /instance-ai/threads/:threadId/status`** — returns the thread's
current activity state:
```json
{
"hasActiveRun": false,
"isSuspended": false,
"backgroundTasks": [
{ "taskId": "t1", "role": "workflow builder", "agentId": "agent-002", "status": "running", "startedAt": 1709300000 }
]
}
```
### How It Works
1. **Mastra V2 messages** — Mastra persists tool invocations, reasoning, and
text in its V2 message format. The backend parses these into rich
`InstanceAiMessage[]` objects with tool calls and flat agent trees.
2. **Agent tree snapshots** — after each `run-finish`, the backend replays
events through `buildAgentTreeFromEvents()` and stores the resulting tree
in thread metadata. This preserves the full sub-agent hierarchy (tool
calls, text, reasoning) that the V2 message format alone cannot capture.
3. **SSE cursor** — the messages response includes `nextEventId`. The frontend
sets its SSE cursor to `nextEventId - 1` so the SSE connection only receives
events that arrived after the historical snapshot. This prevents duplicate
messages on refresh.
### Frontend Flow
```
1. Load historical messages (GET /threads/:threadId/messages)
└── Sets messages[], sets SSE cursor to nextEventId - 1
2. Load thread status (GET /threads/:threadId/status)
└── Sets activeRunId if run is active, injects background tasks
3. Connect SSE (GET /events/:threadId?lastEventId=<cursor>)
└── Only receives live events going forward
```
The order is sequential: historical messages load first, then SSE connects.
This eliminates the race condition where SSE and HTTP responses would compete,
creating duplicate messages.
## Complete Event Type Reference
| Event Type | Payload Key Fields | Purpose |
|------------|-------------------|---------|
| `run-start` | `messageId` | First event in a run |
| `run-finish` | `status`, `reason?` | Last event in a run |
| `text-delta` | `text` | Incremental agent text |
| `reasoning-delta` | `text` | Incremental agent reasoning |
| `tool-call` | `toolCallId`, `toolName`, `args` | Tool invocation (before execution) |
| `tool-result` | `toolCallId`, `result` | Successful tool completion |
| `tool-error` | `toolCallId`, `error` | Failed tool execution |
| `agent-spawned` | `parentId`, `role`, `tools` | Sub-agent created |
| `agent-completed` | `role`, `result` | Sub-agent finished |
| `confirmation-request` | `requestId`, `toolCallId`, `severity`, `message`, ... | HITL approval gate |
| `tasks-update` | `tasks` | Task checklist created/updated |
| `status` | `message` | Transient status indicator |
| `error` | `content`, `statusCode?`, `provider?` | System-level error |
| `thread-title-updated` | `title` | Thread title changed |
| `filesystem-request` | `requestId`, `operation`, `args` | Gateway filesystem operation (internal) |
All event types are defined as a Zod discriminated union in
`@n8n/api-types/src/schemas/instance-ai.schema.ts`.