n8n/packages/@n8n/crdt/src/transports/websocket.ts
Iván Ovejero df6ee78638
Some checks are pending
Build: Benchmark Image / build (push) Waiting to run
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.13.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Util: Sync API Docs / sync-public-api (push) Waiting to run
refactor: Upgrade to TypeScript 6.0.2 (#27673)
2026-04-01 11:03:37 +00:00

268 lines
6.6 KiB
TypeScript

import type { Unsubscribe } from '../types';
import type { SyncTransport } from './types';
type ReceiveHandler = (data: Uint8Array) => void;
type ConnectionHandler = (connected: boolean) => void;
type ErrorHandler = (error: Error) => void;
export interface WebSocketTransportConfig {
/** WebSocket URL (ws:// or wss://) */
url: string;
/** Enable automatic reconnection (default: true) */
reconnect?: boolean;
/** Maximum reconnection attempts (default: Infinity) */
maxReconnectAttempts?: number;
/** Initial reconnection delay in ms (default: 1000) */
reconnectDelay?: number;
/** Maximum reconnection delay in ms (default: 30000) */
maxReconnectDelay?: number;
/** Reconnection backoff multiplier (default: 2) */
reconnectBackoff?: number;
/** Connection timeout in ms (default: 10000) */
connectionTimeout?: number;
}
/**
* WebSocketTransport - Transport using WebSocket for server communication.
*
* Features:
* - Automatic reconnection with exponential backoff
* - Connection timeout handling
* - Binary message support (Uint8Array)
*
* Usage:
* ```typescript
* const transport = new WebSocketTransport({
* url: 'wss://server/sync',
* reconnect: true,
* });
*
* transport.onConnectionChange((connected) => {
* console.log('Connection state:', connected);
* });
*
* await transport.connect();
* ```
*/
export class WebSocketTransport implements SyncTransport {
private ws: WebSocket | null = null;
private receiveHandlers = new Set<ReceiveHandler>();
private connectionHandlers = new Set<ConnectionHandler>();
private errorHandlers = new Set<ErrorHandler>();
private _connected = false;
private reconnectAttempts = 0;
private reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
private shouldReconnect = false;
private isConnecting = false;
private connectionPromise: Promise<void> | null = null;
private readonly config: Required<WebSocketTransportConfig>;
constructor(config: WebSocketTransportConfig) {
this.config = {
url: config.url,
reconnect: config.reconnect ?? true,
maxReconnectAttempts: config.maxReconnectAttempts ?? Infinity,
reconnectDelay: config.reconnectDelay ?? 1000,
maxReconnectDelay: config.maxReconnectDelay ?? 30000,
reconnectBackoff: config.reconnectBackoff ?? 2,
connectionTimeout: config.connectionTimeout ?? 10000,
};
}
get connected(): boolean {
return this._connected;
}
send(data: Uint8Array): void {
if (!this._connected || !this.ws) {
throw new Error('Transport not connected');
}
// TODO: narrow SyncTransport.send to Uint8Array<ArrayBuffer> and propagate through Y.js call sites
this.ws.send(data as unknown as Uint8Array<ArrayBuffer>);
}
onReceive(handler: ReceiveHandler): Unsubscribe {
this.receiveHandlers.add(handler);
return () => {
this.receiveHandlers.delete(handler);
};
}
/**
* Subscribe to connection state changes.
*/
onConnectionChange(handler: ConnectionHandler): Unsubscribe {
this.connectionHandlers.add(handler);
return () => {
this.connectionHandlers.delete(handler);
};
}
/**
* Subscribe to connection errors.
*/
onError(handler: ErrorHandler): Unsubscribe {
this.errorHandlers.add(handler);
return () => {
this.errorHandlers.delete(handler);
};
}
async connect(): Promise<void> {
// Clear any pending reconnect to avoid parallel WebSocket connections
this.clearReconnectTimeout();
if (this._connected) {
return await Promise.resolve();
}
if (this.isConnecting && this.connectionPromise) {
return await this.connectionPromise;
}
this.shouldReconnect = this.config.reconnect;
this.connectionPromise = this.doConnect();
return await this.connectionPromise;
}
disconnect(): void {
this.shouldReconnect = false;
this.clearReconnectTimeout();
if (this.ws) {
this.ws.onclose = null;
this.ws.onerror = null;
this.ws.onmessage = null;
this.ws.onopen = null;
this.ws.close();
this.ws = null;
}
if (this._connected) {
this._connected = false;
this.notifyConnectionChange(false);
}
this.isConnecting = false;
this.connectionPromise = null;
}
private async doConnect(): Promise<void> {
this.isConnecting = true;
return await new Promise<void>((resolve, reject) => {
const timeoutId = setTimeout(() => {
if (this.ws) {
this.ws.close();
}
reject(new Error('Connection timeout'));
}, this.config.connectionTimeout);
try {
this.ws = new WebSocket(this.config.url);
this.ws.binaryType = 'arraybuffer';
this.ws.onopen = () => {
clearTimeout(timeoutId);
this._connected = true;
this.isConnecting = false;
this.reconnectAttempts = 0;
this.notifyConnectionChange(true);
resolve();
};
this.ws.onclose = () => {
clearTimeout(timeoutId);
const wasConnected = this._connected;
this._connected = false;
this.isConnecting = false;
if (wasConnected) {
this.notifyConnectionChange(false);
}
if (this.shouldReconnect) {
this.scheduleReconnect();
}
};
this.ws.onerror = () => {
clearTimeout(timeoutId);
const error = new Error('WebSocket error');
this.notifyError(error);
if (this.isConnecting) {
this.isConnecting = false;
reject(error);
}
};
this.ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
const data = new Uint8Array(event.data);
for (const handler of this.receiveHandlers) {
handler(data);
}
}
};
} catch (error) {
clearTimeout(timeoutId);
this.isConnecting = false;
reject(error instanceof Error ? error : new Error(String(error)));
}
});
}
private scheduleReconnect(): void {
if (!this.shouldReconnect) {
return;
}
if (this.reconnectAttempts >= this.config.maxReconnectAttempts) {
this.notifyError(new Error('Max reconnection attempts reached'));
return;
}
const delay = Math.min(
this.config.reconnectDelay * Math.pow(this.config.reconnectBackoff, this.reconnectAttempts),
this.config.maxReconnectDelay,
);
this.reconnectAttempts++;
this.reconnectTimeout = setTimeout(() => {
this.reconnectTimeout = null;
this.doConnect().catch((error: Error) => {
this.notifyError(error);
});
}, delay);
}
private clearReconnectTimeout(): void {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
}
private notifyConnectionChange(connected: boolean): void {
for (const handler of this.connectionHandlers) {
handler(connected);
}
}
private notifyError(error: Error): void {
for (const handler of this.errorHandlers) {
handler(error);
}
}
}