mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-03-28 19:49:25 +01:00
70 lines
1.9 KiB
TypeScript
70 lines
1.9 KiB
TypeScript
import { BaseCommand, flags } from '@adonisjs/core/ace'
|
|
import type { CommandOptions } from '@adonisjs/core/types/ace'
|
|
import { Worker } from 'bullmq'
|
|
import queueConfig from '#config/queue'
|
|
|
|
export default class QueueWork extends BaseCommand {
|
|
static commandName = 'queue:work'
|
|
static description = 'Start processing jobs from the queue'
|
|
|
|
@flags.string({ description: 'Queue name to process', required: true })
|
|
declare queue: string
|
|
|
|
static options: CommandOptions = {
|
|
startApp: true,
|
|
staysAlive: true,
|
|
}
|
|
|
|
async run() {
|
|
const queueName = this.queue || 'default'
|
|
|
|
const jobHandlers = await this.loadJobHandlers()
|
|
|
|
const worker = new Worker(
|
|
queueName,
|
|
async (job) => {
|
|
this.logger.info(`Processing job: ${job.id} of type: ${job.name}`)
|
|
const jobHandler = jobHandlers.get(job.name)
|
|
if (!jobHandler) {
|
|
throw new Error(`No handler found for job: ${job.name}`)
|
|
}
|
|
|
|
return await jobHandler.handle(job)
|
|
},
|
|
{
|
|
connection: queueConfig.connection,
|
|
concurrency: 3,
|
|
autorun: true,
|
|
}
|
|
)
|
|
|
|
worker.on('failed', (job, err) => {
|
|
this.logger.error(`Job failed: ${job?.id}, Error: ${err.message}`)
|
|
})
|
|
|
|
worker.on('completed', (job) => {
|
|
this.logger.info(`Job completed: ${job.id}`)
|
|
})
|
|
|
|
this.logger.info(`Worker started for queue: ${queueName}`)
|
|
|
|
process.on('SIGTERM', async () => {
|
|
this.logger.info('SIGTERM received. Shutting down worker...')
|
|
await worker.close()
|
|
this.logger.info('Worker shut down gracefully.')
|
|
process.exit(0)
|
|
})
|
|
}
|
|
|
|
private async loadJobHandlers() {
|
|
const handlers = new Map<string, any>()
|
|
|
|
const { RunDownloadJob } = await import('#jobs/run_download_job')
|
|
const { DownloadModelJob } = await import('#jobs/download_model_job')
|
|
handlers.set(RunDownloadJob.key, new RunDownloadJob())
|
|
handlers.set(DownloadModelJob.key, new DownloadModelJob())
|
|
|
|
return handlers
|
|
}
|
|
}
|