mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-03-28 03:29:25 +01:00
When Ollama isn't installed, every ZIM download dispatches embedding jobs that fail and retry 30x with 60s backoff. With many ZIM files downloading in parallel, this exhausts Redis connections with EPIPE/ECONNRESET errors. Two changes: 1. Don't dispatch embedding jobs when Ollama isn't installed (belt) 2. Use BullMQ UnrecoverableError for "not installed" so jobs fail immediately without retrying (suspenders) Closes #351 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
155 lines
5.2 KiB
TypeScript
155 lines
5.2 KiB
TypeScript
import { Job } from 'bullmq'
|
|
import { RunDownloadJobParams } from '../../types/downloads.js'
|
|
import { QueueService } from '#services/queue_service'
|
|
import { doResumableDownload } from '../utils/downloads.js'
|
|
import { createHash } from 'crypto'
|
|
import { DockerService } from '#services/docker_service'
|
|
import { ZimService } from '#services/zim_service'
|
|
import { MapService } from '#services/map_service'
|
|
import { EmbedFileJob } from './embed_file_job.js'
|
|
|
|
export class RunDownloadJob {
|
|
static get queue() {
|
|
return 'downloads'
|
|
}
|
|
|
|
static get key() {
|
|
return 'run-download'
|
|
}
|
|
|
|
static getJobId(url: string): string {
|
|
return createHash('sha256').update(url).digest('hex').slice(0, 16)
|
|
}
|
|
|
|
async handle(job: Job) {
|
|
const { url, filepath, timeout, allowedMimeTypes, forceNew, filetype, resourceMetadata } =
|
|
job.data as RunDownloadJobParams
|
|
|
|
await doResumableDownload({
|
|
url,
|
|
filepath,
|
|
timeout,
|
|
allowedMimeTypes,
|
|
forceNew,
|
|
onProgress(progress) {
|
|
const progressPercent = (progress.downloadedBytes / (progress.totalBytes || 1)) * 100
|
|
job.updateProgress(Math.floor(progressPercent))
|
|
},
|
|
async onComplete(url) {
|
|
try {
|
|
// Create InstalledResource entry if metadata was provided
|
|
if (resourceMetadata) {
|
|
const { default: InstalledResource } = await import('#models/installed_resource')
|
|
const { DateTime } = await import('luxon')
|
|
const { getFileStatsIfExists, deleteFileIfExists } = await import('../utils/fs.js')
|
|
const stats = await getFileStatsIfExists(filepath)
|
|
|
|
// Look up the old entry so we can clean up the previous file after updating
|
|
const oldEntry = await InstalledResource.query()
|
|
.where('resource_id', resourceMetadata.resource_id)
|
|
.where('resource_type', filetype as 'zim' | 'map')
|
|
.first()
|
|
const oldFilePath = oldEntry?.file_path ?? null
|
|
|
|
await InstalledResource.updateOrCreate(
|
|
{ resource_id: resourceMetadata.resource_id, resource_type: filetype as 'zim' | 'map' },
|
|
{
|
|
version: resourceMetadata.version,
|
|
collection_ref: resourceMetadata.collection_ref,
|
|
url: url,
|
|
file_path: filepath,
|
|
file_size_bytes: stats ? Number(stats.size) : null,
|
|
installed_at: DateTime.now(),
|
|
}
|
|
)
|
|
|
|
// Delete the old file if it differs from the new one
|
|
if (oldFilePath && oldFilePath !== filepath) {
|
|
try {
|
|
await deleteFileIfExists(oldFilePath)
|
|
console.log(`[RunDownloadJob] Deleted old file: ${oldFilePath}`)
|
|
} catch (deleteError) {
|
|
console.warn(
|
|
`[RunDownloadJob] Failed to delete old file ${oldFilePath}:`,
|
|
deleteError
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
if (filetype === 'zim') {
|
|
const dockerService = new DockerService()
|
|
const zimService = new ZimService(dockerService)
|
|
await zimService.downloadRemoteSuccessCallback([url], true)
|
|
|
|
// Only dispatch embedding job if AI Assistant (Ollama) is installed
|
|
const ollamaUrl = await dockerService.getServiceURL('nomad_ollama')
|
|
if (ollamaUrl) {
|
|
try {
|
|
await EmbedFileJob.dispatch({
|
|
fileName: url.split('/').pop() || '',
|
|
filePath: filepath,
|
|
})
|
|
} catch (error) {
|
|
console.error(`[RunDownloadJob] Error dispatching EmbedFileJob for URL ${url}:`, error)
|
|
}
|
|
}
|
|
} else if (filetype === 'map') {
|
|
const mapsService = new MapService()
|
|
await mapsService.downloadRemoteSuccessCallback([url], false)
|
|
}
|
|
} catch (error) {
|
|
console.error(
|
|
`[RunDownloadJob] Error in download success callback for URL ${url}:`,
|
|
error
|
|
)
|
|
}
|
|
job.updateProgress(100)
|
|
},
|
|
})
|
|
|
|
return {
|
|
url,
|
|
filepath,
|
|
}
|
|
}
|
|
|
|
static async getByUrl(url: string): Promise<Job | undefined> {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobId = this.getJobId(url)
|
|
return await queue.getJob(jobId)
|
|
}
|
|
|
|
static async dispatch(params: RunDownloadJobParams) {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobId = this.getJobId(params.url)
|
|
|
|
try {
|
|
const job = await queue.add(this.key, params, {
|
|
jobId,
|
|
attempts: 3,
|
|
backoff: { type: 'exponential', delay: 2000 },
|
|
removeOnComplete: true,
|
|
})
|
|
|
|
return {
|
|
job,
|
|
created: true,
|
|
message: `Dispatched download job for URL ${params.url}`,
|
|
}
|
|
} catch (error) {
|
|
if (error.message.includes('job already exists')) {
|
|
const existing = await queue.getJob(jobId)
|
|
return {
|
|
job: existing,
|
|
created: false,
|
|
message: `Job already exists for URL ${params.url}`,
|
|
}
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
}
|