feat(core): Monitor and recover Postgres connection pool (#31008)

Co-authored-by: Danny Martini <danny@n8n.io>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Lorent Lempereur 2026-06-02 21:20:13 +02:00 committed by GitHub
parent ff1a7aeb19
commit 957fdecabc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1017 additions and 158 deletions

View File

@ -90,6 +90,18 @@ class PostgresConfig {
@Env('DB_POSTGRESDB_STATEMENT_TIMEOUT')
statementTimeoutMs: number = 5 * 60 * 1000; // 5 minutes
/** Maximum lifetime in milliseconds of a pooled Postgres connection before it is recycled. Set to 0 to disable. */
@Env('DB_POSTGRESDB_MAX_CONNECTION_LIFETIME_MS')
maxConnectionLifetimeMs: number = 60 * 60 * 1000;
/** Whether to enable TCP keepalive on Postgres connections so dead peers are detected without waiting for a query. */
@Env('DB_POSTGRESDB_KEEP_ALIVE')
keepAlive: boolean = true;
/** Initial delay in milliseconds before the first TCP keepalive probe is sent. */
@Env('DB_POSTGRESDB_KEEP_ALIVE_INITIAL_DELAY_MS')
keepAliveInitialDelayMs: number = 10_000;
@Nested
ssl: PostgresSSLConfig;
}
@ -118,6 +130,23 @@ export class SqliteConfig {
const dbTypeSchema = z.enum(['sqlite', 'postgresdb']);
type DbType = z.infer<typeof dbTypeSchema>;
const DEFAULT_PING_TIMEOUT_MS = 5_000;
function readLegacyPingTimeoutMs(): number {
const raw = process.env.N8N_DB_PING_TIMEOUT;
if (!raw) {
return DEFAULT_PING_TIMEOUT_MS;
}
const parsed = Number.parseInt(raw, 10);
if (Number.isNaN(parsed) || parsed <= 0) {
console.warn(
`Invalid N8N_DB_PING_TIMEOUT="${raw}", falling back to ${DEFAULT_PING_TIMEOUT_MS}ms. Prefer the supported DB_PING_TIMEOUT_MS env var.`,
);
return DEFAULT_PING_TIMEOUT_MS;
}
return parsed;
}
@Config
export class DatabaseConfig {
/** Database type: `sqlite` or `postgresdb`. */
@ -132,6 +161,15 @@ export class DatabaseConfig {
@Env('DB_PING_INTERVAL_SECONDS')
pingIntervalSeconds: number = 2;
/**
* Timeout in milliseconds for an individual database health-check ping.
*
* Falls back to the legacy `N8N_DB_PING_TIMEOUT` env var if set. The legacy
* name is deprecated and may be removed in a future release.
*/
@Env('DB_PING_TIMEOUT_MS')
pingTimeoutMs: number = readLegacyPingTimeoutMs();
@Nested
logging: LoggingConfig;

View File

@ -96,6 +96,9 @@ describe('GlobalConfig', () => {
connectionTimeoutMs: 20_000,
idleTimeoutMs: 30_000,
statementTimeoutMs: 5 * 60 * 1000,
maxConnectionLifetimeMs: 60 * 60 * 1000,
keepAlive: true,
keepAliveInitialDelayMs: 10_000,
ssl: {
ca: '',
cert: '',
@ -113,6 +116,7 @@ describe('GlobalConfig', () => {
tablePrefix: '',
type: 'sqlite',
pingIntervalSeconds: 2,
pingTimeoutMs: 5_000,
} as DatabaseConfig,
credentials: {
defaultName: 'My credentials',
@ -609,6 +613,7 @@ describe('GlobalConfig', () => {
tablePrefix: 'test_',
type: 'sqlite',
pingIntervalSeconds: 2,
pingTimeoutMs: 5_000,
},
endpoints: {
...defaultConfig.endpoints,

View File

@ -0,0 +1,655 @@
/* eslint-disable @typescript-eslint/unbound-method */
import type { Logger } from '@n8n/backend-common';
import type { DatabaseConfig } from '@n8n/config';
import type { DataSource } from '@n8n/typeorm';
import { mock, mockDeep } from 'jest-mock-extended';
import type { ErrorReporter } from 'n8n-core';
import type TimersPromises from 'timers/promises';
import { setTimeout as setTimeoutP } from 'timers/promises';
import { DbConnectionMonitor } from '../db-connection-monitor';
// The monitor uses `setTimeout` from `timers/promises` for recovery backoff.
// Mocking it lets us drive the recovery loop deterministically without juggling
// jest fake timers against async/await microtask ordering.
jest.mock('timers/promises', () => {
const actual = jest.requireActual<typeof TimersPromises>('timers/promises');
return { ...actual, setTimeout: jest.fn() };
});
const mockedSetTimeoutP = setTimeoutP as jest.MockedFunction<typeof setTimeoutP>;
const flushMicrotasks = async () => await new Promise((resolve) => setImmediate(resolve));
describe('DbConnectionMonitor', () => {
let monitor: DbConnectionMonitor;
let onConnectedChange: jest.MockedFunction<(connected: boolean) => void>;
const errorReporter = mock<ErrorReporter>();
const databaseConfig = mock<DatabaseConfig>({ pingTimeoutMs: 5_000 });
const logger = mock<Logger>();
const dataSource = mockDeep<DataSource>({ options: { type: 'postgres' } });
beforeEach(() => {
jest.resetAllMocks();
// Default: never resolves, so query wins the ping timeout race and
// recovery backoff stays suspended unless a test overrides it.
mockedSetTimeoutP.mockImplementation(async () => await new Promise(() => {}));
onConnectedChange = jest.fn();
monitor = new DbConnectionMonitor(
dataSource,
onConnectedChange,
databaseConfig,
logger,
errorReporter,
);
});
describe('ping', () => {
it('should update connection state on successful ping', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// eslint-disable-next-line @typescript-eslint/naming-convention
dataSource.query.mockResolvedValue([{ '1': 1 }]);
// @ts-expect-error private property
monitor.connected = false;
// @ts-expect-error private property
await monitor.ping();
expect(dataSource.query).toHaveBeenCalledWith('SELECT 1');
expect(onConnectedChange).toHaveBeenLastCalledWith(true);
});
it('should mark connection as disconnected when a ping fails', async () => {
// The owner's connectionState.connected drives the /healthz/readiness endpoint and
// the 503-fast-fail middleware in abstract-server. If a failed ping doesn't propagate
// disconnected=false, in-flight requests keep hitting a poisoned pool instead of bailing.
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// @ts-expect-error private property
monitor.connected = true;
dataSource.query.mockRejectedValue(new Error('pool dead'));
// @ts-expect-error private property
await monitor.ping();
expect(onConnectedChange).toHaveBeenLastCalledWith(false);
});
it('should report errors on failed ping', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
const error = new Error('Connection error');
dataSource.query.mockRejectedValue(error);
// @ts-expect-error private property
await monitor.ping();
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
it('should not report OperationalError (ping timeout) to error reporter', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// Query never resolves; the timeout race wins and throws an OperationalError.
dataSource.query.mockReturnValue(new Promise(() => {}));
// Force the timeout side of the Promise.race to resolve immediately.
mockedSetTimeoutP.mockResolvedValueOnce(undefined);
// @ts-expect-error private property
await monitor.ping();
expect(errorReporter.error).not.toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining('Database connection timed out'),
);
});
it('should schedule next ping after execution', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// eslint-disable-next-line @typescript-eslint/naming-convention
dataSource.query.mockResolvedValue([{ '1': 1 }]);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const scheduleNextPingSpy = jest.spyOn(monitor as any, 'scheduleNextPing');
// @ts-expect-error private property
await monitor.ping();
expect(scheduleNextPingSpy).toHaveBeenCalled();
});
it('should not query if data source is not initialized', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = false;
// @ts-expect-error private property
await monitor.ping();
expect(dataSource.query).not.toHaveBeenCalled();
});
it('should not query if monitor is stopped', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
monitor.stop();
// @ts-expect-error private property
await monitor.ping();
expect(dataSource.query).not.toHaveBeenCalled();
});
it('should reset failure counter on successful ping', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.query
.mockRejectedValueOnce(new Error('Connection terminated unexpectedly'))
// eslint-disable-next-line @typescript-eslint/naming-convention
.mockResolvedValueOnce([{ '1': 1 }])
.mockRejectedValueOnce(new Error('read ECONNRESET'))
.mockRejectedValueOnce(
new Error('Client has encountered a connection error and is not queryable'),
);
const recoverSpy = jest
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.spyOn(monitor as any, 'recoverDataSource')
.mockResolvedValue(undefined);
// @ts-expect-error private property
await monitor.ping();
// @ts-expect-error private property
await monitor.ping();
// @ts-expect-error private property
await monitor.ping();
// @ts-expect-error private property
await monitor.ping();
expect(recoverSpy).not.toHaveBeenCalled();
});
it('should trigger recovery after consecutive failures', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.query.mockRejectedValue(new Error('pool poisoned'));
const recoverSpy = jest
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.spyOn(monitor as any, 'recoverDataSource')
.mockResolvedValue(undefined);
// @ts-expect-error private property
await monitor.ping();
// @ts-expect-error private property
await monitor.ping();
// @ts-expect-error private property
await monitor.ping();
expect(recoverSpy).toHaveBeenCalledTimes(1);
});
it('should report and recover from an unexpected throw inside recoverDataSource', async () => {
// If something throws between `this.recovering = true` and the inner try/catch
// inside recoverDataSource (e.g. a broken logger), the outer try/catch/finally
// must (a) surface the error via errorReporter and (b) clear the `recovering`
// flag so subsequent pings can keep probing.
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.query.mockRejectedValue(new Error('pool poisoned'));
// Throw from the "Attempting to recover" warn — this fires after recovering=true
// but before the inner try/catch that protects destroy/initialize.
const loggerError = new Error('logger broke');
logger.warn.mockImplementation((msg: unknown) => {
if (typeof msg === 'string' && msg.includes('Attempting to recover')) {
throw loggerError;
}
});
// @ts-expect-error private property
await monitor.ping();
// @ts-expect-error private property
await monitor.ping();
// @ts-expect-error private property
await monitor.ping();
// Let the fire-and-forget recoverDataSource() promise settle.
await flushMicrotasks();
// @ts-expect-error private property
await monitor.ping();
// Outer catch surfaces the unexpected throw to Sentry.
expect(errorReporter.error).toHaveBeenCalledWith(loggerError);
// Finally clears `recovering` so the 4th ping runs instead of early-returning.
expect(dataSource.query).toHaveBeenCalledTimes(4);
});
it('should skip query while recovery is in progress', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// @ts-expect-error private property
monitor.recovering = true;
// @ts-expect-error private property
await monitor.ping();
expect(dataSource.query).not.toHaveBeenCalled();
});
it('should execute ping on schedule', () => {
jest.useFakeTimers();
try {
const scheduledMonitor = new DbConnectionMonitor(
dataSource,
onConnectedChange,
mock<DatabaseConfig>({ pingIntervalSeconds: 1, pingTimeoutMs: 5_000 }),
logger,
errorReporter,
);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const pingSpy = jest.spyOn(scheduledMonitor as any, 'ping');
// @ts-expect-error private property
scheduledMonitor.scheduleNextPing();
jest.advanceTimersByTime(1000);
expect(pingSpy).toHaveBeenCalled();
} finally {
jest.useRealTimers();
}
});
it('should not schedule another ping after stop', () => {
jest.useFakeTimers();
try {
const scheduledMonitor = new DbConnectionMonitor(
dataSource,
onConnectedChange,
mock<DatabaseConfig>({ pingIntervalSeconds: 1, pingTimeoutMs: 5_000 }),
logger,
errorReporter,
);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const pingSpy = jest.spyOn(scheduledMonitor as any, 'ping');
scheduledMonitor.stop();
// @ts-expect-error private property
scheduledMonitor.scheduleNextPing();
jest.advanceTimersByTime(1000);
expect(pingSpy).not.toHaveBeenCalled();
} finally {
jest.useRealTimers();
}
});
});
describe('recoverDataSource', () => {
it('should destroy and reinitialize the DataSource', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
dataSource.initialize.mockResolvedValue(dataSource);
// @ts-expect-error private property
monitor.connected = false;
// @ts-expect-error private property
await monitor.recoverDataSource();
expect(dataSource.destroy).toHaveBeenCalled();
expect(dataSource.initialize).toHaveBeenCalled();
expect(onConnectedChange).toHaveBeenLastCalledWith(true);
// @ts-expect-error private property
expect(monitor.consecutiveFailures).toBe(0);
// @ts-expect-error private property
expect(monitor.recovering).toBe(false);
});
it('should be a no-op when already recovering', async () => {
// @ts-expect-error private property
monitor.recovering = true;
// @ts-expect-error private property
await monitor.recoverDataSource();
expect(dataSource.destroy).not.toHaveBeenCalled();
expect(dataSource.initialize).not.toHaveBeenCalled();
});
it('should be a no-op when monitor is stopped', async () => {
monitor.stop();
// @ts-expect-error private property
await monitor.recoverDataSource();
expect(dataSource.destroy).not.toHaveBeenCalled();
expect(dataSource.initialize).not.toHaveBeenCalled();
});
it('should back off between failed recovery attempts and eventually succeed', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
dataSource.initialize
.mockRejectedValueOnce(new Error('still down'))
.mockRejectedValueOnce(new Error('still down'))
.mockResolvedValueOnce(dataSource);
// Three iterations: two backoffs (1s, 2s) then success.
mockedSetTimeoutP.mockResolvedValueOnce(undefined).mockResolvedValueOnce(undefined);
// Start from a known-disconnected state so the success transition fires.
// @ts-expect-error private property
monitor.connected = false;
// @ts-expect-error private property
await monitor.recoverDataSource();
expect(dataSource.initialize).toHaveBeenCalledTimes(3);
expect(onConnectedChange).toHaveBeenLastCalledWith(true);
// First backoff = 1000ms (1s * 2^0); second = 2000ms (1s * 2^1).
expect(mockedSetTimeoutP).toHaveBeenNthCalledWith(
1,
1_000,
undefined,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
expect(mockedSetTimeoutP).toHaveBeenNthCalledWith(
2,
2_000,
undefined,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
// @ts-expect-error private property
expect(monitor.recovering).toBe(false);
});
it('should report each failed recovery attempt to errorReporter', async () => {
// Recovery is a fire-and-forget background loop; without Sentry visibility on each
// attempt, a database that's hard-down for hours produces silence in the dashboards.
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
const firstError = new Error('still down 1');
const secondError = new Error('still down 2');
dataSource.initialize
.mockRejectedValueOnce(firstError)
.mockRejectedValueOnce(secondError)
.mockResolvedValueOnce(dataSource);
mockedSetTimeoutP.mockResolvedValueOnce(undefined).mockResolvedValueOnce(undefined);
// @ts-expect-error private property
await monitor.recoverDataSource();
expect(errorReporter.error).toHaveBeenCalledWith(firstError);
expect(errorReporter.error).toHaveBeenCalledWith(secondError);
});
it('should cap exponential backoff at the configured maximum', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
// Fail enough times to push past the 30s ceiling.
dataSource.initialize
.mockRejectedValueOnce(new Error('down'))
.mockRejectedValueOnce(new Error('down'))
.mockRejectedValueOnce(new Error('down'))
.mockRejectedValueOnce(new Error('down'))
.mockRejectedValueOnce(new Error('down'))
.mockRejectedValueOnce(new Error('down'))
.mockResolvedValueOnce(dataSource);
mockedSetTimeoutP.mockResolvedValue(undefined);
// @ts-expect-error private property
await monitor.recoverDataSource();
// Backoffs: 1s, 2s, 4s, 8s, 16s, then capped at 30s.
expect(mockedSetTimeoutP).toHaveBeenNthCalledWith(
5,
16_000,
undefined,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
expect(mockedSetTimeoutP).toHaveBeenNthCalledWith(
6,
30_000,
undefined,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
});
it('should exit the recovery loop when stop() is called during backoff', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
dataSource.initialize.mockRejectedValue(new Error('db is gone forever'));
// Suspend on the first backoff so we can call stop() in between.
let resolveBackoff!: () => void;
mockedSetTimeoutP.mockImplementationOnce(
async () => await new Promise<void>((resolve) => (resolveBackoff = resolve)),
);
// @ts-expect-error private property
const recoveryPromise = monitor.recoverDataSource();
// Let attempt 1 run: destroy → initialize (rejects) → enter backoff.
await flushMicrotasks();
expect(dataSource.initialize).toHaveBeenCalledTimes(1);
monitor.stop();
resolveBackoff();
await recoveryPromise;
expect(dataSource.initialize).toHaveBeenCalledTimes(1);
// @ts-expect-error private property
expect(monitor.recovering).toBe(false);
});
it('should pass an AbortSignal to the backoff sleep so stop() interrupts it', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
dataSource.initialize.mockRejectedValueOnce(new Error('still down'));
// Honor the abort signal the way `timers/promises` does in production:
// reject with an AbortError as soon as the signal aborts.
mockedSetTimeoutP.mockImplementationOnce(
async (_ms, _value, options) =>
await new Promise<undefined>((_resolve, reject) => {
const signal = options?.signal;
signal?.addEventListener('abort', () => {
const abortError = new Error('The operation was aborted');
abortError.name = 'AbortError';
reject(abortError);
});
}),
);
// @ts-expect-error private property
const recoveryPromise = monitor.recoverDataSource();
await flushMicrotasks();
expect(mockedSetTimeoutP).toHaveBeenCalledWith(
expect.any(Number),
undefined,
expect.objectContaining({ signal: expect.any(AbortSignal) }),
);
monitor.stop();
await recoveryPromise;
// AbortError is swallowed; the loop exits on the next iteration without retrying.
expect(dataSource.initialize).toHaveBeenCalledTimes(1);
// @ts-expect-error private property
expect(monitor.recovering).toBe(false);
});
it('should re-attach the pool error listener after a successful recovery', async () => {
// The driver is swapped on destroy+initialize, so the listener attached at start()
// is tied to the old driver instance. Without re-attaching, idle-client errors
// on the new pool become unhandled — which is exactly the crash this PR prevents.
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.destroy.mockResolvedValue();
dataSource.initialize.mockResolvedValue(dataSource);
const on = jest.fn();
(dataSource as unknown as { driver: { master: { on: jest.Mock } } }).driver = {
master: { on },
};
// @ts-expect-error private property
await monitor.recoverDataSource();
expect(on).toHaveBeenCalledWith('error', expect.any(Function));
});
it('should not reinitialize if stop() runs while destroy() is in flight', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// destroy resolves on next tick — gives us a window to call stop().
dataSource.destroy.mockImplementation(
async () => await new Promise((resolve) => setImmediate(resolve)),
);
// @ts-expect-error private property
const recoveryPromise = monitor.recoverDataSource();
// Stop while destroy is still pending.
monitor.stop();
await recoveryPromise;
expect(dataSource.destroy).toHaveBeenCalled();
expect(dataSource.initialize).not.toHaveBeenCalled();
});
});
describe('attachPoolErrorHandler', () => {
// DataSource['driver'] is a complex union of every driver TypeORM supports;
// going through this minimal shape avoids TS2590 ("union type too complex")
// and matches the unsafe cast the production code uses to reach driver.master.
type DriverShape = {
master?: { on?: (event: string, handler: (cause: unknown) => void) => void };
};
const setDriver = (driver: DriverShape) => {
(dataSource as unknown as { driver: DriverShape }).driver = driver;
};
it('should attach an error listener to the Postgres driver pool', () => {
const on = jest.fn();
setDriver({ master: { on } });
monitor.start();
expect(on).toHaveBeenCalledWith('error', expect.any(Function));
});
it('should mark the connection unhealthy when the pool emits an error', () => {
let handler: ((cause: unknown) => void) | undefined;
const on = jest.fn((_event: string, h: (cause: unknown) => void) => {
handler = h;
});
setDriver({ master: { on } });
// @ts-expect-error private property
monitor.connected = true;
monitor.start();
expect(handler).toBeDefined();
handler?.(new Error('terminating connection due to administrator command'));
expect(onConnectedChange).toHaveBeenLastCalledWith(false);
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining('Postgres pool client error'),
);
});
it('should skip attaching when the driver is not Postgres', () => {
const sqliteDataSource = mockDeep<DataSource>({ options: { type: 'sqlite-pooled' } });
const sqliteMonitor = new DbConnectionMonitor(
sqliteDataSource,
onConnectedChange,
databaseConfig,
logger,
errorReporter,
);
sqliteMonitor.start();
// driver.master is never read for non-postgres datasources.
expect(logger.debug).not.toHaveBeenCalledWith(
expect.stringContaining('Attached pool error listener'),
);
});
it('should log a warning when driver.master is unavailable on a Postgres datasource', () => {
setDriver({});
monitor.start();
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining('driver.master is unavailable'),
);
});
});
describe('stop', () => {
it('should clear the ping timer', () => {
const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout');
// @ts-expect-error private property
monitor.pingTimer = setTimeout(() => {}, 1000);
monitor.stop();
expect(clearTimeoutSpy).toHaveBeenCalled();
// @ts-expect-error private property
expect(monitor.pingTimer).toBeUndefined();
});
it('should latch `stopped` so future scheduling is skipped', () => {
monitor.stop();
// @ts-expect-error private property
expect(monitor.stopped).toBe(true);
});
});
describe('constructor', () => {
it('should default `initialConnected` to true so the first observed failure transitions', async () => {
// DbConnection only creates the monitor after a successful init(), so the assumed
// initial state is "connected". If the default flipped to false, the first failed
// ping would be a no-op transition (false → false) and the owner's state machine
// would stay stuck at the manually-set `true` while reality is `false`.
const freshOnConnectedChange = jest.fn();
const freshMonitor = new DbConnectionMonitor(
dataSource,
freshOnConnectedChange,
databaseConfig,
logger,
errorReporter,
);
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.query.mockRejectedValue(new Error('pool dead'));
// @ts-expect-error private property
await freshMonitor.ping();
expect(freshOnConnectedChange).toHaveBeenCalledWith(false);
});
});
describe('setConnected', () => {
it('should only fire onConnectedChange on a transition', () => {
// @ts-expect-error private property
monitor.connected = true;
// @ts-expect-error private property
monitor.setConnected(true);
expect(onConnectedChange).not.toHaveBeenCalled();
// @ts-expect-error private property
monitor.setConnected(false);
expect(onConnectedChange).toHaveBeenCalledWith(false);
// @ts-expect-error private property
monitor.setConnected(false);
expect(onConnectedChange).toHaveBeenCalledTimes(1);
});
});
});

View File

@ -78,15 +78,18 @@ describe('DbConnectionOptions', () => {
dbConfig.type = 'postgresdb';
dbConfig.postgresdb = {
database: 'test_db',
host: 'localhost',
port: 5432,
user: 'postgres',
password: 'password',
schema: 'public',
poolSize: 2,
connectionTimeoutMs: 20000,
idleTimeoutMs: 30000,
statementTimeoutMs: 300000,
host: 'pg.test.example.com',
port: 5433,
user: 'test_user',
password: 'test_password',
schema: 'test_schema',
poolSize: 5,
connectionTimeoutMs: 15_000,
idleTimeoutMs: 25_000,
statementTimeoutMs: 60_000,
maxConnectionLifetimeMs: 1_800_000,
keepAlive: false,
keepAliveInitialDelayMs: 5_000,
ssl: {
enabled: false,
ca: '',
@ -104,18 +107,21 @@ describe('DbConnectionOptions', () => {
type: 'postgres',
...commonOptions,
database: 'test_db',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'password',
schema: 'public',
poolSize: 2,
host: 'pg.test.example.com',
port: 5433,
username: 'test_user',
password: 'test_password',
schema: 'test_schema',
poolSize: 5,
migrations: postgresMigrations,
connectTimeoutMS: 20000,
statementTimeout: 300_000,
connectTimeoutMS: 15_000,
statementTimeout: 60_000,
ssl: false,
extra: {
idleTimeoutMillis: 30000,
idleTimeoutMillis: 25_000,
keepAlive: false,
keepAliveInitialDelayMillis: 5_000,
maxLifetimeSeconds: 1800,
},
});
});
@ -133,6 +139,23 @@ describe('DbConnectionOptions', () => {
expect(result).toMatchObject({ ssl });
});
it('should omit maxLifetimeSeconds when maxConnectionLifetimeMs is 0', () => {
dbConfig.postgresdb.maxConnectionLifetimeMs = 0;
const result = dbConnectionOptions.getOptions();
expect(result.extra).not.toHaveProperty('maxLifetimeSeconds');
});
it('should clamp sub-second maxConnectionLifetimeMs values to 1 second', () => {
// Guards against silent rounding-to-0, which would unintentionally disable the lifetime cap.
dbConfig.postgresdb.maxConnectionLifetimeMs = 500;
const result = dbConnectionOptions.getOptions();
expect(result.extra).toMatchObject({ maxLifetimeSeconds: 1 });
});
});
describe('logging', () => {

View File

@ -9,6 +9,7 @@ import { DbConnectionTimeoutError } from 'n8n-workflow';
import * as migrationHelper from '../../migrations/migration-helpers';
import type { Migration } from '../../migrations/migration-types';
import { DbConnection } from '../db-connection';
import { DbConnectionMonitor } from '../db-connection-monitor';
import type { DbConnectionOptions } from '../db-connection-options';
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
@ -18,6 +19,8 @@ jest.mock('@n8n/typeorm', () => ({
...jest.requireActual('@n8n/typeorm'),
}));
jest.mock('../db-connection-monitor');
describe('DbConnection', () => {
let dbConnection: DbConnection;
const migrations = [{ name: 'TestMigration1' }, { name: 'TestMigration2' }] as Migration[];
@ -36,11 +39,14 @@ describe('DbConnection', () => {
migrations,
};
const monitor = mock<DbConnectionMonitor>();
beforeEach(() => {
jest.resetAllMocks();
connectionOptions.getOptions.mockReturnValue(postgresOptions);
(DataSource as jest.Mock) = jest.fn().mockImplementation(() => dataSource);
jest.mocked(DbConnectionMonitor).mockImplementation(() => monitor);
dbConnection = new DbConnection(errorReporter, connectionOptions, databaseConfig, logger);
});
@ -55,6 +61,14 @@ describe('DbConnection', () => {
expect(dbConnection.connectionState.connected).toBe(true);
});
it('should start the monitor after a successful init', async () => {
dataSource.initialize.mockResolvedValue(dataSource);
await dbConnection.init();
expect(monitor.start).toHaveBeenCalled();
});
it('should not reinitialize if already connected', async () => {
dataSource.initialize.mockResolvedValue(dataSource);
dbConnection.connectionState.connected = true;
@ -76,11 +90,9 @@ describe('DbConnection', () => {
});
it('should rethrow other errors', async () => {
// Arrange
const error = new Error('Some other error');
dataSource.initialize.mockRejectedValue(error);
// Act & Assert
await expect(dbConnection.init()).rejects.toThrow('Some other error');
});
});
@ -103,16 +115,13 @@ describe('DbConnection', () => {
});
describe('close', () => {
it('should clear the ping timer', async () => {
const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout');
// @ts-expect-error private property
dbConnection.pingTimer = setTimeout(() => {}, 1000);
it('should stop the monitor', async () => {
dataSource.initialize.mockResolvedValue(dataSource);
await dbConnection.init();
await dbConnection.close();
expect(clearTimeoutSpy).toHaveBeenCalled();
// @ts-expect-error private property
expect(dbConnection.pingTimer).toBeUndefined();
expect(monitor.stop).toHaveBeenCalled();
});
it('should destroy the data source if initialized', async () => {
@ -133,82 +142,4 @@ describe('DbConnection', () => {
expect(dataSource.destroy).not.toHaveBeenCalled();
});
});
describe('ping', () => {
it('should update connection state on successful ping', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// eslint-disable-next-line @typescript-eslint/naming-convention
dataSource.query.mockResolvedValue([{ '1': 1 }]);
dbConnection.connectionState.connected = false;
// @ts-expect-error private property
await dbConnection.ping();
expect(dataSource.query).toHaveBeenCalledWith('SELECT 1');
expect(dbConnection.connectionState.connected).toBe(true);
});
it('should report errors on failed ping', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
const error = new Error('Connection error');
dataSource.query.mockRejectedValue(error);
// @ts-expect-error private property
await dbConnection.ping();
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
it('should schedule next ping after execution', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
// eslint-disable-next-line @typescript-eslint/naming-convention
dataSource.query.mockResolvedValue([{ '1': 1 }]);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const scheduleNextPingSpy = jest.spyOn(dbConnection as any, 'scheduleNextPing');
// @ts-expect-error private property
await dbConnection.ping();
expect(scheduleNextPingSpy).toHaveBeenCalled();
});
it('should not query if data source is not initialized', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = false;
// @ts-expect-error private property
await dbConnection.ping();
expect(dataSource.query).not.toHaveBeenCalled();
});
it('should execute ping on schedule', () => {
jest.useFakeTimers();
try {
// ARRANGE
dbConnection = new DbConnection(
errorReporter,
connectionOptions,
mock<DatabaseConfig>({
pingIntervalSeconds: 1,
}),
logger,
);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const pingSpy = jest.spyOn(dbConnection as any, 'ping');
// @ts-expect-error private property
dbConnection.scheduleNextPing();
jest.advanceTimersByTime(1000);
expect(pingSpy).toHaveBeenCalled();
} finally {
jest.useRealTimers();
}
});
});
});

View File

@ -0,0 +1,225 @@
import { inTest, type Logger } from '@n8n/backend-common';
import type { DatabaseConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import type { DataSource } from '@n8n/typeorm';
import type { ErrorReporter } from 'n8n-core';
import { ensureError, OperationalError } from 'n8n-workflow';
import { setTimeout as setTimeoutP } from 'timers/promises';
const MAX_PING_FAILURES_BEFORE_RECOVERY = 3;
const MIN_RECOVERY_BACKOFF_MS = 1_000;
const MAX_RECOVERY_BACKOFF_MS = 30_000;
/**
* Watches a DataSource and recovers it when the connection goes bad.
* - Pings on `databaseConfig.pingIntervalSeconds`, races against `databaseConfig.pingTimeoutMs`.
* - After `MAX_PING_FAILURES_BEFORE_RECOVERY` consecutive failures, destroys
* and reinitializes the DataSource with exponential backoff.
* - Attaches an error listener to the pg pool (Postgres only) so terminated
* idle clients are caught instead of crashing the process.
*
* Notifies the owner of connection transitions via `onConnectedChange`.
* The owner is responsible for supplying the initial state via `initialConnected`,
* because `onConnectedChange` only fires on state *transitions*.
*/
export class DbConnectionMonitor {
private pingTimer: NodeJS.Timeout | undefined;
private consecutiveFailures = 0;
private recovering = false;
private connected: boolean;
private stopped = false; // Latched on stop()
private stopAbortController = new AbortController();
constructor(
private readonly dataSource: DataSource,
private readonly onConnectedChange: (connected: boolean) => void,
private readonly databaseConfig: DatabaseConfig,
private readonly logger: Logger,
private readonly errorReporter: ErrorReporter,
initialConnected = true,
) {
this.connected = initialConnected;
}
start() {
this.attachPoolErrorHandler();
this.logger.debug(
`Database connection monitor started (pingIntervalSeconds=${this.databaseConfig.pingIntervalSeconds}, pingTimeoutMs=${this.databaseConfig.pingTimeoutMs}, recoveryThreshold=${MAX_PING_FAILURES_BEFORE_RECOVERY})`,
);
if (!inTest) {
this.scheduleNextPing();
}
}
stop() {
this.stopped = true;
this.recovering = false;
this.stopAbortController.abort();
if (this.pingTimer) {
clearTimeout(this.pingTimer);
this.pingTimer = undefined;
}
this.logger.debug('Database connection monitor stopped');
}
private scheduleNextPing() {
if (!this.stopped) {
this.pingTimer = setTimeout(
async () => await this.ping(),
this.databaseConfig.pingIntervalSeconds * Time.seconds.toMilliseconds,
);
}
}
private async ping() {
if (this.stopped || !this.dataSource.isInitialized) {
return;
}
if (this.recovering) {
this.scheduleNextPing();
return;
}
const abortController = new AbortController();
try {
await Promise.race([
this.dataSource.query('SELECT 1'),
setTimeoutP(this.databaseConfig.pingTimeoutMs, undefined, {
signal: abortController.signal,
}).then(() => {
throw new OperationalError('Database connection timed out');
}),
]);
if (!this.connected) {
this.logger.info('Database connection recovered');
}
this.setConnected(true);
this.consecutiveFailures = 0;
return;
} catch (error) {
this.setConnected(false);
this.consecutiveFailures += 1;
this.logger.warn(
`Database ping failed (${this.consecutiveFailures}/${MAX_PING_FAILURES_BEFORE_RECOVERY}): ${ensureError(error).message}`,
);
if (!(error instanceof OperationalError)) {
this.errorReporter.error(error);
}
if (this.consecutiveFailures >= MAX_PING_FAILURES_BEFORE_RECOVERY) {
this.logger.warn(
`Triggering database connection recovery after ${this.consecutiveFailures} consecutive ping failures`,
);
// Fire-and-forget; recoverDataSource owns its own try/catch/finally and never rejects.
void this.recoverDataSource();
}
} finally {
abortController.abort();
this.scheduleNextPing();
}
}
private async recoverDataSource() {
if (this.recovering || this.stopped) {
return;
}
this.recovering = true;
try {
const recoveryStart = Date.now();
let attempt = 0;
let recovered = false;
while (!recovered && !this.stopped) {
attempt += 1;
this.logger.warn(`Attempting to recover database connection (attempt ${attempt})`);
try {
if (this.dataSource.isInitialized) {
await this.dataSource.destroy();
}
if (this.stopped) {
break;
}
await this.dataSource.initialize();
this.attachPoolErrorHandler();
this.setConnected(true);
this.consecutiveFailures = 0;
recovered = true;
} catch (error) {
const wrapped = ensureError(error);
this.errorReporter.error(wrapped);
const backoff = Math.min(
MIN_RECOVERY_BACKOFF_MS * 2 ** (attempt - 1),
MAX_RECOVERY_BACKOFF_MS,
);
this.logger.warn(
`Recovery attempt ${attempt} failed: ${wrapped.message}. Retrying in ${backoff}ms`,
);
try {
await setTimeoutP(backoff, undefined, {
signal: this.stopAbortController.signal,
});
} catch {
// AbortError from stop() — the while loop's `!this.stopped` guard exits next iteration.
}
}
}
if (recovered) {
this.logger.info(
`Database connection recovered after ${attempt} attempt(s) in ${Date.now() - recoveryStart}ms`,
);
} else {
this.logger.warn(
`Database connection recovery aborted after ${attempt} attempt(s) (monitor stopped)`,
);
}
} catch (error) {
this.errorReporter.error(ensureError(error));
} finally {
this.recovering = false;
}
}
// pg-pool emits 'error' for idle clients that fail (e.g. server-side pg_terminate_backend or RDS failover).
// Without a listener Node treats these as unhandled and crashes the process.
private attachPoolErrorHandler() {
// Postgres-only: the other supported driver (sqlite-pooled) does not expose a pool-level error stream.
if (this.dataSource.options.type !== 'postgres') {
return;
}
const driver = this.dataSource.driver as unknown as {
master?: { on?: (event: string, handler: (cause: unknown) => void) => void };
};
const pool = driver.master;
if (!pool || typeof pool.on !== 'function') {
// Defensive: TypeORM may have renamed `driver.master` in a future release.
this.logger.warn(
'Skipping Postgres pool error listener: driver.master is unavailable (TypeORM internals may have changed)',
);
return;
}
pool.on('error', (cause: unknown) => {
this.setConnected(false);
this.logger.warn(`Postgres pool client error: ${ensureError(cause).message}`);
});
this.logger.debug('Attached pool error listener to Postgres driver');
}
private setConnected(connected: boolean) {
if (this.connected === connected) {
return;
}
this.connected = connected;
this.onConnectedChange(connected);
}
}

View File

@ -112,6 +112,18 @@ export class DbConnectionOptions {
ssl,
extra: {
idleTimeoutMillis: postgresConfig.idleTimeoutMs,
keepAlive: postgresConfig.keepAlive,
keepAliveInitialDelayMillis: postgresConfig.keepAliveInitialDelayMs,
// pg-pool's `maxLifetimeSeconds` is the upstream knob; we accept ms in config for unit consistency.
// Clamp to >= 1s so values like 500ms don't silently round down to 0 (which disables it).
...(postgresConfig.maxConnectionLifetimeMs > 0
? {
maxLifetimeSeconds: Math.max(
1,
Math.round(postgresConfig.maxConnectionLifetimeMs / 1000),
),
}
: {}),
},
};
}

View File

@ -1,13 +1,12 @@
import { inTest, Logger } from '@n8n/backend-common';
import { Logger } from '@n8n/backend-common';
import { DatabaseConfig } from '@n8n/config';
import { Time } from '@n8n/constants';
import { Memoized } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { DataSource } from '@n8n/typeorm';
import { ErrorReporter } from 'n8n-core';
import { DbConnectionTimeoutError, ensureError, OperationalError } from 'n8n-workflow';
import { setTimeout as setTimeoutP } from 'timers/promises';
import { DbConnectionTimeoutError, ensureError } from 'n8n-workflow';
import { DbConnectionMonitor } from './db-connection-monitor';
import { DbConnectionOptions } from './db-connection-options';
import { wrapMigration } from '../migrations/migration-helpers';
import type { Migration } from '../migrations/migration-types';
@ -21,8 +20,7 @@ type ConnectionState = {
export class DbConnection {
private dataSource: DataSource;
private pingTimer: NodeJS.Timeout | undefined;
timeout: number;
private monitor: DbConnectionMonitor | undefined;
readonly connectionState: ConnectionState = {
connected: false,
@ -37,9 +35,6 @@ export class DbConnection {
) {
this.dataSource = new DataSource(this.options);
Container.set(DataSource, this.dataSource);
this.timeout = process.env.N8N_DB_PING_TIMEOUT
? Number.parseInt(process.env.N8N_DB_PING_TIMEOUT)
: 5000;
}
@Memoized
@ -50,6 +45,14 @@ export class DbConnection {
async init(): Promise<void> {
const { connectionState, options } = this;
if (connectionState.connected) return;
// TODO(CAT-3314): Remove N8N_DB_PING_TIMEOUT fallback in v3.
if (process.env.N8N_DB_PING_TIMEOUT) {
this.logger.warn(
'N8N_DB_PING_TIMEOUT is deprecated, use DB_PING_TIMEOUT_MS instead. The legacy variable will be removed in a future release.',
);
}
try {
await this.dataSource.initialize();
} catch (e) {
@ -67,7 +70,15 @@ export class DbConnection {
}
connectionState.connected = true;
if (!inTest) this.scheduleNextPing();
this.monitor = new DbConnectionMonitor(
this.dataSource,
(connected) => (this.connectionState.connected = connected),
this.databaseConfig,
this.logger,
this.errorReporter,
connectionState.connected,
);
this.monitor.start();
}
async migrate() {
@ -78,53 +89,12 @@ export class DbConnection {
}
async close() {
if (this.pingTimer) {
clearTimeout(this.pingTimer);
this.pingTimer = undefined;
}
this.monitor?.stop();
this.monitor = undefined;
if (this.dataSource.isInitialized) {
await this.dataSource.destroy();
this.connectionState.connected = false;
}
}
/** Ping DB connection every `pingIntervalSeconds` seconds to check if it is still alive. */
private scheduleNextPing() {
this.pingTimer = setTimeout(
async () => await this.ping(),
this.databaseConfig.pingIntervalSeconds * Time.seconds.toMilliseconds,
);
}
private async ping() {
if (!this.dataSource.isInitialized) return;
const abortController = new AbortController();
try {
await Promise.race([
this.dataSource.query('SELECT 1'),
setTimeoutP(this.timeout, undefined, { signal: abortController.signal }).then(() => {
throw new OperationalError('Database connection timed out');
}),
]);
if (!this.connectionState.connected) {
this.logger.info('Database connection recovered');
}
this.connectionState.connected = true;
return;
} catch (error) {
this.connectionState.connected = false;
if (error instanceof OperationalError) {
this.logger.warn(error.message);
} else {
this.errorReporter.error(error);
}
} finally {
abortController.abort();
this.scheduleNextPing();
}
}
}