Skip to main content

context_harness/
vector_index.rs

1//! Optional vector-index boundary and production zvec sidecar support.
2//!
3//! SQLite remains the canonical store for documents, chunks, FTS5 rows, and
4//! embedding metadata. A [`VectorIndex`] only retrieves vector candidates for
5//! semantic search; core hybrid scoring still consumes [`ChunkCandidate`]s.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9#[cfg(feature = "zvec-bundled")]
10use std::sync::RwLock;
11
12use anyhow::{anyhow, Context, Result};
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16use sqlx::{Row, SqlitePool};
17
18use context_harness_core::embedding::blob_to_vec;
19use context_harness_core::models::{Chunk, Document};
20use context_harness_core::store::{ChunkCandidate, DocumentMetadata, DocumentResponse, Store};
21
22use crate::config::Config;
23use crate::ctx_dirs;
24use crate::sqlite_store::SqliteStore;
25
26#[allow(dead_code)]
27const MANIFEST_VERSION: u32 = 1;
28#[allow(dead_code)]
29const COLLECTION_DIR: &str = "collection";
30#[allow(dead_code)]
31const MANIFEST_FILE: &str = "manifest.json";
32#[allow(dead_code)]
33const ZVEC_BATCH_SIZE: usize = 512;
34
35/// A vector row available to an optional vector index.
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub struct VectorRecord {
39    pub chunk_id: String,
40    pub document_id: String,
41    pub vector: Vec<f32>,
42    pub model: String,
43    pub dims: usize,
44    pub content_hash: String,
45}
46
47#[derive(Debug, Clone)]
48#[allow(dead_code)]
49struct IndexedVectorRecord {
50    record: VectorRecord,
51    snippet: String,
52    source: String,
53    updated_at: i64,
54}
55
56/// Search controls passed to vector-index backends.
57#[derive(Debug, Clone, Copy, Default)]
58pub struct VectorSearchOptions<'a> {
59    pub limit: i64,
60    pub source: Option<&'a str>,
61    pub since: Option<&'a str>,
62}
63
64/// Health information for the configured vector-index backend.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct VectorIndexHealth {
67    pub enabled: bool,
68    pub available: bool,
69    pub backend: String,
70    pub message: Option<String>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct VectorIndexManifest {
75    pub version: u32,
76    pub backend: String,
77    pub vector_count: usize,
78    pub model: Option<String>,
79    pub dims: Option<usize>,
80    pub metric: String,
81    pub index: String,
82    pub digest: String,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct VectorIndexStatus {
87    pub path: PathBuf,
88    pub health: VectorIndexHealth,
89    pub manifest: Option<VectorIndexManifest>,
90    pub sqlite_vector_count: usize,
91    pub sqlite_digest: String,
92    pub fresh: bool,
93}
94
95/// Optional semantic-search accelerator.
96#[async_trait]
97pub trait VectorIndex: Send + Sync {
98    #[allow(dead_code)]
99    async fn upsert(&self, record: &VectorRecord) -> Result<()>;
100    #[allow(dead_code)]
101    async fn delete_chunk(&self, chunk_id: &str) -> Result<()>;
102    #[allow(dead_code)]
103    async fn delete_document(&self, document_id: &str) -> Result<()>;
104    async fn search(
105        &self,
106        query_vec: &[f32],
107        options: VectorSearchOptions<'_>,
108    ) -> Result<Vec<ChunkCandidate>>;
109    #[allow(dead_code)]
110    async fn health(&self) -> Result<VectorIndexHealth>;
111}
112
113/// Disabled vector index. Callers should use the configured fallback.
114pub struct DisabledVectorIndex;
115
116#[async_trait]
117impl VectorIndex for DisabledVectorIndex {
118    async fn upsert(&self, _record: &VectorRecord) -> Result<()> {
119        Ok(())
120    }
121
122    async fn delete_chunk(&self, _chunk_id: &str) -> Result<()> {
123        Ok(())
124    }
125
126    async fn delete_document(&self, _document_id: &str) -> Result<()> {
127        Ok(())
128    }
129
130    async fn search(
131        &self,
132        _query_vec: &[f32],
133        _options: VectorSearchOptions<'_>,
134    ) -> Result<Vec<ChunkCandidate>> {
135        Ok(Vec::new())
136    }
137
138    async fn health(&self) -> Result<VectorIndexHealth> {
139        Ok(VectorIndexHealth {
140            enabled: false,
141            available: false,
142            backend: "disabled".to_string(),
143            message: Some("vector index disabled; SQLite fallback remains canonical".to_string()),
144        })
145    }
146}
147
148/// Exact brute-force SQLite vector scan used as the behavioral baseline.
149pub struct BruteForceSqliteVectorIndex {
150    store: SqliteStore,
151}
152
153impl BruteForceSqliteVectorIndex {
154    pub fn new(store: SqliteStore) -> Self {
155        Self { store }
156    }
157}
158
159#[async_trait]
160impl VectorIndex for BruteForceSqliteVectorIndex {
161    async fn upsert(&self, _record: &VectorRecord) -> Result<()> {
162        Ok(())
163    }
164
165    async fn delete_chunk(&self, _chunk_id: &str) -> Result<()> {
166        Ok(())
167    }
168
169    async fn delete_document(&self, _document_id: &str) -> Result<()> {
170        Ok(())
171    }
172
173    async fn search(
174        &self,
175        query_vec: &[f32],
176        options: VectorSearchOptions<'_>,
177    ) -> Result<Vec<ChunkCandidate>> {
178        self.store
179            .vector_search(query_vec, options.limit, options.source, options.since)
180            .await
181    }
182
183    async fn health(&self) -> Result<VectorIndexHealth> {
184        Ok(VectorIndexHealth {
185            enabled: true,
186            available: true,
187            backend: "sqlite-bruteforce".to_string(),
188            message: Some("exact SQLite vector scan".to_string()),
189        })
190    }
191}
192
193pub struct VectorIndexRouter {
194    primary: Option<Arc<dyn VectorIndex>>,
195    fallback: Option<BruteForceSqliteVectorIndex>,
196    backend: String,
197}
198
199impl VectorIndexRouter {
200    async fn search(
201        &self,
202        query_vec: &[f32],
203        options: VectorSearchOptions<'_>,
204    ) -> Result<Vec<ChunkCandidate>> {
205        if let Some(primary) = &self.primary {
206            match primary.search(query_vec, options).await {
207                Ok(candidates) => return Ok(candidates),
208                Err(err) if self.fallback.is_some() => {
209                    eprintln!(
210                        "Warning: vector index '{}' failed; falling back to SQLite: {}",
211                        self.backend, err
212                    );
213                }
214                Err(err) => return Err(err),
215            }
216        }
217
218        if let Some(fallback) = &self.fallback {
219            return fallback.search(query_vec, options).await;
220        }
221
222        Ok(Vec::new())
223    }
224
225    #[allow(dead_code)]
226    pub async fn health(&self) -> Result<VectorIndexHealth> {
227        if let Some(primary) = &self.primary {
228            return primary.health().await;
229        }
230        if let Some(fallback) = &self.fallback {
231            return fallback.health().await;
232        }
233        DisabledVectorIndex.health().await
234    }
235}
236
237/// Store wrapper that preserves all SQLite behavior except vector candidates.
238pub struct VectorIndexedStore {
239    sqlite: SqliteStore,
240    router: VectorIndexRouter,
241}
242
243impl VectorIndexedStore {
244    pub fn new(sqlite: SqliteStore, router: VectorIndexRouter) -> Self {
245        Self { sqlite, router }
246    }
247}
248
249#[async_trait]
250impl Store for VectorIndexedStore {
251    async fn upsert_document(&self, doc: &Document) -> Result<String> {
252        self.sqlite.upsert_document(doc).await
253    }
254
255    async fn replace_chunks(
256        &self,
257        doc_id: &str,
258        chunks: &[Chunk],
259        vectors: Option<&[Vec<f32>]>,
260    ) -> Result<()> {
261        self.sqlite.replace_chunks(doc_id, chunks, vectors).await
262    }
263
264    async fn upsert_embedding(
265        &self,
266        chunk_id: &str,
267        doc_id: &str,
268        vector: &[f32],
269        model: &str,
270        dims: usize,
271        content_hash: &str,
272    ) -> Result<()> {
273        self.sqlite
274            .upsert_embedding(chunk_id, doc_id, vector, model, dims, content_hash)
275            .await
276    }
277
278    async fn get_document(&self, id: &str) -> Result<Option<DocumentResponse>> {
279        self.sqlite.get_document(id).await
280    }
281
282    async fn get_document_metadata(&self, id: &str) -> Result<Option<DocumentMetadata>> {
283        self.sqlite.get_document_metadata(id).await
284    }
285
286    async fn keyword_search(
287        &self,
288        query: &str,
289        limit: i64,
290        source: Option<&str>,
291        since: Option<&str>,
292    ) -> Result<Vec<ChunkCandidate>> {
293        self.sqlite
294            .keyword_search(query, limit, source, since)
295            .await
296    }
297
298    async fn vector_search(
299        &self,
300        query_vec: &[f32],
301        limit: i64,
302        source: Option<&str>,
303        since: Option<&str>,
304    ) -> Result<Vec<ChunkCandidate>> {
305        self.router
306            .search(
307                query_vec,
308                VectorSearchOptions {
309                    limit,
310                    source,
311                    since,
312                },
313            )
314            .await
315    }
316}
317
318pub async fn configured_vector_store(
319    config: &Config,
320    pool: SqlitePool,
321) -> Result<VectorIndexedStore> {
322    let sqlite = SqliteStore::new(pool.clone());
323    let router = configured_vector_index(config, pool).await?;
324    Ok(VectorIndexedStore::new(sqlite, router))
325}
326
327pub async fn configured_vector_index(
328    config: &Config,
329    pool: SqlitePool,
330) -> Result<VectorIndexRouter> {
331    let backend = config.vector_index.backend.as_str();
332    let sqlite_fallback = || BruteForceSqliteVectorIndex::new(SqliteStore::new(pool.clone()));
333    let fallback = match config.vector_index.fallback.as_str() {
334        "sqlite" => Some(sqlite_fallback()),
335        _ => None,
336    };
337
338    match backend {
339        "sqlite" => Ok(VectorIndexRouter {
340            primary: None,
341            fallback: Some(sqlite_fallback()),
342            backend: "sqlite".to_string(),
343        }),
344        "disabled" => Ok(VectorIndexRouter {
345            primary: None,
346            fallback,
347            backend: "disabled".to_string(),
348        }),
349        "auto" => configured_auto_zvec(config, pool.clone(), fallback).await,
350        "zvec" => configured_required_zvec(config, pool.clone(), fallback).await,
351        other => Err(anyhow!("unknown vector_index.backend: {other}")),
352    }
353}
354
355#[cfg(feature = "zvec-bundled")]
356async fn configured_auto_zvec(
357    config: &Config,
358    pool: SqlitePool,
359    fallback: Option<BruteForceSqliteVectorIndex>,
360) -> Result<VectorIndexRouter> {
361    match ZvecVectorIndex::open_or_rebuild(config, pool).await {
362        Ok(index) => Ok(VectorIndexRouter {
363            primary: Some(Arc::new(index)),
364            fallback,
365            backend: "zvec".to_string(),
366        }),
367        Err(err) => {
368            eprintln!("Warning: zvec unavailable; using SQLite vector fallback: {err}");
369            Ok(VectorIndexRouter {
370                primary: None,
371                fallback,
372                backend: "auto".to_string(),
373            })
374        }
375    }
376}
377
378#[cfg(not(feature = "zvec-bundled"))]
379async fn configured_auto_zvec(
380    _config: &Config,
381    _pool: SqlitePool,
382    fallback: Option<BruteForceSqliteVectorIndex>,
383) -> Result<VectorIndexRouter> {
384    Ok(VectorIndexRouter {
385        primary: None,
386        fallback,
387        backend: "auto".to_string(),
388    })
389}
390
391#[cfg(feature = "zvec-bundled")]
392async fn configured_required_zvec(
393    config: &Config,
394    pool: SqlitePool,
395    fallback: Option<BruteForceSqliteVectorIndex>,
396) -> Result<VectorIndexRouter> {
397    let index = ZvecVectorIndex::open_or_rebuild(config, pool).await?;
398    Ok(VectorIndexRouter {
399        primary: Some(Arc::new(index)),
400        fallback,
401        backend: "zvec".to_string(),
402    })
403}
404
405#[cfg(not(feature = "zvec-bundled"))]
406async fn configured_required_zvec(
407    _config: &Config,
408    _pool: SqlitePool,
409    _fallback: Option<BruteForceSqliteVectorIndex>,
410) -> Result<VectorIndexRouter> {
411    Err(anyhow!(
412        "vector_index.backend = 'zvec' requires building with --features zvec-bundled"
413    ))
414}
415
416pub fn resolve_vector_index_path(config: &Config) -> PathBuf {
417    if config.vector_index.path.as_os_str() != "auto" {
418        return config.vector_index.path.clone();
419    }
420
421    if ctx_dirs::is_default_workspace_db_path(&config.db.path) {
422        return ctx_dirs::workspace_vector_index_dir();
423    }
424
425    config
426        .db
427        .path
428        .parent()
429        .unwrap_or_else(|| Path::new("."))
430        .join("vector-index")
431        .join("zvec")
432}
433
434pub async fn vector_index_status(config: &Config) -> Result<VectorIndexStatus> {
435    let pool = crate::db::connect(config).await?;
436    let path = resolve_vector_index_path(config);
437    let manifest = read_manifest(&path)?;
438    let snapshot = sqlite_vector_snapshot(&pool).await?;
439    pool.close().await;
440
441    let fresh = manifest
442        .as_ref()
443        .is_some_and(|manifest| manifest.digest == snapshot.digest);
444    let health = status_health(config, manifest.as_ref(), fresh, snapshot.records.len());
445    Ok(VectorIndexStatus {
446        path,
447        health,
448        manifest,
449        sqlite_vector_count: snapshot.records.len(),
450        sqlite_digest: snapshot.digest,
451        fresh,
452    })
453}
454
455fn status_health(
456    config: &Config,
457    manifest: Option<&VectorIndexManifest>,
458    fresh: bool,
459    sqlite_vector_count: usize,
460) -> VectorIndexHealth {
461    match config.vector_index.backend.as_str() {
462        "sqlite" => VectorIndexHealth {
463            enabled: true,
464            available: true,
465            backend: "sqlite-bruteforce".to_string(),
466            message: Some("exact SQLite vector scan".to_string()),
467        },
468        "disabled" => VectorIndexHealth {
469            enabled: false,
470            available: false,
471            backend: "disabled".to_string(),
472            message: Some("vector index disabled; SQLite fallback remains canonical".to_string()),
473        },
474        "zvec" if cfg!(feature = "zvec-bundled") => VectorIndexHealth {
475            enabled: true,
476            available: fresh,
477            backend: "zvec".to_string(),
478            message: Some(zvec_health_message(manifest, fresh, sqlite_vector_count)),
479        },
480        "zvec" => VectorIndexHealth {
481            enabled: true,
482            available: false,
483            backend: "zvec".to_string(),
484            message: Some("zvec not compiled in; rebuild with --features zvec-bundled".to_string()),
485        },
486        "auto" if cfg!(feature = "zvec-bundled") => VectorIndexHealth {
487            enabled: true,
488            available: fresh,
489            backend: "zvec".to_string(),
490            message: Some(zvec_health_message(manifest, fresh, sqlite_vector_count)),
491        },
492        "auto" => VectorIndexHealth {
493            enabled: true,
494            available: config.vector_index.fallback == "sqlite",
495            backend: "sqlite-bruteforce".to_string(),
496            message: Some("zvec not compiled in; using SQLite fallback".to_string()),
497        },
498        _ => VectorIndexHealth {
499            enabled: true,
500            available: config.vector_index.fallback == "sqlite",
501            backend: "sqlite-bruteforce".to_string(),
502            message: Some("unknown vector-index backend; using configured fallback".to_string()),
503        },
504    }
505}
506
507fn zvec_health_message(
508    manifest: Option<&VectorIndexManifest>,
509    fresh: bool,
510    sqlite_vector_count: usize,
511) -> String {
512    if sqlite_vector_count == 0 {
513        "SQLite has no vectors to index".to_string()
514    } else if manifest.is_none() {
515        "zvec sidecar missing".to_string()
516    } else if fresh {
517        "zvec sidecar fresh".to_string()
518    } else {
519        "zvec sidecar stale".to_string()
520    }
521}
522
523pub async fn rebuild_configured_vector_index(config: &Config) -> Result<VectorIndexStatus> {
524    let pool = crate::db::connect(config).await?;
525    rebuild_zvec_if_available(config, &pool).await?;
526    pool.close().await;
527    vector_index_status(config).await
528}
529
530#[cfg(feature = "zvec-bundled")]
531async fn rebuild_zvec_if_available(config: &Config, pool: &SqlitePool) -> Result<()> {
532    ZvecVectorIndex::rebuild(config, pool.clone())
533        .await
534        .map(|_| ())
535}
536
537#[cfg(not(feature = "zvec-bundled"))]
538async fn rebuild_zvec_if_available(config: &Config, _pool: &SqlitePool) -> Result<()> {
539    if config.vector_index.backend == "zvec" {
540        Err(anyhow!(
541            "vector_index.backend = 'zvec' requires building with --features zvec-bundled"
542        ))
543    } else {
544        Ok(())
545    }
546}
547
548pub async fn sync_vector_record_after_sqlite(
549    config: &Config,
550    pool: &SqlitePool,
551    record: &VectorRecord,
552) -> Result<()> {
553    sync_vector_record_after_sqlite_impl(config, pool, record).await
554}
555
556#[cfg(feature = "zvec-bundled")]
557async fn sync_vector_record_after_sqlite_impl(
558    config: &Config,
559    pool: &SqlitePool,
560    record: &VectorRecord,
561) -> Result<()> {
562    if matches!(config.vector_index.backend.as_str(), "sqlite" | "disabled") {
563        return Ok(());
564    }
565
566    let path = resolve_vector_index_path(config);
567    let Some(manifest) = read_manifest(&path)? else {
568        return Ok(());
569    };
570    if manifest.dims != Some(record.dims) {
571        return Ok(());
572    }
573
574    let index = ZvecVectorIndex::open_existing(config, pool.clone()).await?;
575    index.upsert(record).await?;
576    mark_sidecar_stale(&path, &manifest)?;
577    Ok(())
578}
579
580#[cfg(not(feature = "zvec-bundled"))]
581async fn sync_vector_record_after_sqlite_impl(
582    _config: &Config,
583    _pool: &SqlitePool,
584    _record: &VectorRecord,
585) -> Result<()> {
586    Ok(())
587}
588
589pub fn remove_configured_sidecar(config: &Config) -> Result<()> {
590    if matches!(config.vector_index.backend.as_str(), "sqlite" | "disabled") {
591        return Ok(());
592    }
593
594    let path = resolve_vector_index_path(config);
595    if path.is_dir() {
596        std::fs::remove_dir_all(&path)
597            .with_context(|| format!("failed to remove vector index sidecar {}", path.display()))?;
598    } else if path.exists() {
599        std::fs::remove_file(&path)
600            .with_context(|| format!("failed to remove vector index sidecar {}", path.display()))?;
601    }
602    Ok(())
603}
604
605struct SqliteVectorSnapshot {
606    records: Vec<IndexedVectorRecord>,
607    digest: String,
608}
609
610async fn sqlite_vector_snapshot(pool: &SqlitePool) -> Result<SqliteVectorSnapshot> {
611    let rows = sqlx::query(
612        r#"
613        SELECT cv.chunk_id, cv.document_id, cv.embedding,
614               e.model, e.dims, e.hash,
615               COALESCE(substr(c.text, 1, 240), '') AS snippet,
616               d.source, d.updated_at
617        FROM chunk_vectors cv
618        JOIN embeddings e ON e.chunk_id = cv.chunk_id
619        JOIN chunks c ON c.id = cv.chunk_id
620        JOIN documents d ON d.id = cv.document_id
621        ORDER BY cv.chunk_id
622        "#,
623    )
624    .fetch_all(pool)
625    .await?;
626
627    let mut hasher = Sha256::new();
628    let mut records = Vec::with_capacity(rows.len());
629    for row in rows {
630        let chunk_id: String = row.get("chunk_id");
631        let document_id: String = row.get("document_id");
632        let blob: Vec<u8> = row.get("embedding");
633        let model: String = row.get("model");
634        let dims: i64 = row.get("dims");
635        let content_hash: String = row.get("hash");
636        let source: String = row.get("source");
637        let updated_at: i64 = row.get("updated_at");
638
639        hasher.update(chunk_id.as_bytes());
640        hasher.update([0]);
641        hasher.update(document_id.as_bytes());
642        hasher.update([0]);
643        hasher.update(model.as_bytes());
644        hasher.update([0]);
645        hasher.update(dims.to_le_bytes());
646        hasher.update(content_hash.as_bytes());
647        hasher.update([0]);
648        hasher.update(&blob);
649
650        records.push(IndexedVectorRecord {
651            record: VectorRecord {
652                chunk_id,
653                document_id,
654                vector: blob_to_vec(&blob),
655                model,
656                dims: dims as usize,
657                content_hash,
658            },
659            snippet: row.get("snippet"),
660            source,
661            updated_at,
662        });
663    }
664
665    Ok(SqliteVectorSnapshot {
666        records,
667        digest: hex::encode(hasher.finalize()),
668    })
669}
670
671#[allow(dead_code)]
672fn manifest_for_snapshot(snapshot: &SqliteVectorSnapshot, config: &Config) -> VectorIndexManifest {
673    let first = snapshot.records.first();
674    VectorIndexManifest {
675        version: MANIFEST_VERSION,
676        backend: "zvec".to_string(),
677        vector_count: snapshot.records.len(),
678        model: first.map(|r| r.record.model.clone()),
679        dims: first.map(|r| r.record.dims),
680        metric: config.vector_index.metric.clone(),
681        index: config.vector_index.index.clone(),
682        digest: snapshot.digest.clone(),
683    }
684}
685
686fn read_manifest(path: &Path) -> Result<Option<VectorIndexManifest>> {
687    let manifest_path = path.join(MANIFEST_FILE);
688    if !manifest_path.exists() {
689        return Ok(None);
690    }
691    let content = std::fs::read_to_string(&manifest_path)
692        .with_context(|| format!("failed to read {}", manifest_path.display()))?;
693    serde_json::from_str(&content)
694        .with_context(|| format!("failed to parse {}", manifest_path.display()))
695        .map(Some)
696}
697
698#[allow(dead_code)]
699fn write_manifest(path: &Path, manifest: &VectorIndexManifest) -> Result<()> {
700    std::fs::create_dir_all(path)?;
701    let content = serde_json::to_string_pretty(manifest)?;
702    std::fs::write(path.join(MANIFEST_FILE), content)?;
703    Ok(())
704}
705
706#[allow(dead_code)]
707fn mark_sidecar_stale(path: &Path, manifest: &VectorIndexManifest) -> Result<()> {
708    let mut stale = manifest.clone();
709    stale.digest = format!("stale:{}", stale.digest);
710    write_manifest(path, &stale)
711}
712
713#[allow(dead_code)]
714fn parse_since(since: Option<&str>) -> Result<Option<i64>> {
715    let Some(since) = since else {
716        return Ok(None);
717    };
718    let date = chrono::NaiveDate::parse_from_str(since, "%Y-%m-%d")?;
719    Ok(Some(
720        date.and_hms_opt(0, 0, 0)
721            .ok_or_else(|| anyhow!("invalid since date"))?
722            .and_utc()
723            .timestamp(),
724    ))
725}
726
727#[cfg(feature = "zvec-bundled")]
728pub struct ZvecVectorIndex {
729    config: Config,
730    pool: SqlitePool,
731    path: PathBuf,
732    collection: zvec::Collection,
733    metadata: RwLock<Vec<IndexedVectorRecord>>,
734}
735
736#[cfg(feature = "zvec-bundled")]
737impl ZvecVectorIndex {
738    pub async fn open_or_rebuild(config: &Config, pool: SqlitePool) -> Result<Self> {
739        let path = resolve_vector_index_path(config);
740        let collection_path = path.join(COLLECTION_DIR);
741
742        if read_manifest(&path)?.is_some_and(|manifest| !manifest.digest.starts_with("stale:"))
743            && collection_path.exists()
744        {
745            return Self::open_existing(config, pool).await;
746        }
747
748        let snapshot = sqlite_vector_snapshot(&pool).await?;
749        if snapshot.records.is_empty() {
750            return Err(anyhow!("SQLite has no vectors to index"));
751        }
752
753        Self::rebuild_from_snapshot(config, pool, path, snapshot).await
754    }
755
756    async fn open_existing(config: &Config, pool: SqlitePool) -> Result<Self> {
757        let path = resolve_vector_index_path(config);
758        let collection_path = path.join(COLLECTION_DIR);
759        let collection = zvec::Collection::open(&collection_path).with_context(|| {
760            format!(
761                "failed to open zvec collection {}",
762                collection_path.display()
763            )
764        })?;
765        let snapshot = sqlite_vector_snapshot(&pool).await?;
766        Ok(Self {
767            config: config.clone(),
768            pool,
769            path,
770            collection,
771            metadata: RwLock::new(snapshot.records),
772        })
773    }
774
775    pub async fn rebuild(config: &Config, pool: SqlitePool) -> Result<Self> {
776        let path = resolve_vector_index_path(config);
777        let snapshot = sqlite_vector_snapshot(&pool).await?;
778        Self::rebuild_from_snapshot(config, pool, path, snapshot).await
779    }
780
781    async fn rebuild_from_snapshot(
782        config: &Config,
783        pool: SqlitePool,
784        path: PathBuf,
785        snapshot: SqliteVectorSnapshot,
786    ) -> Result<Self> {
787        if path.is_dir() {
788            std::fs::remove_dir_all(&path).with_context(|| {
789                format!("failed to remove stale zvec sidecar {}", path.display())
790            })?;
791        } else if path.exists() {
792            std::fs::remove_file(&path).with_context(|| {
793                format!(
794                    "failed to remove stale zvec sidecar file {}",
795                    path.display()
796                )
797            })?;
798        }
799        std::fs::create_dir_all(&path)?;
800
801        let dims = snapshot
802            .records
803            .first()
804            .map(|r| r.record.dims)
805            .ok_or_else(|| anyhow!("SQLite has no vectors to index"))?;
806        let schema = zvec_schema(dims)?;
807        let collection_path = path.join(COLLECTION_DIR);
808        let collection =
809            zvec::Collection::create_and_open(&collection_path, schema).with_context(|| {
810                format!(
811                    "failed to create zvec collection {}",
812                    collection_path.display()
813                )
814            })?;
815
816        let index = Self {
817            config: config.clone(),
818            pool,
819            path: path.clone(),
820            collection,
821            metadata: RwLock::new(snapshot.records.clone()),
822        };
823        index.populate(&snapshot.records)?;
824        index.collection.optimize()?;
825        index.collection.flush()?;
826        write_manifest(&path, &manifest_for_snapshot(&snapshot, config))?;
827        Ok(index)
828    }
829
830    fn populate(&self, records: &[IndexedVectorRecord]) -> Result<()> {
831        let mut batch = Vec::with_capacity(ZVEC_BATCH_SIZE);
832        for record in records {
833            batch.push(doc_from_record(record)?);
834            if batch.len() == ZVEC_BATCH_SIZE {
835                self.insert_docs(&batch)?;
836                batch.clear();
837            }
838        }
839        if !batch.is_empty() {
840            self.insert_docs(&batch)?;
841        }
842        self.collection.flush()?;
843        Ok(())
844    }
845
846    fn insert_docs(&self, docs: &[zvec::Doc]) -> Result<()> {
847        self.collection.insert(docs)?;
848        Ok(())
849    }
850
851    async fn write_current_manifest(&self) -> Result<()> {
852        let snapshot = sqlite_vector_snapshot(&self.pool).await?;
853        write_manifest(&self.path, &manifest_for_snapshot(&snapshot, &self.config))
854    }
855}
856
857#[cfg(feature = "zvec-bundled")]
858#[async_trait]
859impl VectorIndex for ZvecVectorIndex {
860    async fn upsert(&self, record: &VectorRecord) -> Result<()> {
861        let row = sqlx::query(
862            r#"
863            SELECT COALESCE(substr(c.text, 1, 240), '') AS snippet,
864                   d.source, d.updated_at
865            FROM chunks c
866            JOIN documents d ON d.id = c.document_id
867            WHERE c.id = ?
868            "#,
869        )
870        .bind(&record.chunk_id)
871        .fetch_optional(&self.pool)
872        .await?;
873
874        let Some(row) = row else {
875            return Ok(());
876        };
877
878        let indexed = IndexedVectorRecord {
879            record: record.clone(),
880            snippet: row.get("snippet"),
881            source: row.get("source"),
882            updated_at: row.get("updated_at"),
883        };
884        let doc = doc_from_record(&indexed)?;
885        let _ = self.collection.delete(&[record.chunk_id.as_str()]);
886        self.collection.insert(&[doc])?;
887        self.metadata
888            .write()
889            .map_err(|_| anyhow!("zvec metadata lock poisoned"))?
890            .push(indexed);
891        self.collection.flush()?;
892        Ok(())
893    }
894
895    async fn delete_chunk(&self, chunk_id: &str) -> Result<()> {
896        self.collection.delete(&[chunk_id])?;
897        self.collection.flush()?;
898        self.write_current_manifest().await?;
899        Ok(())
900    }
901
902    async fn delete_document(&self, document_id: &str) -> Result<()> {
903        let escaped = document_id.replace('\'', "\\'");
904        self.collection
905            .delete_by_filter(&format!("document_id == '{escaped}'"))?;
906        self.collection.flush()?;
907        self.write_current_manifest().await?;
908        Ok(())
909    }
910
911    async fn search(
912        &self,
913        query_vec: &[f32],
914        options: VectorSearchOptions<'_>,
915    ) -> Result<Vec<ChunkCandidate>> {
916        let limit = options.limit.max(0) as usize;
917        if limit == 0 {
918            return Ok(Vec::new());
919        }
920        let since_ts = parse_since(options.since)?;
921        let topk = limit.saturating_mul(8).max(limit);
922        let query = zvec::VectorQuery::new("embedding")
923            .topk(topk)
924            .include_doc_id(true)
925            .output_fields(&["chunk_id", "document_id", "source", "updated_at", "snippet"])
926            .vector(query_vec)?;
927
928        let rows = self.collection.query(query)?;
929        let metadata = self
930            .metadata
931            .read()
932            .map_err(|_| anyhow!("zvec metadata lock poisoned"))?;
933        let mut candidates = Vec::new();
934        for row in rows.iter() {
935            let Some(record) = metadata.get(row.doc_id() as usize) else {
936                continue;
937            };
938            if options
939                .source
940                .is_some_and(|expected| record.source != expected)
941            {
942                continue;
943            }
944            if since_ts.is_some_and(|since| record.updated_at < since) {
945                continue;
946            }
947            candidates.push(ChunkCandidate {
948                chunk_id: record.record.chunk_id.clone(),
949                document_id: record.record.document_id.clone(),
950                raw_score: row.score() as f64,
951                snippet: record.snippet.clone(),
952            });
953            if candidates.len() == limit {
954                break;
955            }
956        }
957        Ok(candidates)
958    }
959
960    async fn health(&self) -> Result<VectorIndexHealth> {
961        let snapshot = sqlite_vector_snapshot(&self.pool).await?;
962        let manifest = read_manifest(&self.path)?;
963        let fresh = manifest
964            .as_ref()
965            .is_some_and(|manifest| manifest.digest == snapshot.digest);
966        Ok(VectorIndexHealth {
967            enabled: true,
968            available: fresh,
969            backend: "zvec".to_string(),
970            message: Some(if fresh {
971                "zvec sidecar fresh".to_string()
972            } else {
973                "zvec sidecar stale".to_string()
974            }),
975        })
976    }
977}
978
979#[cfg(feature = "zvec-bundled")]
980fn zvec_schema(dims: usize) -> Result<zvec::CollectionSchema> {
981    let mut schema = zvec::CollectionSchema::new("context_chunks");
982    schema.add_field(zvec::FieldSchema::string("chunk_id"))?;
983    schema.add_field(zvec::FieldSchema::string("document_id"))?;
984    schema.add_field(zvec::FieldSchema::string("source"))?;
985    schema.add_field(zvec::FieldSchema::int64("updated_at"))?;
986    schema.add_field(zvec::FieldSchema::string("snippet"))?;
987    schema.add_field(zvec::VectorSchema::fp32("embedding", dims as u32))?;
988    Ok(schema)
989}
990
991#[cfg(feature = "zvec-bundled")]
992fn doc_from_record(record: &IndexedVectorRecord) -> Result<zvec::Doc> {
993    let mut doc = zvec::Doc::new();
994    doc.set_pk(&record.record.chunk_id)?;
995    doc.set_string("chunk_id", &record.record.chunk_id)?;
996    doc.set_string("document_id", &record.record.document_id)?;
997    doc.set_string("source", &record.source)?;
998    doc.set_int64("updated_at", record.updated_at)?;
999    doc.set_string("snippet", &record.snippet)?;
1000    doc.set_vector("embedding", &record.record.vector)?;
1001    Ok(doc)
1002}