diff --git a/admin/app/controllers/rag_controller.ts b/admin/app/controllers/rag_controller.ts index 4d3111b..9af82f2 100644 --- a/admin/app/controllers/rag_controller.ts +++ b/admin/app/controllers/rag_controller.ts @@ -42,6 +42,11 @@ export default class RagController { }) } + public async getActiveJobs({ response }: HttpContext) { + const jobs = await EmbedFileJob.listActiveJobs() + return response.status(200).json(jobs) + } + public async getJobStatus({ request, response }: HttpContext) { const reqData = await request.validateUsing(getJobStatusSchema) diff --git a/admin/app/jobs/embed_file_job.ts b/admin/app/jobs/embed_file_job.ts index 24dd289..0c59b32 100644 --- a/admin/app/jobs/embed_file_job.ts +++ b/admin/app/jobs/embed_file_job.ts @@ -1,5 +1,6 @@ import { Job } from 'bullmq' import { QueueService } from '#services/queue_service' +import { EmbedJobWithProgress } from '../../types/rag.js' import { RagService } from '#services/rag_service' import { DockerService } from '#services/docker_service' import { OllamaService } from '#services/ollama_service' @@ -57,7 +58,7 @@ export class EmbedFileJob { logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`) // Update progress starting - await job.updateProgress(0) + await job.updateProgress(5) await job.updateData({ ...job.data, status: 'processing', @@ -66,13 +67,19 @@ export class EmbedFileJob { logger.info(`[EmbedFileJob] Processing file: ${filePath}`) + // Progress callback: maps service-reported 0-100% into the 5-95% job range + const onProgress = async (percent: number) => { + await job.updateProgress(Math.min(95, Math.round(5 + percent * 0.9))) + } + // Process and embed the file // Only allow deletion if explicitly marked as final batch const allowDeletion = job.data.isFinalBatch === true const result = await ragService.processAndEmbedFile( filePath, allowDeletion, - batchOffset + batchOffset, + onProgress ) if (!result.success) { @@ -156,6 +163,20 @@ export class EmbedFileJob { } } + static async listActiveJobs(): Promise { + const queueService = new QueueService() + const queue = queueService.getQueue(this.queue) + const jobs = await queue.getJobs(['waiting', 'active', 'delayed']) + + return jobs.map((job) => ({ + jobId: job.id!.toString(), + fileName: (job.data as EmbedFileJobParams).fileName, + filePath: (job.data as EmbedFileJobParams).filePath, + progress: typeof job.progress === 'number' ? job.progress : 0, + status: ((job.data as any).status as string) ?? 'waiting', + })) + } + static async getByFilePath(filePath: string): Promise { const queueService = new QueueService() const queue = queueService.getQueue(this.queue) diff --git a/admin/app/services/rag_service.ts b/admin/app/services/rag_service.ts index 302cbbd..967234e 100644 --- a/admin/app/services/rag_service.ts +++ b/admin/app/services/rag_service.ts @@ -178,7 +178,8 @@ export class RagService { public async embedAndStoreText( text: string, - metadata: Record = {} + metadata: Record = {}, + onProgress?: (percent: number) => Promise ): Promise<{ chunks: number } | null> { try { await this._ensureCollection( @@ -253,6 +254,10 @@ export class RagService { }) embeddings.push(response.embedding) + + if (onProgress) { + await onProgress(((i + 1) / chunks.length) * 100) + } } const timestamp = Date.now() @@ -388,7 +393,8 @@ export class RagService { private async processZIMFile( filepath: string, deleteAfterEmbedding: boolean, - batchOffset?: number + batchOffset?: number, + onProgress?: (percent: number) => Promise ): Promise<{ success: boolean message: string @@ -417,7 +423,8 @@ export class RagService { // Process each chunk individually with its metadata let totalChunks = 0 - for (const zimChunk of zimChunks) { + for (let i = 0; i < zimChunks.length; i++) { + const zimChunk = zimChunks[i] const result = await this.embedAndStoreText(zimChunk.text, { source: filepath, content_type: 'zim_article', @@ -450,6 +457,10 @@ export class RagService { if (result) { totalChunks += result.chunks } + + if (onProgress) { + await onProgress(((i + 1) / zimChunks.length) * 100) + } } // Count unique articles processed in this batch @@ -490,7 +501,8 @@ export class RagService { private async embedTextAndCleanup( extractedText: string, filepath: string, - deleteAfterEmbedding: boolean = false + deleteAfterEmbedding: boolean = false, + onProgress?: (percent: number) => Promise ): Promise<{ success: boolean; message: string; chunks?: number }> { if (!extractedText || extractedText.trim().length === 0) { return { success: false, message: 'Process completed succesfully, but no text was found to embed.' } @@ -498,7 +510,7 @@ export class RagService { const embedResult = await this.embedAndStoreText(extractedText, { source: filepath - }) + }, onProgress) if (!embedResult) { return { success: false, message: 'Failed to embed and store the extracted text.' } @@ -526,7 +538,8 @@ export class RagService { public async processAndEmbedFile( filepath: string, deleteAfterEmbedding: boolean = false, - batchOffset?: number + batchOffset?: number, + onProgress?: (percent: number) => Promise ): Promise<{ success: boolean message: string @@ -552,10 +565,12 @@ export class RagService { // Process based on file type // ZIM files are handled specially since they have their own embedding workflow if (fileType === 'zim') { - return await this.processZIMFile(filepath, deleteAfterEmbedding, batchOffset) + return await this.processZIMFile(filepath, deleteAfterEmbedding, batchOffset, onProgress) } // Extract text based on file type + // Report ~10% when extraction begins; actual embedding progress follows via callback + if (onProgress) await onProgress(10) let extractedText: string switch (fileType) { case 'image': @@ -570,8 +585,14 @@ export class RagService { break } + // Extraction done — scale remaining embedding progress from 15% to 100% + if (onProgress) await onProgress(15) + const scaledProgress = onProgress + ? (p: number) => onProgress(15 + p * 0.85) + : undefined + // Embed extracted text and cleanup - return await this.embedTextAndCleanup(extractedText, filepath, deleteAfterEmbedding) + return await this.embedTextAndCleanup(extractedText, filepath, deleteAfterEmbedding, scaledProgress) } catch (error) { logger.error('[RAG] Error processing and embedding file:', error) return { success: false, message: 'Error processing and embedding file.' } diff --git a/admin/inertia/components/ActiveEmbedJobs.tsx b/admin/inertia/components/ActiveEmbedJobs.tsx new file mode 100644 index 0000000..5e6914e --- /dev/null +++ b/admin/inertia/components/ActiveEmbedJobs.tsx @@ -0,0 +1,45 @@ +import useEmbedJobs from '~/hooks/useEmbedJobs' +import HorizontalBarChart from './HorizontalBarChart' +import StyledSectionHeader from './StyledSectionHeader' + +interface ActiveEmbedJobsProps { + withHeader?: boolean +} + +const ActiveEmbedJobs = ({ withHeader = false }: ActiveEmbedJobsProps) => { + const { data: jobs } = useEmbedJobs() + + return ( + <> + {withHeader && ( + + )} +
+ {jobs && jobs.length > 0 ? ( + jobs.map((job) => ( +
+ +
+ )) + ) : ( +

No files are currently being processed

+ )} +
+ + ) +} + +export default ActiveEmbedJobs diff --git a/admin/inertia/components/chat/KnowledgeBaseModal.tsx b/admin/inertia/components/chat/KnowledgeBaseModal.tsx index 195fcb1..b9b2b31 100644 --- a/admin/inertia/components/chat/KnowledgeBaseModal.tsx +++ b/admin/inertia/components/chat/KnowledgeBaseModal.tsx @@ -9,6 +9,7 @@ import api from '~/lib/api' import { IconX } from '@tabler/icons-react' import { useModals } from '~/context/ModalContext' import StyledModal from '../StyledModal' +import ActiveEmbedJobs from '~/components/ActiveEmbedJobs' interface KnowledgeBaseModalProps { aiAssistantName?: string @@ -185,6 +186,10 @@ export default function KnowledgeBaseModal({ aiAssistantName = "AI Assistant", o +
+ +
+
diff --git a/admin/inertia/hooks/useEmbedJobs.ts b/admin/inertia/hooks/useEmbedJobs.ts new file mode 100644 index 0000000..c5a577a --- /dev/null +++ b/admin/inertia/hooks/useEmbedJobs.ts @@ -0,0 +1,21 @@ +import { useQuery, useQueryClient } from '@tanstack/react-query' +import api from '~/lib/api' + +const useEmbedJobs = (props: { enabled?: boolean } = {}) => { + const queryClient = useQueryClient() + + const queryData = useQuery({ + queryKey: ['embed-jobs'], + queryFn: () => api.getActiveEmbedJobs().then((data) => data ?? []), + refetchInterval: 2000, + enabled: props.enabled ?? true, + }) + + const invalidate = () => { + queryClient.invalidateQueries({ queryKey: ['embed-jobs'] }) + } + + return { ...queryData, invalidate } +} + +export default useEmbedJobs diff --git a/admin/inertia/lib/api.ts b/admin/inertia/lib/api.ts index 8964a40..f25fffc 100644 --- a/admin/inertia/lib/api.ts +++ b/admin/inertia/lib/api.ts @@ -4,6 +4,7 @@ import { ServiceSlim } from '../../types/services' import { FileEntry } from '../../types/files' import { CheckLatestVersionResult, SystemInformationResponse, SystemUpdateStatus } from '../../types/system' import { DownloadJobWithProgress, WikipediaState } from '../../types/downloads' +import { EmbedJobWithProgress } from '../../types/rag' import type { CategoryWithStatus, CollectionWithStatus, ContentUpdateCheckResult, ResourceUpdateInfo } from '../../types/collections' import { catchInternal } from './util' import { NomadOllamaModel, OllamaChatRequest } from '../../types/ollama' @@ -364,6 +365,13 @@ class API { })() } + async getActiveEmbedJobs(): Promise { + return catchInternal(async () => { + const response = await this.client.get('/rag/active-jobs') + return response.data + })() + } + async getStoredRAGFiles() { return catchInternal(async () => { const response = await this.client.get<{ files: string[] }>('/rag/files') diff --git a/admin/package-lock.json b/admin/package-lock.json index e95764f..c50a5b8 100644 --- a/admin/package-lock.json +++ b/admin/package-lock.json @@ -4379,6 +4379,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4395,6 +4396,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4411,6 +4413,7 @@ "cpu": [ "arm" ], + "dev": true, "license": "Apache-2.0", "optional": true, "os": [ @@ -4427,6 +4430,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4443,6 +4447,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4459,6 +4464,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4475,6 +4481,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4491,6 +4498,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4507,6 +4515,7 @@ "cpu": [ "ia32" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ @@ -4523,6 +4532,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "Apache-2.0 AND MIT", "optional": true, "os": [ diff --git a/admin/start/routes.ts b/admin/start/routes.ts index dc9e501..325a1be 100644 --- a/admin/start/routes.ts +++ b/admin/start/routes.ts @@ -126,6 +126,7 @@ router .group(() => { router.post('/upload', [RagController, 'upload']) router.get('/files', [RagController, 'getStoredFiles']) + router.get('/active-jobs', [RagController, 'getActiveJobs']) router.get('/job-status', [RagController, 'getJobStatus']) router.post('/sync', [RagController, 'scanAndSync']) }) diff --git a/admin/types/rag.ts b/admin/types/rag.ts new file mode 100644 index 0000000..f44dca9 --- /dev/null +++ b/admin/types/rag.ts @@ -0,0 +1,7 @@ +export type EmbedJobWithProgress = { + jobId: string + fileName: string + filePath: string + progress: number + status: string +} diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..1ee61ec --- /dev/null +++ b/package-lock.json @@ -0,0 +1,13 @@ +{ + "name": "project-nomad", + "version": "1.27.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "project-nomad", + "version": "1.27.0", + "license": "ISC" + } + } +}