mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-03-28 03:29:25 +01:00
SQL query constructed using JavaScript template literals with dynamic input Resolves utils.custom.sql-injection-template-literal
154 lines
5.9 KiB
TypeScript
154 lines
5.9 KiB
TypeScript
import { BaseCommand, flags } from '@adonisjs/core/ace'
|
|
import type { CommandOptions } from '@adonisjs/core/types/ace'
|
|
import { Worker } from 'bullmq'
|
|
import queueConfig from '#config/queue'
|
|
import { RunDownloadJob } from '#jobs/run_download_job'
|
|
import { DownloadModelJob } from '#jobs/download_model_job'
|
|
import { RunBenchmarkJob } from '#jobs/run_benchmark_job'
|
|
import { EmbedFileJob } from '#jobs/embed_file_job'
|
|
import { CheckUpdateJob } from '#jobs/check_update_job'
|
|
import { CheckServiceUpdatesJob } from '#jobs/check_service_updates_job'
|
|
|
|
export default class QueueWork extends BaseCommand {
|
|
static commandName = 'queue:work'
|
|
static description = 'Start processing jobs from the queue'
|
|
|
|
@flags.string({ description: 'Queue name to process' })
|
|
declare queue: string
|
|
|
|
@flags.boolean({ description: 'Process all queues automatically' })
|
|
declare all: boolean
|
|
|
|
static options: CommandOptions = {
|
|
startApp: true,
|
|
staysAlive: true,
|
|
}
|
|
|
|
async run() {
|
|
// Validate that either --queue or --all is provided
|
|
if (!this.queue && !this.all) {
|
|
this.logger.error('You must specify either --queue=<name> or --all')
|
|
process.exit(1)
|
|
}
|
|
|
|
if (this.queue && this.all) {
|
|
this.logger.error('Cannot specify both --queue and --all flags')
|
|
process.exit(1)
|
|
}
|
|
|
|
const [jobHandlers, allQueues] = await this.loadJobHandlers()
|
|
|
|
// Determine which queues to process
|
|
const queuesToProcess = this.all ? Array.from(allQueues.values()) : [this.queue]
|
|
|
|
this.logger.info(`Starting workers for queues: ${queuesToProcess.join(', ')}`)
|
|
|
|
const workers: Worker[] = []
|
|
|
|
// Create a worker for each queue
|
|
for (const queueName of queuesToProcess) {
|
|
const worker = new Worker(
|
|
queueName,
|
|
async (job) => {
|
|
this.logger.info(`[${queueName}] Processing job: ${job.id} of type: ${job.name}`)
|
|
const jobHandler = jobHandlers.get(job.name)
|
|
if (!jobHandler) {
|
|
throw new Error(`No handler found for job: ${job.name}`)
|
|
}
|
|
|
|
return await jobHandler.handle(job)
|
|
},
|
|
{
|
|
connection: queueConfig.connection,
|
|
concurrency: this.getConcurrencyForQueue(queueName),
|
|
autorun: true,
|
|
}
|
|
)
|
|
|
|
worker.on('failed', async (job, err) => {
|
|
this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`)
|
|
|
|
// If this was a Wikipedia download, mark it as failed in the DB
|
|
// Use safe database operations with ORM to prevent SQL injection
|
|
if (job?.data?.filetype === 'zim' && job?.data?.filename && typeof job.data.filename === 'string') {
|
|
try {
|
|
const WikipediaSelection = (await import('#models/wikipedia_selection')).default
|
|
const db = (await import('@adonisjs/lucid/services/db')).default
|
|
|
|
// Validate filename to prevent SQL injection - only allow safe characters
|
|
const safeFilename = String(job.data.filename).replace(/[^a-zA-Z0-9._-]/g, '')
|
|
if (safeFilename && safeFilename.length > 0) {
|
|
// Using explicit parameterized query to prevent SQL injection
|
|
// The ? placeholders ensure user input is treated as data, not code
|
|
await db.rawQuery(
|
|
'UPDATE wikipedia_selections SET status = ? WHERE filename = ? AND status = ?',
|
|
['failed', safeFilename, 'downloading']
|
|
)
|
|
}
|
|
} catch (e: any) {
|
|
const errorMsg = String(e.message)
|
|
this.logger.error('[' + queueName + '] Failed to update Wikipedia status: ' + errorMsg)
|
|
}
|
|
}
|
|
})
|
|
|
|
worker.on('completed', (job) => {
|
|
this.logger.info(`[${queueName}] Job completed: ${job.id}`)
|
|
})
|
|
|
|
workers.push(worker)
|
|
this.logger.info(`Worker started for queue: ${queueName}`)
|
|
}
|
|
|
|
// Schedule nightly update checks (idempotent, will persist over restarts)
|
|
await CheckUpdateJob.scheduleNightly()
|
|
await CheckServiceUpdatesJob.scheduleNightly()
|
|
|
|
// Graceful shutdown for all workers
|
|
process.on('SIGTERM', async () => {
|
|
this.logger.info('SIGTERM received. Shutting down workers...')
|
|
await Promise.all(workers.map((worker) => worker.close()))
|
|
this.logger.info('All workers shut down gracefully.')
|
|
process.exit(0)
|
|
})
|
|
}
|
|
|
|
private async loadJobHandlers(): Promise<[Map<string, any>, Map<string, string>]> {
|
|
const handlers = new Map<string, any>()
|
|
const queues = new Map<string, string>()
|
|
|
|
handlers.set(RunDownloadJob.key, new RunDownloadJob())
|
|
handlers.set(DownloadModelJob.key, new DownloadModelJob())
|
|
handlers.set(RunBenchmarkJob.key, new RunBenchmarkJob())
|
|
handlers.set(EmbedFileJob.key, new EmbedFileJob())
|
|
handlers.set(CheckUpdateJob.key, new CheckUpdateJob())
|
|
handlers.set(CheckServiceUpdatesJob.key, new CheckServiceUpdatesJob())
|
|
|
|
queues.set(RunDownloadJob.key, RunDownloadJob.queue)
|
|
queues.set(DownloadModelJob.key, DownloadModelJob.queue)
|
|
queues.set(RunBenchmarkJob.key, RunBenchmarkJob.queue)
|
|
queues.set(EmbedFileJob.key, EmbedFileJob.queue)
|
|
queues.set(CheckUpdateJob.key, CheckUpdateJob.queue)
|
|
queues.set(CheckServiceUpdatesJob.key, CheckServiceUpdatesJob.queue)
|
|
|
|
return [handlers, queues]
|
|
}
|
|
|
|
/**
|
|
* Get concurrency setting for a specific queue
|
|
* Can be customized per queue based on workload characteristics
|
|
*/
|
|
private getConcurrencyForQueue(queueName: string): number {
|
|
const concurrencyMap: Record<string, number> = {
|
|
[RunDownloadJob.queue]: 3,
|
|
[DownloadModelJob.queue]: 2, // Lower concurrency for resource-intensive model downloads
|
|
[RunBenchmarkJob.queue]: 1, // Run benchmarks one at a time for accurate results
|
|
[EmbedFileJob.queue]: 2, // Lower concurrency for embedding jobs, can be resource intensive
|
|
[CheckUpdateJob.queue]: 1, // No need to run more than one update check at a time
|
|
default: 3,
|
|
}
|
|
|
|
return concurrencyMap[queueName] || concurrencyMap.default
|
|
}
|
|
}
|