diff --git a/admin/app/jobs/embed_file_job.ts b/admin/app/jobs/embed_file_job.ts index 844af7d..90ce677 100644 --- a/admin/app/jobs/embed_file_job.ts +++ b/admin/app/jobs/embed_file_job.ts @@ -262,13 +262,24 @@ export class EmbedFileJob { const queue = queueService.getQueue(this.queue) const jobs = await queue.getJobs(['waiting', 'active', 'delayed']) - return jobs.map((job) => ({ - jobId: job.id!.toString(), - fileName: (job.data as EmbedFileJobParams).fileName, - filePath: (job.data as EmbedFileJobParams).filePath, - progress: typeof job.progress === 'number' ? job.progress : 0, - status: ((job.data as any).status as string) ?? 'waiting', - })) + return jobs.map((job) => { + const data = job.data as EmbedFileJobParams & { + status?: string + lastBatchAt?: number + startedAt?: number + chunks?: number + } + return { + jobId: job.id!.toString(), + fileName: data.fileName, + filePath: data.filePath, + progress: typeof job.progress === 'number' ? job.progress : 0, + status: data.status ?? 'waiting', + lastBatchAt: data.lastBatchAt, + startedAt: data.startedAt, + chunks: data.chunks, + } + }) } static async getByFilePath(filePath: string): Promise { diff --git a/admin/app/utils/kb_job_health.ts b/admin/app/utils/kb_job_health.ts new file mode 100644 index 0000000..d5e154f --- /dev/null +++ b/admin/app/utils/kb_job_health.ts @@ -0,0 +1,50 @@ +/** + * Visual status assigned to an in-flight (or stuck) embedding job, used to + * pick the colored status pill in the KB Processing Queue. See RFC #883 §5. + * + * - `waiting` — queued, no batch has started yet + * - `healthy` — last batch < 2 minutes ago + * - `slow` — last batch 2-5 minutes ago (CPU-paced multi-batch ingestion + * falls into this band; not necessarily a problem) + * - `stalled` — last batch > 5 minutes ago (likely a real problem) + * - `failed` — job recorded a failed status + */ +export type JobHealthStatus = 'waiting' | 'healthy' | 'slow' | 'stalled' | 'failed' + +export interface JobHealthInput { + /** BullMQ job.data.status — set by EmbedFileJob.handle on transitions. */ + status: string + /** 0-100. 0 means no work observed yet on this job-row. */ + progress: number + /** ms epoch of the last completed batch. Multi-batch ZIMs update this on + * every continuation; single-batch jobs leave it unset until completion. */ + lastBatchAt?: number + /** ms epoch of the first batch start. Used as a fallback "last activity" + * signal for jobs that haven't yet completed their first batch. */ + startedAt?: number + /** Current ms epoch. Injected for testability. */ + now: number +} + +const SLOW_THRESHOLD_MS = 2 * 60 * 1000 +const STALLED_THRESHOLD_MS = 5 * 60 * 1000 + +export function computeJobHealth(input: JobHealthInput): JobHealthStatus { + if (input.status === 'failed') return 'failed' + + // No progress recorded and no activity timestamps — job is still queued. + if ( + input.progress === 0 && + input.lastBatchAt === undefined && + input.startedAt === undefined + ) { + return 'waiting' + } + + const lastActivity = input.lastBatchAt ?? input.startedAt ?? input.now + const stalenessMs = input.now - lastActivity + + if (stalenessMs > STALLED_THRESHOLD_MS) return 'stalled' + if (stalenessMs > SLOW_THRESHOLD_MS) return 'slow' + return 'healthy' +} diff --git a/admin/inertia/components/ActiveEmbedJobs.tsx b/admin/inertia/components/ActiveEmbedJobs.tsx index 9da78bc..754f4e5 100644 --- a/admin/inertia/components/ActiveEmbedJobs.tsx +++ b/admin/inertia/components/ActiveEmbedJobs.tsx @@ -1,39 +1,101 @@ +import { useEffect, useState } from 'react' import useEmbedJobs from '~/hooks/useEmbedJobs' import HorizontalBarChart from './HorizontalBarChart' +import StyledButton from './StyledButton' import StyledSectionHeader from './StyledSectionHeader' +import { + JOB_HEALTH_DISPLAY, + computeJobHealth, + formatTimeAgo, +} from '~/lib/kb_job_health_display' interface ActiveEmbedJobsProps { withHeader?: boolean } const ActiveEmbedJobs = ({ withHeader = false }: ActiveEmbedJobsProps) => { - const { data: jobs } = useEmbedJobs() + const { data: jobs, invalidate, dataUpdatedAt } = useEmbedJobs() + + // Live "last refreshed Xs ago" tick. We re-render every 5s purely to keep + // the relative timestamp current, without touching React Query state. + const [tick, setTick] = useState(() => Date.now()) + useEffect(() => { + const id = setInterval(() => setTick(Date.now()), 5000) + return () => clearInterval(id) + }, []) return ( <> {withHeader && ( )} + + {/* Refresh row — only shown when at least one job exists so the empty + state stays clean. */} + {jobs && jobs.length > 0 && ( +
+ + {dataUpdatedAt > 0 + ? `Last updated ${formatTimeAgo(dataUpdatedAt, tick)}` + : 'Loading…'} + + + Refresh + +
+ )} +
{jobs && jobs.length > 0 ? ( - jobs.map((job) => ( -
- -
- )) + jobs.map((job) => { + const health = computeJobHealth({ + status: job.status, + progress: job.progress, + lastBatchAt: job.lastBatchAt, + startedAt: job.startedAt, + now: tick, + }) + const display = JOB_HEALTH_DISPLAY[health] + const lastActivityMs = job.lastBatchAt ?? job.startedAt + return ( +
+
+ + + {display.label} + + {lastActivityMs !== undefined && ( + + · last activity {formatTimeAgo(lastActivityMs, tick)} + + )} + {typeof job.chunks === 'number' && job.chunks > 0 && ( + + · {job.chunks.toLocaleString()} chunks + + )} +
+ +
+ ) + }) ) : (

No files are currently being processed

)} diff --git a/admin/inertia/lib/kb_job_health_display.ts b/admin/inertia/lib/kb_job_health_display.ts new file mode 100644 index 0000000..4860a23 --- /dev/null +++ b/admin/inertia/lib/kb_job_health_display.ts @@ -0,0 +1,63 @@ +import { computeJobHealth, type JobHealthStatus } from '../../app/utils/kb_job_health.js' + +export { computeJobHealth, type JobHealthStatus } from '../../app/utils/kb_job_health.js' + +/** + * Visual presentation for each health status — pill color, dot color, and the + * short label rendered alongside the dot. Kept in one place so backend health + * decisions (`computeJobHealth`) and frontend rendering stay in sync. + */ +export const JOB_HEALTH_DISPLAY: Record< + JobHealthStatus, + { dot: string; label: string; ariaLabel: string } +> = { + waiting: { + dot: 'bg-gray-400 dark:bg-gray-500', + label: 'Waiting', + ariaLabel: 'Job is queued and waiting to start', + }, + healthy: { + dot: 'bg-green-500', + label: 'Active', + ariaLabel: 'Job is embedding at a normal rate', + }, + slow: { + dot: 'bg-yellow-500', + label: 'Slow', + ariaLabel: 'Job has not made progress for at least 2 minutes', + }, + stalled: { + dot: 'bg-red-500', + label: 'Stalled', + ariaLabel: 'Job has not made progress for at least 5 minutes', + }, + failed: { + dot: 'bg-red-700', + label: 'Failed', + ariaLabel: 'Job failed', + }, +} + +/** + * Format a relative timestamp as "Xs ago", "Xm ago", "Xh ago" with sensible + * thresholds for the KB Processing Queue's "Last activity" line. + */ +export function formatTimeAgo(timestampMs: number, now: number): string { + const seconds = Math.max(0, Math.floor((now - timestampMs) / 1000)) + if (seconds < 5) return 'just now' + if (seconds < 60) return `${seconds}s ago` + const minutes = Math.floor(seconds / 60) + if (minutes < 60) return `${minutes}m ago` + const hours = Math.floor(minutes / 60) + return `${hours}h ago` +} + +/** + * Convenience wrapper that resolves a job's health status without the caller + * having to remember to pass `now`. Mostly for ergonomic frontend use. + */ +export function computeJobHealthNow( + input: Omit[0], 'now'> +): JobHealthStatus { + return computeJobHealth({ ...input, now: Date.now() }) +} diff --git a/admin/tests/unit/kb_job_health.spec.ts b/admin/tests/unit/kb_job_health.spec.ts new file mode 100644 index 0000000..2b22501 --- /dev/null +++ b/admin/tests/unit/kb_job_health.spec.ts @@ -0,0 +1,100 @@ +import * as assert from 'node:assert/strict' +import { test } from 'node:test' + +import { computeJobHealth } from '../../app/utils/kb_job_health.js' + +const MIN = 60 * 1000 +const NOW = 1_700_000_000_000 // arbitrary fixed epoch for deterministic tests + +test('failed status takes precedence over any timing', () => { + assert.equal( + computeJobHealth({ status: 'failed', progress: 42, lastBatchAt: NOW, now: NOW }), + 'failed' + ) +}) + +test('no progress + no activity timestamps → waiting', () => { + assert.equal( + computeJobHealth({ status: 'waiting', progress: 0, now: NOW }), + 'waiting' + ) +}) + +test('progress > 0 but no lastBatchAt yet → healthy (first batch just started)', () => { + assert.equal( + computeJobHealth({ status: 'processing', progress: 5, startedAt: NOW, now: NOW }), + 'healthy' + ) +}) + +test('lastBatchAt 30s ago → healthy', () => { + assert.equal( + computeJobHealth({ + status: 'batch_completed', + progress: 50, + lastBatchAt: NOW - 30 * 1000, + now: NOW, + }), + 'healthy' + ) +}) + +test('lastBatchAt 90s ago → still healthy (under 2 min threshold)', () => { + assert.equal( + computeJobHealth({ + status: 'batch_completed', + progress: 50, + lastBatchAt: NOW - 90 * 1000, + now: NOW, + }), + 'healthy' + ) +}) + +test('lastBatchAt 3 min ago → slow (CPU-paced ingestion lives here)', () => { + assert.equal( + computeJobHealth({ + status: 'batch_completed', + progress: 50, + lastBatchAt: NOW - 3 * MIN, + now: NOW, + }), + 'slow' + ) +}) + +test('lastBatchAt 4:30 ago → still slow (under 5 min stalled threshold)', () => { + assert.equal( + computeJobHealth({ + status: 'batch_completed', + progress: 50, + lastBatchAt: NOW - 4.5 * MIN, + now: NOW, + }), + 'slow' + ) +}) + +test('lastBatchAt 5:01 ago → stalled', () => { + assert.equal( + computeJobHealth({ + status: 'batch_completed', + progress: 50, + lastBatchAt: NOW - (5 * MIN + 1000), + now: NOW, + }), + 'stalled' + ) +}) + +test('lastBatchAt missing but startedAt 10 min ago → stalled (first-batch-never-finished case)', () => { + assert.equal( + computeJobHealth({ + status: 'processing', + progress: 5, + startedAt: NOW - 10 * MIN, + now: NOW, + }), + 'stalled' + ) +}) diff --git a/admin/types/rag.ts b/admin/types/rag.ts index e84f349..a50cb4c 100644 --- a/admin/types/rag.ts +++ b/admin/types/rag.ts @@ -5,6 +5,12 @@ export type EmbedJobWithProgress = { progress: number status: string error?: string + /** ms epoch of last completed batch; multi-batch ZIMs update this each batch. */ + lastBatchAt?: number + /** ms epoch of first batch start; used as a fallback when lastBatchAt unset. */ + startedAt?: number + /** Total chunks embedded across this job's batches so far. */ + chunks?: number } export type ProcessAndEmbedFileResponse = {