project-nomad/admin/app/utils/downloads.ts
Chris Sherwood b0b8f07661 fix: improve download reliability with stall detection, failure visibility, and Wikipedia status tracking
Three bugs caused downloads to hang, disappear, or leave stuck spinners:
1. Wikipedia downloads that failed never updated the DB status from 'downloading',
   leaving the spinner stuck forever. Now the worker's failed handler marks them as failed.
2. No stall detection on streaming downloads - if data stopped flowing mid-download,
   the job hung indefinitely. Added a 5-minute stall timer that triggers retry.
3. Failed jobs were invisible to users since only waiting/active/delayed states were
   queried. Now failed jobs appear with error indicators in the download list.

Closes #364, closes #216

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 11:46:10 -07:00

242 lines
6.5 KiB
TypeScript

import {
DoResumableDownloadParams,
DoResumableDownloadWithRetryParams,
} from '../../types/downloads.js'
import axios from 'axios'
import { Transform } from 'stream'
import { deleteFileIfExists, ensureDirectoryExists, getFileStatsIfExists } from './fs.js'
import { createWriteStream } from 'fs'
import path from 'path'
/**
* Perform a resumable download with progress tracking
* @param param0 - Download parameters. Leave allowedMimeTypes empty to skip mime type checking.
* Otherwise, mime types should be in the format "application/pdf", "image/png", etc.
* @returns Path to the downloaded file
*/
export async function doResumableDownload({
url,
filepath,
timeout = 30000,
signal,
onProgress,
onComplete,
forceNew = false,
allowedMimeTypes,
}: DoResumableDownloadParams): Promise<string> {
const dirname = path.dirname(filepath)
await ensureDirectoryExists(dirname)
// Check if partial file exists for resume
let startByte = 0
let appendMode = false
const existingStats = await getFileStatsIfExists(filepath)
if (existingStats && !forceNew) {
startByte = existingStats.size
appendMode = true
}
// Get file info with HEAD request first
const headResponse = await axios.head(url, {
signal,
timeout,
})
const contentType = headResponse.headers['content-type'] || ''
const totalBytes = parseInt(headResponse.headers['content-length'] || '0')
const supportsRangeRequests = headResponse.headers['accept-ranges'] === 'bytes'
// If allowedMimeTypes is provided, check content type
if (allowedMimeTypes && allowedMimeTypes.length > 0) {
const isMimeTypeAllowed = allowedMimeTypes.some((mimeType) => contentType.includes(mimeType))
if (!isMimeTypeAllowed) {
throw new Error(`MIME type ${contentType} is not allowed`)
}
}
// If file is already complete and not forcing overwrite just return filepath
if (startByte === totalBytes && totalBytes > 0 && !forceNew) {
return filepath
}
// If server doesn't support range requests and we have a partial file, delete it
if (!supportsRangeRequests && startByte > 0) {
await deleteFileIfExists(filepath)
startByte = 0
appendMode = false
}
const headers: Record<string, string> = {}
if (supportsRangeRequests && startByte > 0) {
headers.Range = `bytes=${startByte}-`
}
const response = await axios.get(url, {
responseType: 'stream',
headers,
signal,
timeout,
})
if (response.status !== 200 && response.status !== 206) {
throw new Error(`Failed to download: HTTP ${response.status}`)
}
return new Promise((resolve, reject) => {
let downloadedBytes = startByte
let lastProgressTime = Date.now()
let lastDownloadedBytes = startByte
// Stall detection: if no data arrives for 5 minutes, abort the download
const STALL_TIMEOUT_MS = 5 * 60 * 1000
let stallTimer: ReturnType<typeof setTimeout> | null = null
const clearStallTimer = () => {
if (stallTimer) {
clearTimeout(stallTimer)
stallTimer = null
}
}
const resetStallTimer = () => {
clearStallTimer()
stallTimer = setTimeout(() => {
cleanup(new Error('Download stalled - no data received for 5 minutes'))
}, STALL_TIMEOUT_MS)
}
// Progress tracking stream to monitor data flow
const progressStream = new Transform({
transform(chunk: Buffer, _: any, callback: Function) {
downloadedBytes += chunk.length
resetStallTimer()
// Update progress tracking
const now = Date.now()
if (onProgress && now - lastProgressTime >= 500) {
lastProgressTime = now
lastDownloadedBytes = downloadedBytes
onProgress({
downloadedBytes,
totalBytes,
lastProgressTime,
lastDownloadedBytes,
url,
})
}
this.push(chunk)
callback()
},
})
const writeStream = createWriteStream(filepath, {
flags: appendMode ? 'a' : 'w',
})
// Handle errors and cleanup
const cleanup = (error?: Error) => {
clearStallTimer()
progressStream.destroy()
response.data.destroy()
writeStream.destroy()
if (error) {
reject(error)
}
}
response.data.on('error', cleanup)
progressStream.on('error', cleanup)
writeStream.on('error', cleanup)
writeStream.on('error', cleanup)
signal?.addEventListener('abort', () => {
cleanup(new Error('Download aborted'))
})
writeStream.on('finish', async () => {
clearStallTimer()
if (onProgress) {
onProgress({
downloadedBytes,
totalBytes,
lastProgressTime: Date.now(),
lastDownloadedBytes: downloadedBytes,
url,
})
}
if (onComplete) {
await onComplete(url, filepath)
}
resolve(filepath)
})
// Start stall timer and pipe: response -> progressStream -> writeStream
resetStallTimer()
response.data.pipe(progressStream).pipe(writeStream)
})
}
export async function doResumableDownloadWithRetry({
url,
filepath,
signal,
timeout = 30000,
onProgress,
max_retries = 3,
retry_delay = 2000,
onAttemptError,
allowedMimeTypes,
}: DoResumableDownloadWithRetryParams): Promise<string> {
const dirname = path.dirname(filepath)
await ensureDirectoryExists(dirname)
let attempt = 0
let lastError: Error | null = null
while (attempt < max_retries) {
try {
const result = await doResumableDownload({
url,
filepath,
signal,
timeout,
allowedMimeTypes,
onProgress,
})
return result // return on success
} catch (error) {
attempt++
lastError = error as Error
const isAborted = error.name === 'AbortError' || error.code === 'ABORT_ERR'
const isNetworkError =
error.code === 'ECONNRESET' || error.code === 'ENOTFOUND' || error.code === 'ETIMEDOUT'
onAttemptError?.(error, attempt)
if (isAborted) {
throw new Error(`Download aborted for URL: ${url}`)
}
if (attempt < max_retries && isNetworkError) {
await delay(retry_delay)
continue
}
// If max retries reached or non-retriable error, throw
if (attempt >= max_retries || !isNetworkError) {
throw error
}
}
}
// should not reach here, but TypeScript needs a return
throw lastError || new Error('Unknown error during download')
}
async function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}