context_harness/
ingest.rs

1//! Ingestion pipeline orchestration.
2//!
3//! Coordinates the full sync flow: connector → normalization → chunking →
4//! embedding → storage. Supports incremental sync via checkpoints and
5//! inline embedding (non-fatal on failure).
6//!
7//! # Sync Pipeline
8//!
9//! The [`run_sync`] function implements the following pipeline:
10//!
11//! 1. **Resolve targets** — expands the connector argument into one or more
12//!    `(source_label, connector_type, instance_name)` tuples. Supports:
13//!    - `"git:platform"` — a single named instance
14//!    - `"git"` — all instances of a type
15//!    - `"all"` — every configured connector
16//! 2. **Scan (parallel)** — dispatches to the appropriate connector for each
17//!    target, running scans concurrently via [`tokio::task::JoinSet`].
18//! 3. **Filter** — applies checkpoint, `--since`, `--until`, and `--limit`
19//!    filters to each connector's items.
20//! 4. **Upsert documents** — inserts or updates each item in the `documents`
21//!    table, computing a SHA-256 deduplication hash.
22//! 5. **Replace chunks** — deletes old chunks (and their embeddings/FTS entries)
23//!    for the document, then inserts fresh chunks.
24//! 6. **Inline embed** — if embeddings are enabled, embeds new chunks
25//!    immediately (non-fatal: failures are logged but do not abort the sync).
26//! 7. **Update checkpoint** — persists the latest `updated_at` timestamp
27//!    so the next incremental sync can skip unchanged items.
28//!
29//! # Deduplication
30//!
31//! Each document is identified by `(source, source_id)`. If a document with
32//! the same composite key already exists, it is updated via an `ON CONFLICT`
33//! upsert. The `dedup_hash` field is a SHA-256 digest of source + source_id +
34//! updated_at + body, enabling downstream consumers to detect changes.
35//!
36//! # Checkpointing
37//!
38//! Checkpoints are stored in the `checkpoints` table as `(source, cursor)`
39//! pairs. The cursor is the maximum `updated_at` timestamp seen during the
40//! sync. On subsequent runs, only items newer than the checkpoint are processed.
41//!
42//! # Multi-Instance Connectors
43//!
44//! All connector types support named instances. Documents are tagged with
45//! `source = "type:name"` (e.g. `"git:platform"`, `"filesystem:docs"`).
46//! When syncing a type (e.g. `ctx sync git`), all instances of that type
47//! are scanned in parallel.
48
49use anyhow::{bail, Result};
50use chrono::NaiveDate;
51use sha2::{Digest, Sha256};
52use sqlx::SqlitePool;
53use uuid::Uuid;
54
55use crate::chunk::chunk_text;
56use crate::config::Config;
57use crate::db;
58use crate::embed_cmd;
59use crate::extract;
60use crate::models::SourceItem;
61use crate::progress::{SyncProgressEvent, SyncProgressReporter};
62use crate::traits::{Connector, ConnectorRegistry};
63
64/// Default max extract size when connector is not filesystem or name not found (spec §4.1).
65const DEFAULT_MAX_EXTRACT_BYTES: u64 = 50_000_000;
66
67/// Resolve max_extract_bytes for a source from config. Parses "filesystem:name" and looks up
68/// the connector config; non-filesystem or unknown name uses DEFAULT_MAX_EXTRACT_BYTES.
69fn max_extract_bytes_for_source(config: &Config, source_label: &str) -> u64 {
70    if let Some(name) = source_label.strip_prefix("filesystem:") {
71        config
72            .connectors
73            .filesystem
74            .get(name)
75            .map(|c| c.max_extract_bytes)
76            .unwrap_or(DEFAULT_MAX_EXTRACT_BYTES)
77    } else {
78        DEFAULT_MAX_EXTRACT_BYTES
79    }
80}
81
82/// Resolve a connector argument into a filtered list of connectors to scan.
83///
84/// The `registry` contains all connectors (built-in + custom). This function
85/// filters them based on the user's connector argument.
86///
87/// # Supported Formats
88///
89/// | Input | Meaning |
90/// |-------|---------|
91/// | `"all"` | Every registered connector |
92/// | `"git"` | All connectors of type `"git"` |
93/// | `"filesystem"` | All connectors of type `"filesystem"` |
94/// | `"s3"` | All connectors of type `"s3"` |
95/// | `"script"` | All connectors of type `"script"` |
96/// | `"custom"` | All connectors of type `"custom"` |
97/// | `"git:platform"` | Specific named instance |
98/// | `"custom:myconn"` | Specific named instance |
99fn resolve_connectors<'a>(
100    registry: &'a ConnectorRegistry,
101    connector_arg: &str,
102) -> Result<Vec<&'a dyn Connector>> {
103    match connector_arg {
104        "all" => {
105            let all = registry.connectors();
106            if all.is_empty() {
107                bail!("No connectors configured. Add connector sections to your config.");
108            }
109            Ok(all.iter().map(|c| c.as_ref()).collect())
110        }
111        // Type-level filter: "git", "filesystem", "s3", "script", "custom"
112        conn_type if matches!(conn_type, "filesystem" | "git" | "s3" | "script" | "custom") => {
113            let matched = registry.connectors_by_type(conn_type);
114            if matched.is_empty() {
115                bail!("No {} connectors configured.", conn_type);
116            }
117            Ok(matched)
118        }
119        // Instance-level filter: "git:platform", "custom:mything"
120        other => {
121            if let Some((conn_type, name)) = other.split_once(':') {
122                let conn = registry.find(conn_type, name).ok_or_else(|| {
123                    let available: Vec<String> = registry
124                        .connectors_by_type(conn_type)
125                        .iter()
126                        .map(|c| c.name().to_string())
127                        .collect();
128                    anyhow::anyhow!(
129                        "No {} connector '{}'. Available: {}",
130                        conn_type,
131                        name,
132                        if available.is_empty() {
133                            "(none)".to_string()
134                        } else {
135                            available.join(", ")
136                        }
137                    )
138                })?;
139                Ok(vec![conn])
140            } else {
141                bail!(
142                    "Unknown connector: '{}'. Use: all, filesystem, git, s3, script, custom, or type:name",
143                    other
144                );
145            }
146        }
147    }
148}
149
150/// Runs the full ingestion pipeline for the specified connector(s).
151///
152/// This is the entry point for `ctx sync <connector>`. It builds a
153/// [`ConnectorRegistry`] from the config and runs the pipeline.
154///
155/// # Arguments
156///
157/// - `config` — application configuration.
158/// - `connector` — the connector specifier: `"all"`, a type name (`"git"`),
159///   or a specific instance (`"git:platform"`).
160/// - `full` — if `true`, ignores existing checkpoints and reprocesses all items.
161/// - `dry_run` — if `true`, scans and counts items without writing to the database.
162/// - `since` — optional `YYYY-MM-DD` date; only items updated on or after this date are processed.
163/// - `until` — optional `YYYY-MM-DD` date; only items updated on or before this date are processed.
164/// - `limit` — optional maximum number of items to process (per connector instance).
165/// - `progress` — optional progress reporter; when provided, progress is emitted on stderr.
166///
167/// # Errors
168///
169/// Returns an error if:
170/// - The specified connector is unknown or not configured.
171/// - A connector fails to scan (e.g., network error).
172/// - A database operation fails.
173#[allow(clippy::too_many_arguments)]
174pub async fn run_sync(
175    config: &Config,
176    connector: &str,
177    full: bool,
178    dry_run: bool,
179    since: Option<String>,
180    until: Option<String>,
181    limit: Option<usize>,
182    progress: Option<&dyn SyncProgressReporter>,
183) -> Result<()> {
184    let registry = ConnectorRegistry::from_config(config);
185    run_sync_with_registry(
186        config, connector, full, dry_run, since, until, limit, &registry, progress,
187    )
188    .await
189}
190
191/// Runs the ingestion pipeline with additional custom connectors.
192///
193/// Like [`run_sync`], but accepts extra connectors to merge with
194/// the built-in ones from config. This is the entry point for custom
195/// binaries that register `Connector` trait implementations.
196///
197/// The `extra_connectors` are combined with the built-in connectors
198/// resolved from the config file.
199#[allow(clippy::too_many_arguments, dead_code)]
200pub async fn run_sync_with_extensions(
201    config: &Config,
202    connector: &str,
203    full: bool,
204    dry_run: bool,
205    since: Option<String>,
206    until: Option<String>,
207    limit: Option<usize>,
208    extra_connectors: &ConnectorRegistry,
209) -> Result<()> {
210    // Build combined connector list from config + extras
211    let built_in = ConnectorRegistry::from_config(config);
212
213    // Resolve from built-in registry (ignore error if extras might match)
214    let mut resolved: Vec<&dyn Connector> = match resolve_connectors(&built_in, connector) {
215        Ok(r) => r,
216        Err(_) if !extra_connectors.is_empty() => Vec::new(),
217        Err(e) => return Err(e),
218    };
219
220    // Also resolve from extras
221    if let Ok(extras) = resolve_connectors(extra_connectors, connector) {
222        resolved.extend(extras);
223    }
224
225    if resolved.is_empty() {
226        bail!("No connectors matched '{}'.", connector);
227    }
228
229    run_connectors(config, &resolved, full, dry_run, since, until, limit, None).await
230}
231
232/// Runs the ingestion pipeline with a pre-built [`ConnectorRegistry`].
233///
234/// This is the unified entry point. All connectors in the registry are
235/// trait objects — built-in, Lua, and custom all go through the same path.
236#[allow(clippy::too_many_arguments)]
237pub async fn run_sync_with_registry(
238    config: &Config,
239    connector: &str,
240    full: bool,
241    dry_run: bool,
242    since: Option<String>,
243    until: Option<String>,
244    limit: Option<usize>,
245    registry: &ConnectorRegistry,
246    progress: Option<&dyn SyncProgressReporter>,
247) -> Result<()> {
248    let connectors = resolve_connectors(registry, connector)?;
249    run_connectors(
250        config,
251        &connectors,
252        full,
253        dry_run,
254        since,
255        until,
256        limit,
257        progress,
258    )
259    .await
260}
261
262/// Report progress every N items during ingest (avoids flooding stderr).
263const INGEST_PROGRESS_INTERVAL: u64 = 10;
264
265/// Core sync engine: scans connectors sequentially, ingests items.
266///
267/// Scans are run sequentially since connectors hold references. Each
268/// connector's items flow through the standard checkpoint → filter → upsert →
269/// chunk → embed pipeline.
270#[allow(clippy::too_many_arguments)]
271async fn run_connectors(
272    config: &Config,
273    connectors: &[&dyn Connector],
274    full: bool,
275    dry_run: bool,
276    since: Option<String>,
277    until: Option<String>,
278    limit: Option<usize>,
279    progress: Option<&dyn SyncProgressReporter>,
280) -> Result<()> {
281    if connectors.len() > 1 {
282        println!("Syncing {} connector instances...", connectors.len());
283    }
284
285    // Scan all connectors and collect results
286    let mut scan_results: Vec<(String, Vec<SourceItem>)> = Vec::new();
287    let mut scan_errors: Vec<String> = Vec::new();
288
289    for conn in connectors {
290        let label = conn.source_label();
291        if let Some(p) = progress {
292            p.report(SyncProgressEvent::Discovering {
293                connector: label.to_string(),
294            });
295        }
296        match conn.scan().await {
297            Ok(items) => {
298                scan_results.push((label, items));
299            }
300            Err(e) => {
301                scan_errors.push(format!("{}: {:#}", label, e));
302            }
303        }
304    }
305
306    // Report scan errors but continue with successful scans
307    for err in &scan_errors {
308        eprintln!("Warning: scan failed: {}", err);
309    }
310
311    if scan_results.is_empty() && !scan_errors.is_empty() {
312        bail!("All connector scans failed:\n{}", scan_errors.join("\n"));
313    }
314
315    // Sort for deterministic output ordering
316    scan_results.sort_by(|a, b| a.0.cmp(&b.0));
317
318    // Ingest each target's items (sequential — SQLite writes are serialized)
319    let pool = db::connect(config).await?;
320
321    for (source_label, mut items) in scan_results {
322        // Load checkpoint
323        let checkpoint: Option<i64> = if full {
324            None
325        } else {
326            get_checkpoint(&pool, &source_label).await?
327        };
328
329        // Filter by checkpoint (skip files not modified since checkpoint)
330        if let Some(cp) = checkpoint {
331            items.retain(|item| item.updated_at.timestamp() > cp);
332        }
333
334        // Apply --since filter
335        if let Some(ref since_str) = since {
336            let since_date = NaiveDate::parse_from_str(since_str, "%Y-%m-%d")?;
337            let since_ts = since_date
338                .and_hms_opt(0, 0, 0)
339                .unwrap()
340                .and_utc()
341                .timestamp();
342            items.retain(|item| item.updated_at.timestamp() >= since_ts);
343        }
344
345        // Apply --until filter
346        if let Some(ref until_str) = until {
347            let until_date = NaiveDate::parse_from_str(until_str, "%Y-%m-%d")?;
348            let until_ts = until_date
349                .and_hms_opt(23, 59, 59)
350                .unwrap()
351                .and_utc()
352                .timestamp();
353            items.retain(|item| item.updated_at.timestamp() <= until_ts);
354        }
355
356        // Apply --limit (per connector instance)
357        if let Some(lim) = limit {
358            items.truncate(lim);
359        }
360
361        if dry_run {
362            println!("sync {} (dry-run)", source_label);
363            println!("  items found: {}", items.len());
364            let total_chunks: usize = items
365                .iter()
366                .map(|item| chunk_text("tmp", &item.body, config.chunking.max_tokens).len())
367                .sum();
368            println!("  estimated chunks: {}", total_chunks);
369            continue;
370        }
371
372        let mut docs_upserted = 0u64;
373        let mut chunks_written = 0u64;
374        let mut embeddings_written = 0u64;
375        let mut embeddings_pending = 0u64;
376        let mut extraction_skipped = 0u64;
377        let mut max_updated: i64 = checkpoint.unwrap_or(0);
378        let max_extract_bytes = max_extract_bytes_for_source(config, &source_label);
379        let total_items = items.len() as u64;
380
381        if let Some(p) = progress {
382            if total_items > 0 {
383                p.report(SyncProgressEvent::Ingesting {
384                    connector: source_label.clone(),
385                    n: 0,
386                    total: total_items,
387                });
388            }
389        }
390
391        for item in items.iter_mut() {
392            if let Some(ref bytes) = item.raw_bytes {
393                if bytes.len() as u64 > max_extract_bytes {
394                    extraction_skipped += 1;
395                    eprintln!(
396                        "Warning: skipping {} (size {} > max_extract_bytes {})",
397                        item.source_id,
398                        bytes.len(),
399                        max_extract_bytes
400                    );
401                    continue;
402                }
403                match extract::extract_text(bytes, &item.content_type) {
404                    Ok(text) => {
405                        item.body = text;
406                        item.raw_bytes = None;
407                    }
408                    Err(e) => {
409                        extraction_skipped += 1;
410                        eprintln!("Warning: extraction failed for {}: {}", item.source_id, e);
411                        continue;
412                    }
413                }
414            }
415
416            let doc_id = upsert_document(&pool, item).await?;
417            let chunks = chunk_text(&doc_id, &item.body, config.chunking.max_tokens);
418            let chunk_count = chunks.len() as u64;
419            replace_chunks(&pool, &doc_id, &chunks).await?;
420
421            // Inline embedding (non-fatal)
422            let (emb_ok, emb_pending) =
423                embed_cmd::embed_chunks_inline(config, &pool, &chunks).await;
424            embeddings_written += emb_ok;
425            embeddings_pending += emb_pending;
426
427            docs_upserted += 1;
428            chunks_written += chunk_count;
429
430            if let Some(p) = progress {
431                let n = docs_upserted;
432                if n.is_multiple_of(INGEST_PROGRESS_INTERVAL) || n == total_items {
433                    p.report(SyncProgressEvent::Ingesting {
434                        connector: source_label.clone(),
435                        n,
436                        total: total_items,
437                    });
438                }
439            }
440
441            let ts = item.updated_at.timestamp();
442            if ts > max_updated {
443                max_updated = ts;
444            }
445        }
446
447        // Update checkpoint
448        set_checkpoint(&pool, &source_label, max_updated).await?;
449
450        println!("sync {}", source_label);
451        println!("  fetched: {} items", items.len());
452        println!("  upserted documents: {}", docs_upserted);
453        println!("  chunks written: {}", chunks_written);
454        println!("  extraction skipped: {}", extraction_skipped);
455        if config.embedding.is_enabled() {
456            println!("  embeddings written: {}", embeddings_written);
457            println!("  embeddings pending: {}", embeddings_pending);
458        }
459        println!("  checkpoint: {}", max_updated);
460        println!("ok");
461    }
462
463    pool.close().await;
464    Ok(())
465}
466
467/// Inserts or updates a document in the `documents` table.
468///
469/// Computes a SHA-256 deduplication hash from the item's source, source_id,
470/// updated_at timestamp, and body. If a document with the same `(source, source_id)`
471/// already exists, it is updated with the new data; otherwise a new UUID is assigned.
472///
473/// # Returns
474///
475/// The document's UUID (existing or newly generated).
476async fn upsert_document(pool: &SqlitePool, item: &SourceItem) -> Result<String> {
477    // Compute dedup hash
478    let mut hasher = Sha256::new();
479    hasher.update(item.source.as_bytes());
480    hasher.update(item.source_id.as_bytes());
481    hasher.update(item.updated_at.timestamp().to_le_bytes());
482    hasher.update(item.body.as_bytes());
483    let dedup_hash = format!("{:x}", hasher.finalize());
484
485    // Check if document exists
486    let existing_id: Option<String> =
487        sqlx::query_scalar("SELECT id FROM documents WHERE source = ? AND source_id = ?")
488            .bind(&item.source)
489            .bind(&item.source_id)
490            .fetch_optional(pool)
491            .await?;
492
493    let doc_id = existing_id.unwrap_or_else(|| Uuid::new_v4().to_string());
494
495    sqlx::query(
496        r#"
497        INSERT INTO documents (id, source, source_id, source_url, title, author, created_at, updated_at, content_type, body, metadata_json, raw_json, dedup_hash)
498        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
499        ON CONFLICT(source, source_id) DO UPDATE SET
500            source_url = excluded.source_url,
501            title = excluded.title,
502            author = excluded.author,
503            updated_at = excluded.updated_at,
504            content_type = excluded.content_type,
505            body = excluded.body,
506            metadata_json = excluded.metadata_json,
507            raw_json = excluded.raw_json,
508            dedup_hash = excluded.dedup_hash
509        "#,
510    )
511    .bind(&doc_id)
512    .bind(&item.source)
513    .bind(&item.source_id)
514    .bind(&item.source_url)
515    .bind(&item.title)
516    .bind(&item.author)
517    .bind(item.created_at.timestamp())
518    .bind(item.updated_at.timestamp())
519    .bind(&item.content_type)
520    .bind(&item.body)
521    .bind(&item.metadata_json)
522    .bind(&item.raw_json)
523    .bind(&dedup_hash)
524    .execute(pool)
525    .await?;
526
527    Ok(doc_id)
528}
529
530/// Atomically replaces all chunks (and their embeddings/FTS entries) for a document.
531///
532/// This function runs inside a single SQLite transaction to ensure consistency:
533/// 1. Deletes old `chunk_vectors` rows for the document's chunks.
534/// 2. Deletes old `embeddings` metadata rows for the document's chunks.
535/// 3. Deletes old `chunks_fts` (FTS5) entries for the document.
536/// 4. Deletes old `chunks` rows for the document.
537/// 5. Inserts new chunks and their corresponding FTS entries.
538///
539/// # Arguments
540///
541/// - `pool` — the database connection pool.
542/// - `document_id` — the UUID of the parent document.
543/// - `chunks` — the new set of chunks to insert.
544async fn replace_chunks(
545    pool: &SqlitePool,
546    document_id: &str,
547    chunks: &[crate::models::Chunk],
548) -> Result<()> {
549    let mut tx = pool.begin().await?;
550
551    // Delete old embeddings for this document's chunks
552    sqlx::query(
553        "DELETE FROM chunk_vectors WHERE chunk_id IN (SELECT id FROM chunks WHERE document_id = ?)",
554    )
555    .bind(document_id)
556    .execute(&mut *tx)
557    .await?;
558    sqlx::query(
559        "DELETE FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE document_id = ?)",
560    )
561    .bind(document_id)
562    .execute(&mut *tx)
563    .await?;
564
565    // Delete old FTS entries for this document's chunks
566    sqlx::query("DELETE FROM chunks_fts WHERE document_id = ?")
567        .bind(document_id)
568        .execute(&mut *tx)
569        .await?;
570
571    // Delete old chunks
572    sqlx::query("DELETE FROM chunks WHERE document_id = ?")
573        .bind(document_id)
574        .execute(&mut *tx)
575        .await?;
576
577    // Insert new chunks + FTS entries
578    for chunk in chunks {
579        sqlx::query(
580            "INSERT INTO chunks (id, document_id, chunk_index, text, hash) VALUES (?, ?, ?, ?, ?)",
581        )
582        .bind(&chunk.id)
583        .bind(&chunk.document_id)
584        .bind(chunk.chunk_index)
585        .bind(&chunk.text)
586        .bind(&chunk.hash)
587        .execute(&mut *tx)
588        .await?;
589
590        sqlx::query("INSERT INTO chunks_fts (chunk_id, document_id, text) VALUES (?, ?, ?)")
591            .bind(&chunk.id)
592            .bind(&chunk.document_id)
593            .bind(&chunk.text)
594            .execute(&mut *tx)
595            .await?;
596    }
597
598    tx.commit().await?;
599    Ok(())
600}
601
602/// Retrieves the last sync checkpoint for a given connector.
603///
604/// Returns `Some(timestamp)` if a checkpoint exists, or `None` if this is
605/// the first sync for the connector.
606async fn get_checkpoint(pool: &SqlitePool, source: &str) -> Result<Option<i64>> {
607    let result: Option<String> =
608        sqlx::query_scalar("SELECT cursor FROM checkpoints WHERE source = ?")
609            .bind(source)
610            .fetch_optional(pool)
611            .await?;
612
613    Ok(result.and_then(|s| s.parse::<i64>().ok()))
614}
615
616/// Persists the sync checkpoint for a connector.
617///
618/// Uses an upsert to create or update the checkpoint row. The `cursor`
619/// value is typically the maximum `updated_at` timestamp seen during the sync.
620async fn set_checkpoint(pool: &SqlitePool, source: &str, cursor_val: i64) -> Result<()> {
621    let now = chrono::Utc::now().timestamp();
622    sqlx::query(
623        r#"
624        INSERT INTO checkpoints (source, cursor, updated_at) VALUES (?, ?, ?)
625        ON CONFLICT(source) DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at
626        "#,
627    )
628    .bind(source)
629    .bind(cursor_val.to_string())
630    .bind(now)
631    .execute(pool)
632    .await?;
633
634    Ok(())
635}