project-nomad/admin/commands/queue/work.ts
Chris Sherwood 755807f95e feat: Add system benchmark feature with NOMAD Score
Add comprehensive benchmarking capability to measure server performance:

Backend:
- BenchmarkService with CPU, memory, disk, and AI benchmarks using sysbench
- Database migrations for benchmark_results and benchmark_settings tables
- REST API endpoints for running benchmarks and retrieving results
- CLI commands: benchmark:run, benchmark:results, benchmark:submit
- BullMQ job for async benchmark execution with SSE progress updates
- Synchronous mode option (?sync=true) for simpler local dev setup

Frontend:
- Benchmark settings page with circular gauges for scores
- NOMAD Score display with weighted composite calculation
- System Performance section (CPU, Memory, Disk Read/Write)
- AI Performance section (tokens/sec, time to first token)
- Hardware Information display
- Expandable Benchmark Details section
- Progress simulation during sync benchmark execution

Easy Setup Integration:
- Added System Benchmark to Additional Tools section
- Built-in capability pattern for non-Docker features
- Click-to-navigate behavior for built-in tools

Fixes:
- Docker log multiplexing issue (Tty: true) for proper output parsing
- Consolidated disk benchmarks into single container execution

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 21:48:12 -08:00

72 lines
2.1 KiB
TypeScript

import { BaseCommand, flags } from '@adonisjs/core/ace'
import type { CommandOptions } from '@adonisjs/core/types/ace'
import { Worker } from 'bullmq'
import queueConfig from '#config/queue'
export default class QueueWork extends BaseCommand {
static commandName = 'queue:work'
static description = 'Start processing jobs from the queue'
@flags.string({ description: 'Queue name to process', required: true })
declare queue: string
static options: CommandOptions = {
startApp: true,
staysAlive: true,
}
async run() {
const queueName = this.queue || 'default'
const jobHandlers = await this.loadJobHandlers()
const worker = new Worker(
queueName,
async (job) => {
this.logger.info(`Processing job: ${job.id} of type: ${job.name}`)
const jobHandler = jobHandlers.get(job.name)
if (!jobHandler) {
throw new Error(`No handler found for job: ${job.name}`)
}
return await jobHandler.handle(job)
},
{
connection: queueConfig.connection,
concurrency: 3,
autorun: true,
}
)
worker.on('failed', (job, err) => {
this.logger.error(`Job failed: ${job?.id}, Error: ${err.message}`)
})
worker.on('completed', (job) => {
this.logger.info(`Job completed: ${job.id}`)
})
this.logger.info(`Worker started for queue: ${queueName}`)
process.on('SIGTERM', async () => {
this.logger.info('SIGTERM received. Shutting down worker...')
await worker.close()
this.logger.info('Worker shut down gracefully.')
process.exit(0)
})
}
private async loadJobHandlers() {
const handlers = new Map<string, any>()
const { RunDownloadJob } = await import('#jobs/run_download_job')
const { DownloadModelJob } = await import('#jobs/download_model_job')
const { RunBenchmarkJob } = await import('#jobs/run_benchmark_job')
handlers.set(RunDownloadJob.key, new RunDownloadJob())
handlers.set(DownloadModelJob.key, new DownloadModelJob())
handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob())
return handlers
}
}