mirror of
https://github.com/n8n-io/n8n.git
synced 2026-06-03 10:17:00 +02:00
Some checks are pending
Build: Benchmark Image / build (push) Waiting to run
CI: Master (Build, Test, Lint) / Build for Github Cache (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (22.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (24.14.1) (push) Waiting to run
CI: Master (Build, Test, Lint) / Unit tests (25.x) (push) Waiting to run
CI: Master (Build, Test, Lint) / Lint (push) Waiting to run
CI: Master (Build, Test, Lint) / Performance (push) Waiting to run
CI: Master (Build, Test, Lint) / Notify Slack on failure (push) Blocked by required conditions
Util: Sync API Docs / sync-public-api (push) Waiting to run
Signed-off-by: Oleg Ivaniv <me@olegivaniv.com> Co-authored-by: Albert Alises <albert.alises@gmail.com> Co-authored-by: Jaakko Husso <jaakko@n8n.io> Co-authored-by: Dimitri Lavrenük <20122620+dlavrenuek@users.noreply.github.com> Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> Co-authored-by: Tuukka Kantola <Tuukkaa@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Mutasem Aldmour <4711238+mutdmour@users.noreply.github.com> Co-authored-by: Raúl Gómez Morales <raul00gm@gmail.com> Co-authored-by: Elias Meire <elias@meire.dev> Co-authored-by: Dimitri Lavrenük <dimitri.lavrenuek@n8n.io> Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Co-authored-by: Mutasem Aldmour <mutasem@n8n.io>
94 lines
2.5 KiB
TypeScript
94 lines
2.5 KiB
TypeScript
import { heartbeatMessageSchema } from '@n8n/api-types';
|
|
import type { User } from '@n8n/db';
|
|
import { Service } from '@n8n/di';
|
|
import { UnexpectedError } from 'n8n-workflow';
|
|
import type WebSocket from 'ws';
|
|
|
|
import { AbstractPush } from './abstract.push';
|
|
|
|
function heartbeat(this: WebSocket) {
|
|
this.isAlive = true;
|
|
}
|
|
|
|
@Service()
|
|
export class WebSocketPush extends AbstractPush<WebSocket> {
|
|
add(pushRef: string, userId: User['id'], connection: WebSocket) {
|
|
connection.isAlive = true;
|
|
connection.on('pong', heartbeat);
|
|
|
|
super.add(pushRef, userId, connection);
|
|
|
|
const onMessage = async (data: WebSocket.RawData) => {
|
|
try {
|
|
const buffer = Array.isArray(data)
|
|
? Buffer.concat(data)
|
|
: data instanceof ArrayBuffer
|
|
? Buffer.from(data)
|
|
: data;
|
|
|
|
const msg: unknown = JSON.parse(buffer.toString('utf8'));
|
|
|
|
// Client sends application level heartbeat messages to react
|
|
// to connection issues. This is in addition to the protocol
|
|
// level ping/pong mechanism used by the server.
|
|
if (await this.isClientHeartbeat(msg)) {
|
|
return;
|
|
}
|
|
|
|
this.onMessageReceived(pushRef, msg);
|
|
} catch (error) {
|
|
this.errorReporter.error(
|
|
new UnexpectedError('Error parsing push message', {
|
|
extra: {
|
|
userId,
|
|
data,
|
|
},
|
|
cause: error,
|
|
}),
|
|
);
|
|
this.logger.error("Couldn't parse message from editor-UI", {
|
|
error: error as unknown,
|
|
pushRef,
|
|
data,
|
|
});
|
|
}
|
|
};
|
|
|
|
// Makes sure to remove the session if the connection is closed.
|
|
// Only remove if this connection is still the active one for this pushRef —
|
|
// a newer connection may have already replaced it via add().
|
|
connection.once('close', () => {
|
|
connection.off('pong', heartbeat);
|
|
connection.off('message', onMessage);
|
|
if (this.getConnection(pushRef) === connection) {
|
|
this.remove(pushRef);
|
|
}
|
|
});
|
|
|
|
connection.on('message', onMessage);
|
|
}
|
|
|
|
protected close(connection: WebSocket): void {
|
|
connection.close();
|
|
}
|
|
|
|
protected sendToOneConnection(connection: WebSocket, data: string, asBinary: boolean): void {
|
|
connection.send(data, { binary: asBinary });
|
|
}
|
|
|
|
protected ping(connection: WebSocket): void {
|
|
// If a connection did not respond with a `PONG` in the last 60 seconds, disconnect
|
|
if (!connection.isAlive) {
|
|
return connection.terminate();
|
|
}
|
|
connection.isAlive = false;
|
|
connection.ping();
|
|
}
|
|
|
|
private async isClientHeartbeat(msg: unknown) {
|
|
const result = await heartbeatMessageSchema.safeParseAsync(msg);
|
|
|
|
return result.success;
|
|
}
|
|
}
|