mirror of
https://github.com/Crosstalk-Solutions/project-nomad.git
synced 2026-06-03 10:06:48 +02:00
* feat(KB): per-file ingest state machine (Phase 1 of RFC #883) Adds a persistent state machine for AI knowledge-base ingestion so the scanner can distinguish "fully indexed", "user opted out", "failed", and "stalled" from each other — none of which were derivable from the prior binary "any chunks in Qdrant ⇒ embedded" check. ## What lands - New table `kb_ingest_state` keyed by `file_path` with enum state column (`pending_decision | indexed | browse_only | failed | stalled`). Independent of `installed_resources` so it covers both curated downloads and manually-uploaded KB files. - New KV key `rag.defaultIngestPolicy` (string: `Always | Manual`). Registered now but not consumed yet — JIT prompt + wizard step land in Phase 3 of the RFC. - `EmbedFileJob.handle` writes state on terminal outcomes: - Success (final batch) → `indexed` + chunks count - `UnrecoverableError` → `failed` + error message - Retryable errors are left to BullMQ's existing retry path - `scanAndSyncStorage` swaps the binary qdrant check for a state-aware decision tree (see `decideScanAction`). Existing installs auto-backfill on first scan: files with chunks in Qdrant but no state row become `indexed`; new files start as `pending_decision`. - `deleteFileBySource` drops the state row last, so removed files disappear entirely instead of leaving an orphan that the next scan would re-dispatch into nothing. ## What does NOT land here - Ratio registry (separate PR) — needed for partial-stall detection and cost estimates, but a separable concern. - #880 follow-up initial-progress anchor (separate tiny PR). - Phase 2 UI (status pill, per-card actions, conditional warnings). - Phase 3 policy surfaces (wizard step, JIT prompt, guardrail modal). - PR #886's bulk-action hookup — `_deletePointsBySource` / Re-embed All / Reset & Rebuild would also want to set state, but #886 isn't merged yet; that wiring goes in a follow-up once #886 lands. ## Target This is forward work for v1.40.0 (RFC #883). Branching off `rc` because that's the current latest base and post-GA Jake will sync rc→dev; a retarget at PR-open time is a fast-forward if requested. ## Tests - 9 new unit tests for `decideScanAction` covering all five states plus the no-row / chunks-present / chunks-missing combinations - Type-check clean - Smoke-tested end-to-end on NOMAD3 via hot-patch: - Backfill: 5 ZIMs + 2 KB uploads with existing chunks in Qdrant all came back `indexed` on first scan - Pending dispatch: a video-only ZIM with no chunks (`lrnselfreliance`) came back `pending_decision` and was correctly re-dispatched (Bull deduped to its historical `:completed` jobId — bgauger's #886 fix drains that) - Delete hook: deleting a KB upload via `DELETE /api/rag/files` removed both the disk file and the state row * feat(KB): Always/Manual ingest policy toggle (RFC #883 §1/§4) Activates the `rag.defaultIngestPolicy` KV registered in Phase 1 (#888) so users on a fresh install (or anyone who picks Manual mode) no longer get every new ZIM auto-dispatched to the embed pipeline. ## Stacks on #888 This PR's base is `feat/kb-ingest-state-machine` (#888). The state machine has to be in place for the decision function to be policy-aware; GitHub will fast-forward the base to `rc` once #888 merges. ## Backend changes - `decideScanAction` now takes a `policy: 'Always' | 'Manual'` argument (defaults to `Always` for backward compatibility). - New `ScanAction` kind: `create_pending`. Manual mode records that the scanner has seen a new file (so the UI can surface a per-card Index affordance later) without dispatching an EmbedFileJob. - `scanAndSyncStorage` reads the KV and passes it through. The scan-result log line now includes the active policy and a `waiting on user` count for Manual-mode hits. - `rag.defaultIngestPolicy` added to `SETTINGS_KEYS` so it's reachable through the existing `GET/PATCH /api/system/settings` surface — no new endpoint. ## Frontend changes - New section in the KB panel between "Why upload" and "Processing Queue": "Auto-index new content for AI? [Always | Manual]" — segmented radio with copy explaining the 5-10× disk multiplier. Default Always. - `useQuery('ingestPolicy')` reads the current value; clicking the inactive option mutates and shows a notification confirming the new behavior. ## Tests - 14 unit tests on `decideScanAction` (was 9) — split into Always-mode cases (preserves Phase 1's contract) and Manual-mode cases (`create_pending`, `pending_decision → skip`, etc.). - Type-check clean. - Hot-patch + browser verification deferred until #888 lands; the state machine smoke-tested cleanly on NOMAD3 in #888's PR, and this PR's decision-tree changes are exhaustively unit-tested. ## RFC open question §3 — policy-change re-trigger Switching Manual → Always doesn't auto-dispatch existing `pending_decision` rows immediately. The next scan re-evaluates and dispatches them under the new policy. This matches the RFC's "treat the switch as I've- thought-about-it" instinct for the guardrail; full guardrail implementation lands in Phase 3 task 14. --------- Co-authored-by: Jake Turner <52841588+jakeaturner@users.noreply.github.com>
This commit is contained in:
parent
ca5569c8ea
commit
603a7070e8
|
|
@ -17,7 +17,7 @@ import { randomUUID } from 'node:crypto'
|
|||
import { join, resolve, sep } from 'node:path'
|
||||
import KVStore from '#models/kv_store'
|
||||
import KbIngestState from '#models/kb_ingest_state'
|
||||
import { decideScanAction } from '../utils/kb_ingest_decision.js'
|
||||
import { decideScanAction, type IngestPolicy } from '../utils/kb_ingest_decision.js'
|
||||
import { ZIMExtractionService } from './zim_extraction_service.js'
|
||||
import { ZIM_BATCH_SIZE } from '../../constants/zim_extraction.js'
|
||||
import { ProcessAndEmbedFileResponse, ProcessZIMFileResponse, RAGResult, RerankedRAGResult } from '../../types/rag.js'
|
||||
|
|
@ -1353,14 +1353,21 @@ export class RagService {
|
|||
(filePath) => determineFileType(filePath) !== 'unknown'
|
||||
)
|
||||
|
||||
// Read the global ingest policy. Unset is treated as 'Always' so legacy
|
||||
// installs keep their current behavior until the user explicitly opts
|
||||
// into Manual mode from the KB panel.
|
||||
const policyRaw = await KVStore.getValue('rag.defaultIngestPolicy')
|
||||
const policy: IngestPolicy = policyRaw === 'Manual' ? 'Manual' : 'Always'
|
||||
|
||||
const filesToEmbed: string[] = []
|
||||
let backfilled = 0
|
||||
let createdRows = 0
|
||||
let createdPending = 0
|
||||
let skipped = 0
|
||||
|
||||
for (const filePath of embeddableFiles) {
|
||||
const stateRow = stateByPath.get(filePath) ?? null
|
||||
const action = decideScanAction(stateRow, sourcesInQdrant.has(filePath))
|
||||
const action = decideScanAction(stateRow, sourcesInQdrant.has(filePath), policy)
|
||||
|
||||
switch (action.kind) {
|
||||
case 'skip':
|
||||
|
|
@ -1378,6 +1385,16 @@ export class RagService {
|
|||
})
|
||||
backfilled++
|
||||
break
|
||||
case 'create_pending':
|
||||
// Manual mode: record that we've seen the file but don't dispatch.
|
||||
// The KB panel surfaces a per-card "Index" affordance for these.
|
||||
await KbIngestState.create({
|
||||
file_path: filePath,
|
||||
state: 'pending_decision',
|
||||
chunks_embedded: 0,
|
||||
})
|
||||
createdPending++
|
||||
break
|
||||
case 'dispatch':
|
||||
if (action.createStateRow) {
|
||||
await KbIngestState.create({
|
||||
|
|
@ -1393,7 +1410,7 @@ export class RagService {
|
|||
}
|
||||
|
||||
logger.info(
|
||||
`[RAG] Scan results: ${filesToEmbed.length} to embed, ${backfilled} backfilled, ${createdRows} new pending, ${skipped} skipped`
|
||||
`[RAG] Scan results (policy=${policy}): ${filesToEmbed.length} to embed, ${backfilled} backfilled, ${createdRows} new pending, ${createdPending} waiting on user, ${skipped} skipped`
|
||||
)
|
||||
|
||||
if (filesToEmbed.length === 0) {
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@ import type { KbIngestStateValue } from '../../types/kb_ingest_state.js'
|
|||
|
||||
/**
|
||||
* Decision returned by `decideScanAction` describing what scanAndSyncStorage
|
||||
* should do for one file given its current state row (if any) and whether
|
||||
* Qdrant already has chunks for it.
|
||||
* should do for one file given its current state row (if any), whether Qdrant
|
||||
* already has chunks for it, and the global ingest policy.
|
||||
*
|
||||
* - `skip` — file is in a settled state (already indexed, deliberately not
|
||||
* indexed, or in a manual-recovery state); no auto-dispatch.
|
||||
|
|
@ -13,16 +13,26 @@ import type { KbIngestStateValue } from '../../types/kb_ingest_state.js'
|
|||
* - `backfill_indexed` — Qdrant has chunks but no state row exists yet
|
||||
* (pre-RFC install, or new admin instance pointed at an existing Qdrant
|
||||
* volume). Create a row in `indexed` state without re-embedding.
|
||||
* - `create_pending` — Manual mode: record that we've seen the file but
|
||||
* don't dispatch. Frontend surfaces a per-card "Index" affordance.
|
||||
*/
|
||||
export type ScanAction =
|
||||
| { kind: 'skip' }
|
||||
| { kind: 'dispatch'; createStateRow: boolean }
|
||||
| { kind: 'backfill_indexed' }
|
||||
| { kind: 'create_pending' }
|
||||
|
||||
export interface KbIngestStateRow {
|
||||
state: KbIngestStateValue
|
||||
}
|
||||
|
||||
/**
|
||||
* Global auto-index policy stored at KV `rag.defaultIngestPolicy`. Unset is
|
||||
* treated as `Always` so existing installs keep their current behavior until
|
||||
* the user opts into Manual mode through the KB panel.
|
||||
*/
|
||||
export type IngestPolicy = 'Always' | 'Manual'
|
||||
|
||||
/**
|
||||
* Decide what scanAndSyncStorage should do for a single embeddable file.
|
||||
*
|
||||
|
|
@ -33,18 +43,25 @@ export interface KbIngestStateRow {
|
|||
*/
|
||||
export function decideScanAction(
|
||||
stateRow: KbIngestStateRow | null,
|
||||
hasChunksInQdrant: boolean
|
||||
hasChunksInQdrant: boolean,
|
||||
policy: IngestPolicy = 'Always'
|
||||
): ScanAction {
|
||||
if (!stateRow) {
|
||||
if (hasChunksInQdrant) return { kind: 'backfill_indexed' }
|
||||
return { kind: 'dispatch', createStateRow: true }
|
||||
return policy === 'Always'
|
||||
? { kind: 'dispatch', createStateRow: true }
|
||||
: { kind: 'create_pending' }
|
||||
}
|
||||
|
||||
switch (stateRow.state) {
|
||||
case 'indexed':
|
||||
return hasChunksInQdrant ? { kind: 'skip' } : { kind: 'dispatch', createStateRow: false }
|
||||
case 'pending_decision':
|
||||
return { kind: 'dispatch', createStateRow: false }
|
||||
// Manual mode: file is waiting for the user to opt in via per-card Index.
|
||||
// Always mode: treat as "user-equivalent of auto-index" and dispatch.
|
||||
return policy === 'Always'
|
||||
? { kind: 'dispatch', createStateRow: false }
|
||||
: { kind: 'skip' }
|
||||
case 'browse_only':
|
||||
case 'failed':
|
||||
case 'stalled':
|
||||
|
|
|
|||
|
|
@ -1,3 +1,3 @@
|
|||
import { KVStoreKey } from "../types/kv_store.js";
|
||||
|
||||
export const SETTINGS_KEYS: KVStoreKey[] = ['chat.suggestionsEnabled', 'chat.lastModel', 'ui.hasVisitedEasySetup', 'ui.theme', 'system.earlyAccess', 'ai.assistantCustomName', 'ai.remoteOllamaUrl', 'ai.ollamaFlashAttention'];
|
||||
export const SETTINGS_KEYS: KVStoreKey[] = ['chat.suggestionsEnabled', 'chat.lastModel', 'ui.hasVisitedEasySetup', 'ui.theme', 'system.earlyAccess', 'ai.assistantCustomName', 'ai.remoteOllamaUrl', 'ai.ollamaFlashAttention', 'rag.defaultIngestPolicy'];
|
||||
|
|
@ -51,6 +51,37 @@ export default function KnowledgeBaseModal({ aiAssistantName = "AI Assistant", o
|
|||
select: (data) => data || [],
|
||||
})
|
||||
|
||||
// Global auto-index policy. KVStore returns `null` for an unset key, which
|
||||
// we treat as 'Always' for backward compatibility with installs that predate
|
||||
// this UI. The user can opt into Manual mode from the toggle below.
|
||||
const { data: ingestPolicySetting } = useQuery({
|
||||
queryKey: ['ingestPolicy'],
|
||||
queryFn: () => api.getSetting('rag.defaultIngestPolicy'),
|
||||
})
|
||||
const ingestPolicy: 'Always' | 'Manual' =
|
||||
ingestPolicySetting?.value === 'Manual' ? 'Manual' : 'Always'
|
||||
|
||||
const updateIngestPolicyMutation = useMutation({
|
||||
mutationFn: (policy: 'Always' | 'Manual') =>
|
||||
api.updateSetting('rag.defaultIngestPolicy', policy),
|
||||
onSuccess: (_data, policy) => {
|
||||
queryClient.invalidateQueries({ queryKey: ['ingestPolicy'] })
|
||||
addNotification({
|
||||
type: 'success',
|
||||
message:
|
||||
policy === 'Always'
|
||||
? 'New content will be auto-indexed for AI.'
|
||||
: 'New content will wait for you to opt in.',
|
||||
})
|
||||
},
|
||||
onError: (error: any) => {
|
||||
addNotification({
|
||||
type: 'error',
|
||||
message: error?.message || 'Failed to update indexing policy.',
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
const uploadMutation = useMutation({
|
||||
mutationFn: (file: File) => api.uploadDocument(file),
|
||||
})
|
||||
|
|
@ -307,6 +338,48 @@ export default function KnowledgeBaseModal({ aiAssistantName = "AI Assistant", o
|
|||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="my-8 p-4 rounded-lg border border-border-subtle bg-surface-secondary">
|
||||
<div className="flex flex-wrap items-center justify-between gap-3">
|
||||
<div className="flex-1 min-w-[14rem]">
|
||||
<p className="text-sm font-medium text-text-primary">
|
||||
Auto-index new content for AI?
|
||||
</p>
|
||||
<p className="text-xs text-text-muted mt-1">
|
||||
Indexed content typically uses 5–10× the original file size on disk.
|
||||
Changes apply to new content added after this setting changes.
|
||||
</p>
|
||||
</div>
|
||||
<div
|
||||
role="radiogroup"
|
||||
aria-label="Ingest policy"
|
||||
className="inline-flex rounded-md overflow-hidden border border-border-subtle"
|
||||
>
|
||||
{(['Always', 'Manual'] as const).map((option) => {
|
||||
const isActive = ingestPolicy === option
|
||||
return (
|
||||
<button
|
||||
key={option}
|
||||
type="button"
|
||||
role="radio"
|
||||
aria-checked={isActive}
|
||||
onClick={() =>
|
||||
!isActive && updateIngestPolicyMutation.mutate(option)
|
||||
}
|
||||
disabled={updateIngestPolicyMutation.isPending}
|
||||
className={`px-4 py-2 text-sm font-medium transition-colors ${
|
||||
isActive
|
||||
? 'bg-desert-green text-white'
|
||||
: 'bg-surface-primary text-text-secondary hover:bg-surface-tertiary'
|
||||
} ${updateIngestPolicyMutation.isPending ? 'opacity-50 cursor-not-allowed' : ''}`}
|
||||
>
|
||||
{option}
|
||||
</button>
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="my-8">
|
||||
<div className="flex items-center justify-between mb-4">
|
||||
<StyledSectionHeader title="Processing Queue" className="!mb-0" />
|
||||
|
|
|
|||
|
|
@ -3,44 +3,80 @@ import { test } from 'node:test'
|
|||
|
||||
import { decideScanAction } from '../../app/utils/kb_ingest_decision.js'
|
||||
|
||||
test('no state row, no chunks → dispatch and create row (new file)', () => {
|
||||
// ---------- Always-policy cases (default behavior; preserves pre-policy install) ----------
|
||||
|
||||
test('Always: no state row, no chunks → dispatch and create row (new file)', () => {
|
||||
assert.deepEqual(decideScanAction(null, false, 'Always'), {
|
||||
kind: 'dispatch',
|
||||
createStateRow: true,
|
||||
})
|
||||
})
|
||||
|
||||
test('Always: no state row, chunks present → backfill_indexed (pre-RFC install, existing Qdrant volume)', () => {
|
||||
assert.deepEqual(decideScanAction(null, true, 'Always'), { kind: 'backfill_indexed' })
|
||||
})
|
||||
|
||||
test('Always: indexed + chunks present → skip', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'indexed' }, true, 'Always'), { kind: 'skip' })
|
||||
})
|
||||
|
||||
test('Always: indexed + chunks missing → re-dispatch (Qdrant collection rebuilt or chunks deleted)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'indexed' }, false, 'Always'), {
|
||||
kind: 'dispatch',
|
||||
createStateRow: false,
|
||||
})
|
||||
})
|
||||
|
||||
test('Always: pending_decision → dispatch', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'pending_decision' }, false, 'Always'), {
|
||||
kind: 'dispatch',
|
||||
createStateRow: false,
|
||||
})
|
||||
})
|
||||
|
||||
test('Always: browse_only → skip (user opted out of indexing)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'browse_only' }, false, 'Always'), { kind: 'skip' })
|
||||
})
|
||||
|
||||
test('Always: failed → skip (manual retry needed, do not auto-redispatch)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'failed' }, false, 'Always'), { kind: 'skip' })
|
||||
})
|
||||
|
||||
test('Always: stalled → skip (manual retry needed)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'stalled' }, false, 'Always'), { kind: 'skip' })
|
||||
})
|
||||
|
||||
// ---------- Manual-policy cases ----------
|
||||
|
||||
test('Manual: no state row, no chunks → create_pending (do not auto-dispatch new content)', () => {
|
||||
assert.deepEqual(decideScanAction(null, false, 'Manual'), { kind: 'create_pending' })
|
||||
})
|
||||
|
||||
test('Manual: no state row, chunks present → backfill_indexed (same as Always — Qdrant is authoritative)', () => {
|
||||
assert.deepEqual(decideScanAction(null, true, 'Manual'), { kind: 'backfill_indexed' })
|
||||
})
|
||||
|
||||
test('Manual: pending_decision → skip (waiting for user to opt in via Index button)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'pending_decision' }, false, 'Manual'), {
|
||||
kind: 'skip',
|
||||
})
|
||||
})
|
||||
|
||||
test('Manual: indexed + chunks missing → re-dispatch (user has already opted in for this file)', () => {
|
||||
// Policy switch from Always→Manual must not break in-flight or partially-deleted indexes
|
||||
// for files the user previously chose to index.
|
||||
assert.deepEqual(decideScanAction({ state: 'indexed' }, false, 'Manual'), {
|
||||
kind: 'dispatch',
|
||||
createStateRow: false,
|
||||
})
|
||||
})
|
||||
|
||||
test('Manual: browse_only → skip (same as Always)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'browse_only' }, false, 'Manual'), { kind: 'skip' })
|
||||
})
|
||||
|
||||
// ---------- Policy default ----------
|
||||
|
||||
test('omitted policy defaults to Always (unset KV preserves legacy behavior)', () => {
|
||||
assert.deepEqual(decideScanAction(null, false), { kind: 'dispatch', createStateRow: true })
|
||||
})
|
||||
|
||||
test('no state row, chunks present → backfill_indexed (pre-RFC install, existing Qdrant volume)', () => {
|
||||
assert.deepEqual(decideScanAction(null, true), { kind: 'backfill_indexed' })
|
||||
})
|
||||
|
||||
test('indexed + chunks present → skip', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'indexed' }, true), { kind: 'skip' })
|
||||
})
|
||||
|
||||
test('indexed + chunks missing → re-dispatch (state stale, Qdrant collection rebuilt or chunks deleted)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'indexed' }, false), {
|
||||
kind: 'dispatch',
|
||||
createStateRow: false,
|
||||
})
|
||||
})
|
||||
|
||||
test('pending_decision → dispatch (preserves current Always behavior until policy is consumed)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'pending_decision' }, false), {
|
||||
kind: 'dispatch',
|
||||
createStateRow: false,
|
||||
})
|
||||
})
|
||||
|
||||
test('browse_only → skip (user opted out of indexing)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'browse_only' }, false), { kind: 'skip' })
|
||||
})
|
||||
|
||||
test('browse_only + chunks present → skip (do not silently re-index after un-index)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'browse_only' }, true), { kind: 'skip' })
|
||||
})
|
||||
|
||||
test('failed → skip (manual retry needed, do not auto-redispatch)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'failed' }, false), { kind: 'skip' })
|
||||
})
|
||||
|
||||
test('stalled → skip (manual retry needed)', () => {
|
||||
assert.deepEqual(decideScanAction({ state: 'stalled' }, false), { kind: 'skip' })
|
||||
})
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user