mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-03-28 03:29:25 +01:00
fix(downloads): fix cancel, dismiss, speed, and retry bugs
- Speed indicator: only set prevBytesRef on first observation to prevent intermediate re-renders from inflating the calculated speed - Cancel: throw UnrecoverableError on abort to prevent BullMQ retries - Dismiss: remove stale BullMQ lock before job.remove() so cancelled jobs can actually be dismissed - Retry: add getActiveByUrl() helper that checks job state before blocking re-download, auto-cleans terminal jobs - Wikipedia: reset selection status to failed on cancel so the "downloading" state doesn't persist Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
df4fde16c9
commit
9ef73056ac
|
|
@ -1,4 +1,4 @@
|
||||||
import { Job } from 'bullmq'
|
import { Job, UnrecoverableError } from 'bullmq'
|
||||||
import { RunDownloadJobParams, DownloadProgressData } from '../../types/downloads.js'
|
import { RunDownloadJobParams, DownloadProgressData } from '../../types/downloads.js'
|
||||||
import { QueueService } from '#services/queue_service'
|
import { QueueService } from '#services/queue_service'
|
||||||
import { doResumableDownload } from '../utils/downloads.js'
|
import { doResumableDownload } from '../utils/downloads.js'
|
||||||
|
|
@ -161,6 +161,12 @@ export class RunDownloadJob {
|
||||||
url,
|
url,
|
||||||
filepath,
|
filepath,
|
||||||
}
|
}
|
||||||
|
} catch (error: any) {
|
||||||
|
// If this was a cancellation abort, don't let BullMQ retry
|
||||||
|
if (error?.message?.includes('aborted') || error?.message?.includes('cancelled')) {
|
||||||
|
throw new UnrecoverableError(`Download cancelled: ${error.message}`)
|
||||||
|
}
|
||||||
|
throw error
|
||||||
} finally {
|
} finally {
|
||||||
// Clean up abort controller
|
// Clean up abort controller
|
||||||
RunDownloadJob.abortControllers.delete(job.id!)
|
RunDownloadJob.abortControllers.delete(job.id!)
|
||||||
|
|
@ -174,6 +180,29 @@ export class RunDownloadJob {
|
||||||
return await queue.getJob(jobId)
|
return await queue.getJob(jobId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a download is actively in progress for the given URL.
|
||||||
|
* Returns the job only if it's in an active state (active, waiting, delayed).
|
||||||
|
* If the job exists in a terminal state (failed, completed), removes it and returns undefined.
|
||||||
|
*/
|
||||||
|
static async getActiveByUrl(url: string): Promise<Job | undefined> {
|
||||||
|
const job = await this.getByUrl(url)
|
||||||
|
if (!job) return undefined
|
||||||
|
|
||||||
|
const state = await job.getState()
|
||||||
|
if (state === 'active' || state === 'waiting' || state === 'delayed') {
|
||||||
|
return job
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminal state -- clean up stale job so it doesn't block re-download
|
||||||
|
try {
|
||||||
|
await job.remove()
|
||||||
|
} catch {
|
||||||
|
// May already be gone
|
||||||
|
}
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
|
||||||
static async dispatch(params: RunDownloadJobParams) {
|
static async dispatch(params: RunDownloadJobParams) {
|
||||||
const queueService = new QueueService()
|
const queueService = new QueueService()
|
||||||
const queue = queueService.getQueue(this.queue)
|
const queue = queueService.getQueue(this.queue)
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,18 @@ export class DownloadService {
|
||||||
const queue = this.queueService.getQueue(queueName)
|
const queue = this.queueService.getQueue(queueName)
|
||||||
const job = await queue.getJob(jobId)
|
const job = await queue.getJob(jobId)
|
||||||
if (job) {
|
if (job) {
|
||||||
await job.remove()
|
try {
|
||||||
|
await job.remove()
|
||||||
|
} catch {
|
||||||
|
// Job may be locked by the worker after cancel. Remove the stale lock and retry.
|
||||||
|
try {
|
||||||
|
const client = await queue.client
|
||||||
|
await client.del(`bull:${queueName}:${jobId}:lock`)
|
||||||
|
await job.remove()
|
||||||
|
} catch {
|
||||||
|
// Last resort: already removed or truly stuck
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -109,7 +120,18 @@ export class DownloadService {
|
||||||
try {
|
try {
|
||||||
await job.remove()
|
await job.remove()
|
||||||
} catch {
|
} catch {
|
||||||
// Job may still be locked by worker - it will fail on next progress check
|
// Job may still be locked by worker - try again after it reaches terminal state
|
||||||
|
try {
|
||||||
|
const updatedJob = await queue.getJob(jobId)
|
||||||
|
if (updatedJob) {
|
||||||
|
const state = await updatedJob.getState()
|
||||||
|
if (state === 'failed' || state === 'completed') {
|
||||||
|
await updatedJob.remove()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Best effort - job will be cleaned up on next dismiss attempt
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the partial file from disk
|
// Delete the partial file from disk
|
||||||
|
|
@ -123,6 +145,20 @@ export class DownloadService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this was a Wikipedia download, update selection status to failed
|
||||||
|
// (the worker's failed event may not fire if we removed the job first)
|
||||||
|
if (job.data.filetype === 'zim' && job.data.url?.includes('wikipedia_en_')) {
|
||||||
|
try {
|
||||||
|
const { DockerService } = await import('#services/docker_service')
|
||||||
|
const { ZimService } = await import('#services/zim_service')
|
||||||
|
const dockerService = new DockerService()
|
||||||
|
const zimService = new ZimService(dockerService)
|
||||||
|
await zimService.onWikipediaDownloadComplete(job.data.url, false)
|
||||||
|
} catch {
|
||||||
|
// Best effort
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return { success: true, message: 'Download cancelled and partial file deleted' }
|
return { success: true, message: 'Download cancelled and partial file deleted' }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,7 @@ export class MapService implements IMapService {
|
||||||
const downloadFilenames: string[] = []
|
const downloadFilenames: string[] = []
|
||||||
|
|
||||||
for (const resource of toDownload) {
|
for (const resource of toDownload) {
|
||||||
const existing = await RunDownloadJob.getByUrl(resource.url)
|
const existing = await RunDownloadJob.getActiveByUrl(resource.url)
|
||||||
if (existing) {
|
if (existing) {
|
||||||
logger.warn(`[MapService] Download already in progress for URL ${resource.url}, skipping.`)
|
logger.warn(`[MapService] Download already in progress for URL ${resource.url}, skipping.`)
|
||||||
continue
|
continue
|
||||||
|
|
@ -180,7 +180,7 @@ export class MapService implements IMapService {
|
||||||
throw new Error(`Invalid PMTiles file URL: ${url}. URL must end with .pmtiles`)
|
throw new Error(`Invalid PMTiles file URL: ${url}. URL must end with .pmtiles`)
|
||||||
}
|
}
|
||||||
|
|
||||||
const existing = await RunDownloadJob.getByUrl(url)
|
const existing = await RunDownloadJob.getActiveByUrl(url)
|
||||||
if (existing) {
|
if (existing) {
|
||||||
throw new Error(`Download already in progress for URL ${url}`)
|
throw new Error(`Download already in progress for URL ${url}`)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -143,7 +143,7 @@ export class ZimService {
|
||||||
throw new Error(`Invalid ZIM file URL: ${url}. URL must end with .zim`)
|
throw new Error(`Invalid ZIM file URL: ${url}. URL must end with .zim`)
|
||||||
}
|
}
|
||||||
|
|
||||||
const existing = await RunDownloadJob.getByUrl(url)
|
const existing = await RunDownloadJob.getActiveByUrl(url)
|
||||||
if (existing) {
|
if (existing) {
|
||||||
throw new Error('A download for this URL is already in progress')
|
throw new Error('A download for this URL is already in progress')
|
||||||
}
|
}
|
||||||
|
|
@ -221,7 +221,7 @@ export class ZimService {
|
||||||
const downloadFilenames: string[] = []
|
const downloadFilenames: string[] = []
|
||||||
|
|
||||||
for (const resource of toDownload) {
|
for (const resource of toDownload) {
|
||||||
const existingJob = await RunDownloadJob.getByUrl(resource.url)
|
const existingJob = await RunDownloadJob.getActiveByUrl(resource.url)
|
||||||
if (existingJob) {
|
if (existingJob) {
|
||||||
logger.warn(`[ZimService] Download already in progress for ${resource.url}, skipping.`)
|
logger.warn(`[ZimService] Download already in progress for ${resource.url}, skipping.`)
|
||||||
continue
|
continue
|
||||||
|
|
@ -464,7 +464,7 @@ export class ZimService {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if already downloading
|
// Check if already downloading
|
||||||
const existingJob = await RunDownloadJob.getByUrl(selectedOption.url)
|
const existingJob = await RunDownloadJob.getActiveByUrl(selectedOption.url)
|
||||||
if (existingJob) {
|
if (existingJob) {
|
||||||
return { success: false, message: 'Download already in progress' }
|
return { success: false, message: 'Download already in progress' }
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,10 @@ const ActiveDownloads = ({ filetype, withHeader = false }: ActiveDownloadProps)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
prevBytesRef.current.set(jobId, { bytes: currentBytes, time: now })
|
// Only set initial observation; never advance timestamp when bytes unchanged
|
||||||
|
if (!prev) {
|
||||||
|
prevBytesRef.current.set(jobId, { bytes: currentBytes, time: now })
|
||||||
|
}
|
||||||
return speedRef.current.get(jobId)?.at(-1) || 0
|
return speedRef.current.get(jobId)?.at(-1) || 0
|
||||||
},
|
},
|
||||||
[]
|
[]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user