context_harness/
progress.rs

1//! Sync and embed progress reporting.
2//!
3//! Reports observable progress during `ctx sync` (and optionally `ctx embed pending`)
4//! so users see what is being scanned, how much is left, and when search is up to date.
5//! Progress is emitted on **stderr** so stdout remains parseable for scripts.
6//!
7//! See [SYNC_PROGRESS.md](../docs/SYNC_PROGRESS.md) for the design.
8
9use std::io::Write;
10
11/// Phase of the sync pipeline (used in JSON output and for future extensions).
12#[allow(dead_code)]
13#[derive(Clone, Debug, Eq, PartialEq)]
14pub enum SyncPhase {
15    /// Connector is scanning (e.g. walking filesystem, listing Git). Total unknown.
16    Discovering,
17    /// Items are being upserted, chunked, and optionally embedded. Counts known.
18    Ingesting,
19}
20
21/// A single progress event for sync.
22#[derive(Clone, Debug)]
23pub enum SyncProgressEvent {
24    /// Currently in discovery phase for this connector (no total yet).
25    Discovering { connector: String },
26    /// Ingest phase: n items processed out of total.
27    Ingesting {
28        connector: String,
29        n: u64,
30        total: u64,
31    },
32}
33
34/// Reports sync progress. Implementations write to stderr (human or JSON).
35pub trait SyncProgressReporter: Send + Sync {
36    /// Emit a progress event. Called from the ingest pipeline.
37    fn report(&self, event: SyncProgressEvent);
38}
39
40/// Human-friendly progress on stderr: "sync filesystem:docs  ingesting  1,234 / 5,000 items".
41pub struct StderrProgress;
42
43impl SyncProgressReporter for StderrProgress {
44    fn report(&self, event: SyncProgressEvent) {
45        let line = match &event {
46            SyncProgressEvent::Discovering { connector } => {
47                format!("sync {}  discovering...\n", connector)
48            }
49            SyncProgressEvent::Ingesting {
50                connector,
51                n,
52                total,
53            } => {
54                let n_fmt = format_number(*n);
55                let total_fmt = format_number(*total);
56                format!(
57                    "sync {}  ingesting  {} / {} items\n",
58                    connector, n_fmt, total_fmt
59                )
60            }
61        };
62        let _ = std::io::stderr().lock().write_all(line.as_bytes());
63        let _ = std::io::stderr().lock().flush();
64    }
65}
66
67/// Machine-readable progress: one JSON object per line on stderr.
68pub struct JsonProgress;
69
70impl SyncProgressReporter for JsonProgress {
71    fn report(&self, event: SyncProgressEvent) {
72        let obj = match &event {
73            SyncProgressEvent::Discovering { connector } => serde_json::json!({
74                "event": "progress",
75                "connector": connector,
76                "phase": "discovering"
77            }),
78            SyncProgressEvent::Ingesting {
79                connector,
80                n,
81                total,
82            } => serde_json::json!({
83                "event": "progress",
84                "connector": connector,
85                "phase": "ingesting",
86                "n": n,
87                "total": total
88            }),
89        };
90        if let Ok(line) = serde_json::to_string(&obj) {
91            let _ = writeln!(std::io::stderr().lock(), "{}", line);
92            let _ = std::io::stderr().lock().flush();
93        }
94    }
95}
96
97/// No-op reporter when progress is disabled.
98pub struct NoProgress;
99
100impl SyncProgressReporter for NoProgress {
101    fn report(&self, _event: SyncProgressEvent) {}
102}
103
104fn format_number(n: u64) -> String {
105    let s = n.to_string();
106    let mut result = String::with_capacity(s.len() + (s.len() - 1) / 3);
107    let chars: Vec<char> = s.chars().rev().collect();
108    for (i, c) in chars.iter().enumerate() {
109        if i > 0 && i % 3 == 0 {
110            result.push(',');
111        }
112        result.push(*c);
113    }
114    result.chars().rev().collect()
115}
116
117/// Progress mode for the CLI: off, human (stderr), or JSON (stderr).
118#[derive(Clone, Copy, Debug, Eq, PartialEq)]
119pub enum ProgressMode {
120    Off,
121    Human,
122    Json,
123}
124
125impl ProgressMode {
126    /// Default: human progress when stderr is a TTY, otherwise off.
127    pub fn default_for_tty() -> Self {
128        if atty::is(atty::Stream::Stderr) {
129            ProgressMode::Human
130        } else {
131            ProgressMode::Off
132        }
133    }
134
135    /// Build a reporter for this mode. Caller can pass it to ingest.
136    pub fn reporter(&self) -> Box<dyn SyncProgressReporter> {
137        match self {
138            ProgressMode::Off => Box::new(NoProgress),
139            ProgressMode::Human => Box::new(StderrProgress),
140            ProgressMode::Json => Box::new(JsonProgress),
141        }
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn format_number_comma() {
151        assert_eq!(format_number(0), "0");
152        assert_eq!(format_number(1), "1");
153        assert_eq!(format_number(999), "999");
154        assert_eq!(format_number(1000), "1,000");
155        assert_eq!(format_number(1234), "1,234");
156        assert_eq!(format_number(1_234_567), "1,234,567");
157    }
158}