import { BaseCommand, flags } from '@adonisjs/core/ace' import type { CommandOptions } from '@adonisjs/core/types/ace' import { Worker } from 'bullmq' import queueConfig from '#config/queue' import { RunDownloadJob } from '#jobs/run_download_job' import { DownloadModelJob } from '#jobs/download_model_job' import { RunBenchmarkJob } from '#jobs/run_benchmark_job' import { EmbedFileJob } from '#jobs/embed_file_job' import { CheckUpdateJob } from '#jobs/check_update_job' import { CheckServiceUpdatesJob } from '#jobs/check_service_updates_job' export default class QueueWork extends BaseCommand { static commandName = 'queue:work' static description = 'Start processing jobs from the queue' @flags.string({ description: 'Queue name to process' }) declare queue: string @flags.boolean({ description: 'Process all queues automatically' }) declare all: boolean static options: CommandOptions = { startApp: true, staysAlive: true, } async run() { // Validate that either --queue or --all is provided if (!this.queue && !this.all) { this.logger.error('You must specify either --queue= or --all') process.exit(1) } if (this.queue && this.all) { this.logger.error('Cannot specify both --queue and --all flags') process.exit(1) } const [jobHandlers, allQueues] = await this.loadJobHandlers() // Determine which queues to process const queuesToProcess = this.all ? Array.from(allQueues.values()) : [this.queue] this.logger.info(`Starting workers for queues: ${queuesToProcess.join(', ')}`) const workers: Worker[] = [] // Create a worker for each queue for (const queueName of queuesToProcess) { const worker = new Worker( queueName, async (job) => { this.logger.info(`[${queueName}] Processing job: ${job.id} of type: ${job.name}`) const jobHandler = jobHandlers.get(job.name) if (!jobHandler) { throw new Error(`No handler found for job: ${job.name}`) } return await jobHandler.handle(job) }, { connection: queueConfig.connection, concurrency: this.getConcurrencyForQueue(queueName), autorun: true, } ) worker.on('failed', async (job, err) => { this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`) // If this was a Wikipedia download, mark it as failed in the DB // Use safe database operations with ORM to prevent SQL injection if (job?.data?.filetype === 'zim' && job?.data?.filename && typeof job.data.filename === 'string') { try { const WikipediaSelection = (await import('#models/wikipedia_selection')).default const db = (await import('@adonisjs/lucid/services/db')).default // Validate filename to prevent SQL injection - only allow safe characters const safeFilename = String(job.data.filename).replace(/[^a-zA-Z0-9._-]/g, '') if (safeFilename && safeFilename.length > 0) { // Using explicit parameterized query to prevent SQL injection // The ? placeholders ensure user input is treated as data, not code await db.rawQuery( 'UPDATE wikipedia_selections SET status = ? WHERE filename = ? AND status = ?', ['failed', safeFilename, 'downloading'] ) } } catch (e: any) { const errorMsg = String(e.message) this.logger.error('[' + queueName + '] Failed to update Wikipedia status: ' + errorMsg) } } }) worker.on('completed', (job) => { this.logger.info(`[${queueName}] Job completed: ${job.id}`) }) workers.push(worker) this.logger.info(`Worker started for queue: ${queueName}`) } // Schedule nightly update checks (idempotent, will persist over restarts) await CheckUpdateJob.scheduleNightly() await CheckServiceUpdatesJob.scheduleNightly() // Graceful shutdown for all workers process.on('SIGTERM', async () => { this.logger.info('SIGTERM received. Shutting down workers...') await Promise.all(workers.map((worker) => worker.close())) this.logger.info('All workers shut down gracefully.') process.exit(0) }) } private async loadJobHandlers(): Promise<[Map, Map]> { const handlers = new Map() const queues = new Map() handlers.set(RunDownloadJob.key, new RunDownloadJob()) handlers.set(DownloadModelJob.key, new DownloadModelJob()) handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob()) handlers.set(EmbedFileJob.key, new EmbedFileJob()) handlers.set(CheckUpdateJob.key, new CheckUpdateJob()) handlers.set(CheckServiceUpdatesJob.key, new CheckServiceUpdatesJob()) queues.set(RunDownloadJob.key, RunDownloadJob.queue) queues.set(DownloadModelJob.key, DownloadModelJob.queue) queues.set(RunBenchmarkJob.key, RunBenchmarkJob.queue) queues.set(EmbedFileJob.key, EmbedFileJob.queue) queues.set(CheckUpdateJob.key, CheckUpdateJob.queue) queues.set(CheckServiceUpdatesJob.key, CheckServiceUpdatesJob.queue) return [handlers, queues] } /** * Get concurrency setting for a specific queue * Can be customized per queue based on workload characteristics */ private getConcurrencyForQueue(queueName: string): number { const concurrencyMap: Record = { [RunDownloadJob.queue]: 3, [DownloadModelJob.queue]: 2, // Lower concurrency for resource-intensive model downloads [RunBenchmarkJob.queue]: 1, // Run benchmarks one at a time for accurate results [EmbedFileJob.queue]: 2, // Lower concurrency for embedding jobs, can be resource intensive [CheckUpdateJob.queue]: 1, // No need to run more than one update check at a time default: 3, } return concurrencyMap[queueName] || concurrencyMap.default } }