mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-04-02 14:59:26 +02:00
fix(Jobs): improved error handling and robustness
This commit is contained in:
parent
15c11c98b9
commit
a9942f1e1b
|
|
@ -44,7 +44,9 @@ export class DownloadModelJob {
|
|||
// Services are ready, initiate the download with progress tracking
|
||||
const result = await ollamaService.downloadModel(modelName, (progressPercent) => {
|
||||
if (progressPercent) {
|
||||
job.updateProgress(Math.floor(progressPercent))
|
||||
job.updateProgress(Math.floor(progressPercent)).catch((err) => {
|
||||
if (err?.code !== -1) throw err
|
||||
})
|
||||
logger.info(
|
||||
`[DownloadModelJob] Model ${modelName}: ${progressPercent}%`
|
||||
)
|
||||
|
|
@ -56,6 +58,8 @@ export class DownloadModelJob {
|
|||
status: 'downloading',
|
||||
progress: progressPercent,
|
||||
progress_timestamp: new Date().toISOString(),
|
||||
}).catch((err) => {
|
||||
if (err?.code !== -1) throw err
|
||||
})
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,17 @@ export class EmbedFileJob {
|
|||
return createHash('sha256').update(filePath).digest('hex').slice(0, 16)
|
||||
}
|
||||
|
||||
/** Calls job.updateProgress but silently ignores "Missing key" errors (code -1),
|
||||
* which occur when the job has been removed from Redis (e.g. cancelled externally)
|
||||
* between the time the await was issued and the Redis write completed. */
|
||||
private async safeUpdateProgress(job: Job, progress: number): Promise<void> {
|
||||
try {
|
||||
await job.updateProgress(progress)
|
||||
} catch (err: any) {
|
||||
if (err?.code !== -1) throw err
|
||||
}
|
||||
}
|
||||
|
||||
async handle(job: Job) {
|
||||
const { filePath, fileName, batchOffset, totalArticles } = job.data as EmbedFileJobParams
|
||||
|
||||
|
|
@ -67,7 +78,7 @@ export class EmbedFileJob {
|
|||
logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`)
|
||||
|
||||
// Update progress starting
|
||||
await job.updateProgress(5)
|
||||
await this.safeUpdateProgress(job, 5)
|
||||
await job.updateData({
|
||||
...job.data,
|
||||
status: 'processing',
|
||||
|
|
@ -78,7 +89,7 @@ export class EmbedFileJob {
|
|||
|
||||
// Progress callback: maps service-reported 0-100% into the 5-95% job range
|
||||
const onProgress = async (percent: number) => {
|
||||
await job.updateProgress(Math.min(95, Math.round(5 + percent * 0.9)))
|
||||
await this.safeUpdateProgress(job, Math.min(95, Math.round(5 + percent * 0.9)))
|
||||
}
|
||||
|
||||
// Process and embed the file
|
||||
|
|
@ -117,7 +128,7 @@ export class EmbedFileJob {
|
|||
? Math.round((nextOffset / totalArticles) * 100)
|
||||
: 50
|
||||
|
||||
await job.updateProgress(progress)
|
||||
await this.safeUpdateProgress(job, progress)
|
||||
await job.updateData({
|
||||
...job.data,
|
||||
status: 'batch_completed',
|
||||
|
|
@ -138,7 +149,7 @@ export class EmbedFileJob {
|
|||
|
||||
// Final batch or non-batched file - mark as complete
|
||||
const totalChunks = (job.data.chunks || 0) + (result.chunks || 0)
|
||||
await job.updateProgress(100)
|
||||
await this.safeUpdateProgress(job, 100)
|
||||
await job.updateData({
|
||||
...job.data,
|
||||
status: 'completed',
|
||||
|
|
|
|||
|
|
@ -84,7 +84,11 @@ export class RunDownloadJob {
|
|||
totalBytes: progress.totalBytes,
|
||||
lastProgressTime: Date.now(),
|
||||
}
|
||||
job.updateProgress(progressData)
|
||||
job.updateProgress(progressData).catch((err) => {
|
||||
// Job was removed from Redis (e.g. cancelled) between the callback firing
|
||||
// and the Redis write completing — this is expected and safe to ignore.
|
||||
if (err?.code !== -1) throw err
|
||||
})
|
||||
lastKnownProgress = { downloadedBytes: progress.downloadedBytes, totalBytes: progress.totalBytes }
|
||||
},
|
||||
async onComplete(url) {
|
||||
|
|
@ -161,7 +165,9 @@ export class RunDownloadJob {
|
|||
downloadedBytes: lastKnownProgress.downloadedBytes,
|
||||
totalBytes: lastKnownProgress.totalBytes,
|
||||
lastProgressTime: Date.now(),
|
||||
} as DownloadProgressData)
|
||||
} as DownloadProgressData).catch((err) => {
|
||||
if (err?.code !== -1) throw err
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -66,6 +66,12 @@ export default class QueueWork extends BaseCommand {
|
|||
}
|
||||
)
|
||||
|
||||
// Required to prevent Node from treating BullMQ internal errors as unhandled
|
||||
// EventEmitter errors that crash the process.
|
||||
worker.on('error', (err) => {
|
||||
this.logger.error(`[${queueName}] Worker error: ${err.message}`)
|
||||
})
|
||||
|
||||
worker.on('failed', async (job, err) => {
|
||||
this.logger.error(`[${queueName}] Job failed: ${job?.id}, Error: ${err.message}`)
|
||||
|
||||
|
|
@ -97,6 +103,15 @@ export default class QueueWork extends BaseCommand {
|
|||
await CheckUpdateJob.scheduleNightly()
|
||||
await CheckServiceUpdatesJob.scheduleNightly()
|
||||
|
||||
// Safety net: log unhandled rejections instead of crashing the worker process.
|
||||
// Individual job errors are already caught by BullMQ; this catches anything that
|
||||
// escapes (e.g. a fire-and-forget promise in a callback that rejects unexpectedly).
|
||||
process.on('unhandledRejection', (reason) => {
|
||||
this.logger.error(
|
||||
`Unhandled promise rejection in worker process: ${reason instanceof Error ? reason.message : String(reason)}`
|
||||
)
|
||||
})
|
||||
|
||||
// Graceful shutdown for all workers
|
||||
process.on('SIGTERM', async () => {
|
||||
this.logger.info('SIGTERM received. Shutting down workers...')
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user