context_harness/
progress.rs1use std::io::Write;
10
11#[allow(dead_code)]
13#[derive(Clone, Debug, Eq, PartialEq)]
14pub enum SyncPhase {
15 Discovering,
17 Ingesting,
19}
20
21#[derive(Clone, Debug)]
23pub enum SyncProgressEvent {
24 Discovering { connector: String },
26 Ingesting {
28 connector: String,
29 n: u64,
30 total: u64,
31 },
32}
33
34pub trait SyncProgressReporter: Send + Sync {
36 fn report(&self, event: SyncProgressEvent);
38}
39
40pub struct StderrProgress;
42
43impl SyncProgressReporter for StderrProgress {
44 fn report(&self, event: SyncProgressEvent) {
45 let line = match &event {
46 SyncProgressEvent::Discovering { connector } => {
47 format!("sync {} discovering...\n", connector)
48 }
49 SyncProgressEvent::Ingesting {
50 connector,
51 n,
52 total,
53 } => {
54 let n_fmt = format_number(*n);
55 let total_fmt = format_number(*total);
56 format!(
57 "sync {} ingesting {} / {} items\n",
58 connector, n_fmt, total_fmt
59 )
60 }
61 };
62 let _ = std::io::stderr().lock().write_all(line.as_bytes());
63 let _ = std::io::stderr().lock().flush();
64 }
65}
66
67pub struct JsonProgress;
69
70impl SyncProgressReporter for JsonProgress {
71 fn report(&self, event: SyncProgressEvent) {
72 let obj = match &event {
73 SyncProgressEvent::Discovering { connector } => serde_json::json!({
74 "event": "progress",
75 "connector": connector,
76 "phase": "discovering"
77 }),
78 SyncProgressEvent::Ingesting {
79 connector,
80 n,
81 total,
82 } => serde_json::json!({
83 "event": "progress",
84 "connector": connector,
85 "phase": "ingesting",
86 "n": n,
87 "total": total
88 }),
89 };
90 if let Ok(line) = serde_json::to_string(&obj) {
91 let _ = writeln!(std::io::stderr().lock(), "{}", line);
92 let _ = std::io::stderr().lock().flush();
93 }
94 }
95}
96
97pub struct NoProgress;
99
100impl SyncProgressReporter for NoProgress {
101 fn report(&self, _event: SyncProgressEvent) {}
102}
103
104fn format_number(n: u64) -> String {
105 let s = n.to_string();
106 let mut result = String::with_capacity(s.len() + (s.len() - 1) / 3);
107 let chars: Vec<char> = s.chars().rev().collect();
108 for (i, c) in chars.iter().enumerate() {
109 if i > 0 && i % 3 == 0 {
110 result.push(',');
111 }
112 result.push(*c);
113 }
114 result.chars().rev().collect()
115}
116
117#[derive(Clone, Copy, Debug, Eq, PartialEq)]
119pub enum ProgressMode {
120 Off,
121 Human,
122 Json,
123}
124
125impl ProgressMode {
126 pub fn default_for_tty() -> Self {
128 if atty::is(atty::Stream::Stderr) {
129 ProgressMode::Human
130 } else {
131 ProgressMode::Off
132 }
133 }
134
135 pub fn reporter(&self) -> Box<dyn SyncProgressReporter> {
137 match self {
138 ProgressMode::Off => Box::new(NoProgress),
139 ProgressMode::Human => Box::new(StderrProgress),
140 ProgressMode::Json => Box::new(JsonProgress),
141 }
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn format_number_comma() {
151 assert_eq!(format_number(0), "0");
152 assert_eq!(format_number(1), "1");
153 assert_eq!(format_number(999), "999");
154 assert_eq!(format_number(1000), "1,000");
155 assert_eq!(format_number(1234), "1,234");
156 assert_eq!(format_number(1_234_567), "1,234,567");
157 }
158}