context_harness/
connector_script.rs

1//! Lua scripted connector runtime.
2//!
3//! Loads `.lua` connector scripts at runtime and executes them in a sandboxed
4//! Lua 5.4 VM. Each script implements `connector.scan(config) → items[]`,
5//! returning documents that flow into the standard ingestion pipeline.
6//!
7//! # Architecture
8//!
9//! The Lua VM runs on a blocking thread via [`tokio::task::spawn_blocking`]
10//! to avoid blocking the async runtime. HTTP calls use `reqwest::blocking`,
11//! and `sleep()` uses `std::thread::sleep`.
12//!
13//! # Host APIs
14//!
15//! Scripts have access to sandboxed host APIs provided by [`crate::lua_runtime`]:
16//!
17//! | Module | Functions |
18//! |--------|-----------|
19//! | `http` | `get`, `post`, `put` |
20//! | `json` | `parse`, `encode` |
21//! | `env` | `get` |
22//! | `log` | `info`, `warn`, `error`, `debug` |
23//! | `fs` | `read`, `list` (sandboxed to script directory) |
24//! | `base64` | `encode`, `decode` |
25//! | `crypto` | `sha256`, `hmac_sha256` |
26//! | `sleep` | `sleep(seconds)` |
27//!
28//! # Configuration
29//!
30//! ```toml
31//! [connectors.script.jira]
32//! path = "connectors/jira.lua"
33//! timeout = 600
34//! url = "https://mycompany.atlassian.net"
35//! api_token = "${JIRA_API_TOKEN}"
36//! ```
37//!
38//! See `docs/LUA_CONNECTORS.md` for the full specification.
39
40use anyhow::{bail, Context, Result};
41use async_trait::async_trait;
42use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
43use mlua::prelude::*;
44use std::path::Path;
45use std::time::{Duration, Instant};
46
47use crate::config::{Config, ScriptConnectorConfig};
48use crate::lua_runtime::{register_all_host_apis, toml_table_to_lua};
49use crate::models::SourceItem;
50use crate::traits::Connector;
51
52// ═══════════════════════════════════════════════════════════════════════
53// Connector trait implementation
54// ═══════════════════════════════════════════════════════════════════════
55
56/// A Lua scripted connector instance that implements the [`Connector`] trait.
57///
58/// Wraps the [`scan_script`] function, allowing Lua connectors to be used
59/// through the unified trait-based dispatch alongside built-in connectors.
60pub struct ScriptConnector {
61    /// Instance name (e.g. `"jira"`).
62    name: String,
63    /// Configuration for this script connector instance.
64    config: ScriptConnectorConfig,
65}
66
67impl ScriptConnector {
68    /// Create a new script connector instance.
69    pub fn new(name: String, config: ScriptConnectorConfig) -> Self {
70        Self { name, config }
71    }
72}
73
74#[async_trait]
75impl Connector for ScriptConnector {
76    fn name(&self) -> &str {
77        &self.name
78    }
79
80    fn description(&self) -> &str {
81        "Execute Lua scripts to ingest custom data sources"
82    }
83
84    fn connector_type(&self) -> &str {
85        "script"
86    }
87
88    async fn scan(&self) -> Result<Vec<SourceItem>> {
89        scan_script(&self.name, &self.config).await
90    }
91}
92
93// ═══════════════════════════════════════════════════════════════════════
94// Public API
95// ═══════════════════════════════════════════════════════════════════════
96
97/// Scan a Lua script connector and return the ingested items.
98///
99/// Spawns the Lua VM on a blocking thread to avoid blocking the async
100/// runtime. The script's `connector.scan(config)` is called with the
101/// TOML config section (minus `path` and `timeout`) converted to a Lua table.
102pub async fn scan_script(
103    name: &str,
104    script_config: &ScriptConnectorConfig,
105) -> Result<Vec<SourceItem>> {
106    let path = script_config.path.clone();
107    let extra = script_config.extra.clone();
108    let name = name.to_string();
109    let timeout = script_config.timeout;
110
111    tokio::task::spawn_blocking(move || run_lua_scan(&path, &extra, &name, timeout))
112        .await
113        .context("Lua connector task panicked")?
114}
115
116/// Scaffold a new connector script from a template.
117///
118/// Creates `connectors/<name>.lua` with a commented template showing
119/// the connector interface and available host APIs.
120pub fn scaffold_connector(name: &str) -> Result<()> {
121    let dir = Path::new("connectors");
122    std::fs::create_dir_all(dir)?;
123
124    let filename = format!("{}.lua", name);
125    let path = dir.join(&filename);
126
127    if path.exists() {
128        bail!("Connector script already exists: {}", path.display());
129    }
130
131    let template = format!(
132        r#"--[[
133  Context Harness Connector: {name}
134
135  Configuration (add to ctx.toml):
136
137    [connectors.script.{name}]
138    path = "connectors/{name}.lua"
139    # url = "https://api.example.com"
140    # api_token = "${{{name_upper}_API_TOKEN}}"
141
142  Sync:
143    ctx sync script:{name}
144
145  Test:
146    ctx connector test connectors/{name}.lua
147]]
148
149connector = {{
150    name = "{name}",
151    version = "0.1.0",
152    description = "TODO: describe what this connector ingests",
153}}
154
155--- Scan the data source and return a list of items to ingest.
156--- @param config table Configuration from ctx.toml
157--- @return table Array of source item tables
158function connector.scan(config)
159    local items = {{}}
160
161    -- Example: fetch from an API
162    --
163    -- local resp = http.get(config.url .. "/api/items", {{
164    --     headers = {{ ["Authorization"] = "Bearer " .. config.api_token }},
165    -- }})
166    -- if not resp.ok then
167    --     log.error("API error: " .. resp.status)
168    --     return items
169    -- end
170    -- for _, item in ipairs(resp.json) do
171    --     table.insert(items, {{
172    --         source_id = tostring(item.id),
173    --         title = item.title,
174    --         body = item.content or "",
175    --         source_url = config.url .. "/items/" .. item.id,
176    --         updated_at = item.updated_at,
177    --     }})
178    -- end
179
180    log.info("Fetched " .. #items .. " items")
181    return items
182end
183"#,
184        name = name,
185        name_upper = name.to_uppercase().replace('-', "_"),
186    );
187
188    std::fs::write(&path, template)?;
189    println!("Created connector: {}", path.display());
190    println!();
191    println!("Add to your ctx.toml:");
192    println!();
193    println!("  [connectors.script.{}]", name);
194    println!("  path = \"connectors/{}.lua\"", name);
195    println!();
196    println!("Then sync:");
197    println!();
198    println!("  ctx sync script:{}", name);
199
200    Ok(())
201}
202
203/// Test a connector script without writing to the database.
204///
205/// Loads and executes the script, prints the returned items, and reports
206/// any errors. Useful for development and debugging.
207pub async fn test_script(path: &Path, config: &Config, source: Option<&str>) -> Result<()> {
208    let script_path = path.to_path_buf();
209
210    let extra = if let Some(name) = source {
211        config
212            .connectors
213            .script
214            .get(name)
215            .map(|sc| sc.extra.clone())
216            .unwrap_or_default()
217    } else {
218        toml::Table::new()
219    };
220
221    let name = source.unwrap_or("test").to_string();
222
223    println!("Testing connector: {} ({})", name, script_path.display());
224
225    let items = {
226        let p = script_path.clone();
227        let e = extra;
228        let n = name.clone();
229        tokio::task::spawn_blocking(move || run_lua_scan(&p, &e, &n, 300))
230            .await
231            .context("Lua connector task panicked")??
232    };
233
234    println!("  ✓ Script loaded and executed");
235    println!("  ✓ Returned {} items", items.len());
236
237    let valid = items
238        .iter()
239        .filter(|i| !i.body.is_empty() && !i.source_id.is_empty())
240        .count();
241    println!("  ✓ {} valid items", valid);
242
243    if items.is_empty() {
244        println!("  (no items returned)");
245        return Ok(());
246    }
247
248    println!();
249    let show = items.len().min(5);
250    println!("Items (first {}):", show);
251    for (i, item) in items.iter().take(show).enumerate() {
252        println!(
253            "  [{}] {}: {} ({})",
254            i,
255            item.source_id,
256            item.title.as_deref().unwrap_or("untitled"),
257            item.updated_at.format("%Y-%m-%d")
258        );
259    }
260    if items.len() > show {
261        println!("  ... and {} more", items.len() - show);
262    }
263
264    Ok(())
265}
266
267// ═══════════════════════════════════════════════════════════════════════
268// Lua VM Execution (blocking)
269// ═══════════════════════════════════════════════════════════════════════
270
271/// Execute a Lua connector script and return the resulting source items.
272///
273/// This function runs synchronously on a blocking thread. It:
274/// 1. Creates a sandboxed Lua VM via [`crate::lua_runtime`]
275/// 2. Loads and executes the script
276/// 3. Calls `connector.scan(config)`
277/// 4. Converts the returned Lua table to `Vec<SourceItem>`
278fn run_lua_scan(
279    script_path: &Path,
280    extra: &toml::Table,
281    name: &str,
282    timeout_secs: u64,
283) -> Result<Vec<SourceItem>> {
284    let script_src = std::fs::read_to_string(script_path)
285        .with_context(|| format!("Failed to read connector script: {}", script_path.display()))?;
286
287    let script_dir = script_path.parent().unwrap_or(Path::new(".")).to_path_buf();
288
289    let lua = Lua::new();
290
291    // Set up timeout via instruction hook
292    let deadline = Instant::now() + Duration::from_secs(timeout_secs);
293    lua.set_hook(
294        mlua::HookTriggers::new().every_nth_instruction(10_000),
295        move |_lua, _debug| {
296            if Instant::now() > deadline {
297                Err(mlua::Error::RuntimeError(format!(
298                    "script timed out after {} seconds",
299                    timeout_secs
300                )))
301            } else {
302                Ok(mlua::VmState::Continue)
303            }
304        },
305    );
306
307    // Register all shared host APIs
308    let log_name = format!("script:{}", name);
309    register_all_host_apis(&lua, &log_name, &script_dir)?;
310
311    // Load and execute the script
312    lua.load(&script_src)
313        .set_name(script_path.to_string_lossy())
314        .exec()
315        .map_err(|e| {
316            anyhow::anyhow!(
317                "Failed to execute connector script {}: {}",
318                script_path.display(),
319                e
320            )
321        })?;
322
323    // Build the config table (with env var expansion)
324    let config_table = toml_table_to_lua(&lua, extra)?;
325
326    // Call connector.scan(config)
327    let connector: LuaTable = lua
328        .globals()
329        .get::<LuaTable>("connector")
330        .map_err(|e| anyhow::anyhow!("Script must define a global 'connector' table: {}", e))?;
331
332    let scan: LuaFunction = connector
333        .get::<LuaFunction>("scan")
334        .map_err(|e| anyhow::anyhow!("connector.scan function not defined: {}", e))?;
335
336    let result: LuaTable = scan.call::<LuaTable>(config_table).map_err(|e| {
337        anyhow::anyhow!(
338            "connector.scan() failed in '{}': {}",
339            script_path.display(),
340            e
341        )
342    })?;
343
344    // Convert Lua result to Vec<SourceItem>
345    lua_table_to_source_items(result, name)
346}
347
348// ═══════════════════════════════════════════════════════════════════════
349// Value Conversions: Lua → SourceItem
350// ═══════════════════════════════════════════════════════════════════════
351
352/// Convert a Lua array table into a Vec of SourceItems.
353///
354/// Invalid items (missing required fields, empty body) are logged as
355/// warnings and skipped — they do not cause the sync to fail.
356fn lua_table_to_source_items(table: LuaTable, connector_name: &str) -> Result<Vec<SourceItem>> {
357    let mut items = Vec::new();
358    let default_source = format!("script:{}", connector_name);
359
360    for pair in table.pairs::<i64, LuaTable>() {
361        let (idx, item_table) =
362            pair.map_err(|e| anyhow::anyhow!("Invalid item in scan result: {}", e))?;
363
364        // Required: source_id
365        let source_id: String = match item_table.get::<String>("source_id") {
366            Ok(v) => v,
367            Err(_) => {
368                eprintln!(
369                    "[script:{}] WARN: Skipping item at index {}: missing 'source_id'",
370                    connector_name, idx
371                );
372                continue;
373            }
374        };
375
376        // Required: body
377        let body: String = match item_table.get::<String>("body") {
378            Ok(v) => v,
379            Err(_) => {
380                eprintln!(
381                    "[script:{}] WARN: Skipping item '{}': missing 'body'",
382                    connector_name, source_id
383                );
384                continue;
385            }
386        };
387
388        if body.is_empty() {
389            eprintln!(
390                "[script:{}] WARN: Skipping item '{}': empty body",
391                connector_name, source_id
392            );
393            continue;
394        }
395
396        // Optional fields
397        let source: String = item_table
398            .get::<String>("source")
399            .unwrap_or_else(|_| default_source.clone());
400        let title: Option<String> = item_table.get::<String>("title").ok();
401        let author: Option<String> = item_table.get::<String>("author").ok();
402        let source_url: Option<String> = item_table.get::<String>("source_url").ok();
403        let content_type: String = item_table
404            .get::<String>("content_type")
405            .unwrap_or_else(|_| "text/plain".to_string());
406        let metadata_json: String = item_table
407            .get::<String>("metadata_json")
408            .unwrap_or_else(|_| "{}".to_string());
409
410        // Timestamps
411        let now = Utc::now();
412        let updated_at = parse_lua_timestamp(&item_table, "updated_at").unwrap_or(now);
413        let created_at = parse_lua_timestamp(&item_table, "created_at").unwrap_or(updated_at);
414
415        items.push(SourceItem {
416            source,
417            source_id,
418            source_url,
419            title,
420            author,
421            created_at,
422            updated_at,
423            content_type,
424            body,
425            metadata_json,
426            raw_json: None,
427            raw_bytes: None,
428        });
429    }
430
431    Ok(items)
432}
433
434/// Parse a timestamp from a Lua table field.
435///
436/// Supports ISO 8601 strings (with or without timezone) and Unix timestamps
437/// (integer or float). Returns `None` if the field is missing or unparseable.
438fn parse_lua_timestamp(table: &LuaTable, field: &str) -> Option<DateTime<Utc>> {
439    // Try as string (ISO 8601 / RFC 3339)
440    if let Ok(s) = table.get::<String>(field) {
441        if let Ok(dt) = DateTime::parse_from_rfc3339(&s) {
442            return Some(dt.with_timezone(&Utc));
443        }
444        if let Ok(dt) = NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S") {
445            return Some(dt.and_utc());
446        }
447        if let Ok(dt) = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S") {
448            return Some(dt.and_utc());
449        }
450        if let Ok(dt) = NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%SZ") {
451            return Some(dt.and_utc());
452        }
453    }
454
455    // Try as number (Unix timestamp)
456    if let Ok(ts) = table.get::<i64>(field) {
457        return Utc.timestamp_opt(ts, 0).single();
458    }
459    if let Ok(ts) = table.get::<f64>(field) {
460        return Utc.timestamp_opt(ts as i64, 0).single();
461    }
462
463    None
464}