From 4747863702c517d1d819eadd6978b6a7bb55af58 Mon Sep 17 00:00:00 2001 From: Jake Turner Date: Mon, 9 Feb 2026 15:15:38 -0800 Subject: [PATCH] feat(AI Assistant): allow manual scan and resync KB --- admin/app/controllers/ollama_controller.ts | 5 +- admin/app/controllers/rag_controller.ts | 11 +- admin/app/services/rag_service.ts | 153 +++++++++++++++++- admin/config/logger.ts | 2 +- .../components/chat/KnowledgeBaseModal.tsx | 58 ++++++- admin/inertia/lib/api.ts | 12 ++ admin/start/routes.ts | 1 + 7 files changed, 233 insertions(+), 9 deletions(-) diff --git a/admin/app/controllers/ollama_controller.ts b/admin/app/controllers/ollama_controller.ts index 723759d..e77ed56 100644 --- a/admin/app/controllers/ollama_controller.ts +++ b/admin/app/controllers/ollama_controller.ts @@ -27,14 +27,14 @@ export default class OllamaController { async chat({ request }: HttpContext) { const reqData = await request.validateUsing(chatSchema) - // If there are no system messages in the chat - // (i.e. first message from the user) inject system prompts + // If there are no system messages in the chat inject system prompts const hasSystemMessage = reqData.messages.some((msg) => msg.role === 'system') if (!hasSystemMessage) { const systemPrompt = { role: 'system' as const, content: SYSTEM_PROMPTS.default, } + logger.debug('[OllamaController] Injecting system prompt') reqData.messages.unshift(systemPrompt) } @@ -45,6 +45,7 @@ export default class OllamaController { reqData.model ) + logger.debug(`[OllamaController] Rewritten query for RAG: "${rewrittenQuery}"`) if (rewrittenQuery) { const relevantDocs = await this.ragService.searchSimilarDocuments( rewrittenQuery, diff --git a/admin/app/controllers/rag_controller.ts b/admin/app/controllers/rag_controller.ts index 0a25336..4d3111b 100644 --- a/admin/app/controllers/rag_controller.ts +++ b/admin/app/controllers/rag_controller.ts @@ -9,7 +9,7 @@ import { getJobStatusSchema } from '#validators/rag' @inject() export default class RagController { - constructor(private ragService: RagService) {} + constructor(private ragService: RagService) { } public async upload({ request, response }: HttpContext) { const uploadedFile = request.file('file') @@ -59,4 +59,13 @@ export default class RagController { const files = await this.ragService.getStoredFiles() return response.status(200).json({ files }) } + + public async scanAndSync({ response }: HttpContext) { + try { + const syncResult = await this.ragService.scanAndSyncStorage() + return response.status(200).json(syncResult) + } catch (error) { + return response.status(500).json({ error: 'Error scanning and syncing storage', details: error.message }) + } + } } diff --git a/admin/app/services/rag_service.ts b/admin/app/services/rag_service.ts index d63dedc..c2a4478 100644 --- a/admin/app/services/rag_service.ts +++ b/admin/app/services/rag_service.ts @@ -4,7 +4,7 @@ import { inject } from '@adonisjs/core' import logger from '@adonisjs/core/services/logger' import { TokenChunker } from '@chonkiejs/core' import sharp from 'sharp' -import { deleteFileIfExists, determineFileType, getFile, getFileStatsIfExists, listDirectoryContentsRecursive } from '../utils/fs.js' +import { deleteFileIfExists, determineFileType, getFile, getFileStatsIfExists, listDirectoryContentsRecursive, ZIM_STORAGE_PATH } from '../utils/fs.js' import { PDFParse } from 'pdf-parse' import { createWorker } from 'tesseract.js' import { fromBuffer } from 'pdf2pic' @@ -399,14 +399,14 @@ export class RagService { totalArticles?: number }> { const zimExtractionService = new ZIMExtractionService() - + // Process in batches to avoid lock timeout const startOffset = batchOffset || 0 - + logger.info( `[RAG] Extracting ZIM content (batch: offset=${startOffset}, size=${ZIM_BATCH_SIZE})` ) - + const zimChunks = await zimExtractionService.extractZIMContent(filepath, { startOffset, batchSize: ZIM_BATCH_SIZE, @@ -935,4 +935,149 @@ export class RagService { return { success: false, message: 'Error discovering Nomad docs.' } } } + + /** + * Scans the knowledge base storage directories and syncs with Qdrant. + * Identifies files that exist in storage but haven't been embedded yet, + * and dispatches EmbedFileJob for each missing file. + * + * @returns Object containing success status, message, and counts of scanned/queued files + */ + public async scanAndSyncStorage(): Promise<{ + success: boolean + message: string + filesScanned?: number + filesQueued?: number + }> { + try { + logger.info('[RAG] Starting knowledge base sync scan') + + const KB_UPLOADS_PATH = join(process.cwd(), RagService.UPLOADS_STORAGE_PATH) + const ZIM_PATH = join(process.cwd(), ZIM_STORAGE_PATH) + + const filesInStorage: string[] = [] + + // Force resync of Nomad docs + await this.discoverNomadDocs(true).catch((error) => { + logger.error('[RAG] Error during Nomad docs discovery in sync process:', error) + }) + + // Scan kb_uploads directory + try { + const kbContents = await listDirectoryContentsRecursive(KB_UPLOADS_PATH) + kbContents.forEach((entry) => { + if (entry.type === 'file') { + filesInStorage.push(entry.key) + } + }) + logger.debug(`[RAG] Found ${kbContents.length} files in ${RagService.UPLOADS_STORAGE_PATH}`) + } catch (error) { + if (error.code === 'ENOENT') { + logger.debug(`[RAG] ${RagService.UPLOADS_STORAGE_PATH} directory does not exist, skipping`) + } else { + throw error + } + } + + // Scan zim directory + try { + const zimContents = await listDirectoryContentsRecursive(ZIM_PATH) + zimContents.forEach((entry) => { + if (entry.type === 'file') { + filesInStorage.push(entry.key) + } + }) + logger.debug(`[RAG] Found ${zimContents.length} files in ${ZIM_STORAGE_PATH}`) + } catch (error) { + if (error.code === 'ENOENT') { + logger.debug(`[RAG] ${ZIM_STORAGE_PATH} directory does not exist, skipping`) + } else { + throw error + } + } + + logger.info(`[RAG] Found ${filesInStorage.length} total files in storage directories`) + + // Get all stored sources from Qdrant + await this._ensureCollection( + RagService.CONTENT_COLLECTION_NAME, + RagService.EMBEDDING_DIMENSION + ) + + const sourcesInQdrant = new Set() + let offset: string | number | null | Record = null + const batchSize = 100 + + // Scroll through all points to get sources + do { + const scrollResult = await this.qdrant!.scroll(RagService.CONTENT_COLLECTION_NAME, { + limit: batchSize, + offset: offset, + with_payload: ['source'], // Only fetch source field for efficiency + with_vector: false, + }) + + scrollResult.points.forEach((point) => { + const source = point.payload?.source + if (source && typeof source === 'string') { + sourcesInQdrant.add(source) + } + }) + + offset = scrollResult.next_page_offset || null + } while (offset !== null) + + logger.info(`[RAG] Found ${sourcesInQdrant.size} unique sources in Qdrant`) + + // Find files that are in storage but not in Qdrant + const filesToEmbed = filesInStorage.filter((filePath) => !sourcesInQdrant.has(filePath)) + + logger.info(`[RAG] Found ${filesToEmbed.length} files that need embedding`) + + if (filesToEmbed.length === 0) { + return { + success: true, + message: 'Knowledge base is already in sync', + filesScanned: filesInStorage.length, + filesQueued: 0, + } + } + + // Import EmbedFileJob dynamically to avoid circular dependencies + const { EmbedFileJob } = await import('#jobs/embed_file_job') + + // Dispatch jobs for files that need embedding + let queuedCount = 0 + for (const filePath of filesToEmbed) { + try { + const fileName = filePath.split(/[/\\]/).pop() || filePath + const stats = await getFileStatsIfExists(filePath) + + logger.info(`[RAG] Dispatching embed job for: ${fileName}`) + await EmbedFileJob.dispatch({ + filePath: filePath, + fileName: fileName, + fileSize: stats?.size, + }) + queuedCount++ + logger.debug(`[RAG] Successfully dispatched job for ${fileName}`) + } catch (fileError) { + logger.error(`[RAG] Error dispatching job for file ${filePath}:`, fileError) + } + } + + return { + success: true, + message: `Scanned ${filesInStorage.length} files, queued ${queuedCount} for embedding`, + filesScanned: filesInStorage.length, + filesQueued: queuedCount, + } + } catch (error) { + logger.error('[RAG] Error scanning and syncing knowledge base:', error) + return { + success: false, + message: 'Error scanning and syncing knowledge base', + } + } + } } diff --git a/admin/config/logger.ts b/admin/config/logger.ts index 9f7b7dc..59aa141 100644 --- a/admin/config/logger.ts +++ b/admin/config/logger.ts @@ -13,7 +13,7 @@ const loggerConfig = defineConfig({ app: { enabled: true, name: env.get('APP_NAME'), - level: env.get('LOG_LEVEL'), + level: env.get('NODE_ENV') === 'production' ? env.get('LOG_LEVEL') : 'debug', // default to 'debug' in non-production envs transport: { targets: targets() diff --git a/admin/inertia/components/chat/KnowledgeBaseModal.tsx b/admin/inertia/components/chat/KnowledgeBaseModal.tsx index cc569fa..e95370d 100644 --- a/admin/inertia/components/chat/KnowledgeBaseModal.tsx +++ b/admin/inertia/components/chat/KnowledgeBaseModal.tsx @@ -7,6 +7,8 @@ import StyledTable from '~/components/StyledTable' import { useNotifications } from '~/context/NotificationContext' import api from '~/lib/api' import { IconX } from '@tabler/icons-react' +import { useModals } from '~/context/ModalContext' +import StyledModal from '../StyledModal' interface KnowledgeBaseModalProps { onClose: () => void @@ -16,6 +18,7 @@ export default function KnowledgeBaseModal({ onClose }: KnowledgeBaseModalProps) const { addNotification } = useNotifications() const [files, setFiles] = useState([]) const fileUploaderRef = useRef>(null) + const { openModal, closeModal } = useModals() const { data: storedFiles = [], isLoading: isLoadingFiles } = useQuery({ queryKey: ['storedFiles'], @@ -43,12 +46,53 @@ export default function KnowledgeBaseModal({ onClose }: KnowledgeBaseModalProps) }, }) + const syncMutation = useMutation({ + mutationFn: () => api.syncRAGStorage(), + onSuccess: (data) => { + addNotification({ + type: 'success', + message: data?.message || 'Storage synced successfully. If new files were found, they have been queued for processing.', + }) + }, + onError: (error: any) => { + addNotification({ + type: 'error', + message: error?.message || 'Failed to sync storage', + }) + }, + }) + const handleUpload = () => { if (files.length > 0) { uploadMutation.mutate(files[0]) } } + const handleConfirmSync = () => { + openModal( + { + syncMutation.mutate() + closeModal( + "confirm-sync-modal" + ) + }} + onCancel={() => closeModal("confirm-sync-modal")} + open={true} + confirmText='Confirm Sync' + cancelText='Cancel' + confirmVariant='primary' + > +

+ This will scan the NOMAD's storage directories for any new files and queue them for processing. This is useful if you've manually added files to the storage or want to ensure everything is up to date. + This may cause a temporary increase in resource usage if new files are found and being processed. Are you sure you want to proceed? +

+
, + "confirm-sync-modal" + ) + } + return (
@@ -142,7 +186,19 @@ export default function KnowledgeBaseModal({ onClose }: KnowledgeBaseModalProps)
- +
+ + + Sync Storage + +
className="font-semibold" rowLines={true} diff --git a/admin/inertia/lib/api.ts b/admin/inertia/lib/api.ts index b50e0e2..fd67b57 100644 --- a/admin/inertia/lib/api.ts +++ b/admin/inertia/lib/api.ts @@ -448,6 +448,18 @@ class API { })() } + async syncRAGStorage() { + return catchInternal(async () => { + const response = await this.client.post<{ + success: boolean + message: string + filesScanned?: number + filesQueued?: number + }>('/rag/sync') + return response.data + })() + } + // Wikipedia selector methods async getWikipediaState(): Promise { diff --git a/admin/start/routes.ts b/admin/start/routes.ts index 336ffca..2482f89 100644 --- a/admin/start/routes.ts +++ b/admin/start/routes.ts @@ -119,6 +119,7 @@ router router.post('/upload', [RagController, 'upload']) router.get('/files', [RagController, 'getStoredFiles']) router.get('/job-status', [RagController, 'getJobStatus']) + router.post('/sync', [RagController, 'scanAndSync']) }) .prefix('/api/rag')