Expand description
Ingestion pipeline orchestration.
Coordinates the full sync flow: connector → normalization → chunking → embedding → storage. Supports incremental sync via checkpoints and inline embedding (non-fatal on failure).
§Sync Pipeline
The run_sync function implements the following pipeline:
- Resolve targets — expands the connector argument into one or more
(source_label, connector_type, instance_name)tuples. Supports:"git:platform"— a single named instance"git"— all instances of a type"all"— every configured connector
- Scan (parallel) — dispatches to the appropriate connector for each
target, running scans concurrently via [
tokio::task::JoinSet]. - Filter — applies checkpoint,
--since,--until, and--limitfilters to each connector’s items. - Upsert documents — inserts or updates each item in the
documentstable, computing a SHA-256 deduplication hash. - Replace chunks — deletes old chunks (and their embeddings/FTS entries) for the document, then inserts fresh chunks.
- Inline embed — if embeddings are enabled, embeds new chunks immediately (non-fatal: failures are logged but do not abort the sync).
- Update checkpoint — persists the latest
updated_attimestamp so the next incremental sync can skip unchanged items.
§Deduplication
Each document is identified by (source, source_id). If a document with
the same composite key already exists, it is updated via an ON CONFLICT
upsert. The dedup_hash field is a SHA-256 digest of source + source_id +
updated_at + body, enabling downstream consumers to detect changes.
§Checkpointing
Checkpoints are stored in the checkpoints table as (source, cursor)
pairs. The cursor is the maximum updated_at timestamp seen during the
sync. On subsequent runs, only items newer than the checkpoint are processed.
§Multi-Instance Connectors
All connector types support named instances. Documents are tagged with
source = "type:name" (e.g. "git:platform", "filesystem:docs").
When syncing a type (e.g. ctx sync git), all instances of that type
are scanned in parallel.
Constants§
- DEFAULT_
MAX_ 🔒EXTRACT_ BYTES - Default max extract size when connector is not filesystem or name not found (spec §4.1).
- INGEST_
PROGRESS_ 🔒INTERVAL - Report progress every N items during ingest (avoids flooding stderr).
Functions§
- get_
checkpoint 🔒 - Retrieves the last sync checkpoint for a given connector.
- max_
extract_ 🔒bytes_ for_ source - Resolve max_extract_bytes for a source from config. Parses “filesystem:name” and looks up the connector config; non-filesystem or unknown name uses DEFAULT_MAX_EXTRACT_BYTES.
- replace_
chunks 🔒 - Atomically replaces all chunks (and their embeddings/FTS entries) for a document.
- resolve_
connectors 🔒 - Resolve a connector argument into a filtered list of connectors to scan.
- run_
connectors 🔒 - Core sync engine: scans connectors sequentially, ingests items.
- run_
sync - Runs the full ingestion pipeline for the specified connector(s).
- run_
sync_ with_ extensions - Runs the ingestion pipeline with additional custom connectors.
- run_
sync_ with_ registry - Runs the ingestion pipeline with a pre-built
ConnectorRegistry. - set_
checkpoint 🔒 - Persists the sync checkpoint for a connector.
- upsert_
document 🔒 - Inserts or updates a document in the
documentstable.