fix(Jobs): improved error handling and robustness

This commit is contained in:
Jake Turner 2026-04-02 02:49:33 +00:00 committed by Jake Turner
parent ed4c2afc73
commit 7ea52c2c4f
4 changed files with 43 additions and 7 deletions

View File

@ -44,7 +44,9 @@ export class DownloadModelJob {
// Services are ready, initiate the download with progress tracking // Services are ready, initiate the download with progress tracking
const result = await ollamaService.downloadModel(modelName, (progressPercent) => { const result = await ollamaService.downloadModel(modelName, (progressPercent) => {
if (progressPercent) { if (progressPercent) {
job.updateProgress(Math.floor(progressPercent)) job.updateProgress(Math.floor(progressPercent)).catch((err) => {
if (err?.code !== -1) throw err
})
logger.info( logger.info(
`[DownloadModelJob] Model ${modelName}: ${progressPercent}%` `[DownloadModelJob] Model ${modelName}: ${progressPercent}%`
) )
@ -56,6 +58,8 @@ export class DownloadModelJob {
status: 'downloading', status: 'downloading',
progress: progressPercent, progress: progressPercent,
progress_timestamp: new Date().toISOString(), progress_timestamp: new Date().toISOString(),
}).catch((err) => {
if (err?.code !== -1) throw err
}) })
}) })

View File

@ -31,6 +31,17 @@ export class EmbedFileJob {
return createHash('sha256').update(filePath).digest('hex').slice(0, 16) return createHash('sha256').update(filePath).digest('hex').slice(0, 16)
} }
/** Calls job.updateProgress but silently ignores "Missing key" errors (code -1),
* which occur when the job has been removed from Redis (e.g. cancelled externally)
* between the time the await was issued and the Redis write completed. */
private async safeUpdateProgress(job: Job, progress: number): Promise<void> {
try {
await job.updateProgress(progress)
} catch (err: any) {
if (err?.code !== -1) throw err
}
}
async handle(job: Job) { async handle(job: Job) {
const { filePath, fileName, batchOffset, totalArticles } = job.data as EmbedFileJobParams const { filePath, fileName, batchOffset, totalArticles } = job.data as EmbedFileJobParams
@ -67,7 +78,7 @@ export class EmbedFileJob {
logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`) logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`)
// Update progress starting // Update progress starting
await job.updateProgress(5) await this.safeUpdateProgress(job, 5)
await job.updateData({ await job.updateData({
...job.data, ...job.data,
status: 'processing', status: 'processing',
@ -78,7 +89,7 @@ export class EmbedFileJob {
// Progress callback: maps service-reported 0-100% into the 5-95% job range // Progress callback: maps service-reported 0-100% into the 5-95% job range
const onProgress = async (percent: number) => { const onProgress = async (percent: number) => {
await job.updateProgress(Math.min(95, Math.round(5 + percent * 0.9))) await this.safeUpdateProgress(job, Math.min(95, Math.round(5 + percent * 0.9)))
} }
// Process and embed the file // Process and embed the file
@ -117,7 +128,7 @@ export class EmbedFileJob {
? Math.round((nextOffset / totalArticles) * 100) ? Math.round((nextOffset / totalArticles) * 100)
: 50 : 50
await job.updateProgress(progress) await this.safeUpdateProgress(job, progress)
await job.updateData({ await job.updateData({
...job.data, ...job.data,
status: 'batch_completed', status: 'batch_completed',
@ -138,7 +149,7 @@ export class EmbedFileJob {
// Final batch or non-batched file - mark as complete // Final batch or non-batched file - mark as complete
const totalChunks = (job.data.chunks || 0) + (result.chunks || 0) const totalChunks = (job.data.chunks || 0) + (result.chunks || 0)
await job.updateProgress(100) await this.safeUpdateProgress(job, 100)
await job.updateData({ await job.updateData({
...job.data, ...job.data,
status: 'completed', status: 'completed',

View File

@ -84,7 +84,11 @@ export class RunDownloadJob {
totalBytes: progress.totalBytes, totalBytes: progress.totalBytes,
lastProgressTime: Date.now(), lastProgressTime: Date.now(),
} }
job.updateProgress(progressData) job.updateProgress(progressData).catch((err) => {
// Job was removed from Redis (e.g. cancelled) between the callback firing
// and the Redis write completing — this is expected and safe to ignore.
if (err?.code !== -1) throw err
})
lastKnownProgress = { downloadedBytes: progress.downloadedBytes, totalBytes: progress.totalBytes } lastKnownProgress = { downloadedBytes: progress.downloadedBytes, totalBytes: progress.totalBytes }
}, },
async onComplete(url) { async onComplete(url) {
@ -161,7 +165,9 @@ export class RunDownloadJob {
downloadedBytes: lastKnownProgress.downloadedBytes, downloadedBytes: lastKnownProgress.downloadedBytes,
totalBytes: lastKnownProgress.totalBytes, totalBytes: lastKnownProgress.totalBytes,
lastProgressTime: Date.now(), lastProgressTime: Date.now(),
} as DownloadProgressData) } as DownloadProgressData).catch((err) => {
if (err?.code !== -1) throw err
})
}, },
}) })

View File

@ -66,6 +66,12 @@ export default class QueueWork extends BaseCommand {
} }
) )
// Required to prevent Node from treating BullMQ internal errors as unhandled
// EventEmitter errors that crash the process.
worker.on('error', (err) => {
this.logger.error(`[${queueName}] Worker error: ${err.message}`)
})
worker.on('failed', async (job, err) => { worker.on('failed', async (job, err) => {
this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`) this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`)
@ -97,6 +103,15 @@ export default class QueueWork extends BaseCommand {
await CheckUpdateJob.scheduleNightly() await CheckUpdateJob.scheduleNightly()
await CheckServiceUpdatesJob.scheduleNightly() await CheckServiceUpdatesJob.scheduleNightly()
// Safety net: log unhandled rejections instead of crashing the worker process.
// Individual job errors are already caught by BullMQ; this catches anything that
// escapes (e.g. a fire-and-forget promise in a callback that rejects unexpectedly).
process.on('unhandledRejection', (reason) => {
this.logger.error(
`Unhandled promise rejection in worker process: ${reason instanceof Error ? reason.message : String(reason)}`
)
})
// Graceful shutdown for all workers // Graceful shutdown for all workers
process.on('SIGTERM', async () => { process.on('SIGTERM', async () => {
this.logger.info('SIGTERM received. Shutting down workers...') this.logger.info('SIGTERM received. Shutting down workers...')