mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-03-28 03:29:25 +01:00
When Ollama isn't installed, every ZIM download dispatches embedding jobs that fail and retry 30x with 60s backoff. With many ZIM files downloading in parallel, this exhausts Redis connections with EPIPE/ECONNRESET errors. Two changes: 1. Don't dispatch embedding jobs when Ollama isn't installed (belt) 2. Use BullMQ UnrecoverableError for "not installed" so jobs fail immediately without retrying (suspenders) Closes #351 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
260 lines
8.5 KiB
TypeScript
260 lines
8.5 KiB
TypeScript
import { Job, UnrecoverableError } from 'bullmq'
|
|
import { QueueService } from '#services/queue_service'
|
|
import { EmbedJobWithProgress } from '../../types/rag.js'
|
|
import { RagService } from '#services/rag_service'
|
|
import { DockerService } from '#services/docker_service'
|
|
import { OllamaService } from '#services/ollama_service'
|
|
import { createHash } from 'crypto'
|
|
import logger from '@adonisjs/core/services/logger'
|
|
|
|
export interface EmbedFileJobParams {
|
|
filePath: string
|
|
fileName: string
|
|
fileSize?: number
|
|
// Batch processing for large ZIM files
|
|
batchOffset?: number // Current batch offset (for ZIM files)
|
|
totalArticles?: number // Total articles in ZIM (for progress tracking)
|
|
isFinalBatch?: boolean // Whether this is the last batch (prevents premature deletion)
|
|
}
|
|
|
|
export class EmbedFileJob {
|
|
static get queue() {
|
|
return 'file-embeddings'
|
|
}
|
|
|
|
static get key() {
|
|
return 'embed-file'
|
|
}
|
|
|
|
static getJobId(filePath: string): string {
|
|
return createHash('sha256').update(filePath).digest('hex').slice(0, 16)
|
|
}
|
|
|
|
async handle(job: Job) {
|
|
const { filePath, fileName, batchOffset, totalArticles } = job.data as EmbedFileJobParams
|
|
|
|
const isZimBatch = batchOffset !== undefined
|
|
const batchInfo = isZimBatch ? ` (batch offset: ${batchOffset})` : ''
|
|
logger.info(`[EmbedFileJob] Starting embedding process for: ${fileName}${batchInfo}`)
|
|
|
|
const dockerService = new DockerService()
|
|
const ollamaService = new OllamaService()
|
|
const ragService = new RagService(dockerService, ollamaService)
|
|
|
|
try {
|
|
// Check if Ollama and Qdrant services are installed and ready
|
|
// Use UnrecoverableError for "not installed" so BullMQ won't retry —
|
|
// retrying 30x when the service doesn't exist just wastes Redis connections
|
|
const ollamaUrl = await dockerService.getServiceURL('nomad_ollama')
|
|
if (!ollamaUrl) {
|
|
logger.warn('[EmbedFileJob] Ollama is not installed. Skipping embedding for: %s', fileName)
|
|
throw new UnrecoverableError('Ollama service is not installed. Install AI Assistant to enable file embeddings.')
|
|
}
|
|
|
|
const existingModels = await ollamaService.getModels()
|
|
if (!existingModels) {
|
|
logger.warn('[EmbedFileJob] Ollama service not ready yet. Will retry...')
|
|
throw new Error('Ollama service not ready yet')
|
|
}
|
|
|
|
const qdrantUrl = await dockerService.getServiceURL('nomad_qdrant')
|
|
if (!qdrantUrl) {
|
|
logger.warn('[EmbedFileJob] Qdrant is not installed. Skipping embedding for: %s', fileName)
|
|
throw new UnrecoverableError('Qdrant service is not installed. Install AI Assistant to enable file embeddings.')
|
|
}
|
|
|
|
logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`)
|
|
|
|
// Update progress starting
|
|
await job.updateProgress(5)
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'processing',
|
|
startedAt: job.data.startedAt || Date.now(),
|
|
})
|
|
|
|
logger.info(`[EmbedFileJob] Processing file: ${filePath}`)
|
|
|
|
// Progress callback: maps service-reported 0-100% into the 5-95% job range
|
|
const onProgress = async (percent: number) => {
|
|
await job.updateProgress(Math.min(95, Math.round(5 + percent * 0.9)))
|
|
}
|
|
|
|
// Process and embed the file
|
|
// Only allow deletion if explicitly marked as final batch
|
|
const allowDeletion = job.data.isFinalBatch === true
|
|
const result = await ragService.processAndEmbedFile(
|
|
filePath,
|
|
allowDeletion,
|
|
batchOffset,
|
|
onProgress
|
|
)
|
|
|
|
if (!result.success) {
|
|
logger.error(`[EmbedFileJob] Failed to process file ${fileName}: ${result.message}`)
|
|
throw new Error(result.message)
|
|
}
|
|
|
|
// For ZIM files with batching, check if more batches are needed
|
|
if (result.hasMoreBatches) {
|
|
const nextOffset = (batchOffset || 0) + (result.articlesProcessed || 0)
|
|
logger.info(
|
|
`[EmbedFileJob] Batch complete. Dispatching next batch at offset ${nextOffset}`
|
|
)
|
|
|
|
// Dispatch next batch (not final yet)
|
|
await EmbedFileJob.dispatch({
|
|
filePath,
|
|
fileName,
|
|
batchOffset: nextOffset,
|
|
totalArticles: totalArticles || result.totalArticles,
|
|
isFinalBatch: false, // Explicitly not final
|
|
})
|
|
|
|
// Calculate progress based on articles processed
|
|
const progress = totalArticles
|
|
? Math.round((nextOffset / totalArticles) * 100)
|
|
: 50
|
|
|
|
await job.updateProgress(progress)
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'batch_completed',
|
|
lastBatchAt: Date.now(),
|
|
chunks: (job.data.chunks || 0) + (result.chunks || 0),
|
|
})
|
|
|
|
return {
|
|
success: true,
|
|
fileName,
|
|
filePath,
|
|
chunks: result.chunks,
|
|
hasMoreBatches: true,
|
|
nextOffset,
|
|
message: `Batch embedded ${result.chunks} chunks, next batch queued`,
|
|
}
|
|
}
|
|
|
|
// Final batch or non-batched file - mark as complete
|
|
const totalChunks = (job.data.chunks || 0) + (result.chunks || 0)
|
|
await job.updateProgress(100)
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'completed',
|
|
completedAt: Date.now(),
|
|
chunks: totalChunks,
|
|
})
|
|
|
|
const batchMsg = isZimBatch ? ` (final batch, total chunks: ${totalChunks})` : ''
|
|
logger.info(
|
|
`[EmbedFileJob] Successfully embedded ${result.chunks} chunks from file: ${fileName}${batchMsg}`
|
|
)
|
|
|
|
return {
|
|
success: true,
|
|
fileName,
|
|
filePath,
|
|
chunks: result.chunks,
|
|
message: `Successfully embedded ${result.chunks} chunks`,
|
|
}
|
|
} catch (error) {
|
|
logger.error(`[EmbedFileJob] Error embedding file ${fileName}:`, error)
|
|
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'failed',
|
|
failedAt: Date.now(),
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
})
|
|
|
|
throw error
|
|
}
|
|
}
|
|
|
|
static async listActiveJobs(): Promise<EmbedJobWithProgress[]> {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobs = await queue.getJobs(['waiting', 'active', 'delayed'])
|
|
|
|
return jobs.map((job) => ({
|
|
jobId: job.id!.toString(),
|
|
fileName: (job.data as EmbedFileJobParams).fileName,
|
|
filePath: (job.data as EmbedFileJobParams).filePath,
|
|
progress: typeof job.progress === 'number' ? job.progress : 0,
|
|
status: ((job.data as any).status as string) ?? 'waiting',
|
|
}))
|
|
}
|
|
|
|
static async getByFilePath(filePath: string): Promise<Job | undefined> {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobId = this.getJobId(filePath)
|
|
return await queue.getJob(jobId)
|
|
}
|
|
|
|
static async dispatch(params: EmbedFileJobParams) {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobId = this.getJobId(params.filePath)
|
|
|
|
try {
|
|
const job = await queue.add(this.key, params, {
|
|
jobId,
|
|
attempts: 30,
|
|
backoff: {
|
|
type: 'fixed',
|
|
delay: 60000, // Check every 60 seconds for service readiness
|
|
},
|
|
removeOnComplete: { count: 50 }, // Keep last 50 completed jobs for history
|
|
removeOnFail: { count: 20 } // Keep last 20 failed jobs for debugging
|
|
})
|
|
|
|
logger.info(`[EmbedFileJob] Dispatched embedding job for file: ${params.fileName}`)
|
|
|
|
return {
|
|
job,
|
|
created: true,
|
|
jobId,
|
|
message: `File queued for embedding: ${params.fileName}`,
|
|
}
|
|
} catch (error) {
|
|
if (error.message && error.message.includes('job already exists')) {
|
|
const existing = await queue.getJob(jobId)
|
|
logger.info(`[EmbedFileJob] Job already exists for file: ${params.fileName}`)
|
|
return {
|
|
job: existing,
|
|
created: false,
|
|
jobId,
|
|
message: `Embedding job already exists for: ${params.fileName}`,
|
|
}
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
|
|
static async getStatus(filePath: string): Promise<{
|
|
exists: boolean
|
|
status?: string
|
|
progress?: number
|
|
chunks?: number
|
|
error?: string
|
|
}> {
|
|
const job = await this.getByFilePath(filePath)
|
|
|
|
if (!job) {
|
|
return { exists: false }
|
|
}
|
|
|
|
const state = await job.getState()
|
|
const data = job.data
|
|
|
|
return {
|
|
exists: true,
|
|
status: data.status || state,
|
|
progress: typeof job.progress === 'number' ? job.progress : undefined,
|
|
chunks: data.chunks,
|
|
error: data.error,
|
|
}
|
|
}
|
|
}
|