mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-05-26 22:35:05 +02:00
feat(KB): status pill + last-activity timestamp on Processing Queue (RFC #883 §5/§10)
Each in-flight (or stuck) embedding job gets a colored health pill, relative-activity timestamp, and chunk counter so users can tell at a glance whether ingestion is making progress. ## Health states - **🟢 Active** — last batch < 2 min ago - **🟡 Slow** — last batch 2-5 min ago (CPU-paced multi-batch ingestion lives here naturally; not always a problem) - **🔴 Stalled** — last batch > 5 min ago (likely real problem) - **⚪ Waiting** — queued, no batch started yet - **🔴 Failed** — job recorded failed status ## What lands - New backend util `kb_job_health.ts` with pure `computeJobHealth(input)` decision function. Time-based thresholds (2 min / 5 min) inlined as constants. 9 unit tests pin the boundaries. - `EmbedJobWithProgress` gains `lastBatchAt`, `startedAt`, `chunks` — already set by `EmbedFileJob.handle` on every batch transition, just not previously surfaced through `listActiveJobs`. - Frontend `kb_job_health_display.ts` maps each status to a Tailwind dot color, label, and aria-label so backend and UI stay in sync. - `ActiveEmbedJobs.tsx` renders the pill, "last activity Xs ago", and chunk counter above each progress bar. Adds a manual Refresh button and "Last updated Xs ago" line — the existing 2s/30s auto-poll cadence in `useEmbedJobs` is left intact. - Live tick at 5s keeps the relative timestamps current without re-fetching from the API. ## Not in scope - Per-card Cancel / Retry / Un-index — separate Phase 2 PR - Conditional warnings A/B/C — separate Phase 2 PR - Computing throughput rate (chunks/min) — needs ratio registry consumer (Phase 2 follow-up); for now the pill answers the "is it stuck?" question directly without a rate estimate.
This commit is contained in:
parent
7d7459bc14
commit
ca5569c8ea
|
|
@ -262,13 +262,24 @@ export class EmbedFileJob {
|
|||
const queue = queueService.getQueue(this.queue)
|
||||
const jobs = await queue.getJobs(['waiting', 'active', 'delayed'])
|
||||
|
||||
return jobs.map((job) => ({
|
||||
jobId: job.id!.toString(),
|
||||
fileName: (job.data as EmbedFileJobParams).fileName,
|
||||
filePath: (job.data as EmbedFileJobParams).filePath,
|
||||
progress: typeof job.progress === 'number' ? job.progress : 0,
|
||||
status: ((job.data as any).status as string) ?? 'waiting',
|
||||
}))
|
||||
return jobs.map((job) => {
|
||||
const data = job.data as EmbedFileJobParams & {
|
||||
status?: string
|
||||
lastBatchAt?: number
|
||||
startedAt?: number
|
||||
chunks?: number
|
||||
}
|
||||
return {
|
||||
jobId: job.id!.toString(),
|
||||
fileName: data.fileName,
|
||||
filePath: data.filePath,
|
||||
progress: typeof job.progress === 'number' ? job.progress : 0,
|
||||
status: data.status ?? 'waiting',
|
||||
lastBatchAt: data.lastBatchAt,
|
||||
startedAt: data.startedAt,
|
||||
chunks: data.chunks,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
static async getByFilePath(filePath: string): Promise<Job | undefined> {
|
||||
|
|
|
|||
50
admin/app/utils/kb_job_health.ts
Normal file
50
admin/app/utils/kb_job_health.ts
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Visual status assigned to an in-flight (or stuck) embedding job, used to
|
||||
* pick the colored status pill in the KB Processing Queue. See RFC #883 §5.
|
||||
*
|
||||
* - `waiting` — queued, no batch has started yet
|
||||
* - `healthy` — last batch < 2 minutes ago
|
||||
* - `slow` — last batch 2-5 minutes ago (CPU-paced multi-batch ingestion
|
||||
* falls into this band; not necessarily a problem)
|
||||
* - `stalled` — last batch > 5 minutes ago (likely a real problem)
|
||||
* - `failed` — job recorded a failed status
|
||||
*/
|
||||
export type JobHealthStatus = 'waiting' | 'healthy' | 'slow' | 'stalled' | 'failed'
|
||||
|
||||
export interface JobHealthInput {
|
||||
/** BullMQ job.data.status — set by EmbedFileJob.handle on transitions. */
|
||||
status: string
|
||||
/** 0-100. 0 means no work observed yet on this job-row. */
|
||||
progress: number
|
||||
/** ms epoch of the last completed batch. Multi-batch ZIMs update this on
|
||||
* every continuation; single-batch jobs leave it unset until completion. */
|
||||
lastBatchAt?: number
|
||||
/** ms epoch of the first batch start. Used as a fallback "last activity"
|
||||
* signal for jobs that haven't yet completed their first batch. */
|
||||
startedAt?: number
|
||||
/** Current ms epoch. Injected for testability. */
|
||||
now: number
|
||||
}
|
||||
|
||||
const SLOW_THRESHOLD_MS = 2 * 60 * 1000
|
||||
const STALLED_THRESHOLD_MS = 5 * 60 * 1000
|
||||
|
||||
export function computeJobHealth(input: JobHealthInput): JobHealthStatus {
|
||||
if (input.status === 'failed') return 'failed'
|
||||
|
||||
// No progress recorded and no activity timestamps — job is still queued.
|
||||
if (
|
||||
input.progress === 0 &&
|
||||
input.lastBatchAt === undefined &&
|
||||
input.startedAt === undefined
|
||||
) {
|
||||
return 'waiting'
|
||||
}
|
||||
|
||||
const lastActivity = input.lastBatchAt ?? input.startedAt ?? input.now
|
||||
const stalenessMs = input.now - lastActivity
|
||||
|
||||
if (stalenessMs > STALLED_THRESHOLD_MS) return 'stalled'
|
||||
if (stalenessMs > SLOW_THRESHOLD_MS) return 'slow'
|
||||
return 'healthy'
|
||||
}
|
||||
|
|
@ -1,39 +1,101 @@
|
|||
import { useEffect, useState } from 'react'
|
||||
import useEmbedJobs from '~/hooks/useEmbedJobs'
|
||||
import HorizontalBarChart from './HorizontalBarChart'
|
||||
import StyledButton from './StyledButton'
|
||||
import StyledSectionHeader from './StyledSectionHeader'
|
||||
import {
|
||||
JOB_HEALTH_DISPLAY,
|
||||
computeJobHealth,
|
||||
formatTimeAgo,
|
||||
} from '~/lib/kb_job_health_display'
|
||||
|
||||
interface ActiveEmbedJobsProps {
|
||||
withHeader?: boolean
|
||||
}
|
||||
|
||||
const ActiveEmbedJobs = ({ withHeader = false }: ActiveEmbedJobsProps) => {
|
||||
const { data: jobs } = useEmbedJobs()
|
||||
const { data: jobs, invalidate, dataUpdatedAt } = useEmbedJobs()
|
||||
|
||||
// Live "last refreshed Xs ago" tick. We re-render every 5s purely to keep
|
||||
// the relative timestamp current, without touching React Query state.
|
||||
const [tick, setTick] = useState(() => Date.now())
|
||||
useEffect(() => {
|
||||
const id = setInterval(() => setTick(Date.now()), 5000)
|
||||
return () => clearInterval(id)
|
||||
}, [])
|
||||
|
||||
return (
|
||||
<>
|
||||
{withHeader && (
|
||||
<StyledSectionHeader title="Processing Queue" className="mt-12 mb-4" />
|
||||
)}
|
||||
|
||||
{/* Refresh row — only shown when at least one job exists so the empty
|
||||
state stays clean. */}
|
||||
{jobs && jobs.length > 0 && (
|
||||
<div className="flex items-center justify-between mb-3 text-sm">
|
||||
<span className="text-text-muted">
|
||||
{dataUpdatedAt > 0
|
||||
? `Last updated ${formatTimeAgo(dataUpdatedAt, tick)}`
|
||||
: 'Loading…'}
|
||||
</span>
|
||||
<StyledButton variant="ghost" size="sm" icon="IconRefresh" onClick={invalidate}>
|
||||
Refresh
|
||||
</StyledButton>
|
||||
</div>
|
||||
)}
|
||||
|
||||
<div className="space-y-4">
|
||||
{jobs && jobs.length > 0 ? (
|
||||
jobs.map((job) => (
|
||||
<div
|
||||
key={job.jobId}
|
||||
className="bg-desert-white rounded-lg p-4 border border-desert-stone-light shadow-sm hover:shadow-lg transition-shadow"
|
||||
>
|
||||
<HorizontalBarChart
|
||||
items={[
|
||||
{
|
||||
label: job.fileName,
|
||||
value: job.progress,
|
||||
total: '100%',
|
||||
used: `${job.progress}%`,
|
||||
type: job.status,
|
||||
},
|
||||
]}
|
||||
/>
|
||||
</div>
|
||||
))
|
||||
jobs.map((job) => {
|
||||
const health = computeJobHealth({
|
||||
status: job.status,
|
||||
progress: job.progress,
|
||||
lastBatchAt: job.lastBatchAt,
|
||||
startedAt: job.startedAt,
|
||||
now: tick,
|
||||
})
|
||||
const display = JOB_HEALTH_DISPLAY[health]
|
||||
const lastActivityMs = job.lastBatchAt ?? job.startedAt
|
||||
return (
|
||||
<div
|
||||
key={job.jobId}
|
||||
className="bg-desert-white rounded-lg p-4 border border-desert-stone-light shadow-sm hover:shadow-lg transition-shadow"
|
||||
>
|
||||
<div className="flex items-center gap-3 mb-2">
|
||||
<span
|
||||
className={`inline-block w-2.5 h-2.5 rounded-full ${display.dot}`}
|
||||
aria-label={display.ariaLabel}
|
||||
title={display.ariaLabel}
|
||||
/>
|
||||
<span className="text-sm font-medium text-text-primary">
|
||||
{display.label}
|
||||
</span>
|
||||
{lastActivityMs !== undefined && (
|
||||
<span className="text-xs text-text-muted">
|
||||
· last activity {formatTimeAgo(lastActivityMs, tick)}
|
||||
</span>
|
||||
)}
|
||||
{typeof job.chunks === 'number' && job.chunks > 0 && (
|
||||
<span className="text-xs text-text-muted">
|
||||
· {job.chunks.toLocaleString()} chunks
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
<HorizontalBarChart
|
||||
items={[
|
||||
{
|
||||
label: job.fileName,
|
||||
value: job.progress,
|
||||
total: '100%',
|
||||
used: `${job.progress}%`,
|
||||
type: job.status,
|
||||
},
|
||||
]}
|
||||
/>
|
||||
</div>
|
||||
)
|
||||
})
|
||||
) : (
|
||||
<p className="text-text-muted">No files are currently being processed</p>
|
||||
)}
|
||||
|
|
|
|||
63
admin/inertia/lib/kb_job_health_display.ts
Normal file
63
admin/inertia/lib/kb_job_health_display.ts
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
import { computeJobHealth, type JobHealthStatus } from '../../app/utils/kb_job_health.js'
|
||||
|
||||
export { computeJobHealth, type JobHealthStatus } from '../../app/utils/kb_job_health.js'
|
||||
|
||||
/**
|
||||
* Visual presentation for each health status — pill color, dot color, and the
|
||||
* short label rendered alongside the dot. Kept in one place so backend health
|
||||
* decisions (`computeJobHealth`) and frontend rendering stay in sync.
|
||||
*/
|
||||
export const JOB_HEALTH_DISPLAY: Record<
|
||||
JobHealthStatus,
|
||||
{ dot: string; label: string; ariaLabel: string }
|
||||
> = {
|
||||
waiting: {
|
||||
dot: 'bg-gray-400 dark:bg-gray-500',
|
||||
label: 'Waiting',
|
||||
ariaLabel: 'Job is queued and waiting to start',
|
||||
},
|
||||
healthy: {
|
||||
dot: 'bg-green-500',
|
||||
label: 'Active',
|
||||
ariaLabel: 'Job is embedding at a normal rate',
|
||||
},
|
||||
slow: {
|
||||
dot: 'bg-yellow-500',
|
||||
label: 'Slow',
|
||||
ariaLabel: 'Job has not made progress for at least 2 minutes',
|
||||
},
|
||||
stalled: {
|
||||
dot: 'bg-red-500',
|
||||
label: 'Stalled',
|
||||
ariaLabel: 'Job has not made progress for at least 5 minutes',
|
||||
},
|
||||
failed: {
|
||||
dot: 'bg-red-700',
|
||||
label: 'Failed',
|
||||
ariaLabel: 'Job failed',
|
||||
},
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a relative timestamp as "Xs ago", "Xm ago", "Xh ago" with sensible
|
||||
* thresholds for the KB Processing Queue's "Last activity" line.
|
||||
*/
|
||||
export function formatTimeAgo(timestampMs: number, now: number): string {
|
||||
const seconds = Math.max(0, Math.floor((now - timestampMs) / 1000))
|
||||
if (seconds < 5) return 'just now'
|
||||
if (seconds < 60) return `${seconds}s ago`
|
||||
const minutes = Math.floor(seconds / 60)
|
||||
if (minutes < 60) return `${minutes}m ago`
|
||||
const hours = Math.floor(minutes / 60)
|
||||
return `${hours}h ago`
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience wrapper that resolves a job's health status without the caller
|
||||
* having to remember to pass `now`. Mostly for ergonomic frontend use.
|
||||
*/
|
||||
export function computeJobHealthNow(
|
||||
input: Omit<Parameters<typeof computeJobHealth>[0], 'now'>
|
||||
): JobHealthStatus {
|
||||
return computeJobHealth({ ...input, now: Date.now() })
|
||||
}
|
||||
100
admin/tests/unit/kb_job_health.spec.ts
Normal file
100
admin/tests/unit/kb_job_health.spec.ts
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
import * as assert from 'node:assert/strict'
|
||||
import { test } from 'node:test'
|
||||
|
||||
import { computeJobHealth } from '../../app/utils/kb_job_health.js'
|
||||
|
||||
const MIN = 60 * 1000
|
||||
const NOW = 1_700_000_000_000 // arbitrary fixed epoch for deterministic tests
|
||||
|
||||
test('failed status takes precedence over any timing', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({ status: 'failed', progress: 42, lastBatchAt: NOW, now: NOW }),
|
||||
'failed'
|
||||
)
|
||||
})
|
||||
|
||||
test('no progress + no activity timestamps → waiting', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({ status: 'waiting', progress: 0, now: NOW }),
|
||||
'waiting'
|
||||
)
|
||||
})
|
||||
|
||||
test('progress > 0 but no lastBatchAt yet → healthy (first batch just started)', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({ status: 'processing', progress: 5, startedAt: NOW, now: NOW }),
|
||||
'healthy'
|
||||
)
|
||||
})
|
||||
|
||||
test('lastBatchAt 30s ago → healthy', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({
|
||||
status: 'batch_completed',
|
||||
progress: 50,
|
||||
lastBatchAt: NOW - 30 * 1000,
|
||||
now: NOW,
|
||||
}),
|
||||
'healthy'
|
||||
)
|
||||
})
|
||||
|
||||
test('lastBatchAt 90s ago → still healthy (under 2 min threshold)', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({
|
||||
status: 'batch_completed',
|
||||
progress: 50,
|
||||
lastBatchAt: NOW - 90 * 1000,
|
||||
now: NOW,
|
||||
}),
|
||||
'healthy'
|
||||
)
|
||||
})
|
||||
|
||||
test('lastBatchAt 3 min ago → slow (CPU-paced ingestion lives here)', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({
|
||||
status: 'batch_completed',
|
||||
progress: 50,
|
||||
lastBatchAt: NOW - 3 * MIN,
|
||||
now: NOW,
|
||||
}),
|
||||
'slow'
|
||||
)
|
||||
})
|
||||
|
||||
test('lastBatchAt 4:30 ago → still slow (under 5 min stalled threshold)', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({
|
||||
status: 'batch_completed',
|
||||
progress: 50,
|
||||
lastBatchAt: NOW - 4.5 * MIN,
|
||||
now: NOW,
|
||||
}),
|
||||
'slow'
|
||||
)
|
||||
})
|
||||
|
||||
test('lastBatchAt 5:01 ago → stalled', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({
|
||||
status: 'batch_completed',
|
||||
progress: 50,
|
||||
lastBatchAt: NOW - (5 * MIN + 1000),
|
||||
now: NOW,
|
||||
}),
|
||||
'stalled'
|
||||
)
|
||||
})
|
||||
|
||||
test('lastBatchAt missing but startedAt 10 min ago → stalled (first-batch-never-finished case)', () => {
|
||||
assert.equal(
|
||||
computeJobHealth({
|
||||
status: 'processing',
|
||||
progress: 5,
|
||||
startedAt: NOW - 10 * MIN,
|
||||
now: NOW,
|
||||
}),
|
||||
'stalled'
|
||||
)
|
||||
})
|
||||
|
|
@ -5,6 +5,12 @@ export type EmbedJobWithProgress = {
|
|||
progress: number
|
||||
status: string
|
||||
error?: string
|
||||
/** ms epoch of last completed batch; multi-batch ZIMs update this each batch. */
|
||||
lastBatchAt?: number
|
||||
/** ms epoch of first batch start; used as a fallback when lastBatchAt unset. */
|
||||
startedAt?: number
|
||||
/** Total chunks embedded across this job's batches so far. */
|
||||
chunks?: number
|
||||
}
|
||||
|
||||
export type ProcessAndEmbedFileResponse = {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user