diff --git a/admin/app/controllers/downloads_controller.ts b/admin/app/controllers/downloads_controller.ts index 023806b..0a9b1e3 100644 --- a/admin/app/controllers/downloads_controller.ts +++ b/admin/app/controllers/downloads_controller.ts @@ -20,4 +20,8 @@ export default class DownloadsController { await this.downloadService.removeFailedJob(params.jobId) return { success: true } } + + async cancelJob({ params }: HttpContext) { + return this.downloadService.cancelJob(params.jobId) + } } diff --git a/admin/app/controllers/zim_controller.ts b/admin/app/controllers/zim_controller.ts index 4bb00e3..96adf63 100644 --- a/admin/app/controllers/zim_controller.ts +++ b/admin/app/controllers/zim_controller.ts @@ -27,7 +27,7 @@ export default class ZimController { async downloadRemote({ request }: HttpContext) { const payload = await request.validateUsing(remoteDownloadWithMetadataValidator) assertNotPrivateUrl(payload.url) - const { filename, jobId } = await this.zimService.downloadRemote(payload.url) + const { filename, jobId } = await this.zimService.downloadRemote(payload.url, payload.metadata) return { message: 'Download started successfully', diff --git a/admin/app/jobs/run_download_job.ts b/admin/app/jobs/run_download_job.ts index c7f672e..19e85bf 100644 --- a/admin/app/jobs/run_download_job.ts +++ b/admin/app/jobs/run_download_job.ts @@ -1,5 +1,5 @@ import { Job } from 'bullmq' -import { RunDownloadJobParams } from '../../types/downloads.js' +import { RunDownloadJobParams, DownloadProgressData } from '../../types/downloads.js' import { QueueService } from '#services/queue_service' import { doResumableDownload } from '../utils/downloads.js' import { createHash } from 'crypto' @@ -17,100 +17,153 @@ export class RunDownloadJob { return 'run-download' } + /** In-memory registry of abort controllers for active download jobs */ + static abortControllers: Map = new Map() + static getJobId(url: string): string { return createHash('sha256').update(url).digest('hex').slice(0, 16) } + /** Redis key used to signal cancellation across processes */ + static cancelKey(jobId: string): string { + return `nomad:download:cancel:${jobId}` + } + + /** Signal cancellation via Redis so the worker process can pick it up */ + static async signalCancel(jobId: string): Promise { + const queueService = new QueueService() + const queue = queueService.getQueue(this.queue) + const client = await queue.client + await client.set(this.cancelKey(jobId), '1', 'EX', 300) // 5 min TTL + } + async handle(job: Job) { const { url, filepath, timeout, allowedMimeTypes, forceNew, filetype, resourceMetadata } = job.data as RunDownloadJobParams - await doResumableDownload({ - url, - filepath, - timeout, - allowedMimeTypes, - forceNew, - onProgress(progress) { - const progressPercent = (progress.downloadedBytes / (progress.totalBytes || 1)) * 100 - job.updateProgress(Math.floor(progressPercent)) - }, - async onComplete(url) { - try { - // Create InstalledResource entry if metadata was provided - if (resourceMetadata) { - const { default: InstalledResource } = await import('#models/installed_resource') - const { DateTime } = await import('luxon') - const { getFileStatsIfExists, deleteFileIfExists } = await import('../utils/fs.js') - const stats = await getFileStatsIfExists(filepath) + // Register abort controller for this job + const abortController = new AbortController() + RunDownloadJob.abortControllers.set(job.id!, abortController) - // Look up the old entry so we can clean up the previous file after updating - const oldEntry = await InstalledResource.query() - .where('resource_id', resourceMetadata.resource_id) - .where('resource_type', filetype as 'zim' | 'map') - .first() - const oldFilePath = oldEntry?.file_path ?? null + // Get Redis client for checking cancel signals from the API process + const queueService = new QueueService() + const cancelRedis = await queueService.getQueue(RunDownloadJob.queue).client + let progressCount = 0 - await InstalledResource.updateOrCreate( - { resource_id: resourceMetadata.resource_id, resource_type: filetype as 'zim' | 'map' }, - { - version: resourceMetadata.version, - collection_ref: resourceMetadata.collection_ref, - url: url, - file_path: filepath, - file_size_bytes: stats ? Number(stats.size) : null, - installed_at: DateTime.now(), + try { + await doResumableDownload({ + url, + filepath, + timeout, + allowedMimeTypes, + forceNew, + signal: abortController.signal, + onProgress(progress) { + const progressPercent = (progress.downloadedBytes / (progress.totalBytes || 1)) * 100 + const progressData: DownloadProgressData = { + percent: Math.floor(progressPercent), + downloadedBytes: progress.downloadedBytes, + totalBytes: progress.totalBytes, + lastProgressTime: Date.now(), + } + job.updateProgress(progressData) + + // Check for cancel signal every ~10 progress ticks to avoid hammering Redis + progressCount++ + if (progressCount % 10 === 0) { + cancelRedis.get(RunDownloadJob.cancelKey(job.id!)).then((val: string | null) => { + if (val) { + cancelRedis.del(RunDownloadJob.cancelKey(job.id!)) + abortController.abort() } + }).catch(() => {}) + } + }, + async onComplete(url) { + try { + // Create InstalledResource entry if metadata was provided + if (resourceMetadata) { + const { default: InstalledResource } = await import('#models/installed_resource') + const { DateTime } = await import('luxon') + const { getFileStatsIfExists, deleteFileIfExists } = await import('../utils/fs.js') + const stats = await getFileStatsIfExists(filepath) + + // Look up the old entry so we can clean up the previous file after updating + const oldEntry = await InstalledResource.query() + .where('resource_id', resourceMetadata.resource_id) + .where('resource_type', filetype as 'zim' | 'map') + .first() + const oldFilePath = oldEntry?.file_path ?? null + + await InstalledResource.updateOrCreate( + { resource_id: resourceMetadata.resource_id, resource_type: filetype as 'zim' | 'map' }, + { + version: resourceMetadata.version, + collection_ref: resourceMetadata.collection_ref, + url: url, + file_path: filepath, + file_size_bytes: stats ? Number(stats.size) : null, + installed_at: DateTime.now(), + } + ) + + // Delete the old file if it differs from the new one + if (oldFilePath && oldFilePath !== filepath) { + try { + await deleteFileIfExists(oldFilePath) + console.log(`[RunDownloadJob] Deleted old file: ${oldFilePath}`) + } catch (deleteError) { + console.warn( + `[RunDownloadJob] Failed to delete old file ${oldFilePath}:`, + deleteError + ) + } + } + } + + if (filetype === 'zim') { + const dockerService = new DockerService() + const zimService = new ZimService(dockerService) + await zimService.downloadRemoteSuccessCallback([url], true) + + // Only dispatch embedding job if AI Assistant (Ollama) is installed + const ollamaUrl = await dockerService.getServiceURL('nomad_ollama') + if (ollamaUrl) { + try { + await EmbedFileJob.dispatch({ + fileName: url.split('/').pop() || '', + filePath: filepath, + }) + } catch (error) { + console.error(`[RunDownloadJob] Error dispatching EmbedFileJob for URL ${url}:`, error) + } + } + } else if (filetype === 'map') { + const mapsService = new MapService() + await mapsService.downloadRemoteSuccessCallback([url], false) + } + } catch (error) { + console.error( + `[RunDownloadJob] Error in download success callback for URL ${url}:`, + error ) - - // Delete the old file if it differs from the new one - if (oldFilePath && oldFilePath !== filepath) { - try { - await deleteFileIfExists(oldFilePath) - console.log(`[RunDownloadJob] Deleted old file: ${oldFilePath}`) - } catch (deleteError) { - console.warn( - `[RunDownloadJob] Failed to delete old file ${oldFilePath}:`, - deleteError - ) - } - } } + job.updateProgress({ + percent: 100, + downloadedBytes: 0, + totalBytes: 0, + lastProgressTime: Date.now(), + } as DownloadProgressData) + }, + }) - if (filetype === 'zim') { - const dockerService = new DockerService() - const zimService = new ZimService(dockerService) - await zimService.downloadRemoteSuccessCallback([url], true) - - // Only dispatch embedding job if AI Assistant (Ollama) is installed - const ollamaUrl = await dockerService.getServiceURL('nomad_ollama') - if (ollamaUrl) { - try { - await EmbedFileJob.dispatch({ - fileName: url.split('/').pop() || '', - filePath: filepath, - }) - } catch (error) { - console.error(`[RunDownloadJob] Error dispatching EmbedFileJob for URL ${url}:`, error) - } - } - } else if (filetype === 'map') { - const mapsService = new MapService() - await mapsService.downloadRemoteSuccessCallback([url], false) - } - } catch (error) { - console.error( - `[RunDownloadJob] Error in download success callback for URL ${url}:`, - error - ) - } - job.updateProgress(100) - }, - }) - - return { - url, - filepath, + return { + url, + filepath, + } + } finally { + // Clean up abort controller + RunDownloadJob.abortControllers.delete(job.id!) } } diff --git a/admin/app/services/download_service.ts b/admin/app/services/download_service.ts index a2b7faf..7f24409 100644 --- a/admin/app/services/download_service.ts +++ b/admin/app/services/download_service.ts @@ -2,27 +2,49 @@ import { inject } from '@adonisjs/core' import { QueueService } from './queue_service.js' import { RunDownloadJob } from '#jobs/run_download_job' import { DownloadModelJob } from '#jobs/download_model_job' -import { DownloadJobWithProgress } from '../../types/downloads.js' +import { DownloadJobWithProgress, DownloadProgressData } from '../../types/downloads.js' import { normalize } from 'path' +import { deleteFileIfExists } from '../utils/fs.js' @inject() export class DownloadService { constructor(private queueService: QueueService) {} + private parseProgress(progress: any): { percent: number; downloadedBytes?: number; totalBytes?: number; lastProgressTime?: number } { + if (typeof progress === 'object' && progress !== null && 'percent' in progress) { + const p = progress as DownloadProgressData + return { + percent: p.percent, + downloadedBytes: p.downloadedBytes, + totalBytes: p.totalBytes, + lastProgressTime: p.lastProgressTime, + } + } + // Backward compat: plain integer from in-flight jobs during upgrade + return { percent: parseInt(String(progress), 10) || 0 } + } + async listDownloadJobs(filetype?: string): Promise { // Get regular file download jobs (zim, map, etc.) const queue = this.queueService.getQueue(RunDownloadJob.queue) const fileJobs = await queue.getJobs(['waiting', 'active', 'delayed', 'failed']) - const fileDownloads = fileJobs.map((job) => ({ - jobId: job.id!.toString(), - url: job.data.url, - progress: parseInt(job.progress.toString(), 10), - filepath: normalize(job.data.filepath), - filetype: job.data.filetype, - status: (job.failedReason ? 'failed' : 'active') as 'active' | 'failed', - failedReason: job.failedReason || undefined, - })) + const fileDownloads = fileJobs.map((job) => { + const parsed = this.parseProgress(job.progress) + return { + jobId: job.id!.toString(), + url: job.data.url, + progress: parsed.percent, + filepath: normalize(job.data.filepath), + filetype: job.data.filetype, + title: job.data.title || undefined, + downloadedBytes: parsed.downloadedBytes, + totalBytes: parsed.totalBytes || job.data.totalBytes || undefined, + lastProgressTime: parsed.lastProgressTime, + status: (job.failedReason ? 'failed' : 'active') as 'active' | 'failed', + failedReason: job.failedReason || undefined, + } + }) // Get Ollama model download jobs const modelQueue = this.queueService.getQueue(DownloadModelJob.queue) @@ -61,4 +83,46 @@ export class DownloadService { } } } + + async cancelJob(jobId: string): Promise<{ success: boolean; message: string }> { + const queue = this.queueService.getQueue(RunDownloadJob.queue) + const job = await queue.getJob(jobId) + + if (!job) { + // Job already completed (removeOnComplete: true) or doesn't exist + return { success: true, message: 'Job not found (may have already completed)' } + } + + const filepath = job.data.filepath + + // Signal the worker process to abort the download via Redis + await RunDownloadJob.signalCancel(jobId) + + // Also try in-memory abort (works if worker is in same process) + RunDownloadJob.abortControllers.get(jobId)?.abort() + RunDownloadJob.abortControllers.delete(jobId) + + // Give the worker a moment to pick up the cancel signal and release the job lock + await new Promise((resolve) => setTimeout(resolve, 1000)) + + // Remove the BullMQ job + try { + await job.remove() + } catch { + // Job may still be locked by worker - it will fail on next progress check + } + + // Delete the partial file from disk + if (filepath) { + try { + await deleteFileIfExists(filepath) + // Also try .tmp in case PR #448 staging is merged + await deleteFileIfExists(filepath + '.tmp') + } catch { + // File may not exist yet (waiting job) + } + } + + return { success: true, message: 'Download cancelled and partial file deleted' } + } } diff --git a/admin/app/services/map_service.ts b/admin/app/services/map_service.ts index beb74b2..6fca386 100644 --- a/admin/app/services/map_service.ts +++ b/admin/app/services/map_service.ts @@ -131,6 +131,7 @@ export class MapService implements IMapService { allowedMimeTypes: PMTILES_MIME_TYPES, forceNew: true, filetype: 'map', + title: (resource as any).title || undefined, resourceMetadata: { resource_id: resource.id, version: resource.version, diff --git a/admin/app/services/zim_service.ts b/admin/app/services/zim_service.ts index 3eee1cb..f9a8b94 100644 --- a/admin/app/services/zim_service.ts +++ b/admin/app/services/zim_service.ts @@ -137,7 +137,7 @@ export class ZimService { } } - async downloadRemote(url: string): Promise<{ filename: string; jobId?: string }> { + async downloadRemote(url: string, metadata?: { title?: string; summary?: string; author?: string; size_bytes?: number }): Promise<{ filename: string; jobId?: string }> { const parsed = new URL(url) if (!parsed.pathname.endsWith('.zim')) { throw new Error(`Invalid ZIM file URL: ${url}. URL must end with .zim`) @@ -170,6 +170,8 @@ export class ZimService { allowedMimeTypes: ZIM_MIME_TYPES, forceNew: true, filetype: 'zim', + title: metadata?.title, + totalBytes: metadata?.size_bytes, resourceMetadata, }) @@ -238,6 +240,8 @@ export class ZimService { allowedMimeTypes: ZIM_MIME_TYPES, forceNew: true, filetype: 'zim', + title: (resource as any).title || undefined, + totalBytes: (resource as any).size_mb ? (resource as any).size_mb * 1024 * 1024 : undefined, resourceMetadata: { resource_id: resource.id, version: resource.version, @@ -272,7 +276,9 @@ export class ZimService { // Filter out completed jobs (progress === 100) to avoid race condition // where this job itself is still in the active queue const activeIncompleteJobs = activeJobs.filter((job) => { - const progress = typeof job.progress === 'number' ? job.progress : 0 + const progress = typeof job.progress === 'object' && job.progress !== null + ? (job.progress as any).percent + : typeof job.progress === 'number' ? job.progress : 0 return progress < 100 }) @@ -497,6 +503,8 @@ export class ZimService { allowedMimeTypes: ZIM_MIME_TYPES, forceNew: true, filetype: 'zim', + title: selectedOption.name, + totalBytes: selectedOption.size_mb ? selectedOption.size_mb * 1024 * 1024 : undefined, }) if (!result || !result.job) { diff --git a/admin/inertia/components/ActiveDownloads.tsx b/admin/inertia/components/ActiveDownloads.tsx index 9661f22..2b1332f 100644 --- a/admin/inertia/components/ActiveDownloads.tsx +++ b/admin/inertia/components/ActiveDownloads.tsx @@ -1,8 +1,8 @@ +import { useRef, useState, useCallback } from 'react' import useDownloads, { useDownloadsProps } from '~/hooks/useDownloads' -import HorizontalBarChart from './HorizontalBarChart' -import { extractFileName } from '~/lib/util' +import { extractFileName, formatBytes } from '~/lib/util' import StyledSectionHeader from './StyledSectionHeader' -import { IconAlertTriangle, IconX } from '@tabler/icons-react' +import { IconAlertTriangle, IconX, IconLoader2 } from '@tabler/icons-react' import api from '~/lib/api' interface ActiveDownloadProps { @@ -10,62 +10,246 @@ interface ActiveDownloadProps { withHeader?: boolean } +function formatSpeed(bytesPerSec: number): string { + if (bytesPerSec <= 0) return '0 B/s' + if (bytesPerSec < 1024) return `${Math.round(bytesPerSec)} B/s` + if (bytesPerSec < 1024 * 1024) return `${(bytesPerSec / 1024).toFixed(1)} KB/s` + return `${(bytesPerSec / (1024 * 1024)).toFixed(1)} MB/s` +} + +type DownloadStatus = 'queued' | 'active' | 'stalled' | 'failed' + +function getDownloadStatus(download: { + progress: number + lastProgressTime?: number + status?: string +}): DownloadStatus { + if (download.status === 'failed') return 'failed' + if (download.progress === 0 && !download.lastProgressTime) return 'queued' + if (download.lastProgressTime) { + const elapsed = Date.now() - download.lastProgressTime + if (elapsed > 60_000) return 'stalled' + } + return 'active' +} + const ActiveDownloads = ({ filetype, withHeader = false }: ActiveDownloadProps) => { const { data: downloads, invalidate } = useDownloads({ filetype }) + const [cancellingJobs, setCancellingJobs] = useState>(new Set()) + const [confirmingCancel, setConfirmingCancel] = useState(null) + + // Track previous downloadedBytes for speed calculation + const prevBytesRef = useRef>(new Map()) + const speedRef = useRef>(new Map()) + + const getSpeed = useCallback( + (jobId: string, currentBytes?: number): number => { + if (!currentBytes || currentBytes <= 0) return 0 + + const prev = prevBytesRef.current.get(jobId) + const now = Date.now() + + if (prev && prev.bytes > 0 && currentBytes > prev.bytes) { + const deltaBytes = currentBytes - prev.bytes + const deltaSec = (now - prev.time) / 1000 + if (deltaSec > 0) { + const instantSpeed = deltaBytes / deltaSec + + // Simple moving average (last 5 samples) + const samples = speedRef.current.get(jobId) || [] + samples.push(instantSpeed) + if (samples.length > 5) samples.shift() + speedRef.current.set(jobId, samples) + + const avg = samples.reduce((a, b) => a + b, 0) / samples.length + prevBytesRef.current.set(jobId, { bytes: currentBytes, time: now }) + return avg + } + } + + prevBytesRef.current.set(jobId, { bytes: currentBytes, time: now }) + return speedRef.current.get(jobId)?.at(-1) || 0 + }, + [] + ) const handleDismiss = async (jobId: string) => { await api.removeDownloadJob(jobId) invalidate() } + const handleCancel = async (jobId: string) => { + setCancellingJobs((prev) => new Set(prev).add(jobId)) + setConfirmingCancel(null) + try { + await api.cancelDownloadJob(jobId) + // Clean up speed tracking refs + prevBytesRef.current.delete(jobId) + speedRef.current.delete(jobId) + } finally { + setCancellingJobs((prev) => { + const next = new Set(prev) + next.delete(jobId) + return next + }) + invalidate() + } + } + return ( <> {withHeader && }
{downloads && downloads.length > 0 ? ( - downloads.map((download) => ( -
- {download.status === 'failed' ? ( -
- -
-

- {extractFileName(download.filepath) || download.url} -

-

- Download failed{download.failedReason ? `: ${download.failedReason}` : ''} -

+ downloads.map((download) => { + const filename = extractFileName(download.filepath) || download.url + const status = getDownloadStatus(download) + const speed = getSpeed(download.jobId, download.downloadedBytes) + const isCancelling = cancellingJobs.has(download.jobId) + const isConfirming = confirmingCancel === download.jobId + + return ( +
+ {status === 'failed' ? ( +
+ +
+

+ {download.title || filename} +

+ {download.title && ( +

{filename}

+ )} +

+ Download failed{download.failedReason ? `: ${download.failedReason}` : ''} +

+
+
- -
- ) : ( - - )} -
- )) + ) : ( +
+ {/* Title + Cancel button row */} +
+
+

+ {download.title || filename} +

+ {download.title && ( +
+ + {filename} + + + {download.filetype} + +
+ )} + {!download.title && download.filetype && ( + + {download.filetype} + + )} +
+ {isConfirming ? ( +
+ + +
+ ) : isCancelling ? ( + + ) : ( + + )} +
+ + {/* Size info */} +
+ + {download.downloadedBytes && download.totalBytes + ? `${formatBytes(download.downloadedBytes, 1)} / ${formatBytes(download.totalBytes, 1)}` + : `${download.progress}% / 100%`} + +
+ + {/* Progress bar */} +
+
+
+
+
15 + ? 'left-2 text-white drop-shadow-md' + : 'right-2 text-desert-green' + }`} + > + {Math.round(download.progress)}% +
+
+ + {/* Status indicator */} +
+ {status === 'queued' && ( + <> +
+ Waiting... + + )} + {status === 'active' && ( + <> +
+ + Downloading...{speed > 0 ? ` ${formatSpeed(speed)}` : ''} + + + )} + {status === 'stalled' && download.lastProgressTime && ( + <> +
+ + No data received for{' '} + {Math.floor((Date.now() - download.lastProgressTime) / 60_000)}m... + + + )} +
+
+ )} +
+ ) + }) ) : (

No active downloads

)} diff --git a/admin/inertia/lib/api.ts b/admin/inertia/lib/api.ts index a47f326..ec18f68 100644 --- a/admin/inertia/lib/api.ts +++ b/admin/inertia/lib/api.ts @@ -563,6 +563,15 @@ class API { })() } + async cancelDownloadJob(jobId: string): Promise<{ success: boolean; message: string } | undefined> { + return catchInternal(async () => { + const response = await this.client.post<{ success: boolean; message: string }>( + `/downloads/jobs/${jobId}/cancel` + ) + return response.data + })() + } + async runBenchmark(type: BenchmarkType, sync: boolean = false) { return catchInternal(async () => { const response = await this.client.post( diff --git a/admin/start/routes.ts b/admin/start/routes.ts index 631c528..33215e1 100644 --- a/admin/start/routes.ts +++ b/admin/start/routes.ts @@ -93,6 +93,7 @@ router router.get('/jobs', [DownloadsController, 'index']) router.get('/jobs/:filetype', [DownloadsController, 'filetype']) router.delete('/jobs/:jobId', [DownloadsController, 'removeJob']) + router.post('/jobs/:jobId/cancel', [DownloadsController, 'cancelJob']) }) .prefix('/api/downloads') diff --git a/admin/types/downloads.ts b/admin/types/downloads.ts index b552acf..f280bf6 100644 --- a/admin/types/downloads.ts +++ b/admin/types/downloads.ts @@ -23,11 +23,20 @@ export type DoResumableDownloadProgress = { url: string } +export type DownloadProgressData = { + percent: number + downloadedBytes: number + totalBytes: number + lastProgressTime: number +} + export type RunDownloadJobParams = Omit< DoResumableDownloadParams, 'onProgress' | 'onComplete' | 'signal' > & { filetype: string + title?: string + totalBytes?: number resourceMetadata?: { resource_id: string version: string @@ -41,6 +50,10 @@ export type DownloadJobWithProgress = { progress: number filepath: string filetype: string + title?: string + downloadedBytes?: number + totalBytes?: number + lastProgressTime?: number status?: 'active' | 'failed' failedReason?: string }