mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-05-25 22:05:07 +02:00
Every static call site instantiated a fresh QueueService (24 call sites across 8 files). QueueService.getQueue() opens a BullMQ Queue per call when not cached, and each Queue opens two ioredis connections (one for commands, one blocking). Because every static call constructed a new QueueService, its internal `queues` cache was never shared, every call opened a fresh pair, and none were ever closed. In normal operation this leaked a few connections per API hit. During multi-batch ZIM ingestion after PR #872 (where EmbedFileJob.handle() dispatches the next batch every 50 articles), every batch completion opened two new connections. On NOMAD3 at ~one batch every 4s sustained, that's ~1800 leaked connections/hour. Redis hit its 10,000-maxclient ceiling in ~5 hours and the admin container fell into an EPIPE flood that required a restart to recover. Fix: collapse QueueService to a true process-wide singleton with a private constructor and getInstance() accessor. The existing per-queue Map is now shared across every dispatch / status / cleanup call, so each queue's underlying connections are opened exactly once for the lifetime of the process. close() now clears the map so the singleton can be torn down cleanly if a graceful-shutdown hook is ever wired up. Validated on NOMAD3 (RTX 5060, v1.32.0-rc.4 + this patch hot-applied): under sustained multi-batch wikipedia_en_simple_all_nopic ingestion, connected_clients held flat at 21-22 across a 5-minute window. Pre-fix the same scenario climbed to 10,000+ over hours.
40 lines
1.1 KiB
TypeScript
40 lines
1.1 KiB
TypeScript
import { Queue } from 'bullmq'
|
|
import queueConfig from '#config/queue'
|
|
|
|
// Process-wide singleton. Each `Queue` opens two ioredis connections (one for
|
|
// commands, one blocking). Instantiating a fresh QueueService per dispatch /
|
|
// status lookup leaks both, and under sustained job churn (e.g. multi-batch ZIM
|
|
// ingestion enqueueing a continuation every few seconds) it saturates Redis's
|
|
// maxclients within hours.
|
|
export class QueueService {
|
|
private queues: Map<string, Queue> = new Map()
|
|
|
|
private static _instance: QueueService | null = null
|
|
|
|
private constructor() {}
|
|
|
|
static getInstance(): QueueService {
|
|
if (!QueueService._instance) {
|
|
QueueService._instance = new QueueService()
|
|
}
|
|
return QueueService._instance
|
|
}
|
|
|
|
getQueue(name: string): Queue {
|
|
if (!this.queues.has(name)) {
|
|
const queue = new Queue(name, {
|
|
connection: queueConfig.connection,
|
|
})
|
|
this.queues.set(name, queue)
|
|
}
|
|
return this.queues.get(name)!
|
|
}
|
|
|
|
async close() {
|
|
for (const queue of this.queues.values()) {
|
|
await queue.close()
|
|
}
|
|
this.queues.clear()
|
|
}
|
|
}
|