Skip to main content

context_harness/
embed_cmd.rs

1//! Embedding CLI commands: `ctx embed pending` and `ctx embed rebuild`.
2//!
3//! Manages the embedding lifecycle:
4//!
5//! - **[`run_embed_pending`]** — backfill missing or stale embeddings
6//! - **[`run_embed_rebuild`]** — delete and regenerate all embeddings
7//! - **[`embed_chunks_inline`]** — embed chunks during sync (non-fatal)
8//!
9//! # Staleness Detection
10//!
11//! Each chunk's text is hashed (SHA-256). When the hash in the `embeddings`
12//! table differs from the chunk's current hash, the embedding is considered
13//! stale and will be re-generated by `embed pending`.
14//!
15//! # Batching
16//!
17//! Embeddings are generated in batches (configurable via `embedding.batch_size`
18//! or `--batch-size` flag). Each batch is a single API call to the embedding
19//! provider. Failed batches are logged but don't abort the entire operation.
20
21use anyhow::{bail, Result};
22use context_harness_core::store::Store;
23
24use crate::app_store::{hash_text, AppStore, SqliteAppStore};
25use crate::config::Config;
26use crate::embedding;
27
28/// Backfill embeddings for chunks that are missing or have stale hashes.
29///
30/// Finds all chunks where either:
31/// 1. No embedding exists for the current model, or
32/// 2. The embedding's stored hash doesn't match the chunk's current text hash.
33///
34/// # Arguments
35///
36/// * `config` — Application configuration.
37/// * `limit` — Optional cap on the number of chunks to process.
38/// * `batch_size_override` — Override the config's `embedding.batch_size`.
39/// * `dry_run` — If `true`, report counts without writing anything.
40///
41/// # Errors
42///
43/// Returns an error if the embedding provider is disabled.
44pub async fn run_embed_pending(
45    config: &Config,
46    limit: Option<usize>,
47    batch_size_override: Option<usize>,
48    dry_run: bool,
49) -> Result<()> {
50    if !config.embedding.is_enabled() {
51        bail!("Embedding provider is disabled. Set [embedding] provider in config.");
52    }
53
54    let provider = embedding::create_provider(&config.embedding)?;
55    let model_name = provider.model_name().to_string();
56    let store = SqliteAppStore::connect(config).await?;
57    let batch_size = batch_size_override.unwrap_or(config.embedding.batch_size);
58
59    // Find chunks missing embeddings or with stale hashes
60    let pending = store.find_pending_chunks(&model_name, limit).await?;
61
62    if dry_run {
63        println!("embed pending (dry-run)");
64        println!("  chunks needing embeddings: {}", pending.len());
65        return Ok(());
66    }
67
68    if pending.is_empty() {
69        println!("embed pending");
70        println!("  all chunks up to date");
71        return Ok(());
72    }
73
74    let total = pending.len();
75    let mut embedded = 0u64;
76    let mut failed = 0u64;
77
78    for batch in pending.chunks(batch_size) {
79        let texts: Vec<String> = batch.iter().map(|p| p.text.clone()).collect();
80
81        match embedding::embed_texts(provider.as_ref(), &config.embedding, &texts).await {
82            Ok(vectors) => {
83                for (item, vec) in batch.iter().zip(vectors.iter()) {
84                    store
85                        .upsert_embedding(
86                            &item.chunk_id,
87                            &item.document_id,
88                            vec,
89                            &model_name,
90                            provider.dims(),
91                            &item.text_hash,
92                        )
93                        .await?;
94                    embedded += 1;
95                }
96            }
97            Err(e) => {
98                eprintln!("Warning: embedding batch failed: {}", e);
99                failed += batch.len() as u64;
100            }
101        }
102    }
103
104    println!("embed pending");
105    println!("  total pending: {}", total);
106    println!("  embedded: {}", embedded);
107    println!("  failed: {}", failed);
108
109    store.close().await;
110    Ok(())
111}
112
113/// Delete all embeddings and regenerate for all chunks.
114///
115/// Clears both the `embeddings` metadata table and the `chunk_vectors`
116/// blob table, then re-embeds every chunk in the database.
117///
118/// # Arguments
119///
120/// * `config` — Application configuration.
121/// * `batch_size_override` — Override the config's `embedding.batch_size`.
122///
123/// # Errors
124///
125/// Returns an error if the embedding provider is disabled.
126pub async fn run_embed_rebuild(config: &Config, batch_size_override: Option<usize>) -> Result<()> {
127    if !config.embedding.is_enabled() {
128        bail!("Embedding provider is disabled. Set [embedding] provider in config.");
129    }
130
131    let provider = embedding::create_provider(&config.embedding)?;
132    let model_name = provider.model_name().to_string();
133    let store = SqliteAppStore::connect(config).await?;
134    let batch_size = batch_size_override.unwrap_or(config.embedding.batch_size);
135
136    // Delete all existing embeddings
137    store.clear_embeddings().await?;
138
139    println!("embed rebuild — cleared existing embeddings");
140
141    // Get all chunks
142    let all_chunks = store.find_pending_chunks(&model_name, None).await?;
143
144    if all_chunks.is_empty() {
145        println!("  no chunks to embed");
146        store.close().await;
147        return Ok(());
148    }
149
150    let total = all_chunks.len();
151    let mut embedded = 0u64;
152    let mut failed = 0u64;
153
154    for batch in all_chunks.chunks(batch_size) {
155        let texts: Vec<String> = batch.iter().map(|p| p.text.clone()).collect();
156
157        match embedding::embed_texts(provider.as_ref(), &config.embedding, &texts).await {
158            Ok(vectors) => {
159                for (item, vec) in batch.iter().zip(vectors.iter()) {
160                    store
161                        .upsert_embedding(
162                            &item.chunk_id,
163                            &item.document_id,
164                            vec,
165                            &model_name,
166                            provider.dims(),
167                            &item.text_hash,
168                        )
169                        .await?;
170                    embedded += 1;
171                }
172            }
173            Err(e) => {
174                eprintln!("Warning: embedding batch failed: {}", e);
175                failed += batch.len() as u64;
176            }
177        }
178    }
179
180    println!("embed rebuild");
181    println!("  total chunks: {}", total);
182    println!("  embedded: {}", embedded);
183    println!("  failed: {}", failed);
184
185    store.close().await;
186    Ok(())
187}
188
189/// Embed chunks during sync (inline). Non-fatal on failure.
190///
191/// Called by [`crate::ingest::run_sync`] after chunking each document.
192/// Checks each chunk for existing, up-to-date embeddings before
193/// calling the provider, avoiding redundant API calls.
194///
195/// # Returns
196///
197/// A tuple `(embedded, pending)`:
198/// - `embedded` — number of chunks successfully embedded (or already up-to-date)
199/// - `pending` — number of chunks that failed to embed
200pub async fn embed_chunks_inline(
201    config: &Config,
202    store: &impl AppStore,
203    chunks: &[crate::models::Chunk],
204) -> (u64, u64) {
205    if !config.embedding.is_enabled() {
206        return (0, 0);
207    }
208
209    let provider = match embedding::create_provider(&config.embedding) {
210        Ok(p) => p,
211        Err(e) => {
212            eprintln!("Warning: could not create embedding provider: {}", e);
213            return (0, chunks.len() as u64);
214        }
215    };
216
217    let model_name = provider.model_name().to_string();
218    let mut embedded = 0u64;
219    let mut pending = 0u64;
220
221    for batch in chunks.chunks(config.embedding.batch_size) {
222        // Check which chunks need embedding
223        let mut need_embedding = Vec::new();
224        for chunk in batch {
225            let text_hash = hash_text(&chunk.text);
226            let existing = store
227                .get_embedding_hash(&chunk.id, &model_name)
228                .await
229                .unwrap_or(None);
230
231            if existing.as_deref() == Some(&text_hash) {
232                // Already up to date
233                embedded += 1;
234                continue;
235            }
236
237            need_embedding.push((chunk, text_hash));
238        }
239
240        if need_embedding.is_empty() {
241            continue;
242        }
243
244        let texts: Vec<String> = need_embedding.iter().map(|(c, _)| c.text.clone()).collect();
245
246        match embedding::embed_texts(provider.as_ref(), &config.embedding, &texts).await {
247            Ok(vectors) => {
248                for ((chunk, text_hash), vec) in need_embedding.iter().zip(vectors.iter()) {
249                    if let Err(e) = store
250                        .upsert_embedding(
251                            &chunk.id,
252                            &chunk.document_id,
253                            vec,
254                            &model_name,
255                            provider.dims(),
256                            text_hash,
257                        )
258                        .await
259                    {
260                        eprintln!("Warning: failed to store embedding for {}: {}", chunk.id, e);
261                        pending += 1;
262                    } else {
263                        embedded += 1;
264                    }
265                }
266            }
267            Err(e) => {
268                eprintln!("Warning: embedding batch failed: {}", e);
269                pending += need_embedding.len() as u64;
270            }
271        }
272    }
273
274    (embedded, pending)
275}