Module ingest

Module ingest 

Source
Expand description

Ingestion pipeline orchestration.

Coordinates the full sync flow: connector → normalization → chunking → embedding → storage. Supports incremental sync via checkpoints and inline embedding (non-fatal on failure).

§Sync Pipeline

The run_sync function implements the following pipeline:

  1. Resolve targets — expands the connector argument into one or more (source_label, connector_type, instance_name) tuples. Supports:
    • "git:platform" — a single named instance
    • "git" — all instances of a type
    • "all" — every configured connector
  2. Scan (parallel) — dispatches to the appropriate connector for each target, running scans concurrently via [tokio::task::JoinSet].
  3. Filter — applies checkpoint, --since, --until, and --limit filters to each connector’s items.
  4. Upsert documents — inserts or updates each item in the documents table, computing a SHA-256 deduplication hash.
  5. Replace chunks — deletes old chunks (and their embeddings/FTS entries) for the document, then inserts fresh chunks.
  6. Inline embed — if embeddings are enabled, embeds new chunks immediately (non-fatal: failures are logged but do not abort the sync).
  7. Update checkpoint — persists the latest updated_at timestamp so the next incremental sync can skip unchanged items.

§Deduplication

Each document is identified by (source, source_id). If a document with the same composite key already exists, it is updated via an ON CONFLICT upsert. The dedup_hash field is a SHA-256 digest of source + source_id + updated_at + body, enabling downstream consumers to detect changes.

§Checkpointing

Checkpoints are stored in the checkpoints table as (source, cursor) pairs. The cursor is the maximum updated_at timestamp seen during the sync. On subsequent runs, only items newer than the checkpoint are processed.

§Multi-Instance Connectors

All connector types support named instances. Documents are tagged with source = "type:name" (e.g. "git:platform", "filesystem:docs"). When syncing a type (e.g. ctx sync git), all instances of that type are scanned in parallel.

Constants§

DEFAULT_MAX_EXTRACT_BYTES 🔒
Default max extract size when connector is not filesystem or name not found (spec §4.1).
INGEST_PROGRESS_INTERVAL 🔒
Report progress every N items during ingest (avoids flooding stderr).

Functions§

get_checkpoint 🔒
Retrieves the last sync checkpoint for a given connector.
max_extract_bytes_for_source 🔒
Resolve max_extract_bytes for a source from config. Parses “filesystem:name” and looks up the connector config; non-filesystem or unknown name uses DEFAULT_MAX_EXTRACT_BYTES.
replace_chunks 🔒
Atomically replaces all chunks (and their embeddings/FTS entries) for a document.
resolve_connectors 🔒
Resolve a connector argument into a filtered list of connectors to scan.
run_connectors 🔒
Core sync engine: scans connectors sequentially, ingests items.
run_sync
Runs the full ingestion pipeline for the specified connector(s).
run_sync_with_extensions
Runs the ingestion pipeline with additional custom connectors.
run_sync_with_registry
Runs the ingestion pipeline with a pre-built ConnectorRegistry.
set_checkpoint 🔒
Persists the sync checkpoint for a connector.
upsert_document 🔒
Inserts or updates a document in the documents table.