context_harness/ingest.rs
1//! Ingestion pipeline orchestration.
2//!
3//! Coordinates the full sync flow: connector → normalization → chunking →
4//! embedding → storage. Supports incremental sync via checkpoints and
5//! inline embedding (non-fatal on failure).
6//!
7//! # Sync Pipeline
8//!
9//! The [`run_sync`] function implements the following pipeline:
10//!
11//! 1. **Resolve targets** — expands the connector argument into one or more
12//! `(source_label, connector_type, instance_name)` tuples. Supports:
13//! - `"git:platform"` — a single named instance
14//! - `"git"` — all instances of a type
15//! - `"all"` — every configured connector
16//! 2. **Scan (parallel)** — dispatches to the appropriate connector for each
17//! target, running scans concurrently via [`tokio::task::JoinSet`].
18//! 3. **Filter** — applies checkpoint, `--since`, `--until`, and `--limit`
19//! filters to each connector's items.
20//! 4. **Upsert documents** — inserts or updates each item in the `documents`
21//! table, computing a SHA-256 deduplication hash.
22//! 5. **Replace chunks** — deletes old chunks (and their embeddings/FTS entries)
23//! for the document, then inserts fresh chunks.
24//! 6. **Inline embed** — if embeddings are enabled, embeds new chunks
25//! immediately (non-fatal: failures are logged but do not abort the sync).
26//! 7. **Update checkpoint** — persists the latest `updated_at` timestamp
27//! so the next incremental sync can skip unchanged items.
28//!
29//! # Deduplication
30//!
31//! Each document is identified by `(source, source_id)`. If a document with
32//! the same composite key already exists, it is updated via an `ON CONFLICT`
33//! upsert. The `dedup_hash` field is a SHA-256 digest of source + source_id +
34//! updated_at + body, enabling downstream consumers to detect changes.
35//!
36//! # Checkpointing
37//!
38//! Checkpoints are stored in the `checkpoints` table as `(source, cursor)`
39//! pairs. The cursor is the maximum `updated_at` timestamp seen during the
40//! sync. On subsequent runs, only items newer than the checkpoint are processed.
41//!
42//! # Multi-Instance Connectors
43//!
44//! All connector types support named instances. Documents are tagged with
45//! `source = "type:name"` (e.g. `"git:platform"`, `"filesystem:docs"`).
46//! When syncing a type (e.g. `ctx sync git`), all instances of that type
47//! are scanned in parallel.
48
49use anyhow::{bail, Result};
50use chrono::NaiveDate;
51use context_harness_core::store::Store;
52
53use crate::app_store::{AppStore, SqliteAppStore};
54use crate::chunk::chunk_text;
55use crate::config::Config;
56use crate::embed_cmd;
57use crate::extract;
58use crate::models::SourceItem;
59use crate::progress::{SyncProgressEvent, SyncProgressReporter};
60use crate::traits::{Connector, ConnectorRegistry};
61
62/// Default max extract size when connector is not filesystem or name not found (spec §4.1).
63const DEFAULT_MAX_EXTRACT_BYTES: u64 = 50_000_000;
64
65/// Resolve max_extract_bytes for a source from config. Parses "filesystem:name" and looks up
66/// the connector config; non-filesystem or unknown name uses DEFAULT_MAX_EXTRACT_BYTES.
67fn max_extract_bytes_for_source(config: &Config, source_label: &str) -> u64 {
68 if let Some(name) = source_label.strip_prefix("filesystem:") {
69 config
70 .connectors
71 .filesystem
72 .get(name)
73 .map(|c| c.max_extract_bytes)
74 .unwrap_or(DEFAULT_MAX_EXTRACT_BYTES)
75 } else {
76 DEFAULT_MAX_EXTRACT_BYTES
77 }
78}
79
80/// Resolve a connector argument into a filtered list of connectors to scan.
81///
82/// The `registry` contains all connectors (built-in + custom). This function
83/// filters them based on the user's connector argument.
84///
85/// # Supported Formats
86///
87/// | Input | Meaning |
88/// |-------|---------|
89/// | `"all"` | Every registered connector |
90/// | `"git"` | All connectors of type `"git"` |
91/// | `"filesystem"` | All connectors of type `"filesystem"` |
92/// | `"s3"` | All connectors of type `"s3"` |
93/// | `"script"` | All connectors of type `"script"` |
94/// | `"custom"` | All connectors of type `"custom"` |
95/// | `"git:platform"` | Specific named instance |
96/// | `"custom:myconn"` | Specific named instance |
97fn resolve_connectors<'a>(
98 registry: &'a ConnectorRegistry,
99 connector_arg: &str,
100) -> Result<Vec<&'a dyn Connector>> {
101 match connector_arg {
102 "all" => {
103 let all = registry.connectors();
104 if all.is_empty() {
105 bail!("No connectors configured. Add connector sections to your config.");
106 }
107 Ok(all.iter().map(|c| c.as_ref()).collect())
108 }
109 // Type-level filter: "git", "filesystem", "s3", "script", "custom"
110 conn_type if matches!(conn_type, "filesystem" | "git" | "s3" | "script" | "custom") => {
111 let matched = registry.connectors_by_type(conn_type);
112 if matched.is_empty() {
113 bail!("No {} connectors configured.", conn_type);
114 }
115 Ok(matched)
116 }
117 // Instance-level filter: "git:platform", "custom:mything"
118 other => {
119 if let Some((conn_type, name)) = other.split_once(':') {
120 let conn = registry.find(conn_type, name).ok_or_else(|| {
121 let available: Vec<String> = registry
122 .connectors_by_type(conn_type)
123 .iter()
124 .map(|c| c.name().to_string())
125 .collect();
126 anyhow::anyhow!(
127 "No {} connector '{}'. Available: {}",
128 conn_type,
129 name,
130 if available.is_empty() {
131 "(none)".to_string()
132 } else {
133 available.join(", ")
134 }
135 )
136 })?;
137 Ok(vec![conn])
138 } else {
139 bail!(
140 "Unknown connector: '{}'. Use: all, filesystem, git, s3, script, custom, or type:name",
141 other
142 );
143 }
144 }
145 }
146}
147
148/// Runs the full ingestion pipeline for the specified connector(s).
149///
150/// This is the entry point for `ctx sync <connector>`. It builds a
151/// [`ConnectorRegistry`] from the config and runs the pipeline.
152///
153/// # Arguments
154///
155/// - `config` — application configuration.
156/// - `connector` — the connector specifier: `"all"`, a type name (`"git"`),
157/// or a specific instance (`"git:platform"`).
158/// - `full` — if `true`, ignores existing checkpoints and reprocesses all items.
159/// - `dry_run` — if `true`, scans and counts items without writing to the database.
160/// - `since` — optional `YYYY-MM-DD` date; only items updated on or after this date are processed.
161/// - `until` — optional `YYYY-MM-DD` date; only items updated on or before this date are processed.
162/// - `limit` — optional maximum number of items to process (per connector instance).
163/// - `progress` — optional progress reporter; when provided, progress is emitted on stderr.
164///
165/// # Errors
166///
167/// Returns an error if:
168/// - The specified connector is unknown or not configured.
169/// - A connector fails to scan (e.g., network error).
170/// - A database operation fails.
171#[allow(clippy::too_many_arguments)]
172pub async fn run_sync(
173 config: &Config,
174 connector: &str,
175 full: bool,
176 dry_run: bool,
177 since: Option<String>,
178 until: Option<String>,
179 limit: Option<usize>,
180 progress: Option<&dyn SyncProgressReporter>,
181) -> Result<()> {
182 let registry = ConnectorRegistry::from_config(config);
183 run_sync_with_registry(
184 config, connector, full, dry_run, since, until, limit, ®istry, progress,
185 )
186 .await
187}
188
189/// Runs the ingestion pipeline with additional custom connectors.
190///
191/// Like [`run_sync`], but accepts extra connectors to merge with
192/// the built-in ones from config. This is the entry point for custom
193/// binaries that register `Connector` trait implementations.
194///
195/// The `extra_connectors` are combined with the built-in connectors
196/// resolved from the config file.
197#[allow(clippy::too_many_arguments, dead_code)]
198pub async fn run_sync_with_extensions(
199 config: &Config,
200 connector: &str,
201 full: bool,
202 dry_run: bool,
203 since: Option<String>,
204 until: Option<String>,
205 limit: Option<usize>,
206 extra_connectors: &ConnectorRegistry,
207) -> Result<()> {
208 // Build combined connector list from config + extras
209 let built_in = ConnectorRegistry::from_config(config);
210
211 // Resolve from built-in registry (ignore error if extras might match)
212 let mut resolved: Vec<&dyn Connector> = match resolve_connectors(&built_in, connector) {
213 Ok(r) => r,
214 Err(_) if !extra_connectors.is_empty() => Vec::new(),
215 Err(e) => return Err(e),
216 };
217
218 // Also resolve from extras
219 if let Ok(extras) = resolve_connectors(extra_connectors, connector) {
220 resolved.extend(extras);
221 }
222
223 if resolved.is_empty() {
224 bail!("No connectors matched '{}'.", connector);
225 }
226
227 run_connectors(config, &resolved, full, dry_run, since, until, limit, None).await
228}
229
230/// Runs the ingestion pipeline with a pre-built [`ConnectorRegistry`].
231///
232/// This is the unified entry point. All connectors in the registry are
233/// trait objects — built-in, Lua, and custom all go through the same path.
234#[allow(clippy::too_many_arguments)]
235pub async fn run_sync_with_registry(
236 config: &Config,
237 connector: &str,
238 full: bool,
239 dry_run: bool,
240 since: Option<String>,
241 until: Option<String>,
242 limit: Option<usize>,
243 registry: &ConnectorRegistry,
244 progress: Option<&dyn SyncProgressReporter>,
245) -> Result<()> {
246 let connectors = resolve_connectors(registry, connector)?;
247 run_connectors(
248 config,
249 &connectors,
250 full,
251 dry_run,
252 since,
253 until,
254 limit,
255 progress,
256 )
257 .await
258}
259
260/// Report progress every N items during ingest (avoids flooding stderr).
261const INGEST_PROGRESS_INTERVAL: u64 = 10;
262
263/// Core sync engine: scans connectors sequentially, ingests items.
264///
265/// Scans are run sequentially since connectors hold references. Each
266/// connector's items flow through the standard checkpoint → filter → upsert →
267/// chunk → embed pipeline.
268#[allow(clippy::too_many_arguments)]
269async fn run_connectors(
270 config: &Config,
271 connectors: &[&dyn Connector],
272 full: bool,
273 dry_run: bool,
274 since: Option<String>,
275 until: Option<String>,
276 limit: Option<usize>,
277 progress: Option<&dyn SyncProgressReporter>,
278) -> Result<()> {
279 if connectors.len() > 1 {
280 println!("Syncing {} connector instances...", connectors.len());
281 }
282
283 // Scan all connectors and collect results
284 let mut scan_results: Vec<(String, Vec<SourceItem>)> = Vec::new();
285 let mut scan_errors: Vec<String> = Vec::new();
286
287 for conn in connectors {
288 let label = conn.source_label();
289 if let Some(p) = progress {
290 p.report(SyncProgressEvent::Discovering {
291 connector: label.to_string(),
292 });
293 }
294 match conn.scan().await {
295 Ok(items) => {
296 scan_results.push((label, items));
297 }
298 Err(e) => {
299 scan_errors.push(format!("{}: {:#}", label, e));
300 }
301 }
302 }
303
304 // Report scan errors but continue with successful scans
305 for err in &scan_errors {
306 eprintln!("Warning: scan failed: {}", err);
307 }
308
309 if scan_results.is_empty() && !scan_errors.is_empty() {
310 bail!("All connector scans failed:\n{}", scan_errors.join("\n"));
311 }
312
313 // Sort for deterministic output ordering
314 scan_results.sort_by(|a, b| a.0.cmp(&b.0));
315
316 // Ingest each target's items (sequential — SQLite writes are serialized)
317 let store = SqliteAppStore::connect(config).await?;
318
319 for (source_label, mut items) in scan_results {
320 // Load checkpoint
321 let checkpoint: Option<i64> = if full {
322 None
323 } else {
324 store.get_checkpoint(&source_label).await?
325 };
326
327 // Filter by checkpoint (skip files not modified since checkpoint)
328 if let Some(cp) = checkpoint {
329 items.retain(|item| item.updated_at.timestamp() > cp);
330 }
331
332 // Apply --since filter
333 if let Some(ref since_str) = since {
334 let since_date = NaiveDate::parse_from_str(since_str, "%Y-%m-%d")?;
335 let since_ts = since_date
336 .and_hms_opt(0, 0, 0)
337 .unwrap()
338 .and_utc()
339 .timestamp();
340 items.retain(|item| item.updated_at.timestamp() >= since_ts);
341 }
342
343 // Apply --until filter
344 if let Some(ref until_str) = until {
345 let until_date = NaiveDate::parse_from_str(until_str, "%Y-%m-%d")?;
346 let until_ts = until_date
347 .and_hms_opt(23, 59, 59)
348 .unwrap()
349 .and_utc()
350 .timestamp();
351 items.retain(|item| item.updated_at.timestamp() <= until_ts);
352 }
353
354 // Apply --limit (per connector instance)
355 if let Some(lim) = limit {
356 items.truncate(lim);
357 }
358
359 if dry_run {
360 println!("sync {} (dry-run)", source_label);
361 println!(" items found: {}", items.len());
362 let total_chunks: usize = items
363 .iter()
364 .map(|item| chunk_text("tmp", &item.body, config.chunking.max_tokens).len())
365 .sum();
366 println!(" estimated chunks: {}", total_chunks);
367 continue;
368 }
369
370 let mut docs_upserted = 0u64;
371 let mut chunks_written = 0u64;
372 let mut embeddings_written = 0u64;
373 let mut embeddings_pending = 0u64;
374 let mut extraction_skipped = 0u64;
375 let mut max_updated: i64 = checkpoint.unwrap_or(0);
376 let max_extract_bytes = max_extract_bytes_for_source(config, &source_label);
377 let total_items = items.len() as u64;
378
379 if let Some(p) = progress {
380 if total_items > 0 {
381 p.report(SyncProgressEvent::Ingesting {
382 connector: source_label.clone(),
383 n: 0,
384 total: total_items,
385 });
386 }
387 }
388
389 for item in items.iter_mut() {
390 if let Some(ref bytes) = item.raw_bytes {
391 if bytes.len() as u64 > max_extract_bytes {
392 extraction_skipped += 1;
393 eprintln!(
394 "Warning: skipping {} (size {} > max_extract_bytes {})",
395 item.source_id,
396 bytes.len(),
397 max_extract_bytes
398 );
399 continue;
400 }
401 match extract::extract_text(bytes, &item.content_type) {
402 Ok(text) => {
403 item.body = text;
404 item.raw_bytes = None;
405 }
406 Err(e) => {
407 extraction_skipped += 1;
408 eprintln!("Warning: extraction failed for {}: {}", item.source_id, e);
409 continue;
410 }
411 }
412 }
413
414 let doc_id = store.upsert_source_item(item).await?;
415 let chunks = chunk_text(&doc_id, &item.body, config.chunking.max_tokens);
416 let chunk_count = chunks.len() as u64;
417 store.replace_chunks(&doc_id, &chunks, None).await?;
418
419 // Inline embedding (non-fatal)
420 let (emb_ok, emb_pending) =
421 embed_cmd::embed_chunks_inline(config, &store, &chunks).await;
422 embeddings_written += emb_ok;
423 embeddings_pending += emb_pending;
424
425 docs_upserted += 1;
426 chunks_written += chunk_count;
427
428 if let Some(p) = progress {
429 let n = docs_upserted;
430 if n.is_multiple_of(INGEST_PROGRESS_INTERVAL) || n == total_items {
431 p.report(SyncProgressEvent::Ingesting {
432 connector: source_label.clone(),
433 n,
434 total: total_items,
435 });
436 }
437 }
438
439 let ts = item.updated_at.timestamp();
440 if ts > max_updated {
441 max_updated = ts;
442 }
443 }
444
445 // Update checkpoint
446 store.set_checkpoint(&source_label, max_updated).await?;
447
448 println!("sync {}", source_label);
449 println!(" fetched: {} items", items.len());
450 println!(" upserted documents: {}", docs_upserted);
451 println!(" chunks written: {}", chunks_written);
452 println!(" extraction skipped: {}", extraction_skipped);
453 if config.embedding.is_enabled() {
454 println!(" embeddings written: {}", embeddings_written);
455 println!(" embeddings pending: {}", embeddings_pending);
456 }
457 println!(" checkpoint: {}", max_updated);
458 println!("ok");
459 }
460
461 store.close().await;
462 Ok(())
463}