mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-04-05 16:26:15 +02:00
feat(Ollama): cleanup model download logic and improve progress tracking
This commit is contained in:
parent
3b31be66f9
commit
119aea42ae
|
|
@ -42,33 +42,20 @@ export class DownloadModelJob {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Services are ready, initiate the download with progress tracking
|
// Services are ready, initiate the download with progress tracking
|
||||||
const result = await ollamaService._downloadModel(modelName, (progress) => {
|
const result = await ollamaService.downloadModel(modelName, (progressPercent) => {
|
||||||
// Update job progress in BullMQ
|
if (progressPercent) {
|
||||||
const progressData = {
|
job.updateProgress(Math.floor(progressPercent))
|
||||||
status: progress.status,
|
|
||||||
percent: progress.percent,
|
|
||||||
completed: progress.completed,
|
|
||||||
total: progress.total,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the job progress (0-100 scale for BullMQ)
|
|
||||||
if (progress.percent !== undefined) {
|
|
||||||
job.updateProgress(progress.percent)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log progress with job context
|
|
||||||
if (progress.percent !== undefined) {
|
|
||||||
logger.info(
|
logger.info(
|
||||||
`[DownloadModelJob] Model ${modelName}: ${progress.status} - ${progress.percent}% (${progress.completed}/${progress.total} bytes)`
|
`[DownloadModelJob] Model ${modelName}: ${progressPercent}%`
|
||||||
)
|
)
|
||||||
} else {
|
|
||||||
logger.info(`[DownloadModelJob] Model ${modelName}: ${progress.status}`)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store detailed progress in job data for clients to query
|
// Store detailed progress in job data for clients to query
|
||||||
job.updateData({
|
job.updateData({
|
||||||
...job.data,
|
...job.data,
|
||||||
progress: progressData,
|
status: 'downloading',
|
||||||
|
progress: progressPercent,
|
||||||
|
progress_timestamp: new Date().toISOString(),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -437,7 +437,7 @@ export class BenchmarkService {
|
||||||
|
|
||||||
// Check if the benchmark model is available, pull if not
|
// Check if the benchmark model is available, pull if not
|
||||||
const ollamaService = new (await import('./ollama_service.js')).OllamaService()
|
const ollamaService = new (await import('./ollama_service.js')).OllamaService()
|
||||||
const modelResponse = await ollamaService.downloadModelSync(AI_BENCHMARK_MODEL)
|
const modelResponse = await ollamaService.downloadModel(AI_BENCHMARK_MODEL)
|
||||||
if (!modelResponse.success) {
|
if (!modelResponse.success) {
|
||||||
throw new Error(`Model does not exist and failed to download: ${modelResponse.message}`)
|
throw new Error(`Model does not exist and failed to download: ${modelResponse.message}`)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,8 @@ import path from 'node:path'
|
||||||
import logger from '@adonisjs/core/services/logger'
|
import logger from '@adonisjs/core/services/logger'
|
||||||
import axios from 'axios'
|
import axios from 'axios'
|
||||||
import { DownloadModelJob } from '#jobs/download_model_job'
|
import { DownloadModelJob } from '#jobs/download_model_job'
|
||||||
import { PassThrough } from 'node:stream'
|
|
||||||
import { SERVICE_NAMES } from '../../constants/service_names.js'
|
import { SERVICE_NAMES } from '../../constants/service_names.js'
|
||||||
|
import transmit from '@adonisjs/transmit/services/main'
|
||||||
|
|
||||||
const NOMAD_MODELS_API_BASE_URL = 'https://api.projectnomad.us/api/v1/ollama/models'
|
const NOMAD_MODELS_API_BASE_URL = 'https://api.projectnomad.us/api/v1/ollama/models'
|
||||||
const MODELS_CACHE_FILE = path.join(process.cwd(), 'storage', 'ollama-models-cache.json')
|
const MODELS_CACHE_FILE = path.join(process.cwd(), 'storage', 'ollama-models-cache.json')
|
||||||
|
|
@ -19,7 +19,7 @@ export class OllamaService {
|
||||||
private ollama: Ollama | null = null
|
private ollama: Ollama | null = null
|
||||||
private ollamaInitPromise: Promise<void> | null = null
|
private ollamaInitPromise: Promise<void> | null = null
|
||||||
|
|
||||||
constructor() {}
|
constructor() { }
|
||||||
|
|
||||||
private async _initializeOllamaClient() {
|
private async _initializeOllamaClient() {
|
||||||
if (!this.ollamaInitPromise) {
|
if (!this.ollamaInitPromise) {
|
||||||
|
|
@ -41,198 +41,19 @@ export class OllamaService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** We need to call this in the DownloadModelJob, so it can't be private,
|
|
||||||
* but shouldn't be called directly (dispatch job instead)
|
|
||||||
*/
|
|
||||||
async _downloadModel(
|
|
||||||
model: string,
|
|
||||||
onProgress?: (progress: {
|
|
||||||
status: string
|
|
||||||
completed?: number
|
|
||||||
total?: number
|
|
||||||
percent?: number
|
|
||||||
}) => void
|
|
||||||
): Promise<{ success: boolean; message: string }> {
|
|
||||||
return new Promise(async (resolve) => {
|
|
||||||
try {
|
|
||||||
const dockerService = new (await import('./docker_service.js')).DockerService()
|
|
||||||
const container = dockerService.docker.getContainer(SERVICE_NAMES.OLLAMA)
|
|
||||||
if (!container) {
|
|
||||||
logger.warn('[OllamaService] Ollama container is not running. Cannot download model.')
|
|
||||||
resolve({
|
|
||||||
success: false,
|
|
||||||
message: 'Ollama is not running. Please start Ollama and try again.',
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
container.exec(
|
|
||||||
{
|
|
||||||
Cmd: ['ollama', 'pull', model],
|
|
||||||
AttachStdout: true,
|
|
||||||
AttachStderr: true,
|
|
||||||
},
|
|
||||||
(err, exec) => {
|
|
||||||
if (err) {
|
|
||||||
logger.error(
|
|
||||||
`[OllamaService] Failed to execute model download command: ${
|
|
||||||
err instanceof Error ? err.message : err
|
|
||||||
}`
|
|
||||||
)
|
|
||||||
resolve({ success: false, message: 'Failed to execute download command.' })
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!exec) {
|
|
||||||
logger.error('[OllamaService] No exec instance returned from exec command')
|
|
||||||
resolve({ success: false, message: 'Failed to create exec instance.' })
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
exec.start(
|
|
||||||
{
|
|
||||||
hijack: true,
|
|
||||||
stdin: false,
|
|
||||||
},
|
|
||||||
(startErr, stream) => {
|
|
||||||
if (startErr) {
|
|
||||||
logger.error(
|
|
||||||
`[OllamaService] Failed to start exec stream: ${
|
|
||||||
startErr instanceof Error ? startErr.message : startErr
|
|
||||||
}`
|
|
||||||
)
|
|
||||||
resolve({ success: false, message: 'Failed to start download stream.' })
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!stream) {
|
|
||||||
logger.error('[OllamaService] No stream returned when starting exec')
|
|
||||||
resolve({ success: false, message: 'No stream available.' })
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create PassThrough streams to capture output
|
|
||||||
const stdout = new PassThrough()
|
|
||||||
const stderr = new PassThrough()
|
|
||||||
|
|
||||||
// Demultiplex the Docker stream
|
|
||||||
dockerService.docker.modem.demuxStream(stream, stdout, stderr)
|
|
||||||
|
|
||||||
// Capture and parse stdout (if any)
|
|
||||||
stdout.on('data', (chunk) => {
|
|
||||||
const output = chunk.toString()
|
|
||||||
logger.info(`[OllamaService] Model download (stdout): ${output}`)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Capture stderr - ollama sends progress/status here (not necessarily errors)
|
|
||||||
stderr.on('data', (chunk) => {
|
|
||||||
const output = chunk.toString()
|
|
||||||
|
|
||||||
// Check if this is an actual error message
|
|
||||||
if (
|
|
||||||
output.toLowerCase().includes('error') ||
|
|
||||||
output.toLowerCase().includes('failed')
|
|
||||||
) {
|
|
||||||
logger.error(`[OllamaService] Model download error: ${output}`)
|
|
||||||
} else {
|
|
||||||
// This is normal progress/status output from ollama
|
|
||||||
logger.info(`[OllamaService] Model download progress: ${output}`)
|
|
||||||
|
|
||||||
// Parse JSON progress if available
|
|
||||||
try {
|
|
||||||
const lines = output
|
|
||||||
.split('\n')
|
|
||||||
.filter(
|
|
||||||
(line: any) => typeof line.trim() === 'string' && line.trim().length > 0
|
|
||||||
)
|
|
||||||
for (const line of lines) {
|
|
||||||
const parsed = JSON.parse(line)
|
|
||||||
if (parsed.status) {
|
|
||||||
const progressData: {
|
|
||||||
status: string
|
|
||||||
completed?: number
|
|
||||||
total?: number
|
|
||||||
percent?: number
|
|
||||||
} = {
|
|
||||||
status: parsed.status,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract byte progress if available
|
|
||||||
if (parsed.completed !== undefined && parsed.total !== undefined) {
|
|
||||||
progressData.completed = parsed.completed
|
|
||||||
progressData.total = parsed.total
|
|
||||||
progressData.percent = Math.round(
|
|
||||||
(parsed.completed / parsed.total) * 100
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call progress callback
|
|
||||||
if (onProgress) {
|
|
||||||
onProgress(progressData)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log structured progress
|
|
||||||
if (progressData.percent !== undefined) {
|
|
||||||
logger.info(
|
|
||||||
`[OllamaService] ${progressData.status}: ${progressData.percent}% (${progressData.completed}/${progressData.total} bytes)`
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
logger.info(`[OllamaService] ${progressData.status}`)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// Not JSON, already logged above
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Handle stream end
|
|
||||||
stream.on('end', () => {
|
|
||||||
logger.info(
|
|
||||||
`[OllamaService] Model download process ended for model "${model}"`
|
|
||||||
)
|
|
||||||
resolve({
|
|
||||||
success: true,
|
|
||||||
message: 'Model download completed successfully.',
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
// Handle stream errors
|
|
||||||
stream.on('error', (streamErr) => {
|
|
||||||
logger.error(
|
|
||||||
`[OllamaService] Error during model download stream: ${
|
|
||||||
streamErr instanceof Error ? streamErr.message : streamErr
|
|
||||||
}`
|
|
||||||
)
|
|
||||||
resolve({
|
|
||||||
success: false,
|
|
||||||
message: 'Error occurred during model download.',
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`[OllamaService] Failed to download model "${model}": ${
|
|
||||||
error instanceof Error ? error.message : error
|
|
||||||
}`
|
|
||||||
)
|
|
||||||
resolve({ success: false, message: 'Failed to download model.' })
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Synchronous version of model download (waits for completion). Should only be used for
|
* Synchronous version of model download (waits for completion). Should only be used for
|
||||||
* small models or in contexts where a background job is incompatible.
|
* small models or in contexts where a background job is incompatible.
|
||||||
* @param model Model name to download
|
* @param model Model name to download
|
||||||
* @returns Success status and message
|
* @returns Success status and message
|
||||||
*/
|
*/
|
||||||
async downloadModelSync(model: string): Promise<{ success: boolean; message: string }> {
|
async downloadModel(model: string, progressCallback?: (percent: number) => void): Promise<{ success: boolean; message: string }> {
|
||||||
try {
|
try {
|
||||||
|
await this._ensureDependencies()
|
||||||
|
if (!this.ollama) {
|
||||||
|
throw new Error('Ollama client is not initialized.')
|
||||||
|
}
|
||||||
|
|
||||||
// See if model is already installed
|
// See if model is already installed
|
||||||
const installedModels = await this.getModels()
|
const installedModels = await this.getModels()
|
||||||
if (installedModels && installedModels.some((m) => m.name === model)) {
|
if (installedModels && installedModels.some((m) => m.name === model)) {
|
||||||
|
|
@ -240,26 +61,29 @@ export class OllamaService {
|
||||||
return { success: true, message: 'Model is already installed.' }
|
return { success: true, message: 'Model is already installed.' }
|
||||||
}
|
}
|
||||||
|
|
||||||
const dockerService = new (await import('./docker_service.js')).DockerService()
|
// Returns AbortableAsyncIterator<ProgressResponse>
|
||||||
|
const downloadStream = await this.ollama.pull({
|
||||||
|
model,
|
||||||
|
stream: true,
|
||||||
|
})
|
||||||
|
|
||||||
const ollamAPIURL = await dockerService.getServiceURL(SERVICE_NAMES.OLLAMA)
|
for await (const chunk of downloadStream) {
|
||||||
if (!ollamAPIURL) {
|
if (chunk.completed && chunk.total) {
|
||||||
logger.warn('[OllamaService] Ollama service is not running. Cannot download model.')
|
const percent = ((chunk.completed / chunk.total) * 100).toFixed(2)
|
||||||
return {
|
const percentNum = parseFloat(percent)
|
||||||
success: false,
|
|
||||||
message: 'Ollama is not running. Please start Ollama and try again.',
|
this.broadcastDownloadProgress(model, percentNum)
|
||||||
|
if (progressCallback) {
|
||||||
|
progressCallback(percentNum)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 10 minutes timeout for large model downloads
|
logger.info(`[OllamaService] Model "${model}" downloaded successfully.`)
|
||||||
await axios.post(`${ollamAPIURL}/api/pull`, { name: model }, { timeout: 600000 })
|
|
||||||
|
|
||||||
logger.info(`[OllamaService] Model "${model}" downloaded via API.`)
|
|
||||||
return { success: true, message: 'Model downloaded successfully.' }
|
return { success: true, message: 'Model downloaded successfully.' }
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
`[OllamaService] Failed to download model "${model}": ${
|
`[OllamaService] Failed to download model "${model}": ${error instanceof Error ? error.message : error
|
||||||
error instanceof Error ? error.message : error
|
|
||||||
}`
|
}`
|
||||||
)
|
)
|
||||||
return { success: false, message: 'Failed to download model.' }
|
return { success: false, message: 'Failed to download model.' }
|
||||||
|
|
@ -395,8 +219,7 @@ export class OllamaService {
|
||||||
return this.sortModels(models, sort)
|
return this.sortModels(models, sort)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
`[OllamaService] Failed to retrieve models from Nomad API: ${
|
`[OllamaService] Failed to retrieve models from Nomad API: ${error instanceof Error ? error.message : error
|
||||||
error instanceof Error ? error.message : error
|
|
||||||
}`
|
}`
|
||||||
)
|
)
|
||||||
return null
|
return null
|
||||||
|
|
@ -489,4 +312,13 @@ export class OllamaService {
|
||||||
|
|
||||||
return models
|
return models
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private broadcastDownloadProgress(model: string, percent: number) {
|
||||||
|
transmit.broadcast('model-download', {
|
||||||
|
model,
|
||||||
|
percent,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
})
|
||||||
|
logger.info(`[OllamaService] Download progress for model "${model}": ${percent}%`)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user