mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-04-03 23:36:17 +02:00
Added a cleanup failed button for Processing Queue in the Knowledge Base since documents that fail to process tend to get stuck and then can't be cleared. Fixed the ingestion of documents for OpenAI servers. Updated some text in the chat and chat settings since user will need to manually download models when using a non-ollama remote gpu server.
307 lines
10 KiB
TypeScript
307 lines
10 KiB
TypeScript
import { Job, UnrecoverableError } from 'bullmq'
|
|
import { QueueService } from '#services/queue_service'
|
|
import { EmbedJobWithProgress } from '../../types/rag.js'
|
|
import { RagService } from '#services/rag_service'
|
|
import { DockerService } from '#services/docker_service'
|
|
import { OllamaService } from '#services/ollama_service'
|
|
import { createHash } from 'crypto'
|
|
import logger from '@adonisjs/core/services/logger'
|
|
import fs from 'node:fs/promises'
|
|
|
|
export interface EmbedFileJobParams {
|
|
filePath: string
|
|
fileName: string
|
|
fileSize?: number
|
|
// Batch processing for large ZIM files
|
|
batchOffset?: number // Current batch offset (for ZIM files)
|
|
totalArticles?: number // Total articles in ZIM (for progress tracking)
|
|
isFinalBatch?: boolean // Whether this is the last batch (prevents premature deletion)
|
|
}
|
|
|
|
export class EmbedFileJob {
|
|
static get queue() {
|
|
return 'file-embeddings'
|
|
}
|
|
|
|
static get key() {
|
|
return 'embed-file'
|
|
}
|
|
|
|
static getJobId(filePath: string): string {
|
|
return createHash('sha256').update(filePath).digest('hex').slice(0, 16)
|
|
}
|
|
|
|
async handle(job: Job) {
|
|
const { filePath, fileName, batchOffset, totalArticles } = job.data as EmbedFileJobParams
|
|
|
|
const isZimBatch = batchOffset !== undefined
|
|
const batchInfo = isZimBatch ? ` (batch offset: ${batchOffset})` : ''
|
|
logger.info(`[EmbedFileJob] Starting embedding process for: ${fileName}${batchInfo}`)
|
|
|
|
const dockerService = new DockerService()
|
|
const ollamaService = new OllamaService()
|
|
const ragService = new RagService(dockerService, ollamaService)
|
|
|
|
try {
|
|
// Check if Ollama and Qdrant services are installed and ready
|
|
// Use UnrecoverableError for "not installed" so BullMQ won't retry —
|
|
// retrying 30x when the service doesn't exist just wastes Redis connections
|
|
const ollamaUrl = await dockerService.getServiceURL('nomad_ollama')
|
|
if (!ollamaUrl) {
|
|
logger.warn('[EmbedFileJob] Ollama is not installed. Skipping embedding for: %s', fileName)
|
|
throw new UnrecoverableError('Ollama service is not installed. Install AI Assistant to enable file embeddings.')
|
|
}
|
|
|
|
const existingModels = await ollamaService.getModels()
|
|
if (!existingModels) {
|
|
logger.warn('[EmbedFileJob] Ollama service not ready yet. Will retry...')
|
|
throw new Error('Ollama service not ready yet')
|
|
}
|
|
|
|
const qdrantUrl = await dockerService.getServiceURL('nomad_qdrant')
|
|
if (!qdrantUrl) {
|
|
logger.warn('[EmbedFileJob] Qdrant is not installed. Skipping embedding for: %s', fileName)
|
|
throw new UnrecoverableError('Qdrant service is not installed. Install AI Assistant to enable file embeddings.')
|
|
}
|
|
|
|
logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`)
|
|
|
|
// Update progress starting
|
|
await job.updateProgress(5)
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'processing',
|
|
startedAt: job.data.startedAt || Date.now(),
|
|
})
|
|
|
|
logger.info(`[EmbedFileJob] Processing file: ${filePath}`)
|
|
|
|
// Progress callback: maps service-reported 0-100% into the 5-95% job range
|
|
const onProgress = async (percent: number) => {
|
|
await job.updateProgress(Math.min(95, Math.round(5 + percent * 0.9)))
|
|
}
|
|
|
|
// Process and embed the file
|
|
// Only allow deletion if explicitly marked as final batch
|
|
const allowDeletion = job.data.isFinalBatch === true
|
|
const result = await ragService.processAndEmbedFile(
|
|
filePath,
|
|
allowDeletion,
|
|
batchOffset,
|
|
onProgress
|
|
)
|
|
|
|
if (!result.success) {
|
|
logger.error(`[EmbedFileJob] Failed to process file ${fileName}: ${result.message}`)
|
|
throw new Error(result.message)
|
|
}
|
|
|
|
// For ZIM files with batching, check if more batches are needed
|
|
if (result.hasMoreBatches) {
|
|
const nextOffset = (batchOffset || 0) + (result.articlesProcessed || 0)
|
|
logger.info(
|
|
`[EmbedFileJob] Batch complete. Dispatching next batch at offset ${nextOffset}`
|
|
)
|
|
|
|
// Dispatch next batch (not final yet)
|
|
await EmbedFileJob.dispatch({
|
|
filePath,
|
|
fileName,
|
|
batchOffset: nextOffset,
|
|
totalArticles: totalArticles || result.totalArticles,
|
|
isFinalBatch: false, // Explicitly not final
|
|
})
|
|
|
|
// Calculate progress based on articles processed
|
|
const progress = totalArticles
|
|
? Math.round((nextOffset / totalArticles) * 100)
|
|
: 50
|
|
|
|
await job.updateProgress(progress)
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'batch_completed',
|
|
lastBatchAt: Date.now(),
|
|
chunks: (job.data.chunks || 0) + (result.chunks || 0),
|
|
})
|
|
|
|
return {
|
|
success: true,
|
|
fileName,
|
|
filePath,
|
|
chunks: result.chunks,
|
|
hasMoreBatches: true,
|
|
nextOffset,
|
|
message: `Batch embedded ${result.chunks} chunks, next batch queued`,
|
|
}
|
|
}
|
|
|
|
// Final batch or non-batched file - mark as complete
|
|
const totalChunks = (job.data.chunks || 0) + (result.chunks || 0)
|
|
await job.updateProgress(100)
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'completed',
|
|
completedAt: Date.now(),
|
|
chunks: totalChunks,
|
|
})
|
|
|
|
const batchMsg = isZimBatch ? ` (final batch, total chunks: ${totalChunks})` : ''
|
|
logger.info(
|
|
`[EmbedFileJob] Successfully embedded ${result.chunks} chunks from file: ${fileName}${batchMsg}`
|
|
)
|
|
|
|
return {
|
|
success: true,
|
|
fileName,
|
|
filePath,
|
|
chunks: result.chunks,
|
|
message: `Successfully embedded ${result.chunks} chunks`,
|
|
}
|
|
} catch (error) {
|
|
logger.error(`[EmbedFileJob] Error embedding file ${fileName}:`, error)
|
|
|
|
await job.updateData({
|
|
...job.data,
|
|
status: 'failed',
|
|
failedAt: Date.now(),
|
|
error: error instanceof Error ? error.message : 'Unknown error',
|
|
})
|
|
|
|
throw error
|
|
}
|
|
}
|
|
|
|
static async listActiveJobs(): Promise<EmbedJobWithProgress[]> {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobs = await queue.getJobs(['waiting', 'active', 'delayed'])
|
|
|
|
return jobs.map((job) => ({
|
|
jobId: job.id!.toString(),
|
|
fileName: (job.data as EmbedFileJobParams).fileName,
|
|
filePath: (job.data as EmbedFileJobParams).filePath,
|
|
progress: typeof job.progress === 'number' ? job.progress : 0,
|
|
status: ((job.data as any).status as string) ?? 'waiting',
|
|
}))
|
|
}
|
|
|
|
static async getByFilePath(filePath: string): Promise<Job | undefined> {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobId = this.getJobId(filePath)
|
|
return await queue.getJob(jobId)
|
|
}
|
|
|
|
static async dispatch(params: EmbedFileJobParams) {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const jobId = this.getJobId(params.filePath)
|
|
|
|
try {
|
|
const job = await queue.add(this.key, params, {
|
|
jobId,
|
|
attempts: 30,
|
|
backoff: {
|
|
type: 'fixed',
|
|
delay: 60000, // Check every 60 seconds for service readiness
|
|
},
|
|
removeOnComplete: { count: 50 }, // Keep last 50 completed jobs for history
|
|
removeOnFail: { count: 20 } // Keep last 20 failed jobs for debugging
|
|
})
|
|
|
|
logger.info(`[EmbedFileJob] Dispatched embedding job for file: ${params.fileName}`)
|
|
|
|
return {
|
|
job,
|
|
created: true,
|
|
jobId,
|
|
message: `File queued for embedding: ${params.fileName}`,
|
|
}
|
|
} catch (error) {
|
|
if (error.message && error.message.includes('job already exists')) {
|
|
const existing = await queue.getJob(jobId)
|
|
logger.info(`[EmbedFileJob] Job already exists for file: ${params.fileName}`)
|
|
return {
|
|
job: existing,
|
|
created: false,
|
|
jobId,
|
|
message: `Embedding job already exists for: ${params.fileName}`,
|
|
}
|
|
}
|
|
throw error
|
|
}
|
|
}
|
|
|
|
static async listFailedJobs(): Promise<EmbedJobWithProgress[]> {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
// Jobs that have failed at least once are in 'delayed' (retrying) or terminal 'failed' state.
|
|
// We identify them by job.data.status === 'failed' set in the catch block of handle().
|
|
const jobs = await queue.getJobs(['waiting', 'delayed', 'failed'])
|
|
|
|
return jobs
|
|
.filter((job) => (job.data as any).status === 'failed')
|
|
.map((job) => ({
|
|
jobId: job.id!.toString(),
|
|
fileName: (job.data as EmbedFileJobParams).fileName,
|
|
filePath: (job.data as EmbedFileJobParams).filePath,
|
|
progress: 0,
|
|
status: 'failed',
|
|
error: (job.data as any).error,
|
|
}))
|
|
}
|
|
|
|
static async cleanupFailedJobs(): Promise<{ cleaned: number; filesDeleted: number }> {
|
|
const queueService = new QueueService()
|
|
const queue = queueService.getQueue(this.queue)
|
|
const allJobs = await queue.getJobs(['waiting', 'delayed', 'failed'])
|
|
const failedJobs = allJobs.filter((job) => (job.data as any).status === 'failed')
|
|
|
|
let cleaned = 0
|
|
let filesDeleted = 0
|
|
|
|
for (const job of failedJobs) {
|
|
const filePath = (job.data as EmbedFileJobParams).filePath
|
|
if (filePath && filePath.includes(RagService.UPLOADS_STORAGE_PATH)) {
|
|
try {
|
|
await fs.unlink(filePath)
|
|
filesDeleted++
|
|
} catch {
|
|
// File may already be deleted — that's fine
|
|
}
|
|
}
|
|
await job.remove()
|
|
cleaned++
|
|
}
|
|
|
|
logger.info(`[EmbedFileJob] Cleaned up ${cleaned} failed jobs, deleted ${filesDeleted} files`)
|
|
return { cleaned, filesDeleted }
|
|
}
|
|
|
|
static async getStatus(filePath: string): Promise<{
|
|
exists: boolean
|
|
status?: string
|
|
progress?: number
|
|
chunks?: number
|
|
error?: string
|
|
}> {
|
|
const job = await this.getByFilePath(filePath)
|
|
|
|
if (!job) {
|
|
return { exists: false }
|
|
}
|
|
|
|
const state = await job.getState()
|
|
const data = job.data
|
|
|
|
return {
|
|
exists: true,
|
|
status: data.status || state,
|
|
progress: typeof job.progress === 'number' ? job.progress : undefined,
|
|
chunks: data.chunks,
|
|
error: data.error,
|
|
}
|
|
}
|
|
}
|