Skip to main content

context_harness/
ingest.rs

1//! Ingestion pipeline orchestration.
2//!
3//! Coordinates the full sync flow: connector → normalization → chunking →
4//! embedding → storage. Supports incremental sync via checkpoints and
5//! inline embedding (non-fatal on failure).
6//!
7//! # Sync Pipeline
8//!
9//! The [`run_sync`] function implements the following pipeline:
10//!
11//! 1. **Resolve targets** — expands the connector argument into one or more
12//!    `(source_label, connector_type, instance_name)` tuples. Supports:
13//!    - `"git:platform"` — a single named instance
14//!    - `"git"` — all instances of a type
15//!    - `"all"` — every configured connector
16//! 2. **Scan (parallel)** — dispatches to the appropriate connector for each
17//!    target, running scans concurrently via [`tokio::task::JoinSet`].
18//! 3. **Filter** — applies checkpoint, `--since`, `--until`, and `--limit`
19//!    filters to each connector's items.
20//! 4. **Upsert documents** — inserts or updates each item in the `documents`
21//!    table, computing a SHA-256 deduplication hash.
22//! 5. **Replace chunks** — deletes old chunks (and their embeddings/FTS entries)
23//!    for the document, then inserts fresh chunks.
24//! 6. **Inline embed** — if embeddings are enabled, embeds new chunks
25//!    immediately (non-fatal: failures are logged but do not abort the sync).
26//! 7. **Update checkpoint** — persists the latest `updated_at` timestamp
27//!    so the next incremental sync can skip unchanged items.
28//!
29//! # Deduplication
30//!
31//! Each document is identified by `(source, source_id)`. If a document with
32//! the same composite key already exists, it is updated via an `ON CONFLICT`
33//! upsert. The `dedup_hash` field is a SHA-256 digest of source + source_id +
34//! updated_at + body, enabling downstream consumers to detect changes.
35//!
36//! # Checkpointing
37//!
38//! Checkpoints are stored in the `checkpoints` table as `(source, cursor)`
39//! pairs. The cursor is the maximum `updated_at` timestamp seen during the
40//! sync. On subsequent runs, only items newer than the checkpoint are processed.
41//!
42//! # Multi-Instance Connectors
43//!
44//! All connector types support named instances. Documents are tagged with
45//! `source = "type:name"` (e.g. `"git:platform"`, `"filesystem:docs"`).
46//! When syncing a type (e.g. `ctx sync git`), all instances of that type
47//! are scanned in parallel.
48
49use anyhow::{bail, Result};
50use chrono::NaiveDate;
51use context_harness_core::store::Store;
52
53use crate::app_store::{AppStore, SqliteAppStore};
54use crate::chunk::chunk_text;
55use crate::config::Config;
56use crate::embed_cmd;
57use crate::extract;
58use crate::models::SourceItem;
59use crate::progress::{SyncProgressEvent, SyncProgressReporter};
60use crate::traits::{Connector, ConnectorRegistry};
61
62/// Default max extract size when connector is not filesystem or name not found (spec §4.1).
63const DEFAULT_MAX_EXTRACT_BYTES: u64 = 50_000_000;
64
65/// Resolve max_extract_bytes for a source from config. Parses "filesystem:name" and looks up
66/// the connector config; non-filesystem or unknown name uses DEFAULT_MAX_EXTRACT_BYTES.
67fn max_extract_bytes_for_source(config: &Config, source_label: &str) -> u64 {
68    if let Some(name) = source_label.strip_prefix("filesystem:") {
69        config
70            .connectors
71            .filesystem
72            .get(name)
73            .map(|c| c.max_extract_bytes)
74            .unwrap_or(DEFAULT_MAX_EXTRACT_BYTES)
75    } else {
76        DEFAULT_MAX_EXTRACT_BYTES
77    }
78}
79
80/// Resolve a connector argument into a filtered list of connectors to scan.
81///
82/// The `registry` contains all connectors (built-in + custom). This function
83/// filters them based on the user's connector argument.
84///
85/// # Supported Formats
86///
87/// | Input | Meaning |
88/// |-------|---------|
89/// | `"all"` | Every registered connector |
90/// | `"git"` | All connectors of type `"git"` |
91/// | `"filesystem"` | All connectors of type `"filesystem"` |
92/// | `"s3"` | All connectors of type `"s3"` |
93/// | `"script"` | All connectors of type `"script"` |
94/// | `"custom"` | All connectors of type `"custom"` |
95/// | `"git:platform"` | Specific named instance |
96/// | `"custom:myconn"` | Specific named instance |
97fn resolve_connectors<'a>(
98    registry: &'a ConnectorRegistry,
99    connector_arg: &str,
100) -> Result<Vec<&'a dyn Connector>> {
101    match connector_arg {
102        "all" => {
103            let all = registry.connectors();
104            if all.is_empty() {
105                bail!("No connectors configured. Add connector sections to your config.");
106            }
107            Ok(all.iter().map(|c| c.as_ref()).collect())
108        }
109        // Type-level filter: "git", "filesystem", "s3", "script", "custom"
110        conn_type if matches!(conn_type, "filesystem" | "git" | "s3" | "script" | "custom") => {
111            let matched = registry.connectors_by_type(conn_type);
112            if matched.is_empty() {
113                bail!("No {} connectors configured.", conn_type);
114            }
115            Ok(matched)
116        }
117        // Instance-level filter: "git:platform", "custom:mything"
118        other => {
119            if let Some((conn_type, name)) = other.split_once(':') {
120                let conn = registry.find(conn_type, name).ok_or_else(|| {
121                    let available: Vec<String> = registry
122                        .connectors_by_type(conn_type)
123                        .iter()
124                        .map(|c| c.name().to_string())
125                        .collect();
126                    anyhow::anyhow!(
127                        "No {} connector '{}'. Available: {}",
128                        conn_type,
129                        name,
130                        if available.is_empty() {
131                            "(none)".to_string()
132                        } else {
133                            available.join(", ")
134                        }
135                    )
136                })?;
137                Ok(vec![conn])
138            } else {
139                bail!(
140                    "Unknown connector: '{}'. Use: all, filesystem, git, s3, script, custom, or type:name",
141                    other
142                );
143            }
144        }
145    }
146}
147
148/// Runs the full ingestion pipeline for the specified connector(s).
149///
150/// This is the entry point for `ctx sync <connector>`. It builds a
151/// [`ConnectorRegistry`] from the config and runs the pipeline.
152///
153/// # Arguments
154///
155/// - `config` — application configuration.
156/// - `connector` — the connector specifier: `"all"`, a type name (`"git"`),
157///   or a specific instance (`"git:platform"`).
158/// - `full` — if `true`, ignores existing checkpoints and reprocesses all items.
159/// - `dry_run` — if `true`, scans and counts items without writing to the database.
160/// - `since` — optional `YYYY-MM-DD` date; only items updated on or after this date are processed.
161/// - `until` — optional `YYYY-MM-DD` date; only items updated on or before this date are processed.
162/// - `limit` — optional maximum number of items to process (per connector instance).
163/// - `progress` — optional progress reporter; when provided, progress is emitted on stderr.
164///
165/// # Errors
166///
167/// Returns an error if:
168/// - The specified connector is unknown or not configured.
169/// - A connector fails to scan (e.g., network error).
170/// - A database operation fails.
171#[allow(clippy::too_many_arguments)]
172pub async fn run_sync(
173    config: &Config,
174    connector: &str,
175    full: bool,
176    dry_run: bool,
177    since: Option<String>,
178    until: Option<String>,
179    limit: Option<usize>,
180    progress: Option<&dyn SyncProgressReporter>,
181) -> Result<()> {
182    let registry = ConnectorRegistry::from_config(config);
183    run_sync_with_registry(
184        config, connector, full, dry_run, since, until, limit, &registry, progress,
185    )
186    .await
187}
188
189/// Runs the ingestion pipeline with additional custom connectors.
190///
191/// Like [`run_sync`], but accepts extra connectors to merge with
192/// the built-in ones from config. This is the entry point for custom
193/// binaries that register `Connector` trait implementations.
194///
195/// The `extra_connectors` are combined with the built-in connectors
196/// resolved from the config file.
197#[allow(clippy::too_many_arguments, dead_code)]
198pub async fn run_sync_with_extensions(
199    config: &Config,
200    connector: &str,
201    full: bool,
202    dry_run: bool,
203    since: Option<String>,
204    until: Option<String>,
205    limit: Option<usize>,
206    extra_connectors: &ConnectorRegistry,
207) -> Result<()> {
208    // Build combined connector list from config + extras
209    let built_in = ConnectorRegistry::from_config(config);
210
211    // Resolve from built-in registry (ignore error if extras might match)
212    let mut resolved: Vec<&dyn Connector> = match resolve_connectors(&built_in, connector) {
213        Ok(r) => r,
214        Err(_) if !extra_connectors.is_empty() => Vec::new(),
215        Err(e) => return Err(e),
216    };
217
218    // Also resolve from extras
219    if let Ok(extras) = resolve_connectors(extra_connectors, connector) {
220        resolved.extend(extras);
221    }
222
223    if resolved.is_empty() {
224        bail!("No connectors matched '{}'.", connector);
225    }
226
227    run_connectors(config, &resolved, full, dry_run, since, until, limit, None).await
228}
229
230/// Runs the ingestion pipeline with a pre-built [`ConnectorRegistry`].
231///
232/// This is the unified entry point. All connectors in the registry are
233/// trait objects — built-in, Lua, and custom all go through the same path.
234#[allow(clippy::too_many_arguments)]
235pub async fn run_sync_with_registry(
236    config: &Config,
237    connector: &str,
238    full: bool,
239    dry_run: bool,
240    since: Option<String>,
241    until: Option<String>,
242    limit: Option<usize>,
243    registry: &ConnectorRegistry,
244    progress: Option<&dyn SyncProgressReporter>,
245) -> Result<()> {
246    let connectors = resolve_connectors(registry, connector)?;
247    run_connectors(
248        config,
249        &connectors,
250        full,
251        dry_run,
252        since,
253        until,
254        limit,
255        progress,
256    )
257    .await
258}
259
260/// Report progress every N items during ingest (avoids flooding stderr).
261const INGEST_PROGRESS_INTERVAL: u64 = 10;
262
263/// Core sync engine: scans connectors sequentially, ingests items.
264///
265/// Scans are run sequentially since connectors hold references. Each
266/// connector's items flow through the standard checkpoint → filter → upsert →
267/// chunk → embed pipeline.
268#[allow(clippy::too_many_arguments)]
269async fn run_connectors(
270    config: &Config,
271    connectors: &[&dyn Connector],
272    full: bool,
273    dry_run: bool,
274    since: Option<String>,
275    until: Option<String>,
276    limit: Option<usize>,
277    progress: Option<&dyn SyncProgressReporter>,
278) -> Result<()> {
279    if connectors.len() > 1 {
280        println!("Syncing {} connector instances...", connectors.len());
281    }
282
283    // Scan all connectors and collect results
284    let mut scan_results: Vec<(String, Vec<SourceItem>)> = Vec::new();
285    let mut scan_errors: Vec<String> = Vec::new();
286
287    for conn in connectors {
288        let label = conn.source_label();
289        if let Some(p) = progress {
290            p.report(SyncProgressEvent::Discovering {
291                connector: label.to_string(),
292            });
293        }
294        match conn.scan().await {
295            Ok(items) => {
296                scan_results.push((label, items));
297            }
298            Err(e) => {
299                scan_errors.push(format!("{}: {:#}", label, e));
300            }
301        }
302    }
303
304    // Report scan errors but continue with successful scans
305    for err in &scan_errors {
306        eprintln!("Warning: scan failed: {}", err);
307    }
308
309    if scan_results.is_empty() && !scan_errors.is_empty() {
310        bail!("All connector scans failed:\n{}", scan_errors.join("\n"));
311    }
312
313    // Sort for deterministic output ordering
314    scan_results.sort_by(|a, b| a.0.cmp(&b.0));
315
316    // Ingest each target's items (sequential — SQLite writes are serialized)
317    let store = SqliteAppStore::connect(config).await?;
318
319    for (source_label, mut items) in scan_results {
320        // Load checkpoint
321        let checkpoint: Option<i64> = if full {
322            None
323        } else {
324            store.get_checkpoint(&source_label).await?
325        };
326
327        // Filter by checkpoint (skip files not modified since checkpoint)
328        if let Some(cp) = checkpoint {
329            items.retain(|item| item.updated_at.timestamp() > cp);
330        }
331
332        // Apply --since filter
333        if let Some(ref since_str) = since {
334            let since_date = NaiveDate::parse_from_str(since_str, "%Y-%m-%d")?;
335            let since_ts = since_date
336                .and_hms_opt(0, 0, 0)
337                .unwrap()
338                .and_utc()
339                .timestamp();
340            items.retain(|item| item.updated_at.timestamp() >= since_ts);
341        }
342
343        // Apply --until filter
344        if let Some(ref until_str) = until {
345            let until_date = NaiveDate::parse_from_str(until_str, "%Y-%m-%d")?;
346            let until_ts = until_date
347                .and_hms_opt(23, 59, 59)
348                .unwrap()
349                .and_utc()
350                .timestamp();
351            items.retain(|item| item.updated_at.timestamp() <= until_ts);
352        }
353
354        // Apply --limit (per connector instance)
355        if let Some(lim) = limit {
356            items.truncate(lim);
357        }
358
359        if dry_run {
360            println!("sync {} (dry-run)", source_label);
361            println!("  items found: {}", items.len());
362            let total_chunks: usize = items
363                .iter()
364                .map(|item| chunk_text("tmp", &item.body, config.chunking.max_tokens).len())
365                .sum();
366            println!("  estimated chunks: {}", total_chunks);
367            continue;
368        }
369
370        let mut docs_upserted = 0u64;
371        let mut chunks_written = 0u64;
372        let mut embeddings_written = 0u64;
373        let mut embeddings_pending = 0u64;
374        let mut extraction_skipped = 0u64;
375        let mut max_updated: i64 = checkpoint.unwrap_or(0);
376        let max_extract_bytes = max_extract_bytes_for_source(config, &source_label);
377        let total_items = items.len() as u64;
378
379        if let Some(p) = progress {
380            if total_items > 0 {
381                p.report(SyncProgressEvent::Ingesting {
382                    connector: source_label.clone(),
383                    n: 0,
384                    total: total_items,
385                });
386            }
387        }
388
389        for item in items.iter_mut() {
390            if let Some(ref bytes) = item.raw_bytes {
391                if bytes.len() as u64 > max_extract_bytes {
392                    extraction_skipped += 1;
393                    eprintln!(
394                        "Warning: skipping {} (size {} > max_extract_bytes {})",
395                        item.source_id,
396                        bytes.len(),
397                        max_extract_bytes
398                    );
399                    continue;
400                }
401                match extract::extract_text(bytes, &item.content_type) {
402                    Ok(text) => {
403                        item.body = text;
404                        item.raw_bytes = None;
405                    }
406                    Err(e) => {
407                        extraction_skipped += 1;
408                        eprintln!("Warning: extraction failed for {}: {}", item.source_id, e);
409                        continue;
410                    }
411                }
412            }
413
414            let doc_id = store.upsert_source_item(item).await?;
415            let chunks = chunk_text(&doc_id, &item.body, config.chunking.max_tokens);
416            let chunk_count = chunks.len() as u64;
417            store.replace_chunks(&doc_id, &chunks, None).await?;
418
419            // Inline embedding (non-fatal)
420            let (emb_ok, emb_pending) =
421                embed_cmd::embed_chunks_inline(config, &store, &chunks).await;
422            embeddings_written += emb_ok;
423            embeddings_pending += emb_pending;
424
425            docs_upserted += 1;
426            chunks_written += chunk_count;
427
428            if let Some(p) = progress {
429                let n = docs_upserted;
430                if n.is_multiple_of(INGEST_PROGRESS_INTERVAL) || n == total_items {
431                    p.report(SyncProgressEvent::Ingesting {
432                        connector: source_label.clone(),
433                        n,
434                        total: total_items,
435                    });
436                }
437            }
438
439            let ts = item.updated_at.timestamp();
440            if ts > max_updated {
441                max_updated = ts;
442            }
443        }
444
445        // Update checkpoint
446        store.set_checkpoint(&source_label, max_updated).await?;
447
448        println!("sync {}", source_label);
449        println!("  fetched: {} items", items.len());
450        println!("  upserted documents: {}", docs_upserted);
451        println!("  chunks written: {}", chunks_written);
452        println!("  extraction skipped: {}", extraction_skipped);
453        if config.embedding.is_enabled() {
454            println!("  embeddings written: {}", embeddings_written);
455            println!("  embeddings pending: {}", embeddings_pending);
456        }
457        println!("  checkpoint: {}", max_updated);
458        println!("ok");
459    }
460
461    store.close().await;
462    Ok(())
463}