1use std::path::{Path, PathBuf};
8use std::sync::Arc;
9#[cfg(feature = "zvec-bundled")]
10use std::sync::RwLock;
11
12use anyhow::{anyhow, Context, Result};
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16use sqlx::{Row, SqlitePool};
17
18use context_harness_core::embedding::blob_to_vec;
19use context_harness_core::models::{Chunk, Document};
20use context_harness_core::store::{ChunkCandidate, DocumentMetadata, DocumentResponse, Store};
21
22use crate::config::Config;
23use crate::ctx_dirs;
24use crate::sqlite_store::SqliteStore;
25
26#[allow(dead_code)]
27const MANIFEST_VERSION: u32 = 1;
28#[allow(dead_code)]
29const COLLECTION_DIR: &str = "collection";
30#[allow(dead_code)]
31const MANIFEST_FILE: &str = "manifest.json";
32#[allow(dead_code)]
33const ZVEC_BATCH_SIZE: usize = 512;
34
35#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub struct VectorRecord {
39 pub chunk_id: String,
40 pub document_id: String,
41 pub vector: Vec<f32>,
42 pub model: String,
43 pub dims: usize,
44 pub content_hash: String,
45}
46
47#[derive(Debug, Clone)]
48#[allow(dead_code)]
49struct IndexedVectorRecord {
50 record: VectorRecord,
51 snippet: String,
52 source: String,
53 updated_at: i64,
54}
55
56#[derive(Debug, Clone, Copy, Default)]
58pub struct VectorSearchOptions<'a> {
59 pub limit: i64,
60 pub source: Option<&'a str>,
61 pub since: Option<&'a str>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct VectorIndexHealth {
67 pub enabled: bool,
68 pub available: bool,
69 pub backend: String,
70 pub message: Option<String>,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct VectorIndexManifest {
75 pub version: u32,
76 pub backend: String,
77 pub vector_count: usize,
78 pub model: Option<String>,
79 pub dims: Option<usize>,
80 pub metric: String,
81 pub index: String,
82 pub digest: String,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct VectorIndexStatus {
87 pub path: PathBuf,
88 pub health: VectorIndexHealth,
89 pub manifest: Option<VectorIndexManifest>,
90 pub sqlite_vector_count: usize,
91 pub sqlite_digest: String,
92 pub fresh: bool,
93}
94
95#[async_trait]
97pub trait VectorIndex: Send + Sync {
98 #[allow(dead_code)]
99 async fn upsert(&self, record: &VectorRecord) -> Result<()>;
100 #[allow(dead_code)]
101 async fn delete_chunk(&self, chunk_id: &str) -> Result<()>;
102 #[allow(dead_code)]
103 async fn delete_document(&self, document_id: &str) -> Result<()>;
104 async fn search(
105 &self,
106 query_vec: &[f32],
107 options: VectorSearchOptions<'_>,
108 ) -> Result<Vec<ChunkCandidate>>;
109 #[allow(dead_code)]
110 async fn health(&self) -> Result<VectorIndexHealth>;
111}
112
113pub struct DisabledVectorIndex;
115
116#[async_trait]
117impl VectorIndex for DisabledVectorIndex {
118 async fn upsert(&self, _record: &VectorRecord) -> Result<()> {
119 Ok(())
120 }
121
122 async fn delete_chunk(&self, _chunk_id: &str) -> Result<()> {
123 Ok(())
124 }
125
126 async fn delete_document(&self, _document_id: &str) -> Result<()> {
127 Ok(())
128 }
129
130 async fn search(
131 &self,
132 _query_vec: &[f32],
133 _options: VectorSearchOptions<'_>,
134 ) -> Result<Vec<ChunkCandidate>> {
135 Ok(Vec::new())
136 }
137
138 async fn health(&self) -> Result<VectorIndexHealth> {
139 Ok(VectorIndexHealth {
140 enabled: false,
141 available: false,
142 backend: "disabled".to_string(),
143 message: Some("vector index disabled; SQLite fallback remains canonical".to_string()),
144 })
145 }
146}
147
148pub struct BruteForceSqliteVectorIndex {
150 store: SqliteStore,
151}
152
153impl BruteForceSqliteVectorIndex {
154 pub fn new(store: SqliteStore) -> Self {
155 Self { store }
156 }
157}
158
159#[async_trait]
160impl VectorIndex for BruteForceSqliteVectorIndex {
161 async fn upsert(&self, _record: &VectorRecord) -> Result<()> {
162 Ok(())
163 }
164
165 async fn delete_chunk(&self, _chunk_id: &str) -> Result<()> {
166 Ok(())
167 }
168
169 async fn delete_document(&self, _document_id: &str) -> Result<()> {
170 Ok(())
171 }
172
173 async fn search(
174 &self,
175 query_vec: &[f32],
176 options: VectorSearchOptions<'_>,
177 ) -> Result<Vec<ChunkCandidate>> {
178 self.store
179 .vector_search(query_vec, options.limit, options.source, options.since)
180 .await
181 }
182
183 async fn health(&self) -> Result<VectorIndexHealth> {
184 Ok(VectorIndexHealth {
185 enabled: true,
186 available: true,
187 backend: "sqlite-bruteforce".to_string(),
188 message: Some("exact SQLite vector scan".to_string()),
189 })
190 }
191}
192
193pub struct VectorIndexRouter {
194 primary: Option<Arc<dyn VectorIndex>>,
195 fallback: Option<BruteForceSqliteVectorIndex>,
196 backend: String,
197}
198
199impl VectorIndexRouter {
200 async fn search(
201 &self,
202 query_vec: &[f32],
203 options: VectorSearchOptions<'_>,
204 ) -> Result<Vec<ChunkCandidate>> {
205 if let Some(primary) = &self.primary {
206 match primary.search(query_vec, options).await {
207 Ok(candidates) => return Ok(candidates),
208 Err(err) if self.fallback.is_some() => {
209 eprintln!(
210 "Warning: vector index '{}' failed; falling back to SQLite: {}",
211 self.backend, err
212 );
213 }
214 Err(err) => return Err(err),
215 }
216 }
217
218 if let Some(fallback) = &self.fallback {
219 return fallback.search(query_vec, options).await;
220 }
221
222 Ok(Vec::new())
223 }
224
225 #[allow(dead_code)]
226 pub async fn health(&self) -> Result<VectorIndexHealth> {
227 if let Some(primary) = &self.primary {
228 return primary.health().await;
229 }
230 if let Some(fallback) = &self.fallback {
231 return fallback.health().await;
232 }
233 DisabledVectorIndex.health().await
234 }
235}
236
237pub struct VectorIndexedStore {
239 sqlite: SqliteStore,
240 router: VectorIndexRouter,
241}
242
243impl VectorIndexedStore {
244 pub fn new(sqlite: SqliteStore, router: VectorIndexRouter) -> Self {
245 Self { sqlite, router }
246 }
247}
248
249#[async_trait]
250impl Store for VectorIndexedStore {
251 async fn upsert_document(&self, doc: &Document) -> Result<String> {
252 self.sqlite.upsert_document(doc).await
253 }
254
255 async fn replace_chunks(
256 &self,
257 doc_id: &str,
258 chunks: &[Chunk],
259 vectors: Option<&[Vec<f32>]>,
260 ) -> Result<()> {
261 self.sqlite.replace_chunks(doc_id, chunks, vectors).await
262 }
263
264 async fn upsert_embedding(
265 &self,
266 chunk_id: &str,
267 doc_id: &str,
268 vector: &[f32],
269 model: &str,
270 dims: usize,
271 content_hash: &str,
272 ) -> Result<()> {
273 self.sqlite
274 .upsert_embedding(chunk_id, doc_id, vector, model, dims, content_hash)
275 .await
276 }
277
278 async fn get_document(&self, id: &str) -> Result<Option<DocumentResponse>> {
279 self.sqlite.get_document(id).await
280 }
281
282 async fn get_document_metadata(&self, id: &str) -> Result<Option<DocumentMetadata>> {
283 self.sqlite.get_document_metadata(id).await
284 }
285
286 async fn keyword_search(
287 &self,
288 query: &str,
289 limit: i64,
290 source: Option<&str>,
291 since: Option<&str>,
292 ) -> Result<Vec<ChunkCandidate>> {
293 self.sqlite
294 .keyword_search(query, limit, source, since)
295 .await
296 }
297
298 async fn vector_search(
299 &self,
300 query_vec: &[f32],
301 limit: i64,
302 source: Option<&str>,
303 since: Option<&str>,
304 ) -> Result<Vec<ChunkCandidate>> {
305 self.router
306 .search(
307 query_vec,
308 VectorSearchOptions {
309 limit,
310 source,
311 since,
312 },
313 )
314 .await
315 }
316}
317
318pub async fn configured_vector_store(
319 config: &Config,
320 pool: SqlitePool,
321) -> Result<VectorIndexedStore> {
322 let sqlite = SqliteStore::new(pool.clone());
323 let router = configured_vector_index(config, pool).await?;
324 Ok(VectorIndexedStore::new(sqlite, router))
325}
326
327pub async fn configured_vector_index(
328 config: &Config,
329 pool: SqlitePool,
330) -> Result<VectorIndexRouter> {
331 let backend = config.vector_index.backend.as_str();
332 let sqlite_fallback = || BruteForceSqliteVectorIndex::new(SqliteStore::new(pool.clone()));
333 let fallback = match config.vector_index.fallback.as_str() {
334 "sqlite" => Some(sqlite_fallback()),
335 _ => None,
336 };
337
338 match backend {
339 "sqlite" => Ok(VectorIndexRouter {
340 primary: None,
341 fallback: Some(sqlite_fallback()),
342 backend: "sqlite".to_string(),
343 }),
344 "disabled" => Ok(VectorIndexRouter {
345 primary: None,
346 fallback,
347 backend: "disabled".to_string(),
348 }),
349 "auto" => configured_auto_zvec(config, pool.clone(), fallback).await,
350 "zvec" => configured_required_zvec(config, pool.clone(), fallback).await,
351 other => Err(anyhow!("unknown vector_index.backend: {other}")),
352 }
353}
354
355#[cfg(feature = "zvec-bundled")]
356async fn configured_auto_zvec(
357 config: &Config,
358 pool: SqlitePool,
359 fallback: Option<BruteForceSqliteVectorIndex>,
360) -> Result<VectorIndexRouter> {
361 match ZvecVectorIndex::open_or_rebuild(config, pool).await {
362 Ok(index) => Ok(VectorIndexRouter {
363 primary: Some(Arc::new(index)),
364 fallback,
365 backend: "zvec".to_string(),
366 }),
367 Err(err) => {
368 eprintln!("Warning: zvec unavailable; using SQLite vector fallback: {err}");
369 Ok(VectorIndexRouter {
370 primary: None,
371 fallback,
372 backend: "auto".to_string(),
373 })
374 }
375 }
376}
377
378#[cfg(not(feature = "zvec-bundled"))]
379async fn configured_auto_zvec(
380 _config: &Config,
381 _pool: SqlitePool,
382 fallback: Option<BruteForceSqliteVectorIndex>,
383) -> Result<VectorIndexRouter> {
384 Ok(VectorIndexRouter {
385 primary: None,
386 fallback,
387 backend: "auto".to_string(),
388 })
389}
390
391#[cfg(feature = "zvec-bundled")]
392async fn configured_required_zvec(
393 config: &Config,
394 pool: SqlitePool,
395 fallback: Option<BruteForceSqliteVectorIndex>,
396) -> Result<VectorIndexRouter> {
397 let index = ZvecVectorIndex::open_or_rebuild(config, pool).await?;
398 Ok(VectorIndexRouter {
399 primary: Some(Arc::new(index)),
400 fallback,
401 backend: "zvec".to_string(),
402 })
403}
404
405#[cfg(not(feature = "zvec-bundled"))]
406async fn configured_required_zvec(
407 _config: &Config,
408 _pool: SqlitePool,
409 _fallback: Option<BruteForceSqliteVectorIndex>,
410) -> Result<VectorIndexRouter> {
411 Err(anyhow!(
412 "vector_index.backend = 'zvec' requires building with --features zvec-bundled"
413 ))
414}
415
416pub fn resolve_vector_index_path(config: &Config) -> PathBuf {
417 if config.vector_index.path.as_os_str() != "auto" {
418 return config.vector_index.path.clone();
419 }
420
421 if ctx_dirs::is_default_workspace_db_path(&config.db.path) {
422 return ctx_dirs::workspace_vector_index_dir();
423 }
424
425 config
426 .db
427 .path
428 .parent()
429 .unwrap_or_else(|| Path::new("."))
430 .join("vector-index")
431 .join("zvec")
432}
433
434pub async fn vector_index_status(config: &Config) -> Result<VectorIndexStatus> {
435 let pool = crate::db::connect(config).await?;
436 let path = resolve_vector_index_path(config);
437 let manifest = read_manifest(&path)?;
438 let snapshot = sqlite_vector_snapshot(&pool).await?;
439 pool.close().await;
440
441 let fresh = manifest
442 .as_ref()
443 .is_some_and(|manifest| manifest.digest == snapshot.digest);
444 let health = status_health(config, manifest.as_ref(), fresh, snapshot.records.len());
445 Ok(VectorIndexStatus {
446 path,
447 health,
448 manifest,
449 sqlite_vector_count: snapshot.records.len(),
450 sqlite_digest: snapshot.digest,
451 fresh,
452 })
453}
454
455fn status_health(
456 config: &Config,
457 manifest: Option<&VectorIndexManifest>,
458 fresh: bool,
459 sqlite_vector_count: usize,
460) -> VectorIndexHealth {
461 match config.vector_index.backend.as_str() {
462 "sqlite" => VectorIndexHealth {
463 enabled: true,
464 available: true,
465 backend: "sqlite-bruteforce".to_string(),
466 message: Some("exact SQLite vector scan".to_string()),
467 },
468 "disabled" => VectorIndexHealth {
469 enabled: false,
470 available: false,
471 backend: "disabled".to_string(),
472 message: Some("vector index disabled; SQLite fallback remains canonical".to_string()),
473 },
474 "zvec" if cfg!(feature = "zvec-bundled") => VectorIndexHealth {
475 enabled: true,
476 available: fresh,
477 backend: "zvec".to_string(),
478 message: Some(zvec_health_message(manifest, fresh, sqlite_vector_count)),
479 },
480 "zvec" => VectorIndexHealth {
481 enabled: true,
482 available: false,
483 backend: "zvec".to_string(),
484 message: Some("zvec not compiled in; rebuild with --features zvec-bundled".to_string()),
485 },
486 "auto" if cfg!(feature = "zvec-bundled") => VectorIndexHealth {
487 enabled: true,
488 available: fresh,
489 backend: "zvec".to_string(),
490 message: Some(zvec_health_message(manifest, fresh, sqlite_vector_count)),
491 },
492 "auto" => VectorIndexHealth {
493 enabled: true,
494 available: config.vector_index.fallback == "sqlite",
495 backend: "sqlite-bruteforce".to_string(),
496 message: Some("zvec not compiled in; using SQLite fallback".to_string()),
497 },
498 _ => VectorIndexHealth {
499 enabled: true,
500 available: config.vector_index.fallback == "sqlite",
501 backend: "sqlite-bruteforce".to_string(),
502 message: Some("unknown vector-index backend; using configured fallback".to_string()),
503 },
504 }
505}
506
507fn zvec_health_message(
508 manifest: Option<&VectorIndexManifest>,
509 fresh: bool,
510 sqlite_vector_count: usize,
511) -> String {
512 if sqlite_vector_count == 0 {
513 "SQLite has no vectors to index".to_string()
514 } else if manifest.is_none() {
515 "zvec sidecar missing".to_string()
516 } else if fresh {
517 "zvec sidecar fresh".to_string()
518 } else {
519 "zvec sidecar stale".to_string()
520 }
521}
522
523pub async fn rebuild_configured_vector_index(config: &Config) -> Result<VectorIndexStatus> {
524 let pool = crate::db::connect(config).await?;
525 rebuild_zvec_if_available(config, &pool).await?;
526 pool.close().await;
527 vector_index_status(config).await
528}
529
530#[cfg(feature = "zvec-bundled")]
531async fn rebuild_zvec_if_available(config: &Config, pool: &SqlitePool) -> Result<()> {
532 ZvecVectorIndex::rebuild(config, pool.clone())
533 .await
534 .map(|_| ())
535}
536
537#[cfg(not(feature = "zvec-bundled"))]
538async fn rebuild_zvec_if_available(config: &Config, _pool: &SqlitePool) -> Result<()> {
539 if config.vector_index.backend == "zvec" {
540 Err(anyhow!(
541 "vector_index.backend = 'zvec' requires building with --features zvec-bundled"
542 ))
543 } else {
544 Ok(())
545 }
546}
547
548pub async fn sync_vector_record_after_sqlite(
549 config: &Config,
550 pool: &SqlitePool,
551 record: &VectorRecord,
552) -> Result<()> {
553 sync_vector_record_after_sqlite_impl(config, pool, record).await
554}
555
556#[cfg(feature = "zvec-bundled")]
557async fn sync_vector_record_after_sqlite_impl(
558 config: &Config,
559 pool: &SqlitePool,
560 record: &VectorRecord,
561) -> Result<()> {
562 if matches!(config.vector_index.backend.as_str(), "sqlite" | "disabled") {
563 return Ok(());
564 }
565
566 let path = resolve_vector_index_path(config);
567 let Some(manifest) = read_manifest(&path)? else {
568 return Ok(());
569 };
570 if manifest.dims != Some(record.dims) {
571 return Ok(());
572 }
573
574 let index = ZvecVectorIndex::open_existing(config, pool.clone()).await?;
575 index.upsert(record).await?;
576 mark_sidecar_stale(&path, &manifest)?;
577 Ok(())
578}
579
580#[cfg(not(feature = "zvec-bundled"))]
581async fn sync_vector_record_after_sqlite_impl(
582 _config: &Config,
583 _pool: &SqlitePool,
584 _record: &VectorRecord,
585) -> Result<()> {
586 Ok(())
587}
588
589pub fn remove_configured_sidecar(config: &Config) -> Result<()> {
590 if matches!(config.vector_index.backend.as_str(), "sqlite" | "disabled") {
591 return Ok(());
592 }
593
594 let path = resolve_vector_index_path(config);
595 if path.is_dir() {
596 std::fs::remove_dir_all(&path)
597 .with_context(|| format!("failed to remove vector index sidecar {}", path.display()))?;
598 } else if path.exists() {
599 std::fs::remove_file(&path)
600 .with_context(|| format!("failed to remove vector index sidecar {}", path.display()))?;
601 }
602 Ok(())
603}
604
605struct SqliteVectorSnapshot {
606 records: Vec<IndexedVectorRecord>,
607 digest: String,
608}
609
610async fn sqlite_vector_snapshot(pool: &SqlitePool) -> Result<SqliteVectorSnapshot> {
611 let rows = sqlx::query(
612 r#"
613 SELECT cv.chunk_id, cv.document_id, cv.embedding,
614 e.model, e.dims, e.hash,
615 COALESCE(substr(c.text, 1, 240), '') AS snippet,
616 d.source, d.updated_at
617 FROM chunk_vectors cv
618 JOIN embeddings e ON e.chunk_id = cv.chunk_id
619 JOIN chunks c ON c.id = cv.chunk_id
620 JOIN documents d ON d.id = cv.document_id
621 ORDER BY cv.chunk_id
622 "#,
623 )
624 .fetch_all(pool)
625 .await?;
626
627 let mut hasher = Sha256::new();
628 let mut records = Vec::with_capacity(rows.len());
629 for row in rows {
630 let chunk_id: String = row.get("chunk_id");
631 let document_id: String = row.get("document_id");
632 let blob: Vec<u8> = row.get("embedding");
633 let model: String = row.get("model");
634 let dims: i64 = row.get("dims");
635 let content_hash: String = row.get("hash");
636 let source: String = row.get("source");
637 let updated_at: i64 = row.get("updated_at");
638
639 hasher.update(chunk_id.as_bytes());
640 hasher.update([0]);
641 hasher.update(document_id.as_bytes());
642 hasher.update([0]);
643 hasher.update(model.as_bytes());
644 hasher.update([0]);
645 hasher.update(dims.to_le_bytes());
646 hasher.update(content_hash.as_bytes());
647 hasher.update([0]);
648 hasher.update(&blob);
649
650 records.push(IndexedVectorRecord {
651 record: VectorRecord {
652 chunk_id,
653 document_id,
654 vector: blob_to_vec(&blob),
655 model,
656 dims: dims as usize,
657 content_hash,
658 },
659 snippet: row.get("snippet"),
660 source,
661 updated_at,
662 });
663 }
664
665 Ok(SqliteVectorSnapshot {
666 records,
667 digest: hex::encode(hasher.finalize()),
668 })
669}
670
671#[allow(dead_code)]
672fn manifest_for_snapshot(snapshot: &SqliteVectorSnapshot, config: &Config) -> VectorIndexManifest {
673 let first = snapshot.records.first();
674 VectorIndexManifest {
675 version: MANIFEST_VERSION,
676 backend: "zvec".to_string(),
677 vector_count: snapshot.records.len(),
678 model: first.map(|r| r.record.model.clone()),
679 dims: first.map(|r| r.record.dims),
680 metric: config.vector_index.metric.clone(),
681 index: config.vector_index.index.clone(),
682 digest: snapshot.digest.clone(),
683 }
684}
685
686fn read_manifest(path: &Path) -> Result<Option<VectorIndexManifest>> {
687 let manifest_path = path.join(MANIFEST_FILE);
688 if !manifest_path.exists() {
689 return Ok(None);
690 }
691 let content = std::fs::read_to_string(&manifest_path)
692 .with_context(|| format!("failed to read {}", manifest_path.display()))?;
693 serde_json::from_str(&content)
694 .with_context(|| format!("failed to parse {}", manifest_path.display()))
695 .map(Some)
696}
697
698#[allow(dead_code)]
699fn write_manifest(path: &Path, manifest: &VectorIndexManifest) -> Result<()> {
700 std::fs::create_dir_all(path)?;
701 let content = serde_json::to_string_pretty(manifest)?;
702 std::fs::write(path.join(MANIFEST_FILE), content)?;
703 Ok(())
704}
705
706#[allow(dead_code)]
707fn mark_sidecar_stale(path: &Path, manifest: &VectorIndexManifest) -> Result<()> {
708 let mut stale = manifest.clone();
709 stale.digest = format!("stale:{}", stale.digest);
710 write_manifest(path, &stale)
711}
712
713#[allow(dead_code)]
714fn parse_since(since: Option<&str>) -> Result<Option<i64>> {
715 let Some(since) = since else {
716 return Ok(None);
717 };
718 let date = chrono::NaiveDate::parse_from_str(since, "%Y-%m-%d")?;
719 Ok(Some(
720 date.and_hms_opt(0, 0, 0)
721 .ok_or_else(|| anyhow!("invalid since date"))?
722 .and_utc()
723 .timestamp(),
724 ))
725}
726
727#[cfg(feature = "zvec-bundled")]
728pub struct ZvecVectorIndex {
729 config: Config,
730 pool: SqlitePool,
731 path: PathBuf,
732 collection: zvec::Collection,
733 metadata: RwLock<Vec<IndexedVectorRecord>>,
734}
735
736#[cfg(feature = "zvec-bundled")]
737impl ZvecVectorIndex {
738 pub async fn open_or_rebuild(config: &Config, pool: SqlitePool) -> Result<Self> {
739 let path = resolve_vector_index_path(config);
740 let collection_path = path.join(COLLECTION_DIR);
741
742 if read_manifest(&path)?.is_some_and(|manifest| !manifest.digest.starts_with("stale:"))
743 && collection_path.exists()
744 {
745 return Self::open_existing(config, pool).await;
746 }
747
748 let snapshot = sqlite_vector_snapshot(&pool).await?;
749 if snapshot.records.is_empty() {
750 return Err(anyhow!("SQLite has no vectors to index"));
751 }
752
753 Self::rebuild_from_snapshot(config, pool, path, snapshot).await
754 }
755
756 async fn open_existing(config: &Config, pool: SqlitePool) -> Result<Self> {
757 let path = resolve_vector_index_path(config);
758 let collection_path = path.join(COLLECTION_DIR);
759 let collection = zvec::Collection::open(&collection_path).with_context(|| {
760 format!(
761 "failed to open zvec collection {}",
762 collection_path.display()
763 )
764 })?;
765 let snapshot = sqlite_vector_snapshot(&pool).await?;
766 Ok(Self {
767 config: config.clone(),
768 pool,
769 path,
770 collection,
771 metadata: RwLock::new(snapshot.records),
772 })
773 }
774
775 pub async fn rebuild(config: &Config, pool: SqlitePool) -> Result<Self> {
776 let path = resolve_vector_index_path(config);
777 let snapshot = sqlite_vector_snapshot(&pool).await?;
778 Self::rebuild_from_snapshot(config, pool, path, snapshot).await
779 }
780
781 async fn rebuild_from_snapshot(
782 config: &Config,
783 pool: SqlitePool,
784 path: PathBuf,
785 snapshot: SqliteVectorSnapshot,
786 ) -> Result<Self> {
787 if path.is_dir() {
788 std::fs::remove_dir_all(&path).with_context(|| {
789 format!("failed to remove stale zvec sidecar {}", path.display())
790 })?;
791 } else if path.exists() {
792 std::fs::remove_file(&path).with_context(|| {
793 format!(
794 "failed to remove stale zvec sidecar file {}",
795 path.display()
796 )
797 })?;
798 }
799 std::fs::create_dir_all(&path)?;
800
801 let dims = snapshot
802 .records
803 .first()
804 .map(|r| r.record.dims)
805 .ok_or_else(|| anyhow!("SQLite has no vectors to index"))?;
806 let schema = zvec_schema(dims)?;
807 let collection_path = path.join(COLLECTION_DIR);
808 let collection =
809 zvec::Collection::create_and_open(&collection_path, schema).with_context(|| {
810 format!(
811 "failed to create zvec collection {}",
812 collection_path.display()
813 )
814 })?;
815
816 let index = Self {
817 config: config.clone(),
818 pool,
819 path: path.clone(),
820 collection,
821 metadata: RwLock::new(snapshot.records.clone()),
822 };
823 index.populate(&snapshot.records)?;
824 index.collection.optimize()?;
825 index.collection.flush()?;
826 write_manifest(&path, &manifest_for_snapshot(&snapshot, config))?;
827 Ok(index)
828 }
829
830 fn populate(&self, records: &[IndexedVectorRecord]) -> Result<()> {
831 let mut batch = Vec::with_capacity(ZVEC_BATCH_SIZE);
832 for record in records {
833 batch.push(doc_from_record(record)?);
834 if batch.len() == ZVEC_BATCH_SIZE {
835 self.insert_docs(&batch)?;
836 batch.clear();
837 }
838 }
839 if !batch.is_empty() {
840 self.insert_docs(&batch)?;
841 }
842 self.collection.flush()?;
843 Ok(())
844 }
845
846 fn insert_docs(&self, docs: &[zvec::Doc]) -> Result<()> {
847 self.collection.insert(docs)?;
848 Ok(())
849 }
850
851 async fn write_current_manifest(&self) -> Result<()> {
852 let snapshot = sqlite_vector_snapshot(&self.pool).await?;
853 write_manifest(&self.path, &manifest_for_snapshot(&snapshot, &self.config))
854 }
855}
856
857#[cfg(feature = "zvec-bundled")]
858#[async_trait]
859impl VectorIndex for ZvecVectorIndex {
860 async fn upsert(&self, record: &VectorRecord) -> Result<()> {
861 let row = sqlx::query(
862 r#"
863 SELECT COALESCE(substr(c.text, 1, 240), '') AS snippet,
864 d.source, d.updated_at
865 FROM chunks c
866 JOIN documents d ON d.id = c.document_id
867 WHERE c.id = ?
868 "#,
869 )
870 .bind(&record.chunk_id)
871 .fetch_optional(&self.pool)
872 .await?;
873
874 let Some(row) = row else {
875 return Ok(());
876 };
877
878 let indexed = IndexedVectorRecord {
879 record: record.clone(),
880 snippet: row.get("snippet"),
881 source: row.get("source"),
882 updated_at: row.get("updated_at"),
883 };
884 let doc = doc_from_record(&indexed)?;
885 let _ = self.collection.delete(&[record.chunk_id.as_str()]);
886 self.collection.insert(&[doc])?;
887 self.metadata
888 .write()
889 .map_err(|_| anyhow!("zvec metadata lock poisoned"))?
890 .push(indexed);
891 self.collection.flush()?;
892 Ok(())
893 }
894
895 async fn delete_chunk(&self, chunk_id: &str) -> Result<()> {
896 self.collection.delete(&[chunk_id])?;
897 self.collection.flush()?;
898 self.write_current_manifest().await?;
899 Ok(())
900 }
901
902 async fn delete_document(&self, document_id: &str) -> Result<()> {
903 let escaped = document_id.replace('\'', "\\'");
904 self.collection
905 .delete_by_filter(&format!("document_id == '{escaped}'"))?;
906 self.collection.flush()?;
907 self.write_current_manifest().await?;
908 Ok(())
909 }
910
911 async fn search(
912 &self,
913 query_vec: &[f32],
914 options: VectorSearchOptions<'_>,
915 ) -> Result<Vec<ChunkCandidate>> {
916 let limit = options.limit.max(0) as usize;
917 if limit == 0 {
918 return Ok(Vec::new());
919 }
920 let since_ts = parse_since(options.since)?;
921 let topk = limit.saturating_mul(8).max(limit);
922 let query = zvec::VectorQuery::new("embedding")
923 .topk(topk)
924 .include_doc_id(true)
925 .output_fields(&["chunk_id", "document_id", "source", "updated_at", "snippet"])
926 .vector(query_vec)?;
927
928 let rows = self.collection.query(query)?;
929 let metadata = self
930 .metadata
931 .read()
932 .map_err(|_| anyhow!("zvec metadata lock poisoned"))?;
933 let mut candidates = Vec::new();
934 for row in rows.iter() {
935 let Some(record) = metadata.get(row.doc_id() as usize) else {
936 continue;
937 };
938 if options
939 .source
940 .is_some_and(|expected| record.source != expected)
941 {
942 continue;
943 }
944 if since_ts.is_some_and(|since| record.updated_at < since) {
945 continue;
946 }
947 candidates.push(ChunkCandidate {
948 chunk_id: record.record.chunk_id.clone(),
949 document_id: record.record.document_id.clone(),
950 raw_score: row.score() as f64,
951 snippet: record.snippet.clone(),
952 });
953 if candidates.len() == limit {
954 break;
955 }
956 }
957 Ok(candidates)
958 }
959
960 async fn health(&self) -> Result<VectorIndexHealth> {
961 let snapshot = sqlite_vector_snapshot(&self.pool).await?;
962 let manifest = read_manifest(&self.path)?;
963 let fresh = manifest
964 .as_ref()
965 .is_some_and(|manifest| manifest.digest == snapshot.digest);
966 Ok(VectorIndexHealth {
967 enabled: true,
968 available: fresh,
969 backend: "zvec".to_string(),
970 message: Some(if fresh {
971 "zvec sidecar fresh".to_string()
972 } else {
973 "zvec sidecar stale".to_string()
974 }),
975 })
976 }
977}
978
979#[cfg(feature = "zvec-bundled")]
980fn zvec_schema(dims: usize) -> Result<zvec::CollectionSchema> {
981 let mut schema = zvec::CollectionSchema::new("context_chunks");
982 schema.add_field(zvec::FieldSchema::string("chunk_id"))?;
983 schema.add_field(zvec::FieldSchema::string("document_id"))?;
984 schema.add_field(zvec::FieldSchema::string("source"))?;
985 schema.add_field(zvec::FieldSchema::int64("updated_at"))?;
986 schema.add_field(zvec::FieldSchema::string("snippet"))?;
987 schema.add_field(zvec::VectorSchema::fp32("embedding", dims as u32))?;
988 Ok(schema)
989}
990
991#[cfg(feature = "zvec-bundled")]
992fn doc_from_record(record: &IndexedVectorRecord) -> Result<zvec::Doc> {
993 let mut doc = zvec::Doc::new();
994 doc.set_pk(&record.record.chunk_id)?;
995 doc.set_string("chunk_id", &record.record.chunk_id)?;
996 doc.set_string("document_id", &record.record.document_id)?;
997 doc.set_string("source", &record.source)?;
998 doc.set_int64("updated_at", record.updated_at)?;
999 doc.set_string("snippet", &record.snippet)?;
1000 doc.set_vector("embedding", &record.record.vector)?;
1001 Ok(doc)
1002}