diff --git a/admin/app/jobs/download_model_job.ts b/admin/app/jobs/download_model_job.ts index 2706887..ccc5207 100644 --- a/admin/app/jobs/download_model_job.ts +++ b/admin/app/jobs/download_model_job.ts @@ -42,33 +42,20 @@ export class DownloadModelJob { ) // Services are ready, initiate the download with progress tracking - const result = await ollamaService._downloadModel(modelName, (progress) => { - // Update job progress in BullMQ - const progressData = { - status: progress.status, - percent: progress.percent, - completed: progress.completed, - total: progress.total, - } - - // Update the job progress (0-100 scale for BullMQ) - if (progress.percent !== undefined) { - job.updateProgress(progress.percent) - } - - // Log progress with job context - if (progress.percent !== undefined) { + const result = await ollamaService.downloadModel(modelName, (progressPercent) => { + if (progressPercent) { + job.updateProgress(Math.floor(progressPercent)) logger.info( - `[DownloadModelJob] Model ${modelName}: ${progress.status} - ${progress.percent}% (${progress.completed}/${progress.total} bytes)` + `[DownloadModelJob] Model ${modelName}: ${progressPercent}%` ) - } else { - logger.info(`[DownloadModelJob] Model ${modelName}: ${progress.status}`) } // Store detailed progress in job data for clients to query job.updateData({ ...job.data, - progress: progressData, + status: 'downloading', + progress: progressPercent, + progress_timestamp: new Date().toISOString(), }) }) diff --git a/admin/app/services/benchmark_service.ts b/admin/app/services/benchmark_service.ts index bdb5f31..7f8d810 100644 --- a/admin/app/services/benchmark_service.ts +++ b/admin/app/services/benchmark_service.ts @@ -437,7 +437,7 @@ export class BenchmarkService { // Check if the benchmark model is available, pull if not const ollamaService = new (await import('./ollama_service.js')).OllamaService() - const modelResponse = await ollamaService.downloadModelSync(AI_BENCHMARK_MODEL) + const modelResponse = await ollamaService.downloadModel(AI_BENCHMARK_MODEL) if (!modelResponse.success) { throw new Error(`Model does not exist and failed to download: ${modelResponse.message}`) } diff --git a/admin/app/services/ollama_service.ts b/admin/app/services/ollama_service.ts index 080d724..03f8420 100644 --- a/admin/app/services/ollama_service.ts +++ b/admin/app/services/ollama_service.ts @@ -7,8 +7,8 @@ import path from 'node:path' import logger from '@adonisjs/core/services/logger' import axios from 'axios' import { DownloadModelJob } from '#jobs/download_model_job' -import { PassThrough } from 'node:stream' import { SERVICE_NAMES } from '../../constants/service_names.js' +import transmit from '@adonisjs/transmit/services/main' const NOMAD_MODELS_API_BASE_URL = 'https://api.projectnomad.us/api/v1/ollama/models' const MODELS_CACHE_FILE = path.join(process.cwd(), 'storage', 'ollama-models-cache.json') @@ -19,7 +19,7 @@ export class OllamaService { private ollama: Ollama | null = null private ollamaInitPromise: Promise | null = null - constructor() {} + constructor() { } private async _initializeOllamaClient() { if (!this.ollamaInitPromise) { @@ -41,198 +41,19 @@ export class OllamaService { } } - /** We need to call this in the DownloadModelJob, so it can't be private, - * but shouldn't be called directly (dispatch job instead) - */ - async _downloadModel( - model: string, - onProgress?: (progress: { - status: string - completed?: number - total?: number - percent?: number - }) => void - ): Promise<{ success: boolean; message: string }> { - return new Promise(async (resolve) => { - try { - const dockerService = new (await import('./docker_service.js')).DockerService() - const container = dockerService.docker.getContainer(SERVICE_NAMES.OLLAMA) - if (!container) { - logger.warn('[OllamaService] Ollama container is not running. Cannot download model.') - resolve({ - success: false, - message: 'Ollama is not running. Please start Ollama and try again.', - }) - return - } - - container.exec( - { - Cmd: ['ollama', 'pull', model], - AttachStdout: true, - AttachStderr: true, - }, - (err, exec) => { - if (err) { - logger.error( - `[OllamaService] Failed to execute model download command: ${ - err instanceof Error ? err.message : err - }` - ) - resolve({ success: false, message: 'Failed to execute download command.' }) - return - } - - if (!exec) { - logger.error('[OllamaService] No exec instance returned from exec command') - resolve({ success: false, message: 'Failed to create exec instance.' }) - return - } - - exec.start( - { - hijack: true, - stdin: false, - }, - (startErr, stream) => { - if (startErr) { - logger.error( - `[OllamaService] Failed to start exec stream: ${ - startErr instanceof Error ? startErr.message : startErr - }` - ) - resolve({ success: false, message: 'Failed to start download stream.' }) - return - } - - if (!stream) { - logger.error('[OllamaService] No stream returned when starting exec') - resolve({ success: false, message: 'No stream available.' }) - return - } - - // Create PassThrough streams to capture output - const stdout = new PassThrough() - const stderr = new PassThrough() - - // Demultiplex the Docker stream - dockerService.docker.modem.demuxStream(stream, stdout, stderr) - - // Capture and parse stdout (if any) - stdout.on('data', (chunk) => { - const output = chunk.toString() - logger.info(`[OllamaService] Model download (stdout): ${output}`) - }) - - // Capture stderr - ollama sends progress/status here (not necessarily errors) - stderr.on('data', (chunk) => { - const output = chunk.toString() - - // Check if this is an actual error message - if ( - output.toLowerCase().includes('error') || - output.toLowerCase().includes('failed') - ) { - logger.error(`[OllamaService] Model download error: ${output}`) - } else { - // This is normal progress/status output from ollama - logger.info(`[OllamaService] Model download progress: ${output}`) - - // Parse JSON progress if available - try { - const lines = output - .split('\n') - .filter( - (line: any) => typeof line.trim() === 'string' && line.trim().length > 0 - ) - for (const line of lines) { - const parsed = JSON.parse(line) - if (parsed.status) { - const progressData: { - status: string - completed?: number - total?: number - percent?: number - } = { - status: parsed.status, - } - - // Extract byte progress if available - if (parsed.completed !== undefined && parsed.total !== undefined) { - progressData.completed = parsed.completed - progressData.total = parsed.total - progressData.percent = Math.round( - (parsed.completed / parsed.total) * 100 - ) - } - - // Call progress callback - if (onProgress) { - onProgress(progressData) - } - - // Log structured progress - if (progressData.percent !== undefined) { - logger.info( - `[OllamaService] ${progressData.status}: ${progressData.percent}% (${progressData.completed}/${progressData.total} bytes)` - ) - } else { - logger.info(`[OllamaService] ${progressData.status}`) - } - } - } - } catch { - // Not JSON, already logged above - } - } - }) - - // Handle stream end - stream.on('end', () => { - logger.info( - `[OllamaService] Model download process ended for model "${model}"` - ) - resolve({ - success: true, - message: 'Model download completed successfully.', - }) - }) - - // Handle stream errors - stream.on('error', (streamErr) => { - logger.error( - `[OllamaService] Error during model download stream: ${ - streamErr instanceof Error ? streamErr.message : streamErr - }` - ) - resolve({ - success: false, - message: 'Error occurred during model download.', - }) - }) - } - ) - } - ) - } catch (error) { - logger.error( - `[OllamaService] Failed to download model "${model}": ${ - error instanceof Error ? error.message : error - }` - ) - resolve({ success: false, message: 'Failed to download model.' }) - } - }) - } - /** * Synchronous version of model download (waits for completion). Should only be used for * small models or in contexts where a background job is incompatible. * @param model Model name to download * @returns Success status and message */ - async downloadModelSync(model: string): Promise<{ success: boolean; message: string }> { + async downloadModel(model: string, progressCallback?: (percent: number) => void): Promise<{ success: boolean; message: string }> { try { + await this._ensureDependencies() + if (!this.ollama) { + throw new Error('Ollama client is not initialized.') + } + // See if model is already installed const installedModels = await this.getModels() if (installedModels && installedModels.some((m) => m.name === model)) { @@ -240,26 +61,29 @@ export class OllamaService { return { success: true, message: 'Model is already installed.' } } - const dockerService = new (await import('./docker_service.js')).DockerService() - - const ollamAPIURL = await dockerService.getServiceURL(SERVICE_NAMES.OLLAMA) - if (!ollamAPIURL) { - logger.warn('[OllamaService] Ollama service is not running. Cannot download model.') - return { - success: false, - message: 'Ollama is not running. Please start Ollama and try again.', + // Returns AbortableAsyncIterator + const downloadStream = await this.ollama.pull({ + model, + stream: true, + }) + + for await (const chunk of downloadStream) { + if (chunk.completed && chunk.total) { + const percent = ((chunk.completed / chunk.total) * 100).toFixed(2) + const percentNum = parseFloat(percent) + + this.broadcastDownloadProgress(model, percentNum) + if (progressCallback) { + progressCallback(percentNum) + } } } - // 10 minutes timeout for large model downloads - await axios.post(`${ollamAPIURL}/api/pull`, { name: model }, { timeout: 600000 }) - - logger.info(`[OllamaService] Model "${model}" downloaded via API.`) + logger.info(`[OllamaService] Model "${model}" downloaded successfully.`) return { success: true, message: 'Model downloaded successfully.' } } catch (error) { logger.error( - `[OllamaService] Failed to download model "${model}": ${ - error instanceof Error ? error.message : error + `[OllamaService] Failed to download model "${model}": ${error instanceof Error ? error.message : error }` ) return { success: false, message: 'Failed to download model.' } @@ -395,8 +219,7 @@ export class OllamaService { return this.sortModels(models, sort) } catch (error) { logger.error( - `[OllamaService] Failed to retrieve models from Nomad API: ${ - error instanceof Error ? error.message : error + `[OllamaService] Failed to retrieve models from Nomad API: ${error instanceof Error ? error.message : error }` ) return null @@ -489,4 +312,13 @@ export class OllamaService { return models } + + private broadcastDownloadProgress(model: string, percent: number) { + transmit.broadcast('model-download', { + model, + percent, + timestamp: new Date().toISOString(), + }) + logger.info(`[OllamaService] Download progress for model "${model}": ${percent}%`) + } }