diff --git a/admin/commands/queue/work.ts b/admin/commands/queue/work.ts index 9dcdb9b..2ee93b5 100644 --- a/admin/commands/queue/work.ts +++ b/admin/commands/queue/work.ts @@ -2,70 +2,114 @@ import { BaseCommand, flags } from '@adonisjs/core/ace' import type { CommandOptions } from '@adonisjs/core/types/ace' import { Worker } from 'bullmq' import queueConfig from '#config/queue' +import { RunDownloadJob } from '#jobs/run_download_job' +import { DownloadModelJob } from '#jobs/download_model_job' +import { RunBenchmarkJob } from '#jobs/run_benchmark_job' export default class QueueWork extends BaseCommand { static commandName = 'queue:work' static description = 'Start processing jobs from the queue' - @flags.string({ description: 'Queue name to process', required: true }) + @flags.string({ description: 'Queue name to process' }) declare queue: string + @flags.boolean({ description: 'Process all queues automatically' }) + declare all: boolean + static options: CommandOptions = { startApp: true, staysAlive: true, } async run() { - const queueName = this.queue || 'default' + // Validate that either --queue or --all is provided + if (!this.queue && !this.all) { + this.logger.error('You must specify either --queue= or --all') + process.exit(1) + } - const jobHandlers = await this.loadJobHandlers() + if (this.queue && this.all) { + this.logger.error('Cannot specify both --queue and --all flags') + process.exit(1) + } - const worker = new Worker( - queueName, - async (job) => { - this.logger.info(`Processing job: ${job.id} of type: ${job.name}`) - const jobHandler = jobHandlers.get(job.name) - if (!jobHandler) { - throw new Error(`No handler found for job: ${job.name}`) + const [jobHandlers, allQueues] = await this.loadJobHandlers() + + // Determine which queues to process + const queuesToProcess = this.all ? Array.from(allQueues.values()) : [this.queue] + + this.logger.info(`Starting workers for queues: ${queuesToProcess.join(', ')}`) + + const workers: Worker[] = [] + + // Create a worker for each queue + for (const queueName of queuesToProcess) { + const worker = new Worker( + queueName, + async (job) => { + this.logger.info(`[${queueName}] Processing job: ${job.id} of type: ${job.name}`) + const jobHandler = jobHandlers.get(job.name) + if (!jobHandler) { + throw new Error(`No handler found for job: ${job.name}`) + } + + return await jobHandler.handle(job) + }, + { + connection: queueConfig.connection, + concurrency: this.getConcurrencyForQueue(queueName), + autorun: true, } + ) - return await jobHandler.handle(job) - }, - { - connection: queueConfig.connection, - concurrency: 3, - autorun: true, - } - ) + worker.on('failed', (job, err) => { + this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`) + }) - worker.on('failed', (job, err) => { - this.logger.error(`Job failed: ${job?.id}, Error: ${err.message}`) - }) + worker.on('completed', (job) => { + this.logger.info(`[${queueName}] Job completed: ${job.id}`) + }) - worker.on('completed', (job) => { - this.logger.info(`Job completed: ${job.id}`) - }) - - this.logger.info(`Worker started for queue: ${queueName}`) + workers.push(worker) + this.logger.info(`Worker started for queue: ${queueName}`) + } + // Graceful shutdown for all workers process.on('SIGTERM', async () => { - this.logger.info('SIGTERM received. Shutting down worker...') - await worker.close() - this.logger.info('Worker shut down gracefully.') + this.logger.info('SIGTERM received. Shutting down workers...') + await Promise.all(workers.map((worker) => worker.close())) + this.logger.info('All workers shut down gracefully.') process.exit(0) }) } - private async loadJobHandlers() { + private async loadJobHandlers(): Promise<[Map, Map]> { const handlers = new Map() + const queues = new Map() - const { RunDownloadJob } = await import('#jobs/run_download_job') - const { DownloadModelJob } = await import('#jobs/download_model_job') - const { RunBenchmarkJob } = await import('#jobs/run_benchmark_job') handlers.set(RunDownloadJob.key, new RunDownloadJob()) handlers.set(DownloadModelJob.key, new DownloadModelJob()) handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob()) - return handlers + queues.set(RunDownloadJob.key, RunDownloadJob.queue) + queues.set(DownloadModelJob.key, DownloadModelJob.queue) + queues.set(RunBenchmarkJob.key, RunBenchmarkJob.queue) + + return [handlers, queues] + } + + /** + * Get concurrency setting for a specific queue + * Can be customized per queue based on workload characteristics + */ + private getConcurrencyForQueue(queueName: string): number { + const concurrencyMap: Record = { + [RunDownloadJob.queue]: 3, + [DownloadModelJob.queue]: 2, // Lower concurrency for resource-intensive model downloads + [RunBenchmarkJob.queue]: 1, // Run benchmarks one at a time for accurate results + default: 3, + } + + return concurrencyMap[queueName] || concurrencyMap.default } } diff --git a/admin/package.json b/admin/package.json index b09fe6b..3f29eaf 100644 --- a/admin/package.json +++ b/admin/package.json @@ -14,7 +14,9 @@ "format": "prettier --write .", "typecheck": "tsc --noEmit", "work:downloads": "node ace queue:work --queue=downloads", - "work:model-downloads": "node ace queue:work --queue=model-downloads" + "work:model-downloads": "node ace queue:work --queue=model-downloads", + "work:benchmarks": "node ace queue:work --queue=benchmarks", + "work:all": "node ace queue:work --all" }, "imports": { "#controllers/*": "./app/controllers/*.js", diff --git a/install/entrypoint.sh b/install/entrypoint.sh index 71f40db..17bdc95 100644 --- a/install/entrypoint.sh +++ b/install/entrypoint.sh @@ -17,11 +17,9 @@ node ace migration:run --force echo "Seeding the database..." node ace db:seed -# Start background workers for queues -echo "Starting background workers for queues..." -node ace queue:work --queue=downloads & -node ace queue:work --queue=model-downloads & -node ace queue:work --queue +# Start background workers for all queues +echo "Starting background workers for all queues..." +node ace queue:work --all & # Start the AdonisJS application echo "Starting AdonisJS application..."