diff --git a/admin/app/jobs/download_model_job.ts b/admin/app/jobs/download_model_job.ts index 61908e4..4e27a49 100644 --- a/admin/app/jobs/download_model_job.ts +++ b/admin/app/jobs/download_model_job.ts @@ -44,7 +44,9 @@ export class DownloadModelJob { // Services are ready, initiate the download with progress tracking const result = await ollamaService.downloadModel(modelName, (progressPercent) => { if (progressPercent) { - job.updateProgress(Math.floor(progressPercent)) + job.updateProgress(Math.floor(progressPercent)).catch((err) => { + if (err?.code !== -1) throw err + }) logger.info( `[DownloadModelJob] Model ${modelName}: ${progressPercent}%` ) @@ -56,6 +58,8 @@ export class DownloadModelJob { status: 'downloading', progress: progressPercent, progress_timestamp: new Date().toISOString(), + }).catch((err) => { + if (err?.code !== -1) throw err }) }) diff --git a/admin/app/jobs/embed_file_job.ts b/admin/app/jobs/embed_file_job.ts index 83a61bf..eef45a8 100644 --- a/admin/app/jobs/embed_file_job.ts +++ b/admin/app/jobs/embed_file_job.ts @@ -31,6 +31,17 @@ export class EmbedFileJob { return createHash('sha256').update(filePath).digest('hex').slice(0, 16) } + /** Calls job.updateProgress but silently ignores "Missing key" errors (code -1), + * which occur when the job has been removed from Redis (e.g. cancelled externally) + * between the time the await was issued and the Redis write completed. */ + private async safeUpdateProgress(job: Job, progress: number): Promise { + try { + await job.updateProgress(progress) + } catch (err: any) { + if (err?.code !== -1) throw err + } + } + async handle(job: Job) { const { filePath, fileName, batchOffset, totalArticles } = job.data as EmbedFileJobParams @@ -67,7 +78,7 @@ export class EmbedFileJob { logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`) // Update progress starting - await job.updateProgress(5) + await this.safeUpdateProgress(job, 5) await job.updateData({ ...job.data, status: 'processing', @@ -78,7 +89,7 @@ export class EmbedFileJob { // Progress callback: maps service-reported 0-100% into the 5-95% job range const onProgress = async (percent: number) => { - await job.updateProgress(Math.min(95, Math.round(5 + percent * 0.9))) + await this.safeUpdateProgress(job, Math.min(95, Math.round(5 + percent * 0.9))) } // Process and embed the file @@ -117,7 +128,7 @@ export class EmbedFileJob { ? Math.round((nextOffset / totalArticles) * 100) : 50 - await job.updateProgress(progress) + await this.safeUpdateProgress(job, progress) await job.updateData({ ...job.data, status: 'batch_completed', @@ -138,7 +149,7 @@ export class EmbedFileJob { // Final batch or non-batched file - mark as complete const totalChunks = (job.data.chunks || 0) + (result.chunks || 0) - await job.updateProgress(100) + await this.safeUpdateProgress(job, 100) await job.updateData({ ...job.data, status: 'completed', diff --git a/admin/app/jobs/run_download_job.ts b/admin/app/jobs/run_download_job.ts index 5ce3cc3..4cb7dd7 100644 --- a/admin/app/jobs/run_download_job.ts +++ b/admin/app/jobs/run_download_job.ts @@ -84,7 +84,11 @@ export class RunDownloadJob { totalBytes: progress.totalBytes, lastProgressTime: Date.now(), } - job.updateProgress(progressData) + job.updateProgress(progressData).catch((err) => { + // Job was removed from Redis (e.g. cancelled) between the callback firing + // and the Redis write completing — this is expected and safe to ignore. + if (err?.code !== -1) throw err + }) lastKnownProgress = { downloadedBytes: progress.downloadedBytes, totalBytes: progress.totalBytes } }, async onComplete(url) { @@ -161,7 +165,9 @@ export class RunDownloadJob { downloadedBytes: lastKnownProgress.downloadedBytes, totalBytes: lastKnownProgress.totalBytes, lastProgressTime: Date.now(), - } as DownloadProgressData) + } as DownloadProgressData).catch((err) => { + if (err?.code !== -1) throw err + }) }, }) diff --git a/admin/commands/queue/work.ts b/admin/commands/queue/work.ts index 5fac5ec..31bb1cc 100644 --- a/admin/commands/queue/work.ts +++ b/admin/commands/queue/work.ts @@ -66,6 +66,12 @@ export default class QueueWork extends BaseCommand { } ) + // Required to prevent Node from treating BullMQ internal errors as unhandled + // EventEmitter errors that crash the process. + worker.on('error', (err) => { + this.logger.error(`[${queueName}] Worker error: ${err.message}`) + }) + worker.on('failed', async (job, err) => { this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`) @@ -97,6 +103,15 @@ export default class QueueWork extends BaseCommand { await CheckUpdateJob.scheduleNightly() await CheckServiceUpdatesJob.scheduleNightly() + // Safety net: log unhandled rejections instead of crashing the worker process. + // Individual job errors are already caught by BullMQ; this catches anything that + // escapes (e.g. a fire-and-forget promise in a callback that rejects unexpectedly). + process.on('unhandledRejection', (reason) => { + this.logger.error( + `Unhandled promise rejection in worker process: ${reason instanceof Error ? reason.message : String(reason)}` + ) + }) + // Graceful shutdown for all workers process.on('SIGTERM', async () => { this.logger.info('SIGTERM received. Shutting down workers...')