diff --git a/admin/app/controllers/downloads_controller.ts b/admin/app/controllers/downloads_controller.ts index 023806b..0a9b1e3 100644 --- a/admin/app/controllers/downloads_controller.ts +++ b/admin/app/controllers/downloads_controller.ts @@ -20,4 +20,8 @@ export default class DownloadsController { await this.downloadService.removeFailedJob(params.jobId) return { success: true } } + + async cancelJob({ params }: HttpContext) { + return this.downloadService.cancelJob(params.jobId) + } } diff --git a/admin/app/controllers/zim_controller.ts b/admin/app/controllers/zim_controller.ts index 4bb00e3..96adf63 100644 --- a/admin/app/controllers/zim_controller.ts +++ b/admin/app/controllers/zim_controller.ts @@ -27,7 +27,7 @@ export default class ZimController { async downloadRemote({ request }: HttpContext) { const payload = await request.validateUsing(remoteDownloadWithMetadataValidator) assertNotPrivateUrl(payload.url) - const { filename, jobId } = await this.zimService.downloadRemote(payload.url) + const { filename, jobId } = await this.zimService.downloadRemote(payload.url, payload.metadata) return { message: 'Download started successfully', diff --git a/admin/app/jobs/run_download_job.ts b/admin/app/jobs/run_download_job.ts index c7f672e..5ce3cc3 100644 --- a/admin/app/jobs/run_download_job.ts +++ b/admin/app/jobs/run_download_job.ts @@ -1,5 +1,5 @@ -import { Job } from 'bullmq' -import { RunDownloadJobParams } from '../../types/downloads.js' +import { Job, UnrecoverableError } from 'bullmq' +import { RunDownloadJobParams, DownloadProgressData } from '../../types/downloads.js' import { QueueService } from '#services/queue_service' import { doResumableDownload } from '../utils/downloads.js' import { createHash } from 'crypto' @@ -17,100 +17,170 @@ export class RunDownloadJob { return 'run-download' } + /** In-memory registry of abort controllers for active download jobs */ + static abortControllers: Map = new Map() + static getJobId(url: string): string { return createHash('sha256').update(url).digest('hex').slice(0, 16) } + /** Redis key used to signal cancellation across processes */ + static cancelKey(jobId: string): string { + return `nomad:download:cancel:${jobId}` + } + + /** Signal cancellation via Redis so the worker process can pick it up */ + static async signalCancel(jobId: string): Promise { + const queueService = new QueueService() + const queue = queueService.getQueue(this.queue) + const client = await queue.client + await client.set(this.cancelKey(jobId), '1', 'EX', 300) // 5 min TTL + } + async handle(job: Job) { const { url, filepath, timeout, allowedMimeTypes, forceNew, filetype, resourceMetadata } = job.data as RunDownloadJobParams - await doResumableDownload({ - url, - filepath, - timeout, - allowedMimeTypes, - forceNew, - onProgress(progress) { - const progressPercent = (progress.downloadedBytes / (progress.totalBytes || 1)) * 100 - job.updateProgress(Math.floor(progressPercent)) - }, - async onComplete(url) { - try { - // Create InstalledResource entry if metadata was provided - if (resourceMetadata) { - const { default: InstalledResource } = await import('#models/installed_resource') - const { DateTime } = await import('luxon') - const { getFileStatsIfExists, deleteFileIfExists } = await import('../utils/fs.js') - const stats = await getFileStatsIfExists(filepath) + // Register abort controller for this job + const abortController = new AbortController() + RunDownloadJob.abortControllers.set(job.id!, abortController) - // Look up the old entry so we can clean up the previous file after updating - const oldEntry = await InstalledResource.query() - .where('resource_id', resourceMetadata.resource_id) - .where('resource_type', filetype as 'zim' | 'map') - .first() - const oldFilePath = oldEntry?.file_path ?? null + // Get Redis client for checking cancel signals from the API process + const queueService = new QueueService() + const cancelRedis = await queueService.getQueue(RunDownloadJob.queue).client - await InstalledResource.updateOrCreate( - { resource_id: resourceMetadata.resource_id, resource_type: filetype as 'zim' | 'map' }, - { - version: resourceMetadata.version, - collection_ref: resourceMetadata.collection_ref, - url: url, - file_path: filepath, - file_size_bytes: stats ? Number(stats.size) : null, - installed_at: DateTime.now(), - } - ) + let lastKnownProgress: Pick = { + downloadedBytes: 0, + totalBytes: 0, + } - // Delete the old file if it differs from the new one - if (oldFilePath && oldFilePath !== filepath) { - try { - await deleteFileIfExists(oldFilePath) - console.log(`[RunDownloadJob] Deleted old file: ${oldFilePath}`) - } catch (deleteError) { - console.warn( - `[RunDownloadJob] Failed to delete old file ${oldFilePath}:`, - deleteError - ) - } - } - } - - if (filetype === 'zim') { - const dockerService = new DockerService() - const zimService = new ZimService(dockerService) - await zimService.downloadRemoteSuccessCallback([url], true) - - // Only dispatch embedding job if AI Assistant (Ollama) is installed - const ollamaUrl = await dockerService.getServiceURL('nomad_ollama') - if (ollamaUrl) { - try { - await EmbedFileJob.dispatch({ - fileName: url.split('/').pop() || '', - filePath: filepath, - }) - } catch (error) { - console.error(`[RunDownloadJob] Error dispatching EmbedFileJob for URL ${url}:`, error) - } - } - } else if (filetype === 'map') { - const mapsService = new MapService() - await mapsService.downloadRemoteSuccessCallback([url], false) - } - } catch (error) { - console.error( - `[RunDownloadJob] Error in download success callback for URL ${url}:`, - error - ) + // Poll Redis for cancel signal every 2s — independent of progress events so cancellation + // works even when the stream is stalled and no onProgress ticks are firing. + let cancelPollInterval: ReturnType | null = setInterval(async () => { + try { + const val = await cancelRedis.get(RunDownloadJob.cancelKey(job.id!)) + if (val) { + await cancelRedis.del(RunDownloadJob.cancelKey(job.id!)) + abortController.abort() } - job.updateProgress(100) - }, - }) + } catch { + // Redis errors are non-fatal; in-process AbortController covers same-process cancels + } + }, 2000) - return { - url, - filepath, + try { + await doResumableDownload({ + url, + filepath, + timeout, + allowedMimeTypes, + forceNew, + signal: abortController.signal, + onProgress(progress) { + const progressPercent = (progress.downloadedBytes / (progress.totalBytes || 1)) * 100 + const progressData: DownloadProgressData = { + percent: Math.floor(progressPercent), + downloadedBytes: progress.downloadedBytes, + totalBytes: progress.totalBytes, + lastProgressTime: Date.now(), + } + job.updateProgress(progressData) + lastKnownProgress = { downloadedBytes: progress.downloadedBytes, totalBytes: progress.totalBytes } + }, + async onComplete(url) { + try { + // Create InstalledResource entry if metadata was provided + if (resourceMetadata) { + const { default: InstalledResource } = await import('#models/installed_resource') + const { DateTime } = await import('luxon') + const { getFileStatsIfExists, deleteFileIfExists } = await import('../utils/fs.js') + const stats = await getFileStatsIfExists(filepath) + + // Look up the old entry so we can clean up the previous file after updating + const oldEntry = await InstalledResource.query() + .where('resource_id', resourceMetadata.resource_id) + .where('resource_type', filetype as 'zim' | 'map') + .first() + const oldFilePath = oldEntry?.file_path ?? null + + await InstalledResource.updateOrCreate( + { resource_id: resourceMetadata.resource_id, resource_type: filetype as 'zim' | 'map' }, + { + version: resourceMetadata.version, + collection_ref: resourceMetadata.collection_ref, + url: url, + file_path: filepath, + file_size_bytes: stats ? Number(stats.size) : null, + installed_at: DateTime.now(), + } + ) + + // Delete the old file if it differs from the new one + if (oldFilePath && oldFilePath !== filepath) { + try { + await deleteFileIfExists(oldFilePath) + console.log(`[RunDownloadJob] Deleted old file: ${oldFilePath}`) + } catch (deleteError) { + console.warn( + `[RunDownloadJob] Failed to delete old file ${oldFilePath}:`, + deleteError + ) + } + } + } + + if (filetype === 'zim') { + const dockerService = new DockerService() + const zimService = new ZimService(dockerService) + await zimService.downloadRemoteSuccessCallback([url], true) + + // Only dispatch embedding job if AI Assistant (Ollama) is installed + const ollamaUrl = await dockerService.getServiceURL('nomad_ollama') + if (ollamaUrl) { + try { + await EmbedFileJob.dispatch({ + fileName: url.split('/').pop() || '', + filePath: filepath, + }) + } catch (error) { + console.error(`[RunDownloadJob] Error dispatching EmbedFileJob for URL ${url}:`, error) + } + } + } else if (filetype === 'map') { + const mapsService = new MapService() + await mapsService.downloadRemoteSuccessCallback([url], false) + } + } catch (error) { + console.error( + `[RunDownloadJob] Error in download success callback for URL ${url}:`, + error + ) + } + job.updateProgress({ + percent: 100, + downloadedBytes: lastKnownProgress.downloadedBytes, + totalBytes: lastKnownProgress.totalBytes, + lastProgressTime: Date.now(), + } as DownloadProgressData) + }, + }) + + return { + url, + filepath, + } + } catch (error: any) { + // If this was a cancellation abort, don't let BullMQ retry + if (error?.message?.includes('aborted') || error?.message?.includes('cancelled')) { + throw new UnrecoverableError(`Download cancelled: ${error.message}`) + } + throw error + } finally { + if (cancelPollInterval !== null) { + clearInterval(cancelPollInterval) + cancelPollInterval = null + } + RunDownloadJob.abortControllers.delete(job.id!) } } @@ -121,6 +191,29 @@ export class RunDownloadJob { return await queue.getJob(jobId) } + /** + * Check if a download is actively in progress for the given URL. + * Returns the job only if it's in an active state (active, waiting, delayed). + * If the job exists in a terminal state (failed, completed), removes it and returns undefined. + */ + static async getActiveByUrl(url: string): Promise { + const job = await this.getByUrl(url) + if (!job) return undefined + + const state = await job.getState() + if (state === 'active' || state === 'waiting' || state === 'delayed') { + return job + } + + // Terminal state -- clean up stale job so it doesn't block re-download + try { + await job.remove() + } catch { + // May already be gone + } + return undefined + } + static async dispatch(params: RunDownloadJobParams) { const queueService = new QueueService() const queue = queueService.getQueue(this.queue) diff --git a/admin/app/services/download_service.ts b/admin/app/services/download_service.ts index a2b7faf..40cdca3 100644 --- a/admin/app/services/download_service.ts +++ b/admin/app/services/download_service.ts @@ -2,27 +2,64 @@ import { inject } from '@adonisjs/core' import { QueueService } from './queue_service.js' import { RunDownloadJob } from '#jobs/run_download_job' import { DownloadModelJob } from '#jobs/download_model_job' -import { DownloadJobWithProgress } from '../../types/downloads.js' +import { DownloadJobWithProgress, DownloadProgressData } from '../../types/downloads.js' import { normalize } from 'path' +import { deleteFileIfExists } from '../utils/fs.js' @inject() export class DownloadService { constructor(private queueService: QueueService) {} - async listDownloadJobs(filetype?: string): Promise { - // Get regular file download jobs (zim, map, etc.) - const queue = this.queueService.getQueue(RunDownloadJob.queue) - const fileJobs = await queue.getJobs(['waiting', 'active', 'delayed', 'failed']) + private parseProgress(progress: any): { percent: number; downloadedBytes?: number; totalBytes?: number; lastProgressTime?: number } { + if (typeof progress === 'object' && progress !== null && 'percent' in progress) { + const p = progress as DownloadProgressData + return { + percent: p.percent, + downloadedBytes: p.downloadedBytes, + totalBytes: p.totalBytes, + lastProgressTime: p.lastProgressTime, + } + } + // Backward compat: plain integer from in-flight jobs during upgrade + return { percent: parseInt(String(progress), 10) || 0 } + } - const fileDownloads = fileJobs.map((job) => ({ - jobId: job.id!.toString(), - url: job.data.url, - progress: parseInt(job.progress.toString(), 10), - filepath: normalize(job.data.filepath), - filetype: job.data.filetype, - status: (job.failedReason ? 'failed' : 'active') as 'active' | 'failed', - failedReason: job.failedReason || undefined, - })) + async listDownloadJobs(filetype?: string): Promise { + // Get regular file download jobs (zim, map, etc.) — query each state separately so we can + // tag each job with its actual BullMQ state rather than guessing from progress data. + const queue = this.queueService.getQueue(RunDownloadJob.queue) + type FileJobState = 'waiting' | 'active' | 'delayed' | 'failed' + + const [waitingJobs, activeJobs, delayedJobs, failedJobs] = await Promise.all([ + queue.getJobs(['waiting']), + queue.getJobs(['active']), + queue.getJobs(['delayed']), + queue.getJobs(['failed']), + ]) + + const taggedFileJobs: Array<{ job: (typeof waitingJobs)[0]; state: FileJobState }> = [ + ...waitingJobs.map((j) => ({ job: j, state: 'waiting' as const })), + ...activeJobs.map((j) => ({ job: j, state: 'active' as const })), + ...delayedJobs.map((j) => ({ job: j, state: 'delayed' as const })), + ...failedJobs.map((j) => ({ job: j, state: 'failed' as const })), + ] + + const fileDownloads = taggedFileJobs.map(({ job, state }) => { + const parsed = this.parseProgress(job.progress) + return { + jobId: job.id!.toString(), + url: job.data.url, + progress: parsed.percent, + filepath: normalize(job.data.filepath), + filetype: job.data.filetype, + title: job.data.title || undefined, + downloadedBytes: parsed.downloadedBytes, + totalBytes: parsed.totalBytes || job.data.totalBytes || undefined, + lastProgressTime: parsed.lastProgressTime, + status: state, + failedReason: job.failedReason || undefined, + } + }) // Get Ollama model download jobs const modelQueue = this.queueService.getQueue(DownloadModelJob.queue) @@ -56,9 +93,106 @@ export class DownloadService { const queue = this.queueService.getQueue(queueName) const job = await queue.getJob(jobId) if (job) { - await job.remove() + try { + await job.remove() + } catch { + // Job may be locked by the worker after cancel. Remove the stale lock and retry. + try { + const client = await queue.client + await client.del(`bull:${queueName}:${jobId}:lock`) + await job.remove() + } catch { + // Last resort: already removed or truly stuck + } + } return } } } + + async cancelJob(jobId: string): Promise<{ success: boolean; message: string }> { + const queue = this.queueService.getQueue(RunDownloadJob.queue) + const job = await queue.getJob(jobId) + + if (!job) { + // Job already completed (removeOnComplete: true) or doesn't exist + return { success: true, message: 'Job not found (may have already completed)' } + } + + const filepath = job.data.filepath + + // Signal the worker process to abort the download via Redis + await RunDownloadJob.signalCancel(jobId) + + // Also try in-memory abort (works if worker is in same process) + RunDownloadJob.abortControllers.get(jobId)?.abort() + RunDownloadJob.abortControllers.delete(jobId) + + // Poll for terminal state (up to 4s at 250ms intervals) — cooperates with BullMQ's lifecycle + // instead of force-removing an active job and losing the worker's failure/cleanup path. + const POLL_INTERVAL_MS = 250 + const POLL_TIMEOUT_MS = 4000 + const deadline = Date.now() + POLL_TIMEOUT_MS + let reachedTerminal = false + + while (Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)) + try { + const state = await job.getState() + if (state === 'failed' || state === 'completed' || state === 'unknown') { + reachedTerminal = true + break + } + } catch { + reachedTerminal = true // getState() throws if job is already gone + break + } + } + + if (!reachedTerminal) { + console.warn(`[DownloadService] cancelJob: job ${jobId} did not reach terminal state within timeout, removing anyway`) + } + + // Remove the BullMQ job + try { + await job.remove() + } catch { + // Lock contention fallback: clear lock and retry once + try { + const client = await queue.client + await client.del(`bull:${RunDownloadJob.queue}:${jobId}:lock`) + const updatedJob = await queue.getJob(jobId) + if (updatedJob) await updatedJob.remove() + } catch { + // Best effort - job will be cleaned up on next dismiss attempt + } + } + + // Delete the partial file from disk + if (filepath) { + try { + await deleteFileIfExists(filepath) + // Also try .tmp in case PR #448 staging is merged + await deleteFileIfExists(filepath + '.tmp') + } catch { + // File may not exist yet (waiting job) + } + } + + // If this was a Wikipedia download, update selection status to failed + // (the worker's failed event may not fire if we removed the job first) + if (job.data.filetype === 'zim' && job.data.url?.includes('wikipedia_en_')) { + try { + const { DockerService } = await import('#services/docker_service') + const { ZimService } = await import('#services/zim_service') + const dockerService = new DockerService() + const zimService = new ZimService(dockerService) + await zimService.onWikipediaDownloadComplete(job.data.url, false) + } catch { + // Best effort + } + } + + return { success: true, message: 'Download cancelled and partial file deleted' } + } } diff --git a/admin/app/services/map_service.ts b/admin/app/services/map_service.ts index 483b215..395ff71 100644 --- a/admin/app/services/map_service.ts +++ b/admin/app/services/map_service.ts @@ -119,7 +119,7 @@ export class MapService implements IMapService { const downloadFilenames: string[] = [] for (const resource of toDownload) { - const existing = await RunDownloadJob.getByUrl(resource.url) + const existing = await RunDownloadJob.getActiveByUrl(resource.url) if (existing) { logger.warn(`[MapService] Download already in progress for URL ${resource.url}, skipping.`) continue @@ -141,6 +141,7 @@ export class MapService implements IMapService { allowedMimeTypes: PMTILES_MIME_TYPES, forceNew: true, filetype: 'map', + title: (resource as any).title || undefined, resourceMetadata: { resource_id: resource.id, version: resource.version, @@ -189,7 +190,7 @@ export class MapService implements IMapService { throw new Error(`Invalid PMTiles file URL: ${url}. URL must end with .pmtiles`) } - const existing = await RunDownloadJob.getByUrl(url) + const existing = await RunDownloadJob.getActiveByUrl(url) if (existing) { throw new Error(`Download already in progress for URL ${url}`) } diff --git a/admin/app/services/zim_service.ts b/admin/app/services/zim_service.ts index 3eee1cb..bc587aa 100644 --- a/admin/app/services/zim_service.ts +++ b/admin/app/services/zim_service.ts @@ -137,13 +137,13 @@ export class ZimService { } } - async downloadRemote(url: string): Promise<{ filename: string; jobId?: string }> { + async downloadRemote(url: string, metadata?: { title?: string; summary?: string; author?: string; size_bytes?: number }): Promise<{ filename: string; jobId?: string }> { const parsed = new URL(url) if (!parsed.pathname.endsWith('.zim')) { throw new Error(`Invalid ZIM file URL: ${url}. URL must end with .zim`) } - const existing = await RunDownloadJob.getByUrl(url) + const existing = await RunDownloadJob.getActiveByUrl(url) if (existing) { throw new Error('A download for this URL is already in progress') } @@ -170,6 +170,8 @@ export class ZimService { allowedMimeTypes: ZIM_MIME_TYPES, forceNew: true, filetype: 'zim', + title: metadata?.title, + totalBytes: metadata?.size_bytes, resourceMetadata, }) @@ -219,7 +221,7 @@ export class ZimService { const downloadFilenames: string[] = [] for (const resource of toDownload) { - const existingJob = await RunDownloadJob.getByUrl(resource.url) + const existingJob = await RunDownloadJob.getActiveByUrl(resource.url) if (existingJob) { logger.warn(`[ZimService] Download already in progress for ${resource.url}, skipping.`) continue @@ -238,6 +240,8 @@ export class ZimService { allowedMimeTypes: ZIM_MIME_TYPES, forceNew: true, filetype: 'zim', + title: (resource as any).title || undefined, + totalBytes: (resource as any).size_mb ? (resource as any).size_mb * 1024 * 1024 : undefined, resourceMetadata: { resource_id: resource.id, version: resource.version, @@ -272,7 +276,9 @@ export class ZimService { // Filter out completed jobs (progress === 100) to avoid race condition // where this job itself is still in the active queue const activeIncompleteJobs = activeJobs.filter((job) => { - const progress = typeof job.progress === 'number' ? job.progress : 0 + const progress = typeof job.progress === 'object' && job.progress !== null + ? (job.progress as any).percent + : typeof job.progress === 'number' ? job.progress : 0 return progress < 100 }) @@ -458,7 +464,7 @@ export class ZimService { } // Check if already downloading - const existingJob = await RunDownloadJob.getByUrl(selectedOption.url) + const existingJob = await RunDownloadJob.getActiveByUrl(selectedOption.url) if (existingJob) { return { success: false, message: 'Download already in progress' } } @@ -497,6 +503,8 @@ export class ZimService { allowedMimeTypes: ZIM_MIME_TYPES, forceNew: true, filetype: 'zim', + title: selectedOption.name, + totalBytes: selectedOption.size_mb ? selectedOption.size_mb * 1024 * 1024 : undefined, }) if (!result || !result.job) { diff --git a/admin/inertia/components/ActiveDownloads.tsx b/admin/inertia/components/ActiveDownloads.tsx index 9661f22..b4a53ca 100644 --- a/admin/inertia/components/ActiveDownloads.tsx +++ b/admin/inertia/components/ActiveDownloads.tsx @@ -1,8 +1,8 @@ +import { useRef, useState, useCallback } from 'react' import useDownloads, { useDownloadsProps } from '~/hooks/useDownloads' -import HorizontalBarChart from './HorizontalBarChart' -import { extractFileName } from '~/lib/util' +import { extractFileName, formatBytes } from '~/lib/util' import StyledSectionHeader from './StyledSectionHeader' -import { IconAlertTriangle, IconX } from '@tabler/icons-react' +import { IconAlertTriangle, IconX, IconLoader2 } from '@tabler/icons-react' import api from '~/lib/api' interface ActiveDownloadProps { @@ -10,62 +10,251 @@ interface ActiveDownloadProps { withHeader?: boolean } +function formatSpeed(bytesPerSec: number): string { + if (bytesPerSec <= 0) return '0 B/s' + if (bytesPerSec < 1024) return `${Math.round(bytesPerSec)} B/s` + if (bytesPerSec < 1024 * 1024) return `${(bytesPerSec / 1024).toFixed(1)} KB/s` + return `${(bytesPerSec / (1024 * 1024)).toFixed(1)} MB/s` +} + +type DownloadStatus = 'queued' | 'active' | 'stalled' | 'failed' + +function getDownloadStatus(download: { + progress: number + lastProgressTime?: number + status?: string +}): DownloadStatus { + if (download.status === 'failed') return 'failed' + if (download.status === 'waiting' || download.status === 'delayed') return 'queued' + // Fallback heuristic for model jobs and in-flight jobs from before this deploy + if (download.progress === 0 && !download.lastProgressTime) return 'queued' + if (download.lastProgressTime) { + const elapsed = Date.now() - download.lastProgressTime + if (elapsed > 60_000) return 'stalled' + } + return 'active' +} + const ActiveDownloads = ({ filetype, withHeader = false }: ActiveDownloadProps) => { const { data: downloads, invalidate } = useDownloads({ filetype }) + const [cancellingJobs, setCancellingJobs] = useState>(new Set()) + const [confirmingCancel, setConfirmingCancel] = useState(null) + + // Track previous downloadedBytes for speed calculation + const prevBytesRef = useRef>(new Map()) + const speedRef = useRef>(new Map()) + + const getSpeed = useCallback( + (jobId: string, currentBytes?: number): number => { + if (!currentBytes || currentBytes <= 0) return 0 + + const prev = prevBytesRef.current.get(jobId) + const now = Date.now() + + if (prev && prev.bytes > 0 && currentBytes > prev.bytes) { + const deltaBytes = currentBytes - prev.bytes + const deltaSec = (now - prev.time) / 1000 + if (deltaSec > 0) { + const instantSpeed = deltaBytes / deltaSec + + // Simple moving average (last 5 samples) + const samples = speedRef.current.get(jobId) || [] + samples.push(instantSpeed) + if (samples.length > 5) samples.shift() + speedRef.current.set(jobId, samples) + + const avg = samples.reduce((a, b) => a + b, 0) / samples.length + prevBytesRef.current.set(jobId, { bytes: currentBytes, time: now }) + return avg + } + } + + // Only set initial observation; never advance timestamp when bytes unchanged + if (!prev) { + prevBytesRef.current.set(jobId, { bytes: currentBytes, time: now }) + } + return speedRef.current.get(jobId)?.at(-1) || 0 + }, + [] + ) const handleDismiss = async (jobId: string) => { await api.removeDownloadJob(jobId) invalidate() } + const handleCancel = async (jobId: string) => { + setCancellingJobs((prev) => new Set(prev).add(jobId)) + setConfirmingCancel(null) + try { + await api.cancelDownloadJob(jobId) + // Clean up speed tracking refs + prevBytesRef.current.delete(jobId) + speedRef.current.delete(jobId) + } finally { + setCancellingJobs((prev) => { + const next = new Set(prev) + next.delete(jobId) + return next + }) + invalidate() + } + } + return ( <> {withHeader && }
{downloads && downloads.length > 0 ? ( - downloads.map((download) => ( -
- {download.status === 'failed' ? ( -
- -
-

- {extractFileName(download.filepath) || download.url} -

-

- Download failed{download.failedReason ? `: ${download.failedReason}` : ''} -

+ downloads.map((download) => { + const filename = extractFileName(download.filepath) || download.url + const status = getDownloadStatus(download) + const speed = getSpeed(download.jobId, download.downloadedBytes) + const isCancelling = cancellingJobs.has(download.jobId) + const isConfirming = confirmingCancel === download.jobId + + return ( +
+ {status === 'failed' ? ( +
+ +
+

+ {download.title || filename} +

+ {download.title && ( +

{filename}

+ )} +

+ Download failed{download.failedReason ? `: ${download.failedReason}` : ''} +

+
+
- -
- ) : ( - - )} -
- )) + ) : ( +
+ {/* Title + Cancel button row */} +
+
+

+ {download.title || filename} +

+ {download.title && ( +
+ + {filename} + + + {download.filetype} + +
+ )} + {!download.title && download.filetype && ( + + {download.filetype} + + )} +
+ {isConfirming ? ( +
+ + +
+ ) : isCancelling ? ( + + ) : ( + + )} +
+ + {/* Size info */} +
+ + {download.downloadedBytes && download.totalBytes + ? `${formatBytes(download.downloadedBytes, 1)} / ${formatBytes(download.totalBytes, 1)}` + : `${download.progress}% / 100%`} + +
+ + {/* Progress bar */} +
+
+
+
+
15 + ? 'left-2 text-white drop-shadow-md' + : 'right-2 text-desert-green' + }`} + > + {Math.round(download.progress)}% +
+
+ + {/* Status indicator */} +
+ {status === 'queued' && ( + <> +
+ Waiting... + + )} + {status === 'active' && ( + <> +
+ + Downloading...{speed > 0 ? ` ${formatSpeed(speed)}` : ''} + + + )} + {status === 'stalled' && download.lastProgressTime && ( + <> +
+ + No data received for{' '} + {Math.floor((Date.now() - download.lastProgressTime) / 60_000)}m... + + + )} +
+
+ )} +
+ ) + }) ) : (

No active downloads

)} diff --git a/admin/inertia/lib/api.ts b/admin/inertia/lib/api.ts index c405875..a37e40f 100644 --- a/admin/inertia/lib/api.ts +++ b/admin/inertia/lib/api.ts @@ -618,6 +618,15 @@ class API { })() } + async cancelDownloadJob(jobId: string): Promise<{ success: boolean; message: string } | undefined> { + return catchInternal(async () => { + const response = await this.client.post<{ success: boolean; message: string }>( + `/downloads/jobs/${jobId}/cancel` + ) + return response.data + })() + } + async runBenchmark(type: BenchmarkType, sync: boolean = false) { return catchInternal(async () => { const response = await this.client.post( diff --git a/admin/start/routes.ts b/admin/start/routes.ts index dab2192..06ec524 100644 --- a/admin/start/routes.ts +++ b/admin/start/routes.ts @@ -95,6 +95,7 @@ router router.get('/jobs', [DownloadsController, 'index']) router.get('/jobs/:filetype', [DownloadsController, 'filetype']) router.delete('/jobs/:jobId', [DownloadsController, 'removeJob']) + router.post('/jobs/:jobId/cancel', [DownloadsController, 'cancelJob']) }) .prefix('/api/downloads') diff --git a/admin/types/downloads.ts b/admin/types/downloads.ts index b552acf..71fca26 100644 --- a/admin/types/downloads.ts +++ b/admin/types/downloads.ts @@ -23,11 +23,20 @@ export type DoResumableDownloadProgress = { url: string } +export type DownloadProgressData = { + percent: number + downloadedBytes: number + totalBytes: number + lastProgressTime: number +} + export type RunDownloadJobParams = Omit< DoResumableDownloadParams, 'onProgress' | 'onComplete' | 'signal' > & { filetype: string + title?: string + totalBytes?: number resourceMetadata?: { resource_id: string version: string @@ -41,7 +50,11 @@ export type DownloadJobWithProgress = { progress: number filepath: string filetype: string - status?: 'active' | 'failed' + title?: string + downloadedBytes?: number + totalBytes?: number + lastProgressTime?: number + status?: 'active' | 'waiting' | 'delayed' | 'failed' failedReason?: string }