mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-03-28 03:29:25 +01:00
131 lines
4.7 KiB
TypeScript
131 lines
4.7 KiB
TypeScript
import { BaseCommand, flags } from '@adonisjs/core/ace'
|
|
import type { CommandOptions } from '@adonisjs/core/types/ace'
|
|
import { Worker } from 'bullmq'
|
|
import queueConfig from '#config/queue'
|
|
import { RunDownloadJob } from '#jobs/run_download_job'
|
|
import { DownloadModelJob } from '#jobs/download_model_job'
|
|
import { RunBenchmarkJob } from '#jobs/run_benchmark_job'
|
|
import { EmbedFileJob } from '#jobs/embed_file_job'
|
|
import { CheckUpdateJob } from '#jobs/check_update_job'
|
|
import { CheckServiceUpdatesJob } from '#jobs/check_service_updates_job'
|
|
|
|
export default class QueueWork extends BaseCommand {
|
|
static commandName = 'queue:work'
|
|
static description = 'Start processing jobs from the queue'
|
|
|
|
@flags.string({ description: 'Queue name to process' })
|
|
declare queue: string
|
|
|
|
@flags.boolean({ description: 'Process all queues automatically' })
|
|
declare all: boolean
|
|
|
|
static options: CommandOptions = {
|
|
startApp: true,
|
|
staysAlive: true,
|
|
}
|
|
|
|
async run() {
|
|
// Validate that either --queue or --all is provided
|
|
if (!this.queue && !this.all) {
|
|
this.logger.error('You must specify either --queue=<name> or --all')
|
|
process.exit(1)
|
|
}
|
|
|
|
if (this.queue && this.all) {
|
|
this.logger.error('Cannot specify both --queue and --all flags')
|
|
process.exit(1)
|
|
}
|
|
|
|
const [jobHandlers, allQueues] = await this.loadJobHandlers()
|
|
|
|
// Determine which queues to process
|
|
const queuesToProcess = this.all ? Array.from(allQueues.values()) : [this.queue]
|
|
|
|
this.logger.info(`Starting workers for queues: ${queuesToProcess.join(', ')}`)
|
|
|
|
const workers: Worker[] = []
|
|
|
|
// Create a worker for each queue
|
|
for (const queueName of queuesToProcess) {
|
|
const worker = new Worker(
|
|
queueName,
|
|
async (job) => {
|
|
this.logger.info(`[${queueName}] Processing job: ${job.id} of type: ${job.name}`)
|
|
const jobHandler = jobHandlers.get(job.name)
|
|
if (!jobHandler) {
|
|
throw new Error(`No handler found for job: ${job.name}`)
|
|
}
|
|
|
|
return await jobHandler.handle(job)
|
|
},
|
|
{
|
|
connection: queueConfig.connection,
|
|
concurrency: this.getConcurrencyForQueue(queueName),
|
|
autorun: true,
|
|
}
|
|
)
|
|
|
|
worker.on('failed', (job, err) => {
|
|
this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`)
|
|
})
|
|
|
|
worker.on('completed', (job) => {
|
|
this.logger.info(`[${queueName}] Job completed: ${job.id}`)
|
|
})
|
|
|
|
workers.push(worker)
|
|
this.logger.info(`Worker started for queue: ${queueName}`)
|
|
}
|
|
|
|
// Schedule nightly update checks (idempotent, will persist over restarts)
|
|
await CheckUpdateJob.scheduleNightly()
|
|
await CheckServiceUpdatesJob.scheduleNightly()
|
|
|
|
// Graceful shutdown for all workers
|
|
process.on('SIGTERM', async () => {
|
|
this.logger.info('SIGTERM received. Shutting down workers...')
|
|
await Promise.all(workers.map((worker) => worker.close()))
|
|
this.logger.info('All workers shut down gracefully.')
|
|
process.exit(0)
|
|
})
|
|
}
|
|
|
|
private async loadJobHandlers(): Promise<[Map<string, any>, Map<string, string>]> {
|
|
const handlers = new Map<string, any>()
|
|
const queues = new Map<string, string>()
|
|
|
|
handlers.set(RunDownloadJob.key, new RunDownloadJob())
|
|
handlers.set(DownloadModelJob.key, new DownloadModelJob())
|
|
handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob())
|
|
handlers.set(EmbedFileJob.key, new EmbedFileJob())
|
|
handlers.set(CheckUpdateJob.key, new CheckUpdateJob())
|
|
handlers.set(CheckServiceUpdatesJob.key, new CheckServiceUpdatesJob())
|
|
|
|
queues.set(RunDownloadJob.key, RunDownloadJob.queue)
|
|
queues.set(DownloadModelJob.key, DownloadModelJob.queue)
|
|
queues.set(RunBenchmarkJob.key, RunBenchmarkJob.queue)
|
|
queues.set(EmbedFileJob.key, EmbedFileJob.queue)
|
|
queues.set(CheckUpdateJob.key, CheckUpdateJob.queue)
|
|
queues.set(CheckServiceUpdatesJob.key, CheckServiceUpdatesJob.queue)
|
|
|
|
return [handlers, queues]
|
|
}
|
|
|
|
/**
|
|
* Get concurrency setting for a specific queue
|
|
* Can be customized per queue based on workload characteristics
|
|
*/
|
|
private getConcurrencyForQueue(queueName: string): number {
|
|
const concurrencyMap: Record<string, number> = {
|
|
[RunDownloadJob.queue]: 3,
|
|
[DownloadModelJob.queue]: 2, // Lower concurrency for resource-intensive model downloads
|
|
[RunBenchmarkJob.queue]: 1, // Run benchmarks one at a time for accurate results
|
|
[EmbedFileJob.queue]: 2, // Lower concurrency for embedding jobs, can be resource intensive
|
|
[CheckUpdateJob.queue]: 1, // No need to run more than one update check at a time
|
|
default: 3,
|
|
}
|
|
|
|
return concurrencyMap[queueName] || concurrencyMap.default
|
|
}
|
|
}
|