project-nomad/admin/app/jobs/run_download_job.ts
Chris Sherwood 34076b107b fix: prevent embedding retry storm when Ollama is not installed
When Ollama isn't installed, every ZIM download dispatches embedding jobs
that fail and retry 30x with 60s backoff. With many ZIM files downloading
in parallel, this exhausts Redis connections with EPIPE/ECONNRESET errors.

Two changes:
1. Don't dispatch embedding jobs when Ollama isn't installed (belt)
2. Use BullMQ UnrecoverableError for "not installed" so jobs fail
   immediately without retrying (suspenders)

Closes #351

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 11:46:10 -07:00

155 lines
5.2 KiB
TypeScript

import { Job } from 'bullmq'
import { RunDownloadJobParams } from '../../types/downloads.js'
import { QueueService } from '#services/queue_service'
import { doResumableDownload } from '../utils/downloads.js'
import { createHash } from 'crypto'
import { DockerService } from '#services/docker_service'
import { ZimService } from '#services/zim_service'
import { MapService } from '#services/map_service'
import { EmbedFileJob } from './embed_file_job.js'
export class RunDownloadJob {
static get queue() {
return 'downloads'
}
static get key() {
return 'run-download'
}
static getJobId(url: string): string {
return createHash('sha256').update(url).digest('hex').slice(0, 16)
}
async handle(job: Job) {
const { url, filepath, timeout, allowedMimeTypes, forceNew, filetype, resourceMetadata } =
job.data as RunDownloadJobParams
await doResumableDownload({
url,
filepath,
timeout,
allowedMimeTypes,
forceNew,
onProgress(progress) {
const progressPercent = (progress.downloadedBytes / (progress.totalBytes || 1)) * 100
job.updateProgress(Math.floor(progressPercent))
},
async onComplete(url) {
try {
// Create InstalledResource entry if metadata was provided
if (resourceMetadata) {
const { default: InstalledResource } = await import('#models/installed_resource')
const { DateTime } = await import('luxon')
const { getFileStatsIfExists, deleteFileIfExists } = await import('../utils/fs.js')
const stats = await getFileStatsIfExists(filepath)
// Look up the old entry so we can clean up the previous file after updating
const oldEntry = await InstalledResource.query()
.where('resource_id', resourceMetadata.resource_id)
.where('resource_type', filetype as 'zim' | 'map')
.first()
const oldFilePath = oldEntry?.file_path ?? null
await InstalledResource.updateOrCreate(
{ resource_id: resourceMetadata.resource_id, resource_type: filetype as 'zim' | 'map' },
{
version: resourceMetadata.version,
collection_ref: resourceMetadata.collection_ref,
url: url,
file_path: filepath,
file_size_bytes: stats ? Number(stats.size) : null,
installed_at: DateTime.now(),
}
)
// Delete the old file if it differs from the new one
if (oldFilePath && oldFilePath !== filepath) {
try {
await deleteFileIfExists(oldFilePath)
console.log(`[RunDownloadJob] Deleted old file: ${oldFilePath}`)
} catch (deleteError) {
console.warn(
`[RunDownloadJob] Failed to delete old file ${oldFilePath}:`,
deleteError
)
}
}
}
if (filetype === 'zim') {
const dockerService = new DockerService()
const zimService = new ZimService(dockerService)
await zimService.downloadRemoteSuccessCallback([url], true)
// Only dispatch embedding job if AI Assistant (Ollama) is installed
const ollamaUrl = await dockerService.getServiceURL('nomad_ollama')
if (ollamaUrl) {
try {
await EmbedFileJob.dispatch({
fileName: url.split('/').pop() || '',
filePath: filepath,
})
} catch (error) {
console.error(`[RunDownloadJob] Error dispatching EmbedFileJob for URL ${url}:`, error)
}
}
} else if (filetype === 'map') {
const mapsService = new MapService()
await mapsService.downloadRemoteSuccessCallback([url], false)
}
} catch (error) {
console.error(
`[RunDownloadJob] Error in download success callback for URL ${url}:`,
error
)
}
job.updateProgress(100)
},
})
return {
url,
filepath,
}
}
static async getByUrl(url: string): Promise<Job | undefined> {
const queueService = new QueueService()
const queue = queueService.getQueue(this.queue)
const jobId = this.getJobId(url)
return await queue.getJob(jobId)
}
static async dispatch(params: RunDownloadJobParams) {
const queueService = new QueueService()
const queue = queueService.getQueue(this.queue)
const jobId = this.getJobId(params.url)
try {
const job = await queue.add(this.key, params, {
jobId,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
})
return {
job,
created: true,
message: `Dispatched download job for URL ${params.url}`,
}
} catch (error) {
if (error.message.includes('job already exists')) {
const existing = await queue.getJob(jobId)
return {
job: existing,
created: false,
message: `Job already exists for URL ${params.url}`,
}
}
throw error
}
}
}