feat(RAG): display embedding queue and improve progress tracking

This commit is contained in:
Jake Turner 2026-03-04 23:23:50 +00:00
parent 61df6175c4
commit 5513a3a452
No known key found for this signature in database
GPG Key ID: 6DCBBAE4FEAB53EB
11 changed files with 167 additions and 10 deletions

View File

@ -42,6 +42,11 @@ export default class RagController {
})
}
public async getActiveJobs({ response }: HttpContext) {
const jobs = await EmbedFileJob.listActiveJobs()
return response.status(200).json(jobs)
}
public async getJobStatus({ request, response }: HttpContext) {
const reqData = await request.validateUsing(getJobStatusSchema)

View File

@ -1,5 +1,6 @@
import { Job } from 'bullmq'
import { QueueService } from '#services/queue_service'
import { EmbedJobWithProgress } from '../../types/rag.js'
import { RagService } from '#services/rag_service'
import { DockerService } from '#services/docker_service'
import { OllamaService } from '#services/ollama_service'
@ -57,7 +58,7 @@ export class EmbedFileJob {
logger.info(`[EmbedFileJob] Services ready. Processing file: ${fileName}`)
// Update progress starting
await job.updateProgress(0)
await job.updateProgress(5)
await job.updateData({
...job.data,
status: 'processing',
@ -66,13 +67,19 @@ export class EmbedFileJob {
logger.info(`[EmbedFileJob] Processing file: ${filePath}`)
// Progress callback: maps service-reported 0-100% into the 5-95% job range
const onProgress = async (percent: number) => {
await job.updateProgress(Math.min(95, Math.round(5 + percent * 0.9)))
}
// Process and embed the file
// Only allow deletion if explicitly marked as final batch
const allowDeletion = job.data.isFinalBatch === true
const result = await ragService.processAndEmbedFile(
filePath,
allowDeletion,
batchOffset
batchOffset,
onProgress
)
if (!result.success) {
@ -156,6 +163,20 @@ export class EmbedFileJob {
}
}
static async listActiveJobs(): Promise<EmbedJobWithProgress[]> {
const queueService = new QueueService()
const queue = queueService.getQueue(this.queue)
const jobs = await queue.getJobs(['waiting', 'active', 'delayed'])
return jobs.map((job) => ({
jobId: job.id!.toString(),
fileName: (job.data as EmbedFileJobParams).fileName,
filePath: (job.data as EmbedFileJobParams).filePath,
progress: typeof job.progress === 'number' ? job.progress : 0,
status: ((job.data as any).status as string) ?? 'waiting',
}))
}
static async getByFilePath(filePath: string): Promise<Job | undefined> {
const queueService = new QueueService()
const queue = queueService.getQueue(this.queue)

View File

@ -178,7 +178,8 @@ export class RagService {
public async embedAndStoreText(
text: string,
metadata: Record<string, any> = {}
metadata: Record<string, any> = {},
onProgress?: (percent: number) => Promise<void>
): Promise<{ chunks: number } | null> {
try {
await this._ensureCollection(
@ -253,6 +254,10 @@ export class RagService {
})
embeddings.push(response.embedding)
if (onProgress) {
await onProgress(((i + 1) / chunks.length) * 100)
}
}
const timestamp = Date.now()
@ -388,7 +393,8 @@ export class RagService {
private async processZIMFile(
filepath: string,
deleteAfterEmbedding: boolean,
batchOffset?: number
batchOffset?: number,
onProgress?: (percent: number) => Promise<void>
): Promise<{
success: boolean
message: string
@ -417,7 +423,8 @@ export class RagService {
// Process each chunk individually with its metadata
let totalChunks = 0
for (const zimChunk of zimChunks) {
for (let i = 0; i < zimChunks.length; i++) {
const zimChunk = zimChunks[i]
const result = await this.embedAndStoreText(zimChunk.text, {
source: filepath,
content_type: 'zim_article',
@ -450,6 +457,10 @@ export class RagService {
if (result) {
totalChunks += result.chunks
}
if (onProgress) {
await onProgress(((i + 1) / zimChunks.length) * 100)
}
}
// Count unique articles processed in this batch
@ -490,7 +501,8 @@ export class RagService {
private async embedTextAndCleanup(
extractedText: string,
filepath: string,
deleteAfterEmbedding: boolean = false
deleteAfterEmbedding: boolean = false,
onProgress?: (percent: number) => Promise<void>
): Promise<{ success: boolean; message: string; chunks?: number }> {
if (!extractedText || extractedText.trim().length === 0) {
return { success: false, message: 'Process completed succesfully, but no text was found to embed.' }
@ -498,7 +510,7 @@ export class RagService {
const embedResult = await this.embedAndStoreText(extractedText, {
source: filepath
})
}, onProgress)
if (!embedResult) {
return { success: false, message: 'Failed to embed and store the extracted text.' }
@ -526,7 +538,8 @@ export class RagService {
public async processAndEmbedFile(
filepath: string,
deleteAfterEmbedding: boolean = false,
batchOffset?: number
batchOffset?: number,
onProgress?: (percent: number) => Promise<void>
): Promise<{
success: boolean
message: string
@ -552,10 +565,12 @@ export class RagService {
// Process based on file type
// ZIM files are handled specially since they have their own embedding workflow
if (fileType === 'zim') {
return await this.processZIMFile(filepath, deleteAfterEmbedding, batchOffset)
return await this.processZIMFile(filepath, deleteAfterEmbedding, batchOffset, onProgress)
}
// Extract text based on file type
// Report ~10% when extraction begins; actual embedding progress follows via callback
if (onProgress) await onProgress(10)
let extractedText: string
switch (fileType) {
case 'image':
@ -570,8 +585,14 @@ export class RagService {
break
}
// Extraction done — scale remaining embedding progress from 15% to 100%
if (onProgress) await onProgress(15)
const scaledProgress = onProgress
? (p: number) => onProgress(15 + p * 0.85)
: undefined
// Embed extracted text and cleanup
return await this.embedTextAndCleanup(extractedText, filepath, deleteAfterEmbedding)
return await this.embedTextAndCleanup(extractedText, filepath, deleteAfterEmbedding, scaledProgress)
} catch (error) {
logger.error('[RAG] Error processing and embedding file:', error)
return { success: false, message: 'Error processing and embedding file.' }

View File

@ -0,0 +1,45 @@
import useEmbedJobs from '~/hooks/useEmbedJobs'
import HorizontalBarChart from './HorizontalBarChart'
import StyledSectionHeader from './StyledSectionHeader'
interface ActiveEmbedJobsProps {
withHeader?: boolean
}
const ActiveEmbedJobs = ({ withHeader = false }: ActiveEmbedJobsProps) => {
const { data: jobs } = useEmbedJobs()
return (
<>
{withHeader && (
<StyledSectionHeader title="Processing Queue" className="mt-12 mb-4" />
)}
<div className="space-y-4">
{jobs && jobs.length > 0 ? (
jobs.map((job) => (
<div
key={job.jobId}
className="bg-desert-white rounded-lg p-4 border border-desert-stone-light shadow-sm hover:shadow-lg transition-shadow"
>
<HorizontalBarChart
items={[
{
label: job.fileName,
value: job.progress,
total: '100%',
used: `${job.progress}%`,
type: job.status,
},
]}
/>
</div>
))
) : (
<p className="text-gray-500">No files are currently being processed</p>
)}
</div>
</>
)
}
export default ActiveEmbedJobs

View File

@ -9,6 +9,7 @@ import api from '~/lib/api'
import { IconX } from '@tabler/icons-react'
import { useModals } from '~/context/ModalContext'
import StyledModal from '../StyledModal'
import ActiveEmbedJobs from '~/components/ActiveEmbedJobs'
interface KnowledgeBaseModalProps {
aiAssistantName?: string
@ -185,6 +186,10 @@ export default function KnowledgeBaseModal({ aiAssistantName = "AI Assistant", o
</div>
</div>
</div>
<div className="my-8">
<ActiveEmbedJobs withHeader={true} />
</div>
<div className="my-12">
<div className='flex items-center justify-between mb-6'>
<StyledSectionHeader title="Stored Knowledge Base Files" className='!mb-0' />

View File

@ -0,0 +1,21 @@
import { useQuery, useQueryClient } from '@tanstack/react-query'
import api from '~/lib/api'
const useEmbedJobs = (props: { enabled?: boolean } = {}) => {
const queryClient = useQueryClient()
const queryData = useQuery({
queryKey: ['embed-jobs'],
queryFn: () => api.getActiveEmbedJobs().then((data) => data ?? []),
refetchInterval: 2000,
enabled: props.enabled ?? true,
})
const invalidate = () => {
queryClient.invalidateQueries({ queryKey: ['embed-jobs'] })
}
return { ...queryData, invalidate }
}
export default useEmbedJobs

View File

@ -4,6 +4,7 @@ import { ServiceSlim } from '../../types/services'
import { FileEntry } from '../../types/files'
import { CheckLatestVersionResult, SystemInformationResponse, SystemUpdateStatus } from '../../types/system'
import { DownloadJobWithProgress, WikipediaState } from '../../types/downloads'
import { EmbedJobWithProgress } from '../../types/rag'
import type { CategoryWithStatus, CollectionWithStatus, ContentUpdateCheckResult, ResourceUpdateInfo } from '../../types/collections'
import { catchInternal } from './util'
import { NomadOllamaModel, OllamaChatRequest } from '../../types/ollama'
@ -364,6 +365,13 @@ class API {
})()
}
async getActiveEmbedJobs(): Promise<EmbedJobWithProgress[] | undefined> {
return catchInternal(async () => {
const response = await this.client.get<EmbedJobWithProgress[]>('/rag/active-jobs')
return response.data
})()
}
async getStoredRAGFiles() {
return catchInternal(async () => {
const response = await this.client.get<{ files: string[] }>('/rag/files')

View File

@ -4379,6 +4379,7 @@
"cpu": [
"arm64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4395,6 +4396,7 @@
"cpu": [
"x64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4411,6 +4413,7 @@
"cpu": [
"arm"
],
"dev": true,
"license": "Apache-2.0",
"optional": true,
"os": [
@ -4427,6 +4430,7 @@
"cpu": [
"arm64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4443,6 +4447,7 @@
"cpu": [
"arm64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4459,6 +4464,7 @@
"cpu": [
"x64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4475,6 +4481,7 @@
"cpu": [
"x64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4491,6 +4498,7 @@
"cpu": [
"arm64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4507,6 +4515,7 @@
"cpu": [
"ia32"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [
@ -4523,6 +4532,7 @@
"cpu": [
"x64"
],
"dev": true,
"license": "Apache-2.0 AND MIT",
"optional": true,
"os": [

View File

@ -126,6 +126,7 @@ router
.group(() => {
router.post('/upload', [RagController, 'upload'])
router.get('/files', [RagController, 'getStoredFiles'])
router.get('/active-jobs', [RagController, 'getActiveJobs'])
router.get('/job-status', [RagController, 'getJobStatus'])
router.post('/sync', [RagController, 'scanAndSync'])
})

7
admin/types/rag.ts Normal file
View File

@ -0,0 +1,7 @@
export type EmbedJobWithProgress = {
jobId: string
fileName: string
filePath: string
progress: number
status: string
}

13
package-lock.json generated Normal file
View File

@ -0,0 +1,13 @@
{
"name": "project-nomad",
"version": "1.27.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "project-nomad",
"version": "1.27.0",
"license": "ISC"
}
}
}