1use anyhow::Result;
9use async_trait::async_trait;
10use serde::Serialize;
11use sha2::{Digest, Sha256};
12use sqlx::{Row, SqlitePool};
13use uuid::Uuid;
14
15use context_harness_core::models::{Chunk, Document};
16use context_harness_core::store::{ChunkCandidate, DocumentMetadata, DocumentResponse, Store};
17
18use crate::config::Config;
19use crate::db;
20use crate::migrate;
21use crate::models::SourceItem;
22use crate::sqlite_store::SqliteStore;
23use crate::vector_index::{self, VectorRecord};
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct PendingChunk {
28 pub chunk_id: String,
29 pub document_id: String,
30 pub text: String,
31 pub text_hash: String,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct SourceStats {
37 pub source: String,
38 pub doc_count: i64,
39 pub chunk_count: i64,
40 pub embedded_count: i64,
41 pub last_sync_ts: Option<i64>,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct StoreStats {
47 pub total_docs: i64,
48 pub total_chunks: i64,
49 pub total_embedded: i64,
50 pub db_size_bytes: u64,
51 pub sources: Vec<SourceStats>,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
56pub struct ExportData {
57 pub documents: Vec<ExportDocument>,
58 pub chunks: Vec<ExportChunk>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
63pub struct ExportDocument {
64 pub id: String,
65 pub source: String,
66 pub source_id: String,
67 pub source_url: Option<String>,
68 pub title: Option<String>,
69 pub updated_at: i64,
70 pub body: String,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
75pub struct ExportChunk {
76 pub id: String,
77 pub document_id: String,
78 pub chunk_index: i64,
79 pub text: String,
80}
81
82#[async_trait]
84pub trait AppStore: Store {
85 #[allow(dead_code)]
86 async fn initialize(&self) -> Result<()>;
87 async fn get_checkpoint(&self, source: &str) -> Result<Option<i64>>;
88 async fn set_checkpoint(&self, source: &str, cursor: i64) -> Result<()>;
89 async fn upsert_source_item(&self, item: &SourceItem) -> Result<String>;
90 async fn find_pending_chunks(
91 &self,
92 model: &str,
93 limit: Option<usize>,
94 ) -> Result<Vec<PendingChunk>>;
95 async fn get_embedding_hash(&self, chunk_id: &str, model: &str) -> Result<Option<String>>;
96 async fn clear_embeddings(&self) -> Result<()>;
97 async fn stats(&self) -> Result<StoreStats>;
98 async fn export_index(&self) -> Result<ExportData>;
99}
100
101pub struct SqliteAppStore {
103 config: Config,
104 pool: SqlitePool,
105}
106
107impl SqliteAppStore {
108 pub async fn connect(config: &Config) -> Result<Self> {
109 let pool = db::connect(config).await?;
110 Ok(Self {
111 config: config.clone(),
112 pool,
113 })
114 }
115
116 pub async fn initialize_config(config: &Config) -> Result<()> {
117 migrate::run_migrations(config).await
118 }
119
120 #[allow(dead_code)]
121 pub fn pool(&self) -> &SqlitePool {
122 &self.pool
123 }
124
125 pub async fn close(&self) {
126 self.pool.close().await;
127 }
128
129 fn core_store(&self) -> SqliteStore {
130 SqliteStore::new(self.pool.clone())
131 }
132}
133
134#[async_trait]
135impl Store for SqliteAppStore {
136 async fn upsert_document(&self, doc: &Document) -> Result<String> {
137 self.core_store().upsert_document(doc).await
138 }
139
140 async fn replace_chunks(
141 &self,
142 doc_id: &str,
143 chunks: &[Chunk],
144 vectors: Option<&[Vec<f32>]>,
145 ) -> Result<()> {
146 self.core_store()
147 .replace_chunks(doc_id, chunks, vectors)
148 .await?;
149 vector_index::remove_configured_sidecar(&self.config)?;
150 Ok(())
151 }
152
153 async fn upsert_embedding(
154 &self,
155 chunk_id: &str,
156 doc_id: &str,
157 vector: &[f32],
158 model: &str,
159 dims: usize,
160 content_hash: &str,
161 ) -> Result<()> {
162 self.core_store()
163 .upsert_embedding(chunk_id, doc_id, vector, model, dims, content_hash)
164 .await?;
165 let record = VectorRecord {
166 chunk_id: chunk_id.to_string(),
167 document_id: doc_id.to_string(),
168 vector: vector.to_vec(),
169 model: model.to_string(),
170 dims,
171 content_hash: content_hash.to_string(),
172 };
173 vector_index::sync_vector_record_after_sqlite(&self.config, &self.pool, &record).await?;
174 Ok(())
175 }
176
177 async fn get_document(&self, id: &str) -> Result<Option<DocumentResponse>> {
178 self.core_store().get_document(id).await
179 }
180
181 async fn get_document_metadata(&self, id: &str) -> Result<Option<DocumentMetadata>> {
182 self.core_store().get_document_metadata(id).await
183 }
184
185 async fn keyword_search(
186 &self,
187 query: &str,
188 limit: i64,
189 source: Option<&str>,
190 since: Option<&str>,
191 ) -> Result<Vec<ChunkCandidate>> {
192 self.core_store()
193 .keyword_search(query, limit, source, since)
194 .await
195 }
196
197 async fn vector_search(
198 &self,
199 query_vec: &[f32],
200 limit: i64,
201 source: Option<&str>,
202 since: Option<&str>,
203 ) -> Result<Vec<ChunkCandidate>> {
204 self.core_store()
205 .vector_search(query_vec, limit, source, since)
206 .await
207 }
208}
209
210#[async_trait]
211impl AppStore for SqliteAppStore {
212 async fn initialize(&self) -> Result<()> {
213 migrate::run_migrations(&self.config).await
214 }
215
216 async fn get_checkpoint(&self, source: &str) -> Result<Option<i64>> {
217 let result: Option<String> =
218 sqlx::query_scalar("SELECT cursor FROM checkpoints WHERE source = ?")
219 .bind(source)
220 .fetch_optional(&self.pool)
221 .await?;
222
223 Ok(result.and_then(|s| s.parse::<i64>().ok()))
224 }
225
226 async fn set_checkpoint(&self, source: &str, cursor: i64) -> Result<()> {
227 let now = chrono::Utc::now().timestamp();
228 sqlx::query(
229 r#"
230 INSERT INTO checkpoints (source, cursor, updated_at) VALUES (?, ?, ?)
231 ON CONFLICT(source) DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at
232 "#,
233 )
234 .bind(source)
235 .bind(cursor.to_string())
236 .bind(now)
237 .execute(&self.pool)
238 .await?;
239
240 Ok(())
241 }
242
243 async fn upsert_source_item(&self, item: &SourceItem) -> Result<String> {
244 let doc = source_item_to_document(&self.pool, item).await?;
245 self.upsert_document(&doc).await
246 }
247
248 async fn find_pending_chunks(
249 &self,
250 model: &str,
251 limit: Option<usize>,
252 ) -> Result<Vec<PendingChunk>> {
253 let limit_val = limit.unwrap_or(usize::MAX) as i64;
254
255 let rows = sqlx::query(
256 r#"
257 SELECT c.id AS chunk_id, c.document_id, c.text, c.hash AS chunk_hash
258 FROM chunks c
259 LEFT JOIN embeddings e ON e.chunk_id = c.id AND e.model = ?
260 WHERE e.chunk_id IS NULL OR e.hash != c.hash
261 ORDER BY c.document_id, c.chunk_index
262 LIMIT ?
263 "#,
264 )
265 .bind(model)
266 .bind(limit_val)
267 .fetch_all(&self.pool)
268 .await?;
269
270 Ok(rows
271 .iter()
272 .map(|row| {
273 let text: String = row.get("text");
274 PendingChunk {
275 chunk_id: row.get("chunk_id"),
276 document_id: row.get("document_id"),
277 text_hash: row.get("chunk_hash"),
278 text,
279 }
280 })
281 .collect())
282 }
283
284 async fn get_embedding_hash(&self, chunk_id: &str, model: &str) -> Result<Option<String>> {
285 let hash =
286 sqlx::query_scalar("SELECT hash FROM embeddings WHERE chunk_id = ? AND model = ?")
287 .bind(chunk_id)
288 .bind(model)
289 .fetch_optional(&self.pool)
290 .await?;
291 Ok(hash)
292 }
293
294 async fn clear_embeddings(&self) -> Result<()> {
295 sqlx::query("DELETE FROM chunk_vectors")
296 .execute(&self.pool)
297 .await?;
298 sqlx::query("DELETE FROM embeddings")
299 .execute(&self.pool)
300 .await?;
301 vector_index::remove_configured_sidecar(&self.config)?;
302 Ok(())
303 }
304
305 async fn stats(&self) -> Result<StoreStats> {
306 let total_docs: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM documents")
307 .fetch_one(&self.pool)
308 .await?;
309 let total_chunks: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunks")
310 .fetch_one(&self.pool)
311 .await?;
312 let total_embedded: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chunk_vectors")
313 .fetch_one(&self.pool)
314 .await?;
315 let db_size_bytes = std::fs::metadata(&self.config.db.path)
316 .map(|m| m.len())
317 .unwrap_or(0);
318
319 let source_rows = sqlx::query(
320 r#"
321 SELECT
322 d.source,
323 COUNT(DISTINCT d.id) AS doc_count,
324 COUNT(DISTINCT c.id) AS chunk_count,
325 COUNT(DISTINCT cv.chunk_id) AS embedded_count
326 FROM documents d
327 LEFT JOIN chunks c ON c.document_id = d.id
328 LEFT JOIN chunk_vectors cv ON cv.chunk_id = c.id
329 GROUP BY d.source
330 ORDER BY doc_count DESC
331 "#,
332 )
333 .fetch_all(&self.pool)
334 .await?;
335
336 let checkpoint_rows = sqlx::query("SELECT source, updated_at FROM checkpoints")
337 .fetch_all(&self.pool)
338 .await?;
339
340 let mut sources = Vec::new();
341 for row in &source_rows {
342 let source: String = row.get("source");
343 let last_sync_ts = checkpoint_rows
344 .iter()
345 .find(|cp| {
346 let cp_source: String = cp.get("source");
347 cp_source == source
348 })
349 .map(|cp| cp.get::<i64, _>("updated_at"));
350
351 sources.push(SourceStats {
352 source,
353 doc_count: row.get("doc_count"),
354 chunk_count: row.get("chunk_count"),
355 embedded_count: row.get("embedded_count"),
356 last_sync_ts,
357 });
358 }
359
360 Ok(StoreStats {
361 total_docs,
362 total_chunks,
363 total_embedded,
364 db_size_bytes,
365 sources,
366 })
367 }
368
369 async fn export_index(&self) -> Result<ExportData> {
370 let doc_rows = sqlx::query(
371 "SELECT id, source, source_id, source_url, title, updated_at, body \
372 FROM documents ORDER BY source_id",
373 )
374 .fetch_all(&self.pool)
375 .await?;
376
377 let chunk_rows = sqlx::query(
378 "SELECT id, document_id, chunk_index, text \
379 FROM chunks ORDER BY document_id, chunk_index",
380 )
381 .fetch_all(&self.pool)
382 .await?;
383
384 let documents = doc_rows
385 .iter()
386 .map(|row| ExportDocument {
387 id: row.get("id"),
388 source: row.get("source"),
389 source_id: row.get("source_id"),
390 source_url: row.get("source_url"),
391 title: row.get("title"),
392 updated_at: row.get("updated_at"),
393 body: row.get("body"),
394 })
395 .collect();
396
397 let chunks = chunk_rows
398 .iter()
399 .map(|row| ExportChunk {
400 id: row.get("id"),
401 document_id: row.get("document_id"),
402 chunk_index: row.get("chunk_index"),
403 text: row.get("text"),
404 })
405 .collect();
406
407 Ok(ExportData { documents, chunks })
408 }
409}
410
411async fn source_item_to_document(pool: &SqlitePool, item: &SourceItem) -> Result<Document> {
412 let dedup_hash = dedup_hash(item);
413 let existing_id: Option<String> =
414 sqlx::query_scalar("SELECT id FROM documents WHERE source = ? AND source_id = ?")
415 .bind(&item.source)
416 .bind(&item.source_id)
417 .fetch_optional(pool)
418 .await?;
419
420 Ok(Document {
421 id: existing_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
422 source: item.source.clone(),
423 source_id: item.source_id.clone(),
424 source_url: item.source_url.clone(),
425 title: item.title.clone(),
426 author: item.author.clone(),
427 created_at: item.created_at.timestamp(),
428 updated_at: item.updated_at.timestamp(),
429 content_type: item.content_type.clone(),
430 body: item.body.clone(),
431 metadata_json: item.metadata_json.clone(),
432 raw_json: item.raw_json.clone(),
433 dedup_hash,
434 })
435}
436
437fn dedup_hash(item: &SourceItem) -> String {
438 let mut hasher = Sha256::new();
439 hasher.update(item.source.as_bytes());
440 hasher.update(item.source_id.as_bytes());
441 hasher.update(item.updated_at.timestamp().to_le_bytes());
442 hasher.update(item.body.as_bytes());
443 format!("{:x}", hasher.finalize())
444}
445
446pub(crate) fn hash_text(text: &str) -> String {
447 let mut hasher = Sha256::new();
448 hasher.update(text.as_bytes());
449 format!("{:x}", hasher.finalize())
450}