mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-03-28 03:29:25 +01:00
Three bugs caused downloads to hang, disappear, or leave stuck spinners: 1. Wikipedia downloads that failed never updated the DB status from 'downloading', leaving the spinner stuck forever. Now the worker's failed handler marks them as failed. 2. No stall detection on streaming downloads - if data stopped flowing mid-download, the job hung indefinitely. Added a 5-minute stall timer that triggers retry. 3. Failed jobs were invisible to users since only waiting/active/delayed states were queried. Now failed jobs appear with error indicators in the download list. Closes #364, closes #216 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
242 lines
6.5 KiB
TypeScript
242 lines
6.5 KiB
TypeScript
import {
|
|
DoResumableDownloadParams,
|
|
DoResumableDownloadWithRetryParams,
|
|
} from '../../types/downloads.js'
|
|
import axios from 'axios'
|
|
import { Transform } from 'stream'
|
|
import { deleteFileIfExists, ensureDirectoryExists, getFileStatsIfExists } from './fs.js'
|
|
import { createWriteStream } from 'fs'
|
|
import path from 'path'
|
|
|
|
/**
|
|
* Perform a resumable download with progress tracking
|
|
* @param param0 - Download parameters. Leave allowedMimeTypes empty to skip mime type checking.
|
|
* Otherwise, mime types should be in the format "application/pdf", "image/png", etc.
|
|
* @returns Path to the downloaded file
|
|
*/
|
|
export async function doResumableDownload({
|
|
url,
|
|
filepath,
|
|
timeout = 30000,
|
|
signal,
|
|
onProgress,
|
|
onComplete,
|
|
forceNew = false,
|
|
allowedMimeTypes,
|
|
}: DoResumableDownloadParams): Promise<string> {
|
|
const dirname = path.dirname(filepath)
|
|
await ensureDirectoryExists(dirname)
|
|
|
|
// Check if partial file exists for resume
|
|
let startByte = 0
|
|
let appendMode = false
|
|
|
|
const existingStats = await getFileStatsIfExists(filepath)
|
|
if (existingStats && !forceNew) {
|
|
startByte = existingStats.size
|
|
appendMode = true
|
|
}
|
|
|
|
// Get file info with HEAD request first
|
|
const headResponse = await axios.head(url, {
|
|
signal,
|
|
timeout,
|
|
})
|
|
|
|
const contentType = headResponse.headers['content-type'] || ''
|
|
const totalBytes = parseInt(headResponse.headers['content-length'] || '0')
|
|
const supportsRangeRequests = headResponse.headers['accept-ranges'] === 'bytes'
|
|
|
|
// If allowedMimeTypes is provided, check content type
|
|
if (allowedMimeTypes && allowedMimeTypes.length > 0) {
|
|
const isMimeTypeAllowed = allowedMimeTypes.some((mimeType) => contentType.includes(mimeType))
|
|
if (!isMimeTypeAllowed) {
|
|
throw new Error(`MIME type ${contentType} is not allowed`)
|
|
}
|
|
}
|
|
|
|
// If file is already complete and not forcing overwrite just return filepath
|
|
if (startByte === totalBytes && totalBytes > 0 && !forceNew) {
|
|
return filepath
|
|
}
|
|
|
|
// If server doesn't support range requests and we have a partial file, delete it
|
|
if (!supportsRangeRequests && startByte > 0) {
|
|
await deleteFileIfExists(filepath)
|
|
startByte = 0
|
|
appendMode = false
|
|
}
|
|
|
|
const headers: Record<string, string> = {}
|
|
if (supportsRangeRequests && startByte > 0) {
|
|
headers.Range = `bytes=${startByte}-`
|
|
}
|
|
|
|
const response = await axios.get(url, {
|
|
responseType: 'stream',
|
|
headers,
|
|
signal,
|
|
timeout,
|
|
})
|
|
|
|
if (response.status !== 200 && response.status !== 206) {
|
|
throw new Error(`Failed to download: HTTP ${response.status}`)
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
let downloadedBytes = startByte
|
|
let lastProgressTime = Date.now()
|
|
let lastDownloadedBytes = startByte
|
|
|
|
// Stall detection: if no data arrives for 5 minutes, abort the download
|
|
const STALL_TIMEOUT_MS = 5 * 60 * 1000
|
|
let stallTimer: ReturnType<typeof setTimeout> | null = null
|
|
|
|
const clearStallTimer = () => {
|
|
if (stallTimer) {
|
|
clearTimeout(stallTimer)
|
|
stallTimer = null
|
|
}
|
|
}
|
|
|
|
const resetStallTimer = () => {
|
|
clearStallTimer()
|
|
stallTimer = setTimeout(() => {
|
|
cleanup(new Error('Download stalled - no data received for 5 minutes'))
|
|
}, STALL_TIMEOUT_MS)
|
|
}
|
|
|
|
// Progress tracking stream to monitor data flow
|
|
const progressStream = new Transform({
|
|
transform(chunk: Buffer, _: any, callback: Function) {
|
|
downloadedBytes += chunk.length
|
|
resetStallTimer()
|
|
|
|
// Update progress tracking
|
|
const now = Date.now()
|
|
if (onProgress && now - lastProgressTime >= 500) {
|
|
lastProgressTime = now
|
|
lastDownloadedBytes = downloadedBytes
|
|
onProgress({
|
|
downloadedBytes,
|
|
totalBytes,
|
|
lastProgressTime,
|
|
lastDownloadedBytes,
|
|
url,
|
|
})
|
|
}
|
|
|
|
this.push(chunk)
|
|
callback()
|
|
},
|
|
})
|
|
|
|
const writeStream = createWriteStream(filepath, {
|
|
flags: appendMode ? 'a' : 'w',
|
|
})
|
|
|
|
// Handle errors and cleanup
|
|
const cleanup = (error?: Error) => {
|
|
clearStallTimer()
|
|
progressStream.destroy()
|
|
response.data.destroy()
|
|
writeStream.destroy()
|
|
if (error) {
|
|
reject(error)
|
|
}
|
|
}
|
|
|
|
response.data.on('error', cleanup)
|
|
progressStream.on('error', cleanup)
|
|
writeStream.on('error', cleanup)
|
|
writeStream.on('error', cleanup)
|
|
|
|
signal?.addEventListener('abort', () => {
|
|
cleanup(new Error('Download aborted'))
|
|
})
|
|
|
|
writeStream.on('finish', async () => {
|
|
clearStallTimer()
|
|
if (onProgress) {
|
|
onProgress({
|
|
downloadedBytes,
|
|
totalBytes,
|
|
lastProgressTime: Date.now(),
|
|
lastDownloadedBytes: downloadedBytes,
|
|
url,
|
|
})
|
|
}
|
|
if (onComplete) {
|
|
await onComplete(url, filepath)
|
|
}
|
|
resolve(filepath)
|
|
})
|
|
|
|
// Start stall timer and pipe: response -> progressStream -> writeStream
|
|
resetStallTimer()
|
|
response.data.pipe(progressStream).pipe(writeStream)
|
|
})
|
|
}
|
|
|
|
export async function doResumableDownloadWithRetry({
|
|
url,
|
|
filepath,
|
|
signal,
|
|
timeout = 30000,
|
|
onProgress,
|
|
max_retries = 3,
|
|
retry_delay = 2000,
|
|
onAttemptError,
|
|
allowedMimeTypes,
|
|
}: DoResumableDownloadWithRetryParams): Promise<string> {
|
|
const dirname = path.dirname(filepath)
|
|
await ensureDirectoryExists(dirname)
|
|
|
|
let attempt = 0
|
|
let lastError: Error | null = null
|
|
|
|
while (attempt < max_retries) {
|
|
try {
|
|
const result = await doResumableDownload({
|
|
url,
|
|
filepath,
|
|
signal,
|
|
timeout,
|
|
allowedMimeTypes,
|
|
onProgress,
|
|
})
|
|
|
|
return result // return on success
|
|
} catch (error) {
|
|
attempt++
|
|
lastError = error as Error
|
|
|
|
const isAborted = error.name === 'AbortError' || error.code === 'ABORT_ERR'
|
|
const isNetworkError =
|
|
error.code === 'ECONNRESET' || error.code === 'ENOTFOUND' || error.code === 'ETIMEDOUT'
|
|
|
|
onAttemptError?.(error, attempt)
|
|
if (isAborted) {
|
|
throw new Error(`Download aborted for URL: ${url}`)
|
|
}
|
|
|
|
if (attempt < max_retries && isNetworkError) {
|
|
await delay(retry_delay)
|
|
continue
|
|
}
|
|
|
|
// If max retries reached or non-retriable error, throw
|
|
if (attempt >= max_retries || !isNetworkError) {
|
|
throw error
|
|
}
|
|
}
|
|
}
|
|
|
|
// should not reach here, but TypeScript needs a return
|
|
throw lastError || new Error('Unknown error during download')
|
|
}
|
|
|
|
async function delay(ms: number): Promise<void> {
|
|
return new Promise((resolve) => setTimeout(resolve, ms))
|
|
}
|