diff --git a/packages/@n8n/config/src/configs/database.config.ts b/packages/@n8n/config/src/configs/database.config.ts index efc8af33d41..64c3b1141e6 100644 --- a/packages/@n8n/config/src/configs/database.config.ts +++ b/packages/@n8n/config/src/configs/database.config.ts @@ -90,6 +90,18 @@ class PostgresConfig { @Env('DB_POSTGRESDB_STATEMENT_TIMEOUT') statementTimeoutMs: number = 5 * 60 * 1000; // 5 minutes + /** Maximum lifetime in milliseconds of a pooled Postgres connection before it is recycled. Set to 0 to disable. */ + @Env('DB_POSTGRESDB_MAX_CONNECTION_LIFETIME_MS') + maxConnectionLifetimeMs: number = 60 * 60 * 1000; + + /** Whether to enable TCP keepalive on Postgres connections so dead peers are detected without waiting for a query. */ + @Env('DB_POSTGRESDB_KEEP_ALIVE') + keepAlive: boolean = true; + + /** Initial delay in milliseconds before the first TCP keepalive probe is sent. */ + @Env('DB_POSTGRESDB_KEEP_ALIVE_INITIAL_DELAY_MS') + keepAliveInitialDelayMs: number = 10_000; + @Nested ssl: PostgresSSLConfig; } @@ -118,6 +130,23 @@ export class SqliteConfig { const dbTypeSchema = z.enum(['sqlite', 'postgresdb']); type DbType = z.infer; +const DEFAULT_PING_TIMEOUT_MS = 5_000; + +function readLegacyPingTimeoutMs(): number { + const raw = process.env.N8N_DB_PING_TIMEOUT; + if (!raw) { + return DEFAULT_PING_TIMEOUT_MS; + } + const parsed = Number.parseInt(raw, 10); + if (Number.isNaN(parsed) || parsed <= 0) { + console.warn( + `Invalid N8N_DB_PING_TIMEOUT="${raw}", falling back to ${DEFAULT_PING_TIMEOUT_MS}ms. Prefer the supported DB_PING_TIMEOUT_MS env var.`, + ); + return DEFAULT_PING_TIMEOUT_MS; + } + return parsed; +} + @Config export class DatabaseConfig { /** Database type: `sqlite` or `postgresdb`. */ @@ -132,6 +161,15 @@ export class DatabaseConfig { @Env('DB_PING_INTERVAL_SECONDS') pingIntervalSeconds: number = 2; + /** + * Timeout in milliseconds for an individual database health-check ping. + * + * Falls back to the legacy `N8N_DB_PING_TIMEOUT` env var if set. The legacy + * name is deprecated and may be removed in a future release. + */ + @Env('DB_PING_TIMEOUT_MS') + pingTimeoutMs: number = readLegacyPingTimeoutMs(); + @Nested logging: LoggingConfig; diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index b20ec8bc495..9818ad5ac97 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -96,6 +96,9 @@ describe('GlobalConfig', () => { connectionTimeoutMs: 20_000, idleTimeoutMs: 30_000, statementTimeoutMs: 5 * 60 * 1000, + maxConnectionLifetimeMs: 60 * 60 * 1000, + keepAlive: true, + keepAliveInitialDelayMs: 10_000, ssl: { ca: '', cert: '', @@ -113,6 +116,7 @@ describe('GlobalConfig', () => { tablePrefix: '', type: 'sqlite', pingIntervalSeconds: 2, + pingTimeoutMs: 5_000, } as DatabaseConfig, credentials: { defaultName: 'My credentials', @@ -609,6 +613,7 @@ describe('GlobalConfig', () => { tablePrefix: 'test_', type: 'sqlite', pingIntervalSeconds: 2, + pingTimeoutMs: 5_000, }, endpoints: { ...defaultConfig.endpoints, diff --git a/packages/@n8n/db/src/connection/__tests__/db-connection-monitor.test.ts b/packages/@n8n/db/src/connection/__tests__/db-connection-monitor.test.ts new file mode 100644 index 00000000000..a1b67241720 --- /dev/null +++ b/packages/@n8n/db/src/connection/__tests__/db-connection-monitor.test.ts @@ -0,0 +1,655 @@ +/* eslint-disable @typescript-eslint/unbound-method */ +import type { Logger } from '@n8n/backend-common'; +import type { DatabaseConfig } from '@n8n/config'; +import type { DataSource } from '@n8n/typeorm'; +import { mock, mockDeep } from 'jest-mock-extended'; +import type { ErrorReporter } from 'n8n-core'; +import type TimersPromises from 'timers/promises'; +import { setTimeout as setTimeoutP } from 'timers/promises'; + +import { DbConnectionMonitor } from '../db-connection-monitor'; + +// The monitor uses `setTimeout` from `timers/promises` for recovery backoff. +// Mocking it lets us drive the recovery loop deterministically without juggling +// jest fake timers against async/await microtask ordering. +jest.mock('timers/promises', () => { + const actual = jest.requireActual('timers/promises'); + return { ...actual, setTimeout: jest.fn() }; +}); +const mockedSetTimeoutP = setTimeoutP as jest.MockedFunction; + +const flushMicrotasks = async () => await new Promise((resolve) => setImmediate(resolve)); + +describe('DbConnectionMonitor', () => { + let monitor: DbConnectionMonitor; + let onConnectedChange: jest.MockedFunction<(connected: boolean) => void>; + const errorReporter = mock(); + const databaseConfig = mock({ pingTimeoutMs: 5_000 }); + const logger = mock(); + const dataSource = mockDeep({ options: { type: 'postgres' } }); + + beforeEach(() => { + jest.resetAllMocks(); + // Default: never resolves, so query wins the ping timeout race and + // recovery backoff stays suspended unless a test overrides it. + mockedSetTimeoutP.mockImplementation(async () => await new Promise(() => {})); + onConnectedChange = jest.fn(); + monitor = new DbConnectionMonitor( + dataSource, + onConnectedChange, + databaseConfig, + logger, + errorReporter, + ); + }); + + describe('ping', () => { + it('should update connection state on successful ping', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + // eslint-disable-next-line @typescript-eslint/naming-convention + dataSource.query.mockResolvedValue([{ '1': 1 }]); + // @ts-expect-error private property + monitor.connected = false; + + // @ts-expect-error private property + await monitor.ping(); + + expect(dataSource.query).toHaveBeenCalledWith('SELECT 1'); + expect(onConnectedChange).toHaveBeenLastCalledWith(true); + }); + + it('should mark connection as disconnected when a ping fails', async () => { + // The owner's connectionState.connected drives the /healthz/readiness endpoint and + // the 503-fast-fail middleware in abstract-server. If a failed ping doesn't propagate + // disconnected=false, in-flight requests keep hitting a poisoned pool instead of bailing. + // @ts-expect-error readonly property + dataSource.isInitialized = true; + // @ts-expect-error private property + monitor.connected = true; + dataSource.query.mockRejectedValue(new Error('pool dead')); + + // @ts-expect-error private property + await monitor.ping(); + + expect(onConnectedChange).toHaveBeenLastCalledWith(false); + }); + + it('should report errors on failed ping', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + const error = new Error('Connection error'); + dataSource.query.mockRejectedValue(error); + + // @ts-expect-error private property + await monitor.ping(); + + expect(errorReporter.error).toHaveBeenCalledWith(error); + }); + + it('should not report OperationalError (ping timeout) to error reporter', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + // Query never resolves; the timeout race wins and throws an OperationalError. + dataSource.query.mockReturnValue(new Promise(() => {})); + // Force the timeout side of the Promise.race to resolve immediately. + mockedSetTimeoutP.mockResolvedValueOnce(undefined); + + // @ts-expect-error private property + await monitor.ping(); + + expect(errorReporter.error).not.toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Database connection timed out'), + ); + }); + + it('should schedule next ping after execution', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + // eslint-disable-next-line @typescript-eslint/naming-convention + dataSource.query.mockResolvedValue([{ '1': 1 }]); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const scheduleNextPingSpy = jest.spyOn(monitor as any, 'scheduleNextPing'); + + // @ts-expect-error private property + await monitor.ping(); + + expect(scheduleNextPingSpy).toHaveBeenCalled(); + }); + + it('should not query if data source is not initialized', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = false; + + // @ts-expect-error private property + await monitor.ping(); + + expect(dataSource.query).not.toHaveBeenCalled(); + }); + + it('should not query if monitor is stopped', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + monitor.stop(); + + // @ts-expect-error private property + await monitor.ping(); + + expect(dataSource.query).not.toHaveBeenCalled(); + }); + + it('should reset failure counter on successful ping', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.query + .mockRejectedValueOnce(new Error('Connection terminated unexpectedly')) + // eslint-disable-next-line @typescript-eslint/naming-convention + .mockResolvedValueOnce([{ '1': 1 }]) + .mockRejectedValueOnce(new Error('read ECONNRESET')) + .mockRejectedValueOnce( + new Error('Client has encountered a connection error and is not queryable'), + ); + const recoverSpy = jest + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .spyOn(monitor as any, 'recoverDataSource') + .mockResolvedValue(undefined); + + // @ts-expect-error private property + await monitor.ping(); + // @ts-expect-error private property + await monitor.ping(); + // @ts-expect-error private property + await monitor.ping(); + // @ts-expect-error private property + await monitor.ping(); + + expect(recoverSpy).not.toHaveBeenCalled(); + }); + + it('should trigger recovery after consecutive failures', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.query.mockRejectedValue(new Error('pool poisoned')); + const recoverSpy = jest + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .spyOn(monitor as any, 'recoverDataSource') + .mockResolvedValue(undefined); + + // @ts-expect-error private property + await monitor.ping(); + // @ts-expect-error private property + await monitor.ping(); + // @ts-expect-error private property + await monitor.ping(); + + expect(recoverSpy).toHaveBeenCalledTimes(1); + }); + + it('should report and recover from an unexpected throw inside recoverDataSource', async () => { + // If something throws between `this.recovering = true` and the inner try/catch + // inside recoverDataSource (e.g. a broken logger), the outer try/catch/finally + // must (a) surface the error via errorReporter and (b) clear the `recovering` + // flag so subsequent pings can keep probing. + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.query.mockRejectedValue(new Error('pool poisoned')); + // Throw from the "Attempting to recover" warn — this fires after recovering=true + // but before the inner try/catch that protects destroy/initialize. + const loggerError = new Error('logger broke'); + logger.warn.mockImplementation((msg: unknown) => { + if (typeof msg === 'string' && msg.includes('Attempting to recover')) { + throw loggerError; + } + }); + + // @ts-expect-error private property + await monitor.ping(); + // @ts-expect-error private property + await monitor.ping(); + // @ts-expect-error private property + await monitor.ping(); + // Let the fire-and-forget recoverDataSource() promise settle. + await flushMicrotasks(); + + // @ts-expect-error private property + await monitor.ping(); + + // Outer catch surfaces the unexpected throw to Sentry. + expect(errorReporter.error).toHaveBeenCalledWith(loggerError); + // Finally clears `recovering` so the 4th ping runs instead of early-returning. + expect(dataSource.query).toHaveBeenCalledTimes(4); + }); + + it('should skip query while recovery is in progress', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + // @ts-expect-error private property + monitor.recovering = true; + + // @ts-expect-error private property + await monitor.ping(); + + expect(dataSource.query).not.toHaveBeenCalled(); + }); + + it('should execute ping on schedule', () => { + jest.useFakeTimers(); + try { + const scheduledMonitor = new DbConnectionMonitor( + dataSource, + onConnectedChange, + mock({ pingIntervalSeconds: 1, pingTimeoutMs: 5_000 }), + logger, + errorReporter, + ); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const pingSpy = jest.spyOn(scheduledMonitor as any, 'ping'); + + // @ts-expect-error private property + scheduledMonitor.scheduleNextPing(); + jest.advanceTimersByTime(1000); + + expect(pingSpy).toHaveBeenCalled(); + } finally { + jest.useRealTimers(); + } + }); + + it('should not schedule another ping after stop', () => { + jest.useFakeTimers(); + try { + const scheduledMonitor = new DbConnectionMonitor( + dataSource, + onConnectedChange, + mock({ pingIntervalSeconds: 1, pingTimeoutMs: 5_000 }), + logger, + errorReporter, + ); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const pingSpy = jest.spyOn(scheduledMonitor as any, 'ping'); + + scheduledMonitor.stop(); + // @ts-expect-error private property + scheduledMonitor.scheduleNextPing(); + jest.advanceTimersByTime(1000); + + expect(pingSpy).not.toHaveBeenCalled(); + } finally { + jest.useRealTimers(); + } + }); + }); + + describe('recoverDataSource', () => { + it('should destroy and reinitialize the DataSource', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.destroy.mockResolvedValue(); + dataSource.initialize.mockResolvedValue(dataSource); + // @ts-expect-error private property + monitor.connected = false; + + // @ts-expect-error private property + await monitor.recoverDataSource(); + + expect(dataSource.destroy).toHaveBeenCalled(); + expect(dataSource.initialize).toHaveBeenCalled(); + expect(onConnectedChange).toHaveBeenLastCalledWith(true); + // @ts-expect-error private property + expect(monitor.consecutiveFailures).toBe(0); + // @ts-expect-error private property + expect(monitor.recovering).toBe(false); + }); + + it('should be a no-op when already recovering', async () => { + // @ts-expect-error private property + monitor.recovering = true; + + // @ts-expect-error private property + await monitor.recoverDataSource(); + + expect(dataSource.destroy).not.toHaveBeenCalled(); + expect(dataSource.initialize).not.toHaveBeenCalled(); + }); + + it('should be a no-op when monitor is stopped', async () => { + monitor.stop(); + + // @ts-expect-error private property + await monitor.recoverDataSource(); + + expect(dataSource.destroy).not.toHaveBeenCalled(); + expect(dataSource.initialize).not.toHaveBeenCalled(); + }); + + it('should back off between failed recovery attempts and eventually succeed', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.destroy.mockResolvedValue(); + dataSource.initialize + .mockRejectedValueOnce(new Error('still down')) + .mockRejectedValueOnce(new Error('still down')) + .mockResolvedValueOnce(dataSource); + // Three iterations: two backoffs (1s, 2s) then success. + mockedSetTimeoutP.mockResolvedValueOnce(undefined).mockResolvedValueOnce(undefined); + // Start from a known-disconnected state so the success transition fires. + // @ts-expect-error private property + monitor.connected = false; + + // @ts-expect-error private property + await monitor.recoverDataSource(); + + expect(dataSource.initialize).toHaveBeenCalledTimes(3); + expect(onConnectedChange).toHaveBeenLastCalledWith(true); + // First backoff = 1000ms (1s * 2^0); second = 2000ms (1s * 2^1). + expect(mockedSetTimeoutP).toHaveBeenNthCalledWith( + 1, + 1_000, + undefined, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + expect(mockedSetTimeoutP).toHaveBeenNthCalledWith( + 2, + 2_000, + undefined, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + // @ts-expect-error private property + expect(monitor.recovering).toBe(false); + }); + + it('should report each failed recovery attempt to errorReporter', async () => { + // Recovery is a fire-and-forget background loop; without Sentry visibility on each + // attempt, a database that's hard-down for hours produces silence in the dashboards. + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.destroy.mockResolvedValue(); + const firstError = new Error('still down 1'); + const secondError = new Error('still down 2'); + dataSource.initialize + .mockRejectedValueOnce(firstError) + .mockRejectedValueOnce(secondError) + .mockResolvedValueOnce(dataSource); + mockedSetTimeoutP.mockResolvedValueOnce(undefined).mockResolvedValueOnce(undefined); + + // @ts-expect-error private property + await monitor.recoverDataSource(); + + expect(errorReporter.error).toHaveBeenCalledWith(firstError); + expect(errorReporter.error).toHaveBeenCalledWith(secondError); + }); + + it('should cap exponential backoff at the configured maximum', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.destroy.mockResolvedValue(); + // Fail enough times to push past the 30s ceiling. + dataSource.initialize + .mockRejectedValueOnce(new Error('down')) + .mockRejectedValueOnce(new Error('down')) + .mockRejectedValueOnce(new Error('down')) + .mockRejectedValueOnce(new Error('down')) + .mockRejectedValueOnce(new Error('down')) + .mockRejectedValueOnce(new Error('down')) + .mockResolvedValueOnce(dataSource); + mockedSetTimeoutP.mockResolvedValue(undefined); + + // @ts-expect-error private property + await monitor.recoverDataSource(); + + // Backoffs: 1s, 2s, 4s, 8s, 16s, then capped at 30s. + expect(mockedSetTimeoutP).toHaveBeenNthCalledWith( + 5, + 16_000, + undefined, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + expect(mockedSetTimeoutP).toHaveBeenNthCalledWith( + 6, + 30_000, + undefined, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); + + it('should exit the recovery loop when stop() is called during backoff', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.destroy.mockResolvedValue(); + dataSource.initialize.mockRejectedValue(new Error('db is gone forever')); + + // Suspend on the first backoff so we can call stop() in between. + let resolveBackoff!: () => void; + mockedSetTimeoutP.mockImplementationOnce( + async () => await new Promise((resolve) => (resolveBackoff = resolve)), + ); + + // @ts-expect-error private property + const recoveryPromise = monitor.recoverDataSource(); + + // Let attempt 1 run: destroy → initialize (rejects) → enter backoff. + await flushMicrotasks(); + expect(dataSource.initialize).toHaveBeenCalledTimes(1); + + monitor.stop(); + resolveBackoff(); + await recoveryPromise; + + expect(dataSource.initialize).toHaveBeenCalledTimes(1); + // @ts-expect-error private property + expect(monitor.recovering).toBe(false); + }); + + it('should pass an AbortSignal to the backoff sleep so stop() interrupts it', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.destroy.mockResolvedValue(); + dataSource.initialize.mockRejectedValueOnce(new Error('still down')); + // Honor the abort signal the way `timers/promises` does in production: + // reject with an AbortError as soon as the signal aborts. + mockedSetTimeoutP.mockImplementationOnce( + async (_ms, _value, options) => + await new Promise((_resolve, reject) => { + const signal = options?.signal; + signal?.addEventListener('abort', () => { + const abortError = new Error('The operation was aborted'); + abortError.name = 'AbortError'; + reject(abortError); + }); + }), + ); + + // @ts-expect-error private property + const recoveryPromise = monitor.recoverDataSource(); + + await flushMicrotasks(); + expect(mockedSetTimeoutP).toHaveBeenCalledWith( + expect.any(Number), + undefined, + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + + monitor.stop(); + await recoveryPromise; + + // AbortError is swallowed; the loop exits on the next iteration without retrying. + expect(dataSource.initialize).toHaveBeenCalledTimes(1); + // @ts-expect-error private property + expect(monitor.recovering).toBe(false); + }); + + it('should re-attach the pool error listener after a successful recovery', async () => { + // The driver is swapped on destroy+initialize, so the listener attached at start() + // is tied to the old driver instance. Without re-attaching, idle-client errors + // on the new pool become unhandled — which is exactly the crash this PR prevents. + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.destroy.mockResolvedValue(); + dataSource.initialize.mockResolvedValue(dataSource); + const on = jest.fn(); + (dataSource as unknown as { driver: { master: { on: jest.Mock } } }).driver = { + master: { on }, + }; + + // @ts-expect-error private property + await monitor.recoverDataSource(); + + expect(on).toHaveBeenCalledWith('error', expect.any(Function)); + }); + + it('should not reinitialize if stop() runs while destroy() is in flight', async () => { + // @ts-expect-error readonly property + dataSource.isInitialized = true; + // destroy resolves on next tick — gives us a window to call stop(). + dataSource.destroy.mockImplementation( + async () => await new Promise((resolve) => setImmediate(resolve)), + ); + + // @ts-expect-error private property + const recoveryPromise = monitor.recoverDataSource(); + + // Stop while destroy is still pending. + monitor.stop(); + + await recoveryPromise; + + expect(dataSource.destroy).toHaveBeenCalled(); + expect(dataSource.initialize).not.toHaveBeenCalled(); + }); + }); + + describe('attachPoolErrorHandler', () => { + // DataSource['driver'] is a complex union of every driver TypeORM supports; + // going through this minimal shape avoids TS2590 ("union type too complex") + // and matches the unsafe cast the production code uses to reach driver.master. + type DriverShape = { + master?: { on?: (event: string, handler: (cause: unknown) => void) => void }; + }; + const setDriver = (driver: DriverShape) => { + (dataSource as unknown as { driver: DriverShape }).driver = driver; + }; + + it('should attach an error listener to the Postgres driver pool', () => { + const on = jest.fn(); + setDriver({ master: { on } }); + + monitor.start(); + + expect(on).toHaveBeenCalledWith('error', expect.any(Function)); + }); + + it('should mark the connection unhealthy when the pool emits an error', () => { + let handler: ((cause: unknown) => void) | undefined; + const on = jest.fn((_event: string, h: (cause: unknown) => void) => { + handler = h; + }); + setDriver({ master: { on } }); + // @ts-expect-error private property + monitor.connected = true; + + monitor.start(); + expect(handler).toBeDefined(); + + handler?.(new Error('terminating connection due to administrator command')); + + expect(onConnectedChange).toHaveBeenLastCalledWith(false); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Postgres pool client error'), + ); + }); + + it('should skip attaching when the driver is not Postgres', () => { + const sqliteDataSource = mockDeep({ options: { type: 'sqlite-pooled' } }); + const sqliteMonitor = new DbConnectionMonitor( + sqliteDataSource, + onConnectedChange, + databaseConfig, + logger, + errorReporter, + ); + + sqliteMonitor.start(); + + // driver.master is never read for non-postgres datasources. + expect(logger.debug).not.toHaveBeenCalledWith( + expect.stringContaining('Attached pool error listener'), + ); + }); + + it('should log a warning when driver.master is unavailable on a Postgres datasource', () => { + setDriver({}); + + monitor.start(); + + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('driver.master is unavailable'), + ); + }); + }); + + describe('stop', () => { + it('should clear the ping timer', () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); + // @ts-expect-error private property + monitor.pingTimer = setTimeout(() => {}, 1000); + + monitor.stop(); + + expect(clearTimeoutSpy).toHaveBeenCalled(); + // @ts-expect-error private property + expect(monitor.pingTimer).toBeUndefined(); + }); + + it('should latch `stopped` so future scheduling is skipped', () => { + monitor.stop(); + + // @ts-expect-error private property + expect(monitor.stopped).toBe(true); + }); + }); + + describe('constructor', () => { + it('should default `initialConnected` to true so the first observed failure transitions', async () => { + // DbConnection only creates the monitor after a successful init(), so the assumed + // initial state is "connected". If the default flipped to false, the first failed + // ping would be a no-op transition (false → false) and the owner's state machine + // would stay stuck at the manually-set `true` while reality is `false`. + const freshOnConnectedChange = jest.fn(); + const freshMonitor = new DbConnectionMonitor( + dataSource, + freshOnConnectedChange, + databaseConfig, + logger, + errorReporter, + ); + // @ts-expect-error readonly property + dataSource.isInitialized = true; + dataSource.query.mockRejectedValue(new Error('pool dead')); + + // @ts-expect-error private property + await freshMonitor.ping(); + + expect(freshOnConnectedChange).toHaveBeenCalledWith(false); + }); + }); + + describe('setConnected', () => { + it('should only fire onConnectedChange on a transition', () => { + // @ts-expect-error private property + monitor.connected = true; + + // @ts-expect-error private property + monitor.setConnected(true); + expect(onConnectedChange).not.toHaveBeenCalled(); + + // @ts-expect-error private property + monitor.setConnected(false); + expect(onConnectedChange).toHaveBeenCalledWith(false); + + // @ts-expect-error private property + monitor.setConnected(false); + expect(onConnectedChange).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/packages/@n8n/db/src/connection/__tests__/db-connection-options.test.ts b/packages/@n8n/db/src/connection/__tests__/db-connection-options.test.ts index 9c278c78500..4db9bf35131 100644 --- a/packages/@n8n/db/src/connection/__tests__/db-connection-options.test.ts +++ b/packages/@n8n/db/src/connection/__tests__/db-connection-options.test.ts @@ -78,15 +78,18 @@ describe('DbConnectionOptions', () => { dbConfig.type = 'postgresdb'; dbConfig.postgresdb = { database: 'test_db', - host: 'localhost', - port: 5432, - user: 'postgres', - password: 'password', - schema: 'public', - poolSize: 2, - connectionTimeoutMs: 20000, - idleTimeoutMs: 30000, - statementTimeoutMs: 300000, + host: 'pg.test.example.com', + port: 5433, + user: 'test_user', + password: 'test_password', + schema: 'test_schema', + poolSize: 5, + connectionTimeoutMs: 15_000, + idleTimeoutMs: 25_000, + statementTimeoutMs: 60_000, + maxConnectionLifetimeMs: 1_800_000, + keepAlive: false, + keepAliveInitialDelayMs: 5_000, ssl: { enabled: false, ca: '', @@ -104,18 +107,21 @@ describe('DbConnectionOptions', () => { type: 'postgres', ...commonOptions, database: 'test_db', - host: 'localhost', - port: 5432, - username: 'postgres', - password: 'password', - schema: 'public', - poolSize: 2, + host: 'pg.test.example.com', + port: 5433, + username: 'test_user', + password: 'test_password', + schema: 'test_schema', + poolSize: 5, migrations: postgresMigrations, - connectTimeoutMS: 20000, - statementTimeout: 300_000, + connectTimeoutMS: 15_000, + statementTimeout: 60_000, ssl: false, extra: { - idleTimeoutMillis: 30000, + idleTimeoutMillis: 25_000, + keepAlive: false, + keepAliveInitialDelayMillis: 5_000, + maxLifetimeSeconds: 1800, }, }); }); @@ -133,6 +139,23 @@ describe('DbConnectionOptions', () => { expect(result).toMatchObject({ ssl }); }); + + it('should omit maxLifetimeSeconds when maxConnectionLifetimeMs is 0', () => { + dbConfig.postgresdb.maxConnectionLifetimeMs = 0; + + const result = dbConnectionOptions.getOptions(); + + expect(result.extra).not.toHaveProperty('maxLifetimeSeconds'); + }); + + it('should clamp sub-second maxConnectionLifetimeMs values to 1 second', () => { + // Guards against silent rounding-to-0, which would unintentionally disable the lifetime cap. + dbConfig.postgresdb.maxConnectionLifetimeMs = 500; + + const result = dbConnectionOptions.getOptions(); + + expect(result.extra).toMatchObject({ maxLifetimeSeconds: 1 }); + }); }); describe('logging', () => { diff --git a/packages/@n8n/db/src/connection/__tests__/db-connection.test.ts b/packages/@n8n/db/src/connection/__tests__/db-connection.test.ts index 2addb2c7b29..10549e77651 100644 --- a/packages/@n8n/db/src/connection/__tests__/db-connection.test.ts +++ b/packages/@n8n/db/src/connection/__tests__/db-connection.test.ts @@ -9,6 +9,7 @@ import { DbConnectionTimeoutError } from 'n8n-workflow'; import * as migrationHelper from '../../migrations/migration-helpers'; import type { Migration } from '../../migrations/migration-types'; import { DbConnection } from '../db-connection'; +import { DbConnectionMonitor } from '../db-connection-monitor'; import type { DbConnectionOptions } from '../db-connection-options'; // eslint-disable-next-line @typescript-eslint/no-unsafe-return @@ -18,6 +19,8 @@ jest.mock('@n8n/typeorm', () => ({ ...jest.requireActual('@n8n/typeorm'), })); +jest.mock('../db-connection-monitor'); + describe('DbConnection', () => { let dbConnection: DbConnection; const migrations = [{ name: 'TestMigration1' }, { name: 'TestMigration2' }] as Migration[]; @@ -36,11 +39,14 @@ describe('DbConnection', () => { migrations, }; + const monitor = mock(); + beforeEach(() => { jest.resetAllMocks(); connectionOptions.getOptions.mockReturnValue(postgresOptions); (DataSource as jest.Mock) = jest.fn().mockImplementation(() => dataSource); + jest.mocked(DbConnectionMonitor).mockImplementation(() => monitor); dbConnection = new DbConnection(errorReporter, connectionOptions, databaseConfig, logger); }); @@ -55,6 +61,14 @@ describe('DbConnection', () => { expect(dbConnection.connectionState.connected).toBe(true); }); + it('should start the monitor after a successful init', async () => { + dataSource.initialize.mockResolvedValue(dataSource); + + await dbConnection.init(); + + expect(monitor.start).toHaveBeenCalled(); + }); + it('should not reinitialize if already connected', async () => { dataSource.initialize.mockResolvedValue(dataSource); dbConnection.connectionState.connected = true; @@ -76,11 +90,9 @@ describe('DbConnection', () => { }); it('should rethrow other errors', async () => { - // Arrange const error = new Error('Some other error'); dataSource.initialize.mockRejectedValue(error); - // Act & Assert await expect(dbConnection.init()).rejects.toThrow('Some other error'); }); }); @@ -103,16 +115,13 @@ describe('DbConnection', () => { }); describe('close', () => { - it('should clear the ping timer', async () => { - const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); - // @ts-expect-error private property - dbConnection.pingTimer = setTimeout(() => {}, 1000); + it('should stop the monitor', async () => { + dataSource.initialize.mockResolvedValue(dataSource); + await dbConnection.init(); await dbConnection.close(); - expect(clearTimeoutSpy).toHaveBeenCalled(); - // @ts-expect-error private property - expect(dbConnection.pingTimer).toBeUndefined(); + expect(monitor.stop).toHaveBeenCalled(); }); it('should destroy the data source if initialized', async () => { @@ -133,82 +142,4 @@ describe('DbConnection', () => { expect(dataSource.destroy).not.toHaveBeenCalled(); }); }); - - describe('ping', () => { - it('should update connection state on successful ping', async () => { - // @ts-expect-error readonly property - dataSource.isInitialized = true; - // eslint-disable-next-line @typescript-eslint/naming-convention - dataSource.query.mockResolvedValue([{ '1': 1 }]); - dbConnection.connectionState.connected = false; - - // @ts-expect-error private property - await dbConnection.ping(); - - expect(dataSource.query).toHaveBeenCalledWith('SELECT 1'); - expect(dbConnection.connectionState.connected).toBe(true); - }); - - it('should report errors on failed ping', async () => { - // @ts-expect-error readonly property - dataSource.isInitialized = true; - const error = new Error('Connection error'); - dataSource.query.mockRejectedValue(error); - - // @ts-expect-error private property - await dbConnection.ping(); - - expect(errorReporter.error).toHaveBeenCalledWith(error); - }); - - it('should schedule next ping after execution', async () => { - // @ts-expect-error readonly property - dataSource.isInitialized = true; - // eslint-disable-next-line @typescript-eslint/naming-convention - dataSource.query.mockResolvedValue([{ '1': 1 }]); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const scheduleNextPingSpy = jest.spyOn(dbConnection as any, 'scheduleNextPing'); - - // @ts-expect-error private property - await dbConnection.ping(); - - expect(scheduleNextPingSpy).toHaveBeenCalled(); - }); - - it('should not query if data source is not initialized', async () => { - // @ts-expect-error readonly property - dataSource.isInitialized = false; - - // @ts-expect-error private property - await dbConnection.ping(); - - expect(dataSource.query).not.toHaveBeenCalled(); - }); - - it('should execute ping on schedule', () => { - jest.useFakeTimers(); - try { - // ARRANGE - dbConnection = new DbConnection( - errorReporter, - connectionOptions, - mock({ - pingIntervalSeconds: 1, - }), - logger, - ); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const pingSpy = jest.spyOn(dbConnection as any, 'ping'); - - // @ts-expect-error private property - dbConnection.scheduleNextPing(); - jest.advanceTimersByTime(1000); - - expect(pingSpy).toHaveBeenCalled(); - } finally { - jest.useRealTimers(); - } - }); - }); }); diff --git a/packages/@n8n/db/src/connection/db-connection-monitor.ts b/packages/@n8n/db/src/connection/db-connection-monitor.ts new file mode 100644 index 00000000000..14383005c3d --- /dev/null +++ b/packages/@n8n/db/src/connection/db-connection-monitor.ts @@ -0,0 +1,225 @@ +import { inTest, type Logger } from '@n8n/backend-common'; +import type { DatabaseConfig } from '@n8n/config'; +import { Time } from '@n8n/constants'; +import type { DataSource } from '@n8n/typeorm'; +import type { ErrorReporter } from 'n8n-core'; +import { ensureError, OperationalError } from 'n8n-workflow'; +import { setTimeout as setTimeoutP } from 'timers/promises'; + +const MAX_PING_FAILURES_BEFORE_RECOVERY = 3; +const MIN_RECOVERY_BACKOFF_MS = 1_000; +const MAX_RECOVERY_BACKOFF_MS = 30_000; + +/** + * Watches a DataSource and recovers it when the connection goes bad. + * - Pings on `databaseConfig.pingIntervalSeconds`, races against `databaseConfig.pingTimeoutMs`. + * - After `MAX_PING_FAILURES_BEFORE_RECOVERY` consecutive failures, destroys + * and reinitializes the DataSource with exponential backoff. + * - Attaches an error listener to the pg pool (Postgres only) so terminated + * idle clients are caught instead of crashing the process. + * + * Notifies the owner of connection transitions via `onConnectedChange`. + * The owner is responsible for supplying the initial state via `initialConnected`, + * because `onConnectedChange` only fires on state *transitions*. + */ +export class DbConnectionMonitor { + private pingTimer: NodeJS.Timeout | undefined; + private consecutiveFailures = 0; + private recovering = false; + private connected: boolean; + private stopped = false; // Latched on stop() + private stopAbortController = new AbortController(); + + constructor( + private readonly dataSource: DataSource, + private readonly onConnectedChange: (connected: boolean) => void, + private readonly databaseConfig: DatabaseConfig, + private readonly logger: Logger, + private readonly errorReporter: ErrorReporter, + initialConnected = true, + ) { + this.connected = initialConnected; + } + + start() { + this.attachPoolErrorHandler(); + this.logger.debug( + `Database connection monitor started (pingIntervalSeconds=${this.databaseConfig.pingIntervalSeconds}, pingTimeoutMs=${this.databaseConfig.pingTimeoutMs}, recoveryThreshold=${MAX_PING_FAILURES_BEFORE_RECOVERY})`, + ); + if (!inTest) { + this.scheduleNextPing(); + } + } + + stop() { + this.stopped = true; + this.recovering = false; + this.stopAbortController.abort(); + if (this.pingTimer) { + clearTimeout(this.pingTimer); + this.pingTimer = undefined; + } + this.logger.debug('Database connection monitor stopped'); + } + + private scheduleNextPing() { + if (!this.stopped) { + this.pingTimer = setTimeout( + async () => await this.ping(), + this.databaseConfig.pingIntervalSeconds * Time.seconds.toMilliseconds, + ); + } + } + + private async ping() { + if (this.stopped || !this.dataSource.isInitialized) { + return; + } + + if (this.recovering) { + this.scheduleNextPing(); + return; + } + + const abortController = new AbortController(); + + try { + await Promise.race([ + this.dataSource.query('SELECT 1'), + setTimeoutP(this.databaseConfig.pingTimeoutMs, undefined, { + signal: abortController.signal, + }).then(() => { + throw new OperationalError('Database connection timed out'); + }), + ]); + + if (!this.connected) { + this.logger.info('Database connection recovered'); + } + + this.setConnected(true); + this.consecutiveFailures = 0; + return; + } catch (error) { + this.setConnected(false); + this.consecutiveFailures += 1; + this.logger.warn( + `Database ping failed (${this.consecutiveFailures}/${MAX_PING_FAILURES_BEFORE_RECOVERY}): ${ensureError(error).message}`, + ); + if (!(error instanceof OperationalError)) { + this.errorReporter.error(error); + } + + if (this.consecutiveFailures >= MAX_PING_FAILURES_BEFORE_RECOVERY) { + this.logger.warn( + `Triggering database connection recovery after ${this.consecutiveFailures} consecutive ping failures`, + ); + // Fire-and-forget; recoverDataSource owns its own try/catch/finally and never rejects. + void this.recoverDataSource(); + } + } finally { + abortController.abort(); + this.scheduleNextPing(); + } + } + + private async recoverDataSource() { + if (this.recovering || this.stopped) { + return; + } + this.recovering = true; + + try { + const recoveryStart = Date.now(); + let attempt = 0; + let recovered = false; + + while (!recovered && !this.stopped) { + attempt += 1; + this.logger.warn(`Attempting to recover database connection (attempt ${attempt})`); + + try { + if (this.dataSource.isInitialized) { + await this.dataSource.destroy(); + } + + if (this.stopped) { + break; + } + await this.dataSource.initialize(); + this.attachPoolErrorHandler(); + this.setConnected(true); + this.consecutiveFailures = 0; + recovered = true; + } catch (error) { + const wrapped = ensureError(error); + this.errorReporter.error(wrapped); + const backoff = Math.min( + MIN_RECOVERY_BACKOFF_MS * 2 ** (attempt - 1), + MAX_RECOVERY_BACKOFF_MS, + ); + this.logger.warn( + `Recovery attempt ${attempt} failed: ${wrapped.message}. Retrying in ${backoff}ms`, + ); + try { + await setTimeoutP(backoff, undefined, { + signal: this.stopAbortController.signal, + }); + } catch { + // AbortError from stop() — the while loop's `!this.stopped` guard exits next iteration. + } + } + } + + if (recovered) { + this.logger.info( + `Database connection recovered after ${attempt} attempt(s) in ${Date.now() - recoveryStart}ms`, + ); + } else { + this.logger.warn( + `Database connection recovery aborted after ${attempt} attempt(s) (monitor stopped)`, + ); + } + } catch (error) { + this.errorReporter.error(ensureError(error)); + } finally { + this.recovering = false; + } + } + + // pg-pool emits 'error' for idle clients that fail (e.g. server-side pg_terminate_backend or RDS failover). + // Without a listener Node treats these as unhandled and crashes the process. + private attachPoolErrorHandler() { + // Postgres-only: the other supported driver (sqlite-pooled) does not expose a pool-level error stream. + if (this.dataSource.options.type !== 'postgres') { + return; + } + + const driver = this.dataSource.driver as unknown as { + master?: { on?: (event: string, handler: (cause: unknown) => void) => void }; + }; + + const pool = driver.master; + if (!pool || typeof pool.on !== 'function') { + // Defensive: TypeORM may have renamed `driver.master` in a future release. + this.logger.warn( + 'Skipping Postgres pool error listener: driver.master is unavailable (TypeORM internals may have changed)', + ); + return; + } + + pool.on('error', (cause: unknown) => { + this.setConnected(false); + this.logger.warn(`Postgres pool client error: ${ensureError(cause).message}`); + }); + this.logger.debug('Attached pool error listener to Postgres driver'); + } + + private setConnected(connected: boolean) { + if (this.connected === connected) { + return; + } + this.connected = connected; + this.onConnectedChange(connected); + } +} diff --git a/packages/@n8n/db/src/connection/db-connection-options.ts b/packages/@n8n/db/src/connection/db-connection-options.ts index 856b8cb0340..d7bdffc6c07 100644 --- a/packages/@n8n/db/src/connection/db-connection-options.ts +++ b/packages/@n8n/db/src/connection/db-connection-options.ts @@ -112,6 +112,18 @@ export class DbConnectionOptions { ssl, extra: { idleTimeoutMillis: postgresConfig.idleTimeoutMs, + keepAlive: postgresConfig.keepAlive, + keepAliveInitialDelayMillis: postgresConfig.keepAliveInitialDelayMs, + // pg-pool's `maxLifetimeSeconds` is the upstream knob; we accept ms in config for unit consistency. + // Clamp to >= 1s so values like 500ms don't silently round down to 0 (which disables it). + ...(postgresConfig.maxConnectionLifetimeMs > 0 + ? { + maxLifetimeSeconds: Math.max( + 1, + Math.round(postgresConfig.maxConnectionLifetimeMs / 1000), + ), + } + : {}), }, }; } diff --git a/packages/@n8n/db/src/connection/db-connection.ts b/packages/@n8n/db/src/connection/db-connection.ts index e215aa456c4..d5dd2d07544 100644 --- a/packages/@n8n/db/src/connection/db-connection.ts +++ b/packages/@n8n/db/src/connection/db-connection.ts @@ -1,13 +1,12 @@ -import { inTest, Logger } from '@n8n/backend-common'; +import { Logger } from '@n8n/backend-common'; import { DatabaseConfig } from '@n8n/config'; -import { Time } from '@n8n/constants'; import { Memoized } from '@n8n/decorators'; import { Container, Service } from '@n8n/di'; import { DataSource } from '@n8n/typeorm'; import { ErrorReporter } from 'n8n-core'; -import { DbConnectionTimeoutError, ensureError, OperationalError } from 'n8n-workflow'; -import { setTimeout as setTimeoutP } from 'timers/promises'; +import { DbConnectionTimeoutError, ensureError } from 'n8n-workflow'; +import { DbConnectionMonitor } from './db-connection-monitor'; import { DbConnectionOptions } from './db-connection-options'; import { wrapMigration } from '../migrations/migration-helpers'; import type { Migration } from '../migrations/migration-types'; @@ -21,8 +20,7 @@ type ConnectionState = { export class DbConnection { private dataSource: DataSource; - private pingTimer: NodeJS.Timeout | undefined; - timeout: number; + private monitor: DbConnectionMonitor | undefined; readonly connectionState: ConnectionState = { connected: false, @@ -37,9 +35,6 @@ export class DbConnection { ) { this.dataSource = new DataSource(this.options); Container.set(DataSource, this.dataSource); - this.timeout = process.env.N8N_DB_PING_TIMEOUT - ? Number.parseInt(process.env.N8N_DB_PING_TIMEOUT) - : 5000; } @Memoized @@ -50,6 +45,14 @@ export class DbConnection { async init(): Promise { const { connectionState, options } = this; if (connectionState.connected) return; + + // TODO(CAT-3314): Remove N8N_DB_PING_TIMEOUT fallback in v3. + if (process.env.N8N_DB_PING_TIMEOUT) { + this.logger.warn( + 'N8N_DB_PING_TIMEOUT is deprecated, use DB_PING_TIMEOUT_MS instead. The legacy variable will be removed in a future release.', + ); + } + try { await this.dataSource.initialize(); } catch (e) { @@ -67,7 +70,15 @@ export class DbConnection { } connectionState.connected = true; - if (!inTest) this.scheduleNextPing(); + this.monitor = new DbConnectionMonitor( + this.dataSource, + (connected) => (this.connectionState.connected = connected), + this.databaseConfig, + this.logger, + this.errorReporter, + connectionState.connected, + ); + this.monitor.start(); } async migrate() { @@ -78,53 +89,12 @@ export class DbConnection { } async close() { - if (this.pingTimer) { - clearTimeout(this.pingTimer); - this.pingTimer = undefined; - } + this.monitor?.stop(); + this.monitor = undefined; if (this.dataSource.isInitialized) { await this.dataSource.destroy(); this.connectionState.connected = false; } } - - /** Ping DB connection every `pingIntervalSeconds` seconds to check if it is still alive. */ - private scheduleNextPing() { - this.pingTimer = setTimeout( - async () => await this.ping(), - this.databaseConfig.pingIntervalSeconds * Time.seconds.toMilliseconds, - ); - } - - private async ping() { - if (!this.dataSource.isInitialized) return; - const abortController = new AbortController(); - - try { - await Promise.race([ - this.dataSource.query('SELECT 1'), - setTimeoutP(this.timeout, undefined, { signal: abortController.signal }).then(() => { - throw new OperationalError('Database connection timed out'); - }), - ]); - - if (!this.connectionState.connected) { - this.logger.info('Database connection recovered'); - } - - this.connectionState.connected = true; - return; - } catch (error) { - this.connectionState.connected = false; - if (error instanceof OperationalError) { - this.logger.warn(error.message); - } else { - this.errorReporter.error(error); - } - } finally { - abortController.abort(); - this.scheduleNextPing(); - } - } }