import { BaseCommand, flags } from '@adonisjs/core/ace' import type { CommandOptions } from '@adonisjs/core/types/ace' import { Worker } from 'bullmq' import queueConfig from '#config/queue' import { RunDownloadJob } from '#jobs/run_download_job' import { DownloadModelJob } from '#jobs/download_model_job' import { RunBenchmarkJob } from '#jobs/run_benchmark_job' import { EmbedFileJob } from '#jobs/embed_file_job' import { CheckUpdateJob } from '#jobs/check_update_job' import { CheckServiceUpdatesJob } from '#jobs/check_service_updates_job' export default class QueueWork extends BaseCommand { static commandName = 'queue:work' static description = 'Start processing jobs from the queue' @flags.string({ description: 'Queue name to process' }) declare queue: string @flags.boolean({ description: 'Process all queues automatically' }) declare all: boolean static options: CommandOptions = { startApp: true, staysAlive: true, } async run() { // Validate that either --queue or --all is provided if (!this.queue && !this.all) { this.logger.error('You must specify either --queue= or --all') process.exit(1) } if (this.queue && this.all) { this.logger.error('Cannot specify both --queue and --all flags') process.exit(1) } const [jobHandlers, allQueues] = await this.loadJobHandlers() // Determine which queues to process const queuesToProcess = this.all ? Array.from(allQueues.values()) : [this.queue] this.logger.info(`Starting workers for queues: ${queuesToProcess.join(', ')}`) const workers: Worker[] = [] // Create a worker for each queue for (const queueName of queuesToProcess) { const worker = new Worker( queueName, async (job) => { this.logger.info(`[${queueName}] Processing job: ${job.id} of type: ${job.name}`) const jobHandler = jobHandlers.get(job.name) if (!jobHandler) { throw new Error(`No handler found for job: ${job.name}`) } return await jobHandler.handle(job) }, { connection: queueConfig.connection, concurrency: this.getConcurrencyForQueue(queueName), lockDuration: 300000, autorun: true, } ) // Required to prevent Node from treating BullMQ internal errors as unhandled // EventEmitter errors that crash the process. worker.on('error', (err) => { this.logger.error(`[${queueName}] Worker error: ${err.message}`) }) worker.on('failed', async (job, err) => { this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`) // If this was a Wikipedia download, mark it as failed in the DB if (job?.data?.filetype === 'zim' && job?.data?.url?.includes('wikipedia_en_')) { try { const { DockerService } = await import('#services/docker_service') const { ZimService } = await import('#services/zim_service') const dockerService = new DockerService() const zimService = new ZimService(dockerService) await zimService.onWikipediaDownloadComplete(job.data.url, false) } catch (e: any) { this.logger.error( `[${queueName}] Failed to update Wikipedia status: ${e.message}` ) } } }) worker.on('completed', (job) => { this.logger.info(`[${queueName}] Job completed: ${job.id}`) }) workers.push(worker) this.logger.info(`Worker started for queue: ${queueName}`) } // Schedule nightly update checks (idempotent, will persist over restarts) await CheckUpdateJob.scheduleNightly() await CheckServiceUpdatesJob.scheduleNightly() // Safety net: log unhandled rejections instead of crashing the worker process. // Individual job errors are already caught by BullMQ; this catches anything that // escapes (e.g. a fire-and-forget promise in a callback that rejects unexpectedly). process.on('unhandledRejection', (reason) => { this.logger.error( `Unhandled promise rejection in worker process: ${reason instanceof Error ? reason.message : String(reason)}` ) }) // Graceful shutdown for all workers process.on('SIGTERM', async () => { this.logger.info('SIGTERM received. Shutting down workers...') await Promise.all(workers.map((worker) => worker.close())) this.logger.info('All workers shut down gracefully.') process.exit(0) }) } private async loadJobHandlers(): Promise<[Map, Map]> { const handlers = new Map() const queues = new Map() handlers.set(RunDownloadJob.key, new RunDownloadJob()) handlers.set(DownloadModelJob.key, new DownloadModelJob()) handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob()) handlers.set(EmbedFileJob.key, new EmbedFileJob()) handlers.set(CheckUpdateJob.key, new CheckUpdateJob()) handlers.set(CheckServiceUpdatesJob.key, new CheckServiceUpdatesJob()) queues.set(RunDownloadJob.key, RunDownloadJob.queue) queues.set(DownloadModelJob.key, DownloadModelJob.queue) queues.set(RunBenchmarkJob.key, RunBenchmarkJob.queue) queues.set(EmbedFileJob.key, EmbedFileJob.queue) queues.set(CheckUpdateJob.key, CheckUpdateJob.queue) queues.set(CheckServiceUpdatesJob.key, CheckServiceUpdatesJob.queue) return [handlers, queues] } /** * Get concurrency setting for a specific queue * Can be customized per queue based on workload characteristics */ private getConcurrencyForQueue(queueName: string): number { const concurrencyMap: Record = { [RunDownloadJob.queue]: 3, [DownloadModelJob.queue]: 2, // Lower concurrency for resource-intensive model downloads [RunBenchmarkJob.queue]: 1, // Run benchmarks one at a time for accurate results [EmbedFileJob.queue]: 2, // Lower concurrency for embedding jobs, can be resource intensive [CheckUpdateJob.queue]: 1, // No need to run more than one update check at a time default: 3, } return concurrencyMap[queueName] || concurrencyMap.default } }