n8n/packages/cli/src/push/websocket.push.ts
oleg 629826ca1d
Some checks are pending
Build: Benchmark Image / build (push) Waiting to run
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.14.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Util: Sync API Docs / sync-public-api (push) Waiting to run
feat: Instance AI and local gateway modules (no-changelog) (#27206)
Signed-off-by: Oleg Ivaniv <me@olegivaniv.com>
Co-authored-by: Albert Alises <albert.alises@gmail.com>
Co-authored-by: Jaakko Husso <jaakko@n8n.io>
Co-authored-by: Dimitri Lavrenük <20122620+dlavrenuek@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Co-authored-by: Tuukka Kantola <Tuukkaa@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Mutasem Aldmour <4711238+mutdmour@users.noreply.github.com>
Co-authored-by: Raúl Gómez Morales <raul00gm@gmail.com>
Co-authored-by: Elias Meire <elias@meire.dev>
Co-authored-by: Dimitri Lavrenük <dimitri.lavrenuek@n8n.io>
Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
Co-authored-by: Mutasem Aldmour <mutasem@n8n.io>
2026-04-01 21:33:38 +03:00

94 lines
2.5 KiB
TypeScript

import { heartbeatMessageSchema } from '@n8n/api-types';
import type { User } from '@n8n/db';
import { Service } from '@n8n/di';
import { UnexpectedError } from 'n8n-workflow';
import type WebSocket from 'ws';
import { AbstractPush } from './abstract.push';
function heartbeat(this: WebSocket) {
this.isAlive = true;
}
@Service()
export class WebSocketPush extends AbstractPush<WebSocket> {
add(pushRef: string, userId: User['id'], connection: WebSocket) {
connection.isAlive = true;
connection.on('pong', heartbeat);
super.add(pushRef, userId, connection);
const onMessage = async (data: WebSocket.RawData) => {
try {
const buffer = Array.isArray(data)
? Buffer.concat(data)
: data instanceof ArrayBuffer
? Buffer.from(data)
: data;
const msg: unknown = JSON.parse(buffer.toString('utf8'));
// Client sends application level heartbeat messages to react
// to connection issues. This is in addition to the protocol
// level ping/pong mechanism used by the server.
if (await this.isClientHeartbeat(msg)) {
return;
}
this.onMessageReceived(pushRef, msg);
} catch (error) {
this.errorReporter.error(
new UnexpectedError('Error parsing push message', {
extra: {
userId,
data,
},
cause: error,
}),
);
this.logger.error("Couldn't parse message from editor-UI", {
error: error as unknown,
pushRef,
data,
});
}
};
// Makes sure to remove the session if the connection is closed.
// Only remove if this connection is still the active one for this pushRef —
// a newer connection may have already replaced it via add().
connection.once('close', () => {
connection.off('pong', heartbeat);
connection.off('message', onMessage);
if (this.getConnection(pushRef) === connection) {
this.remove(pushRef);
}
});
connection.on('message', onMessage);
}
protected close(connection: WebSocket): void {
connection.close();
}
protected sendToOneConnection(connection: WebSocket, data: string, asBinary: boolean): void {
connection.send(data, { binary: asBinary });
}
protected ping(connection: WebSocket): void {
// If a connection did not respond with a `PONG` in the last 60 seconds, disconnect
if (!connection.isAlive) {
return connection.terminate();
}
connection.isAlive = false;
connection.ping();
}
private async isClientHeartbeat(msg: unknown) {
const result = await heartbeatMessageSchema.safeParseAsync(msg);
return result.success;
}
}