Skip to main content

context_harness/
app_store.rs

1//! Application-level storage boundary for native Context Harness operations.
2//!
3//! [`context_harness_core::store::Store`] covers reusable search and retrieval
4//! behavior. [`AppStore`] adds CLI/native responsibilities that are still
5//! canonical SQLite operations today: migrations, sync checkpoints, source item
6//! writes, embedding maintenance, stats, and export views.
7
8use anyhow::Result;
9use async_trait::async_trait;
10use serde::Serialize;
11use sha2::{Digest, Sha256};
12use sqlx::{Row, SqlitePool};
13use uuid::Uuid;
14
15use context_harness_core::models::{Chunk, Document};
16use context_harness_core::store::{ChunkCandidate, DocumentMetadata, DocumentResponse, Store};
17
18use crate::config::Config;
19use crate::db;
20use crate::migrate;
21use crate::models::SourceItem;
22use crate::sqlite_store::SqliteStore;
23use crate::vector_index::{self, VectorRecord};
24
25/// A chunk that needs embedding because its embedding is missing or stale.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct PendingChunk {
28    pub chunk_id: String,
29    pub document_id: String,
30    pub text: String,
31    pub text_hash: String,
32}
33
34/// Per-source database statistics.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct SourceStats {
37    pub source: String,
38    pub doc_count: i64,
39    pub chunk_count: i64,
40    pub embedded_count: i64,
41    pub last_sync_ts: Option<i64>,
42}
43
44/// Database statistics used by `ctx stats`.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct StoreStats {
47    pub total_docs: i64,
48    pub total_chunks: i64,
49    pub total_embedded: i64,
50    pub db_size_bytes: u64,
51    pub sources: Vec<SourceStats>,
52}
53
54/// Export payload used by `ctx export`.
55#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
56pub struct ExportData {
57    pub documents: Vec<ExportDocument>,
58    pub chunks: Vec<ExportChunk>,
59}
60
61/// Exported document row.
62#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
63pub struct ExportDocument {
64    pub id: String,
65    pub source: String,
66    pub source_id: String,
67    pub source_url: Option<String>,
68    pub title: Option<String>,
69    pub updated_at: i64,
70    pub body: String,
71}
72
73/// Exported chunk row.
74#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
75pub struct ExportChunk {
76    pub id: String,
77    pub document_id: String,
78    pub chunk_index: i64,
79    pub text: String,
80}
81
82/// App-level storage operations layered on top of core search storage.
83#[async_trait]
84pub trait AppStore: Store {
85    #[allow(dead_code)]
86    async fn initialize(&self) -> Result<()>;
87    async fn get_checkpoint(&self, source: &str) -> Result<Option<i64>>;
88    async fn set_checkpoint(&self, source: &str, cursor: i64) -> Result<()>;
89    async fn upsert_source_item(&self, item: &SourceItem) -> Result<String>;
90    async fn find_pending_chunks(
91        &self,
92        model: &str,
93        limit: Option<usize>,
94    ) -> Result<Vec<PendingChunk>>;
95    async fn get_embedding_hash(&self, chunk_id: &str, model: &str) -> Result<Option<String>>;
96    async fn clear_embeddings(&self) -> Result<()>;
97    async fn stats(&self) -> Result<StoreStats>;
98    async fn export_index(&self) -> Result<ExportData>;
99}
100
101/// SQLite-backed [`AppStore`] implementation.
102pub struct SqliteAppStore {
103    config: Config,
104    pool: SqlitePool,
105}
106
107impl SqliteAppStore {
108    pub async fn connect(config: &Config) -> Result<Self> {
109        let pool = db::connect(config).await?;
110        Ok(Self {
111            config: config.clone(),
112            pool,
113        })
114    }
115
116    pub async fn initialize_config(config: &Config) -> Result<()> {
117        migrate::run_migrations(config).await
118    }
119
120    #[allow(dead_code)]
121    pub fn pool(&self) -> &SqlitePool {
122        &self.pool
123    }
124
125    pub async fn close(&self) {
126        self.pool.close().await;
127    }
128
129    fn core_store(&self) -> SqliteStore {
130        SqliteStore::new(self.pool.clone())
131    }
132}
133
134#[async_trait]
135impl Store for SqliteAppStore {
136    async fn upsert_document(&self, doc: &Document) -> Result<String> {
137        self.core_store().upsert_document(doc).await
138    }
139
140    async fn replace_chunks(
141        &self,
142        doc_id: &str,
143        chunks: &[Chunk],
144        vectors: Option<&[Vec<f32>]>,
145    ) -> Result<()> {
146        self.core_store()
147            .replace_chunks(doc_id, chunks, vectors)
148            .await?;
149        vector_index::remove_configured_sidecar(&self.config)?;
150        Ok(())
151    }
152
153    async fn upsert_embedding(
154        &self,
155        chunk_id: &str,
156        doc_id: &str,
157        vector: &[f32],
158        model: &str,
159        dims: usize,
160        content_hash: &str,
161    ) -> Result<()> {
162        self.core_store()
163            .upsert_embedding(chunk_id, doc_id, vector, model, dims, content_hash)
164            .await?;
165        let record = VectorRecord {
166            chunk_id: chunk_id.to_string(),
167            document_id: doc_id.to_string(),
168            vector: vector.to_vec(),
169            model: model.to_string(),
170            dims,
171            content_hash: content_hash.to_string(),
172        };
173        vector_index::sync_vector_record_after_sqlite(&self.config, &self.pool, &record).await?;
174        Ok(())
175    }
176
177    async fn get_document(&self, id: &str) -> Result<Option<DocumentResponse>> {
178        self.core_store().get_document(id).await
179    }
180
181    async fn get_document_metadata(&self, id: &str) -> Result<Option<DocumentMetadata>> {
182        self.core_store().get_document_metadata(id).await
183    }
184
185    async fn keyword_search(
186        &self,
187        query: &str,
188        limit: i64,
189        source: Option<&str>,
190        since: Option<&str>,
191    ) -> Result<Vec<ChunkCandidate>> {
192        self.core_store()
193            .keyword_search(query, limit, source, since)
194            .await
195    }
196
197    async fn vector_search(
198        &self,
199        query_vec: &[f32],
200        limit: i64,
201        source: Option<&str>,
202        since: Option<&str>,
203    ) -> Result<Vec<ChunkCandidate>> {
204        self.core_store()
205            .vector_search(query_vec, limit, source, since)
206            .await
207    }
208}
209
210#[async_trait]
211impl AppStore for SqliteAppStore {
212    async fn initialize(&self) -> Result<()> {
213        migrate::run_migrations(&self.config).await
214    }
215
216    async fn get_checkpoint(&self, source: &str) -> Result<Option<i64>> {
217        let result: Option<String> =
218            sqlx::query_scalar("SELECT cursor FROM checkpoints WHERE source = ?")
219                .bind(source)
220                .fetch_optional(&self.pool)
221                .await?;
222
223        Ok(result.and_then(|s| s.parse::<i64>().ok()))
224    }
225
226    async fn set_checkpoint(&self, source: &str, cursor: i64) -> Result<()> {
227        let now = chrono::Utc::now().timestamp();
228        sqlx::query(
229            r#"
230            INSERT INTO checkpoints (source, cursor, updated_at) VALUES (?, ?, ?)
231            ON CONFLICT(source) DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at
232            "#,
233        )
234        .bind(source)
235        .bind(cursor.to_string())
236        .bind(now)
237        .execute(&self.pool)
238        .await?;
239
240        Ok(())
241    }
242
243    async fn upsert_source_item(&self, item: &SourceItem) -> Result<String> {
244        let doc = source_item_to_document(&self.pool, item).await?;
245        self.upsert_document(&doc).await
246    }
247
248    async fn find_pending_chunks(
249        &self,
250        model: &str,
251        limit: Option<usize>,
252    ) -> Result<Vec<PendingChunk>> {
253        let limit_val = limit.unwrap_or(usize::MAX) as i64;
254
255        let rows = sqlx::query(
256            r#"
257            SELECT c.id AS chunk_id, c.document_id, c.text, c.hash AS chunk_hash
258            FROM chunks c
259            LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
260            WHERE e.chunk_id IS NULL OR e.hash != c.hash
261            ORDER BY c.document_id, c.chunk_index
262            LIMIT ?
263            "#,
264        )
265        .bind(model)
266        .bind(limit_val)
267        .fetch_all(&self.pool)
268        .await?;
269
270        Ok(rows
271            .iter()
272            .map(|row| {
273                let text: String = row.get("text");
274                PendingChunk {
275                    chunk_id: row.get("chunk_id"),
276                    document_id: row.get("document_id"),
277                    text_hash: row.get("chunk_hash"),
278                    text,
279                }
280            })
281            .collect())
282    }
283
284    async fn get_embedding_hash(&self, chunk_id: &str, model: &str) -> Result<Option<String>> {
285        let hash =
286            sqlx::query_scalar("SELECT hash FROM embeddings WHERE chunk_id = ? AND model = ?")
287                .bind(chunk_id)
288                .bind(model)
289                .fetch_optional(&self.pool)
290                .await?;
291        Ok(hash)
292    }
293
294    async fn clear_embeddings(&self) -> Result<()> {
295        sqlx::query("DELETE FROM chunk_vectors")
296            .execute(&self.pool)
297            .await?;
298        sqlx::query("DELETE FROM embeddings")
299            .execute(&self.pool)
300            .await?;
301        vector_index::remove_configured_sidecar(&self.config)?;
302        Ok(())
303    }
304
305    async fn stats(&self) -> Result<StoreStats> {
306        let total_docs: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents")
307            .fetch_one(&self.pool)
308            .await?;
309        let total_chunks: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunks")
310            .fetch_one(&self.pool)
311            .await?;
312        let total_embedded: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunk_vectors")
313            .fetch_one(&self.pool)
314            .await?;
315        let db_size_bytes = std::fs::metadata(&self.config.db.path)
316            .map(|m| m.len())
317            .unwrap_or(0);
318
319        let source_rows = sqlx::query(
320            r#"
321            SELECT
322                d.source,
323                COUNT(DISTINCT d.id) AS doc_count,
324                COUNT(DISTINCT c.id) AS chunk_count,
325                COUNT(DISTINCT cv.chunk_id) AS embedded_count
326            FROM documents d
327            LEFT JOIN chunks c ON c.document_id = d.id
328            LEFT JOIN chunk_vectors cv ON cv.chunk_id = c.id
329            GROUP BY d.source
330            ORDER BY doc_count DESC
331            "#,
332        )
333        .fetch_all(&self.pool)
334        .await?;
335
336        let checkpoint_rows = sqlx::query("SELECT source, updated_at FROM checkpoints")
337            .fetch_all(&self.pool)
338            .await?;
339
340        let mut sources = Vec::new();
341        for row in &source_rows {
342            let source: String = row.get("source");
343            let last_sync_ts = checkpoint_rows
344                .iter()
345                .find(|cp| {
346                    let cp_source: String = cp.get("source");
347                    cp_source == source
348                })
349                .map(|cp| cp.get::<i64, _>("updated_at"));
350
351            sources.push(SourceStats {
352                source,
353                doc_count: row.get("doc_count"),
354                chunk_count: row.get("chunk_count"),
355                embedded_count: row.get("embedded_count"),
356                last_sync_ts,
357            });
358        }
359
360        Ok(StoreStats {
361            total_docs,
362            total_chunks,
363            total_embedded,
364            db_size_bytes,
365            sources,
366        })
367    }
368
369    async fn export_index(&self) -> Result<ExportData> {
370        let doc_rows = sqlx::query(
371            "SELECT id, source, source_id, source_url, title, updated_at, body \
372             FROM documents ORDER BY source_id",
373        )
374        .fetch_all(&self.pool)
375        .await?;
376
377        let chunk_rows = sqlx::query(
378            "SELECT id, document_id, chunk_index, text \
379             FROM chunks ORDER BY document_id, chunk_index",
380        )
381        .fetch_all(&self.pool)
382        .await?;
383
384        let documents = doc_rows
385            .iter()
386            .map(|row| ExportDocument {
387                id: row.get("id"),
388                source: row.get("source"),
389                source_id: row.get("source_id"),
390                source_url: row.get("source_url"),
391                title: row.get("title"),
392                updated_at: row.get("updated_at"),
393                body: row.get("body"),
394            })
395            .collect();
396
397        let chunks = chunk_rows
398            .iter()
399            .map(|row| ExportChunk {
400                id: row.get("id"),
401                document_id: row.get("document_id"),
402                chunk_index: row.get("chunk_index"),
403                text: row.get("text"),
404            })
405            .collect();
406
407        Ok(ExportData { documents, chunks })
408    }
409}
410
411async fn source_item_to_document(pool: &SqlitePool, item: &SourceItem) -> Result<Document> {
412    let dedup_hash = dedup_hash(item);
413    let existing_id: Option<String> =
414        sqlx::query_scalar("SELECT id FROM documents WHERE source = ? AND source_id = ?")
415            .bind(&item.source)
416            .bind(&item.source_id)
417            .fetch_optional(pool)
418            .await?;
419
420    Ok(Document {
421        id: existing_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
422        source: item.source.clone(),
423        source_id: item.source_id.clone(),
424        source_url: item.source_url.clone(),
425        title: item.title.clone(),
426        author: item.author.clone(),
427        created_at: item.created_at.timestamp(),
428        updated_at: item.updated_at.timestamp(),
429        content_type: item.content_type.clone(),
430        body: item.body.clone(),
431        metadata_json: item.metadata_json.clone(),
432        raw_json: item.raw_json.clone(),
433        dedup_hash,
434    })
435}
436
437fn dedup_hash(item: &SourceItem) -> String {
438    let mut hasher = Sha256::new();
439    hasher.update(item.source.as_bytes());
440    hasher.update(item.source_id.as_bytes());
441    hasher.update(item.updated_at.timestamp().to_le_bytes());
442    hasher.update(item.body.as_bytes());
443    format!("{:x}", hasher.finalize())
444}
445
446pub(crate) fn hash_text(text: &str) -> String {
447    let mut hasher = Sha256::new();
448    hasher.update(text.as_bytes());
449    format!("{:x}", hasher.finalize())
450}