import { BaseCommand, flags } from '@adonisjs/core/ace' import type { CommandOptions } from '@adonisjs/core/types/ace' import { Worker } from 'bullmq' import queueConfig from '#config/queue' import { RunDownloadJob } from '#jobs/run_download_job' import { DownloadModelJob } from '#jobs/download_model_job' import { RunBenchmarkJob } from '#jobs/run_benchmark_job' import { EmbedFileJob } from '#jobs/embed_file_job' import { CheckUpdateJob } from '#jobs/check_update_job' import { CheckServiceUpdatesJob } from '#jobs/check_service_updates_job' export default class QueueWork extends BaseCommand { static commandName = 'queue:work' static description = 'Start processing jobs from the queue' @flags.string({ description: 'Queue name to process' }) declare queue: string @flags.boolean({ description: 'Process all queues automatically' }) declare all: boolean static options: CommandOptions = { startApp: true, staysAlive: true, } async run() { // Validate that either --queue or --all is provided if (!this.queue && !this.all) { this.logger.error('You must specify either --queue= or --all') process.exit(1) } if (this.queue && this.all) { this.logger.error('Cannot specify both --queue and --all flags') process.exit(1) } const [jobHandlers, allQueues] = await this.loadJobHandlers() // Determine which queues to process const queuesToProcess = this.all ? Array.from(allQueues.values()) : [this.queue] this.logger.info(`Starting workers for queues: ${queuesToProcess.join(', ')}`) const workers: Worker[] = [] // Create a worker for each queue for (const queueName of queuesToProcess) { const worker = new Worker( queueName, async (job) => { this.logger.info(`[${queueName}] Processing job: ${job.id} of type: ${job.name}`) const jobHandler = jobHandlers.get(job.name) if (!jobHandler) { throw new Error(`No handler found for job: ${job.name}`) } return await jobHandler.handle(job) }, { connection: queueConfig.connection, concurrency: this.getConcurrencyForQueue(queueName), autorun: true, } ) worker.on('failed', async (job, err) => { this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`) // If this was a Wikipedia download, mark it as failed in the DB if (job?.data?.filetype === 'zim' && job?.data?.url?.includes('wikipedia_en_')) { try { const { DockerService } = await import('#services/docker_service') const { ZimService } = await import('#services/zim_service') const dockerService = new DockerService() const zimService = new ZimService(dockerService) await zimService.onWikipediaDownloadComplete(job.data.url, false) } catch (e: any) { this.logger.error( `[${queueName}] Failed to update Wikipedia status: ${e.message}` ) } } }) worker.on('completed', (job) => { this.logger.info(`[${queueName}] Job completed: ${job.id}`) }) workers.push(worker) this.logger.info(`Worker started for queue: ${queueName}`) } // Schedule nightly update checks (idempotent, will persist over restarts) await CheckUpdateJob.scheduleNightly() await CheckServiceUpdatesJob.scheduleNightly() // Graceful shutdown for all workers process.on('SIGTERM', async () => { this.logger.info('SIGTERM received. Shutting down workers...') await Promise.all(workers.map((worker) => worker.close())) this.logger.info('All workers shut down gracefully.') process.exit(0) }) } private async loadJobHandlers(): Promise<[Map, Map]> { const handlers = new Map() const queues = new Map() handlers.set(RunDownloadJob.key, new RunDownloadJob()) handlers.set(DownloadModelJob.key, new DownloadModelJob()) handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob()) handlers.set(EmbedFileJob.key, new EmbedFileJob()) handlers.set(CheckUpdateJob.key, new CheckUpdateJob()) handlers.set(CheckServiceUpdatesJob.key, new CheckServiceUpdatesJob()) queues.set(RunDownloadJob.key, RunDownloadJob.queue) queues.set(DownloadModelJob.key, DownloadModelJob.queue) queues.set(RunBenchmarkJob.key, RunBenchmarkJob.queue) queues.set(EmbedFileJob.key, EmbedFileJob.queue) queues.set(CheckUpdateJob.key, CheckUpdateJob.queue) queues.set(CheckServiceUpdatesJob.key, CheckServiceUpdatesJob.queue) return [handlers, queues] } /** * Get concurrency setting for a specific queue * Can be customized per queue based on workload characteristics */ private getConcurrencyForQueue(queueName: string): number { const concurrencyMap: Record = { [RunDownloadJob.queue]: 3, [DownloadModelJob.queue]: 2, // Lower concurrency for resource-intensive model downloads [RunBenchmarkJob.queue]: 1, // Run benchmarks one at a time for accurate results [EmbedFileJob.queue]: 2, // Lower concurrency for embedding jobs, can be resource intensive [CheckUpdateJob.queue]: 1, // No need to run more than one update check at a time default: 3, } return concurrencyMap[queueName] || concurrencyMap.default } }