1use anyhow::{bail, Result};
50use chrono::NaiveDate;
51use sha2::{Digest, Sha256};
52use sqlx::SqlitePool;
53use uuid::Uuid;
54
55use crate::chunk::chunk_text;
56use crate::config::Config;
57use crate::db;
58use crate::embed_cmd;
59use crate::extract;
60use crate::models::SourceItem;
61use crate::progress::{SyncProgressEvent, SyncProgressReporter};
62use crate::traits::{Connector, ConnectorRegistry};
63
64const DEFAULT_MAX_EXTRACT_BYTES: u64 = 50_000_000;
66
67fn max_extract_bytes_for_source(config: &Config, source_label: &str) -> u64 {
70 if let Some(name) = source_label.strip_prefix("filesystem:") {
71 config
72 .connectors
73 .filesystem
74 .get(name)
75 .map(|c| c.max_extract_bytes)
76 .unwrap_or(DEFAULT_MAX_EXTRACT_BYTES)
77 } else {
78 DEFAULT_MAX_EXTRACT_BYTES
79 }
80}
81
82fn resolve_connectors<'a>(
100 registry: &'a ConnectorRegistry,
101 connector_arg: &str,
102) -> Result<Vec<&'a dyn Connector>> {
103 match connector_arg {
104 "all" => {
105 let all = registry.connectors();
106 if all.is_empty() {
107 bail!("No connectors configured. Add connector sections to your config.");
108 }
109 Ok(all.iter().map(|c| c.as_ref()).collect())
110 }
111 conn_type if matches!(conn_type, "filesystem" | "git" | "s3" | "script" | "custom") => {
113 let matched = registry.connectors_by_type(conn_type);
114 if matched.is_empty() {
115 bail!("No {} connectors configured.", conn_type);
116 }
117 Ok(matched)
118 }
119 other => {
121 if let Some((conn_type, name)) = other.split_once(':') {
122 let conn = registry.find(conn_type, name).ok_or_else(|| {
123 let available: Vec<String> = registry
124 .connectors_by_type(conn_type)
125 .iter()
126 .map(|c| c.name().to_string())
127 .collect();
128 anyhow::anyhow!(
129 "No {} connector '{}'. Available: {}",
130 conn_type,
131 name,
132 if available.is_empty() {
133 "(none)".to_string()
134 } else {
135 available.join(", ")
136 }
137 )
138 })?;
139 Ok(vec![conn])
140 } else {
141 bail!(
142 "Unknown connector: '{}'. Use: all, filesystem, git, s3, script, custom, or type:name",
143 other
144 );
145 }
146 }
147 }
148}
149
150#[allow(clippy::too_many_arguments)]
174pub async fn run_sync(
175 config: &Config,
176 connector: &str,
177 full: bool,
178 dry_run: bool,
179 since: Option<String>,
180 until: Option<String>,
181 limit: Option<usize>,
182 progress: Option<&dyn SyncProgressReporter>,
183) -> Result<()> {
184 let registry = ConnectorRegistry::from_config(config);
185 run_sync_with_registry(
186 config, connector, full, dry_run, since, until, limit, ®istry, progress,
187 )
188 .await
189}
190
191#[allow(clippy::too_many_arguments, dead_code)]
200pub async fn run_sync_with_extensions(
201 config: &Config,
202 connector: &str,
203 full: bool,
204 dry_run: bool,
205 since: Option<String>,
206 until: Option<String>,
207 limit: Option<usize>,
208 extra_connectors: &ConnectorRegistry,
209) -> Result<()> {
210 let built_in = ConnectorRegistry::from_config(config);
212
213 let mut resolved: Vec<&dyn Connector> = match resolve_connectors(&built_in, connector) {
215 Ok(r) => r,
216 Err(_) if !extra_connectors.is_empty() => Vec::new(),
217 Err(e) => return Err(e),
218 };
219
220 if let Ok(extras) = resolve_connectors(extra_connectors, connector) {
222 resolved.extend(extras);
223 }
224
225 if resolved.is_empty() {
226 bail!("No connectors matched '{}'.", connector);
227 }
228
229 run_connectors(config, &resolved, full, dry_run, since, until, limit, None).await
230}
231
232#[allow(clippy::too_many_arguments)]
237pub async fn run_sync_with_registry(
238 config: &Config,
239 connector: &str,
240 full: bool,
241 dry_run: bool,
242 since: Option<String>,
243 until: Option<String>,
244 limit: Option<usize>,
245 registry: &ConnectorRegistry,
246 progress: Option<&dyn SyncProgressReporter>,
247) -> Result<()> {
248 let connectors = resolve_connectors(registry, connector)?;
249 run_connectors(
250 config,
251 &connectors,
252 full,
253 dry_run,
254 since,
255 until,
256 limit,
257 progress,
258 )
259 .await
260}
261
262const INGEST_PROGRESS_INTERVAL: u64 = 10;
264
265#[allow(clippy::too_many_arguments)]
271async fn run_connectors(
272 config: &Config,
273 connectors: &[&dyn Connector],
274 full: bool,
275 dry_run: bool,
276 since: Option<String>,
277 until: Option<String>,
278 limit: Option<usize>,
279 progress: Option<&dyn SyncProgressReporter>,
280) -> Result<()> {
281 if connectors.len() > 1 {
282 println!("Syncing {} connector instances...", connectors.len());
283 }
284
285 let mut scan_results: Vec<(String, Vec<SourceItem>)> = Vec::new();
287 let mut scan_errors: Vec<String> = Vec::new();
288
289 for conn in connectors {
290 let label = conn.source_label();
291 if let Some(p) = progress {
292 p.report(SyncProgressEvent::Discovering {
293 connector: label.to_string(),
294 });
295 }
296 match conn.scan().await {
297 Ok(items) => {
298 scan_results.push((label, items));
299 }
300 Err(e) => {
301 scan_errors.push(format!("{}: {:#}", label, e));
302 }
303 }
304 }
305
306 for err in &scan_errors {
308 eprintln!("Warning: scan failed: {}", err);
309 }
310
311 if scan_results.is_empty() && !scan_errors.is_empty() {
312 bail!("All connector scans failed:\n{}", scan_errors.join("\n"));
313 }
314
315 scan_results.sort_by(|a, b| a.0.cmp(&b.0));
317
318 let pool = db::connect(config).await?;
320
321 for (source_label, mut items) in scan_results {
322 let checkpoint: Option<i64> = if full {
324 None
325 } else {
326 get_checkpoint(&pool, &source_label).await?
327 };
328
329 if let Some(cp) = checkpoint {
331 items.retain(|item| item.updated_at.timestamp() > cp);
332 }
333
334 if let Some(ref since_str) = since {
336 let since_date = NaiveDate::parse_from_str(since_str, "%Y-%m-%d")?;
337 let since_ts = since_date
338 .and_hms_opt(0, 0, 0)
339 .unwrap()
340 .and_utc()
341 .timestamp();
342 items.retain(|item| item.updated_at.timestamp() >= since_ts);
343 }
344
345 if let Some(ref until_str) = until {
347 let until_date = NaiveDate::parse_from_str(until_str, "%Y-%m-%d")?;
348 let until_ts = until_date
349 .and_hms_opt(23, 59, 59)
350 .unwrap()
351 .and_utc()
352 .timestamp();
353 items.retain(|item| item.updated_at.timestamp() <= until_ts);
354 }
355
356 if let Some(lim) = limit {
358 items.truncate(lim);
359 }
360
361 if dry_run {
362 println!("sync {} (dry-run)", source_label);
363 println!(" items found: {}", items.len());
364 let total_chunks: usize = items
365 .iter()
366 .map(|item| chunk_text("tmp", &item.body, config.chunking.max_tokens).len())
367 .sum();
368 println!(" estimated chunks: {}", total_chunks);
369 continue;
370 }
371
372 let mut docs_upserted = 0u64;
373 let mut chunks_written = 0u64;
374 let mut embeddings_written = 0u64;
375 let mut embeddings_pending = 0u64;
376 let mut extraction_skipped = 0u64;
377 let mut max_updated: i64 = checkpoint.unwrap_or(0);
378 let max_extract_bytes = max_extract_bytes_for_source(config, &source_label);
379 let total_items = items.len() as u64;
380
381 if let Some(p) = progress {
382 if total_items > 0 {
383 p.report(SyncProgressEvent::Ingesting {
384 connector: source_label.clone(),
385 n: 0,
386 total: total_items,
387 });
388 }
389 }
390
391 for item in items.iter_mut() {
392 if let Some(ref bytes) = item.raw_bytes {
393 if bytes.len() as u64 > max_extract_bytes {
394 extraction_skipped += 1;
395 eprintln!(
396 "Warning: skipping {} (size {} > max_extract_bytes {})",
397 item.source_id,
398 bytes.len(),
399 max_extract_bytes
400 );
401 continue;
402 }
403 match extract::extract_text(bytes, &item.content_type) {
404 Ok(text) => {
405 item.body = text;
406 item.raw_bytes = None;
407 }
408 Err(e) => {
409 extraction_skipped += 1;
410 eprintln!("Warning: extraction failed for {}: {}", item.source_id, e);
411 continue;
412 }
413 }
414 }
415
416 let doc_id = upsert_document(&pool, item).await?;
417 let chunks = chunk_text(&doc_id, &item.body, config.chunking.max_tokens);
418 let chunk_count = chunks.len() as u64;
419 replace_chunks(&pool, &doc_id, &chunks).await?;
420
421 let (emb_ok, emb_pending) =
423 embed_cmd::embed_chunks_inline(config, &pool, &chunks).await;
424 embeddings_written += emb_ok;
425 embeddings_pending += emb_pending;
426
427 docs_upserted += 1;
428 chunks_written += chunk_count;
429
430 if let Some(p) = progress {
431 let n = docs_upserted;
432 if n.is_multiple_of(INGEST_PROGRESS_INTERVAL) || n == total_items {
433 p.report(SyncProgressEvent::Ingesting {
434 connector: source_label.clone(),
435 n,
436 total: total_items,
437 });
438 }
439 }
440
441 let ts = item.updated_at.timestamp();
442 if ts > max_updated {
443 max_updated = ts;
444 }
445 }
446
447 set_checkpoint(&pool, &source_label, max_updated).await?;
449
450 println!("sync {}", source_label);
451 println!(" fetched: {} items", items.len());
452 println!(" upserted documents: {}", docs_upserted);
453 println!(" chunks written: {}", chunks_written);
454 println!(" extraction skipped: {}", extraction_skipped);
455 if config.embedding.is_enabled() {
456 println!(" embeddings written: {}", embeddings_written);
457 println!(" embeddings pending: {}", embeddings_pending);
458 }
459 println!(" checkpoint: {}", max_updated);
460 println!("ok");
461 }
462
463 pool.close().await;
464 Ok(())
465}
466
467async fn upsert_document(pool: &SqlitePool, item: &SourceItem) -> Result<String> {
477 let mut hasher = Sha256::new();
479 hasher.update(item.source.as_bytes());
480 hasher.update(item.source_id.as_bytes());
481 hasher.update(item.updated_at.timestamp().to_le_bytes());
482 hasher.update(item.body.as_bytes());
483 let dedup_hash = format!("{:x}", hasher.finalize());
484
485 let existing_id: Option<String> =
487 sqlx::query_scalar("SELECT id FROM documents WHERE source = ? AND source_id = ?")
488 .bind(&item.source)
489 .bind(&item.source_id)
490 .fetch_optional(pool)
491 .await?;
492
493 let doc_id = existing_id.unwrap_or_else(|| Uuid::new_v4().to_string());
494
495 sqlx::query(
496 r#"
497 INSERT INTO documents (id, source, source_id, source_url, title, author, created_at, updated_at, content_type, body, metadata_json, raw_json, dedup_hash)
498 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
499 ON CONFLICT(source, source_id) DO UPDATE SET
500 source_url = excluded.source_url,
501 title = excluded.title,
502 author = excluded.author,
503 updated_at = excluded.updated_at,
504 content_type = excluded.content_type,
505 body = excluded.body,
506 metadata_json = excluded.metadata_json,
507 raw_json = excluded.raw_json,
508 dedup_hash = excluded.dedup_hash
509 "#,
510 )
511 .bind(&doc_id)
512 .bind(&item.source)
513 .bind(&item.source_id)
514 .bind(&item.source_url)
515 .bind(&item.title)
516 .bind(&item.author)
517 .bind(item.created_at.timestamp())
518 .bind(item.updated_at.timestamp())
519 .bind(&item.content_type)
520 .bind(&item.body)
521 .bind(&item.metadata_json)
522 .bind(&item.raw_json)
523 .bind(&dedup_hash)
524 .execute(pool)
525 .await?;
526
527 Ok(doc_id)
528}
529
530async fn replace_chunks(
545 pool: &SqlitePool,
546 document_id: &str,
547 chunks: &[crate::models::Chunk],
548) -> Result<()> {
549 let mut tx = pool.begin().await?;
550
551 sqlx::query(
553 "DELETE FROM chunk_vectors WHERE chunk_id IN (SELECT id FROM chunks WHERE document_id = ?)",
554 )
555 .bind(document_id)
556 .execute(&mut *tx)
557 .await?;
558 sqlx::query(
559 "DELETE FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE document_id = ?)",
560 )
561 .bind(document_id)
562 .execute(&mut *tx)
563 .await?;
564
565 sqlx::query("DELETE FROM chunks_fts WHERE document_id = ?")
567 .bind(document_id)
568 .execute(&mut *tx)
569 .await?;
570
571 sqlx::query("DELETE FROM chunks WHERE document_id = ?")
573 .bind(document_id)
574 .execute(&mut *tx)
575 .await?;
576
577 for chunk in chunks {
579 sqlx::query(
580 "INSERT INTO chunks (id, document_id, chunk_index, text, hash) VALUES (?, ?, ?, ?, ?)",
581 )
582 .bind(&chunk.id)
583 .bind(&chunk.document_id)
584 .bind(chunk.chunk_index)
585 .bind(&chunk.text)
586 .bind(&chunk.hash)
587 .execute(&mut *tx)
588 .await?;
589
590 sqlx::query("INSERT INTO chunks_fts (chunk_id, document_id, text) VALUES (?, ?, ?)")
591 .bind(&chunk.id)
592 .bind(&chunk.document_id)
593 .bind(&chunk.text)
594 .execute(&mut *tx)
595 .await?;
596 }
597
598 tx.commit().await?;
599 Ok(())
600}
601
602async fn get_checkpoint(pool: &SqlitePool, source: &str) -> Result<Option<i64>> {
607 let result: Option<String> =
608 sqlx::query_scalar("SELECT cursor FROM checkpoints WHERE source = ?")
609 .bind(source)
610 .fetch_optional(pool)
611 .await?;
612
613 Ok(result.and_then(|s| s.parse::<i64>().ok()))
614}
615
616async fn set_checkpoint(pool: &SqlitePool, source: &str, cursor_val: i64) -> Result<()> {
621 let now = chrono::Utc::now().timestamp();
622 sqlx::query(
623 r#"
624 INSERT INTO checkpoints (source, cursor, updated_at) VALUES (?, ?, ?)
625 ON CONFLICT(source) DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at
626 "#,
627 )
628 .bind(source)
629 .bind(cursor_val.to_string())
630 .bind(now)
631 .execute(pool)
632 .await?;
633
634 Ok(())
635}