1use anyhow::{bail, Context, Result};
41use async_trait::async_trait;
42use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
43use mlua::prelude::*;
44use std::path::Path;
45use std::time::{Duration, Instant};
46
47use crate::config::{Config, ScriptConnectorConfig};
48use crate::lua_runtime::{register_all_host_apis, toml_table_to_lua};
49use crate::models::SourceItem;
50use crate::traits::Connector;
51
52pub struct ScriptConnector {
61 name: String,
63 config: ScriptConnectorConfig,
65}
66
67impl ScriptConnector {
68 pub fn new(name: String, config: ScriptConnectorConfig) -> Self {
70 Self { name, config }
71 }
72}
73
74#[async_trait]
75impl Connector for ScriptConnector {
76 fn name(&self) -> &str {
77 &self.name
78 }
79
80 fn description(&self) -> &str {
81 "Execute Lua scripts to ingest custom data sources"
82 }
83
84 fn connector_type(&self) -> &str {
85 "script"
86 }
87
88 async fn scan(&self) -> Result<Vec<SourceItem>> {
89 scan_script(&self.name, &self.config).await
90 }
91}
92
93pub async fn scan_script(
103 name: &str,
104 script_config: &ScriptConnectorConfig,
105) -> Result<Vec<SourceItem>> {
106 let path = script_config.path.clone();
107 let extra = script_config.extra.clone();
108 let name = name.to_string();
109 let timeout = script_config.timeout;
110
111 tokio::task::spawn_blocking(move || run_lua_scan(&path, &extra, &name, timeout))
112 .await
113 .context("Lua connector task panicked")?
114}
115
116pub fn scaffold_connector(name: &str) -> Result<()> {
121 let dir = Path::new("connectors");
122 std::fs::create_dir_all(dir)?;
123
124 let filename = format!("{}.lua", name);
125 let path = dir.join(&filename);
126
127 if path.exists() {
128 bail!("Connector script already exists: {}", path.display());
129 }
130
131 let template = format!(
132 r#"--[[
133 Context Harness Connector: {name}
134
135 Configuration (add to ctx.toml):
136
137 [connectors.script.{name}]
138 path = "connectors/{name}.lua"
139 # url = "https://api.example.com"
140 # api_token = "${{{name_upper}_API_TOKEN}}"
141
142 Sync:
143 ctx sync script:{name}
144
145 Test:
146 ctx connector test connectors/{name}.lua
147]]
148
149connector = {{
150 name = "{name}",
151 version = "0.1.0",
152 description = "TODO: describe what this connector ingests",
153}}
154
155--- Scan the data source and return a list of items to ingest.
156--- @param config table Configuration from ctx.toml
157--- @return table Array of source item tables
158function connector.scan(config)
159 local items = {{}}
160
161 -- Example: fetch from an API
162 --
163 -- local resp = http.get(config.url .. "/api/items", {{
164 -- headers = {{ ["Authorization"] = "Bearer " .. config.api_token }},
165 -- }})
166 -- if not resp.ok then
167 -- log.error("API error: " .. resp.status)
168 -- return items
169 -- end
170 -- for _, item in ipairs(resp.json) do
171 -- table.insert(items, {{
172 -- source_id = tostring(item.id),
173 -- title = item.title,
174 -- body = item.content or "",
175 -- source_url = config.url .. "/items/" .. item.id,
176 -- updated_at = item.updated_at,
177 -- }})
178 -- end
179
180 log.info("Fetched " .. #items .. " items")
181 return items
182end
183"#,
184 name = name,
185 name_upper = name.to_uppercase().replace('-', "_"),
186 );
187
188 std::fs::write(&path, template)?;
189 println!("Created connector: {}", path.display());
190 println!();
191 println!("Add to your ctx.toml:");
192 println!();
193 println!(" [connectors.script.{}]", name);
194 println!(" path = \"connectors/{}.lua\"", name);
195 println!();
196 println!("Then sync:");
197 println!();
198 println!(" ctx sync script:{}", name);
199
200 Ok(())
201}
202
203pub async fn test_script(path: &Path, config: &Config, source: Option<&str>) -> Result<()> {
208 let script_path = path.to_path_buf();
209
210 let extra = if let Some(name) = source {
211 config
212 .connectors
213 .script
214 .get(name)
215 .map(|sc| sc.extra.clone())
216 .unwrap_or_default()
217 } else {
218 toml::Table::new()
219 };
220
221 let name = source.unwrap_or("test").to_string();
222
223 println!("Testing connector: {} ({})", name, script_path.display());
224
225 let items = {
226 let p = script_path.clone();
227 let e = extra;
228 let n = name.clone();
229 tokio::task::spawn_blocking(move || run_lua_scan(&p, &e, &n, 300))
230 .await
231 .context("Lua connector task panicked")??
232 };
233
234 println!(" ✓ Script loaded and executed");
235 println!(" ✓ Returned {} items", items.len());
236
237 let valid = items
238 .iter()
239 .filter(|i| !i.body.is_empty() && !i.source_id.is_empty())
240 .count();
241 println!(" ✓ {} valid items", valid);
242
243 if items.is_empty() {
244 println!(" (no items returned)");
245 return Ok(());
246 }
247
248 println!();
249 let show = items.len().min(5);
250 println!("Items (first {}):", show);
251 for (i, item) in items.iter().take(show).enumerate() {
252 println!(
253 " [{}] {}: {} ({})",
254 i,
255 item.source_id,
256 item.title.as_deref().unwrap_or("untitled"),
257 item.updated_at.format("%Y-%m-%d")
258 );
259 }
260 if items.len() > show {
261 println!(" ... and {} more", items.len() - show);
262 }
263
264 Ok(())
265}
266
267fn run_lua_scan(
279 script_path: &Path,
280 extra: &toml::Table,
281 name: &str,
282 timeout_secs: u64,
283) -> Result<Vec<SourceItem>> {
284 let script_src = std::fs::read_to_string(script_path)
285 .with_context(|| format!("Failed to read connector script: {}", script_path.display()))?;
286
287 let script_dir = script_path.parent().unwrap_or(Path::new(".")).to_path_buf();
288
289 let lua = Lua::new();
290
291 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
293 lua.set_hook(
294 mlua::HookTriggers::new().every_nth_instruction(10_000),
295 move |_lua, _debug| {
296 if Instant::now() > deadline {
297 Err(mlua::Error::RuntimeError(format!(
298 "script timed out after {} seconds",
299 timeout_secs
300 )))
301 } else {
302 Ok(mlua::VmState::Continue)
303 }
304 },
305 );
306
307 let log_name = format!("script:{}", name);
309 register_all_host_apis(&lua, &log_name, &script_dir)?;
310
311 lua.load(&script_src)
313 .set_name(script_path.to_string_lossy())
314 .exec()
315 .map_err(|e| {
316 anyhow::anyhow!(
317 "Failed to execute connector script {}: {}",
318 script_path.display(),
319 e
320 )
321 })?;
322
323 let config_table = toml_table_to_lua(&lua, extra)?;
325
326 let connector: LuaTable = lua
328 .globals()
329 .get::<LuaTable>("connector")
330 .map_err(|e| anyhow::anyhow!("Script must define a global 'connector' table: {}", e))?;
331
332 let scan: LuaFunction = connector
333 .get::<LuaFunction>("scan")
334 .map_err(|e| anyhow::anyhow!("connector.scan function not defined: {}", e))?;
335
336 let result: LuaTable = scan.call::<LuaTable>(config_table).map_err(|e| {
337 anyhow::anyhow!(
338 "connector.scan() failed in '{}': {}",
339 script_path.display(),
340 e
341 )
342 })?;
343
344 lua_table_to_source_items(result, name)
346}
347
348fn lua_table_to_source_items(table: LuaTable, connector_name: &str) -> Result<Vec<SourceItem>> {
357 let mut items = Vec::new();
358 let default_source = format!("script:{}", connector_name);
359
360 for pair in table.pairs::<i64, LuaTable>() {
361 let (idx, item_table) =
362 pair.map_err(|e| anyhow::anyhow!("Invalid item in scan result: {}", e))?;
363
364 let source_id: String = match item_table.get::<String>("source_id") {
366 Ok(v) => v,
367 Err(_) => {
368 eprintln!(
369 "[script:{}] WARN: Skipping item at index {}: missing 'source_id'",
370 connector_name, idx
371 );
372 continue;
373 }
374 };
375
376 let body: String = match item_table.get::<String>("body") {
378 Ok(v) => v,
379 Err(_) => {
380 eprintln!(
381 "[script:{}] WARN: Skipping item '{}': missing 'body'",
382 connector_name, source_id
383 );
384 continue;
385 }
386 };
387
388 if body.is_empty() {
389 eprintln!(
390 "[script:{}] WARN: Skipping item '{}': empty body",
391 connector_name, source_id
392 );
393 continue;
394 }
395
396 let source: String = item_table
398 .get::<String>("source")
399 .unwrap_or_else(|_| default_source.clone());
400 let title: Option<String> = item_table.get::<String>("title").ok();
401 let author: Option<String> = item_table.get::<String>("author").ok();
402 let source_url: Option<String> = item_table.get::<String>("source_url").ok();
403 let content_type: String = item_table
404 .get::<String>("content_type")
405 .unwrap_or_else(|_| "text/plain".to_string());
406 let metadata_json: String = item_table
407 .get::<String>("metadata_json")
408 .unwrap_or_else(|_| "{}".to_string());
409
410 let now = Utc::now();
412 let updated_at = parse_lua_timestamp(&item_table, "updated_at").unwrap_or(now);
413 let created_at = parse_lua_timestamp(&item_table, "created_at").unwrap_or(updated_at);
414
415 items.push(SourceItem {
416 source,
417 source_id,
418 source_url,
419 title,
420 author,
421 created_at,
422 updated_at,
423 content_type,
424 body,
425 metadata_json,
426 raw_json: None,
427 raw_bytes: None,
428 });
429 }
430
431 Ok(items)
432}
433
434fn parse_lua_timestamp(table: &LuaTable, field: &str) -> Option<DateTime<Utc>> {
439 if let Ok(s) = table.get::<String>(field) {
441 if let Ok(dt) = DateTime::parse_from_rfc3339(&s) {
442 return Some(dt.with_timezone(&Utc));
443 }
444 if let Ok(dt) = NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S") {
445 return Some(dt.and_utc());
446 }
447 if let Ok(dt) = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S") {
448 return Some(dt.and_utc());
449 }
450 if let Ok(dt) = NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%SZ") {
451 return Some(dt.and_utc());
452 }
453 }
454
455 if let Ok(ts) = table.get::<i64>(field) {
457 return Utc.timestamp_opt(ts, 0).single();
458 }
459 if let Ok(ts) = table.get::<f64>(field) {
460 return Utc.timestamp_opt(ts as i64, 0).single();
461 }
462
463 None
464}