context_harness/
connector_s3.rs

1//! Amazon S3 connector.
2//!
3//! Lists and downloads objects from an S3 bucket using the S3 REST API with
4//! AWS Signature V4 authentication. Implements pagination for large buckets,
5//! glob-based filtering on object keys, and supports custom endpoints for
6//! S3-compatible services (MinIO, LocalStack).
7//!
8//! Uses only pure-Rust dependencies (`hmac`, `sha2`) for AWS signing — no
9//! C library dependencies like `aws-lc-sys`, making it compatible with
10//! all build environments including Nix.
11//!
12//! # Configuration
13//!
14//! ```toml
15//! [connectors.s3.runbooks]
16//! bucket = "acme-docs"
17//! prefix = "engineering/runbooks/"
18//! region = "us-east-1"
19//! include_globs = ["**/*.md"]
20//! # endpoint_url = "http://localhost:9000"   # MinIO
21//! ```
22//!
23//! # Environment Variables
24//!
25//! Credentials are read from environment variables:
26//! - `AWS_ACCESS_KEY_ID` — required
27//! - `AWS_SECRET_ACCESS_KEY` — required
28//! - `AWS_SESSION_TOKEN` — optional (for temporary credentials / IAM roles)
29//!
30//! # Authentication
31//!
32//! All S3 requests are signed using
33//! [AWS Signature Version 4](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html).
34//! The signing implementation uses HMAC-SHA256 (`hmac` + `sha2` crates).
35//!
36//! # Pagination
37//!
38//! Large buckets (1000+ objects) are handled automatically via the
39//! `ListObjectsV2` continuation token mechanism.
40//!
41//! # Content Type Detection
42//!
43//! File extensions are mapped to MIME types:
44//!
45//! | Extension | MIME Type |
46//! |-----------|----------|
47//! | `.md` | `text/markdown` |
48//! | `.txt` | `text/plain` |
49//! | `.json` | `application/json` |
50//! | `.yaml`, `.yml` | `text/yaml` |
51//! | `.rst` | `text/x-rst` |
52//! | `.html`, `.htm` | `text/html` |
53//! | Other | `text/plain` |
54
55use anyhow::{bail, Context, Result};
56use async_trait::async_trait;
57use chrono::{TimeZone, Utc};
58use globset::{Glob, GlobSet, GlobSetBuilder};
59use hmac::{Hmac, Mac};
60use sha2::{Digest, Sha256};
61
62use crate::config::S3ConnectorConfig;
63use crate::models::SourceItem;
64use crate::traits::Connector;
65
66// ═══════════════════════════════════════════════════════════════════════
67// Connector trait implementation
68// ═══════════════════════════════════════════════════════════════════════
69
70/// An S3 connector instance that implements the [`Connector`] trait.
71///
72/// Wraps the [`scan_s3`] function, allowing S3 connectors to be used
73/// through the unified trait-based dispatch.
74pub struct S3Connector {
75    /// Instance name (e.g. `"runbooks"`).
76    name: String,
77    /// Configuration for this S3 connector instance.
78    config: S3ConnectorConfig,
79}
80
81impl S3Connector {
82    /// Create a new S3 connector instance.
83    pub fn new(name: String, config: S3ConnectorConfig) -> Self {
84        Self { name, config }
85    }
86}
87
88#[async_trait]
89impl Connector for S3Connector {
90    fn name(&self) -> &str {
91        &self.name
92    }
93
94    fn description(&self) -> &str {
95        "List and download objects from S3 buckets"
96    }
97
98    fn connector_type(&self) -> &str {
99        "s3"
100    }
101
102    async fn scan(&self) -> Result<Vec<SourceItem>> {
103        scan_s3(&self.name, &self.config).await
104    }
105}
106
107type HmacSha256 = Hmac<Sha256>;
108
109/// Scan an S3 bucket and produce [`SourceItem`]s.
110///
111/// Uses the S3 REST API directly with AWS SigV4 signing.
112///
113/// # Workflow
114///
115/// 1. Read AWS credentials from environment variables.
116/// 2. List all objects in the bucket (with pagination).
117/// 3. Apply include/exclude glob filters.
118/// 4. Download each matching object's content.
119/// 5. Return sorted `SourceItem`s with S3 metadata.
120///
121/// # Errors
122///
123/// # Arguments
124///
125/// - `name` — the instance name (e.g. `"runbooks"`). Used as part of the
126///   source identifier: `"s3:<name>"`.
127/// - `s3_config` — the S3 connector configuration for this instance.
128///
129/// # Errors
130///
131/// Returns an error if:
132/// - AWS credentials are not set in environment
133/// - S3 API requests fail (network or auth errors)
134/// - Object listing or download fails
135pub async fn scan_s3(name: &str, s3_config: &S3ConnectorConfig) -> Result<Vec<SourceItem>> {
136    let creds = AwsCredentials::from_env()?;
137
138    // Build glob sets
139    let include_set = build_globset(&s3_config.include_globs)?;
140
141    let mut default_excludes = vec!["**/.git/**".to_string(), "**/node_modules/**".to_string()];
142    default_excludes.extend(s3_config.exclude_globs.clone());
143    let exclude_set = build_globset(&default_excludes)?;
144
145    // List all objects
146    let objects = list_objects(s3_config, &creds).await?;
147
148    let mut items = Vec::new();
149    let client = reqwest::Client::new();
150
151    for obj in &objects {
152        // Compute relative key (strip prefix for glob matching)
153        let rel_key = if s3_config.prefix.is_empty() {
154            obj.key.clone()
155        } else {
156            let prefix = s3_config.prefix.trim_end_matches('/');
157            obj.key
158                .strip_prefix(prefix)
159                .map(|s| s.trim_start_matches('/').to_string())
160                .unwrap_or_else(|| obj.key.clone())
161        };
162
163        // Apply glob filters
164        if exclude_set.is_match(&rel_key) {
165            continue;
166        }
167        if !include_set.is_match(&rel_key) {
168            continue;
169        }
170
171        // Download the object
172        let body = match download_object(s3_config, &creds, &client, &obj.key).await {
173            Ok(b) => b,
174            Err(e) => {
175                eprintln!(
176                    "Warning: failed to download s3://{}/{}: {}",
177                    s3_config.bucket, obj.key, e
178                );
179                continue;
180            }
181        };
182
183        let title = obj.key.rsplit('/').next().unwrap_or(&obj.key).to_string();
184        let source_url = format!("s3://{}/{}", s3_config.bucket, obj.key);
185
186        let metadata = serde_json::json!({
187            "bucket": s3_config.bucket,
188            "etag": obj.etag,
189            "size": obj.size,
190        });
191
192        items.push(SourceItem {
193            source: format!("s3:{}", name),
194            source_id: obj.key.clone(),
195            source_url: Some(source_url),
196            title: Some(title),
197            author: None,
198            created_at: Utc.timestamp_opt(obj.last_modified, 0).unwrap(),
199            updated_at: Utc.timestamp_opt(obj.last_modified, 0).unwrap(),
200            content_type: detect_content_type(&obj.key),
201            body,
202            metadata_json: metadata.to_string(),
203            raw_json: None,
204            raw_bytes: None,
205        });
206    }
207
208    items.sort_by(|a, b| a.source_id.cmp(&b.source_id));
209    Ok(items)
210}
211
212// ============ AWS Credentials ============
213
214/// AWS credentials loaded from environment variables.
215struct AwsCredentials {
216    access_key_id: String,
217    secret_access_key: String,
218    session_token: Option<String>,
219}
220
221impl AwsCredentials {
222    /// Load credentials from `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`,
223    /// and optionally `AWS_SESSION_TOKEN`.
224    fn from_env() -> Result<Self> {
225        let access_key_id = std::env::var("AWS_ACCESS_KEY_ID")
226            .context("AWS_ACCESS_KEY_ID environment variable not set")?;
227        let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY")
228            .context("AWS_SECRET_ACCESS_KEY environment variable not set")?;
229        let session_token = std::env::var("AWS_SESSION_TOKEN").ok();
230
231        Ok(Self {
232            access_key_id,
233            secret_access_key,
234            session_token,
235        })
236    }
237}
238
239// ============ S3 Object Listing ============
240
241/// Metadata for a single S3 object, parsed from `ListObjectsV2` XML response.
242struct S3Object {
243    /// Full object key (path within bucket).
244    key: String,
245    /// Last modification timestamp (Unix epoch seconds).
246    last_modified: i64,
247    /// Entity tag (content hash), stripped of surrounding quotes.
248    etag: String,
249    /// Object size in bytes.
250    size: i64,
251}
252
253/// List all objects in the configured S3 bucket, handling pagination.
254///
255/// Uses `ListObjectsV2` with `max-keys=1000` per page. Automatically
256/// follows `NextContinuationToken` until all objects are retrieved.
257async fn list_objects(
258    s3_config: &S3ConnectorConfig,
259    creds: &AwsCredentials,
260) -> Result<Vec<S3Object>> {
261    let client = reqwest::Client::new();
262    let mut objects = Vec::new();
263    let mut continuation_token: Option<String> = None;
264
265    loop {
266        let mut query_params = vec![
267            ("list-type".to_string(), "2".to_string()),
268            ("max-keys".to_string(), "1000".to_string()),
269        ];
270
271        if !s3_config.prefix.is_empty() {
272            query_params.push(("prefix".to_string(), s3_config.prefix.clone()));
273        }
274
275        if let Some(ref token) = continuation_token {
276            query_params.push(("continuation-token".to_string(), token.clone()));
277        }
278
279        let host = s3_host(s3_config);
280        let url = format!("https://{}/", host);
281
282        let now = Utc::now();
283        let date_stamp = now.format("%Y%m%d").to_string();
284        let amz_date = now.format("%Y%m%dT%H%M%SZ").to_string();
285
286        // Build canonical query string (must be sorted)
287        let mut sorted_params = query_params.clone();
288        sorted_params.sort_by(|a, b| a.0.cmp(&b.0));
289        let canonical_querystring: String = sorted_params
290            .iter()
291            .map(|(k, v)| format!("{}={}", uri_encode(k), uri_encode(v)))
292            .collect::<Vec<_>>()
293            .join("&");
294
295        let payload_hash = hex_sha256(b"");
296
297        let mut headers = vec![
298            ("host".to_string(), host.clone()),
299            ("x-amz-content-sha256".to_string(), payload_hash.clone()),
300            ("x-amz-date".to_string(), amz_date.clone()),
301        ];
302        if let Some(ref token) = creds.session_token {
303            headers.push(("x-amz-security-token".to_string(), token.clone()));
304        }
305        headers.sort_by(|a, b| a.0.cmp(&b.0));
306
307        let signed_headers: String = headers
308            .iter()
309            .map(|(k, _)| k.as_str())
310            .collect::<Vec<_>>()
311            .join(";");
312
313        let canonical_headers: String = headers
314            .iter()
315            .map(|(k, v)| format!("{}:{}\n", k, v))
316            .collect();
317
318        let canonical_request = format!(
319            "GET\n/\n{}\n{}\n{}\n{}",
320            canonical_querystring, canonical_headers, signed_headers, payload_hash
321        );
322
323        let credential_scope = format!("{}/{}/s3/aws4_request", date_stamp, s3_config.region);
324        let string_to_sign = format!(
325            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
326            amz_date,
327            credential_scope,
328            hex_sha256(canonical_request.as_bytes())
329        );
330
331        let signing_key = derive_signing_key(
332            &creds.secret_access_key,
333            &date_stamp,
334            &s3_config.region,
335            "s3",
336        );
337        let signature = hex_hmac_sha256(&signing_key, string_to_sign.as_bytes());
338
339        let authorization = format!(
340            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
341            creds.access_key_id, credential_scope, signed_headers, signature
342        );
343
344        let full_url = format!("{}?{}", url, canonical_querystring);
345
346        let mut req_builder = client
347            .get(&full_url)
348            .header("Authorization", &authorization)
349            .header("x-amz-content-sha256", &payload_hash)
350            .header("x-amz-date", &amz_date);
351
352        if let Some(ref token) = creds.session_token {
353            req_builder = req_builder.header("x-amz-security-token", token);
354        }
355
356        let resp = req_builder.send().await.map_err(|e| {
357            anyhow::anyhow!(
358                "Failed to list S3 objects in s3://{}/{}: {}",
359                s3_config.bucket,
360                s3_config.prefix,
361                e
362            )
363        })?;
364
365        if !resp.status().is_success() {
366            let status = resp.status();
367            let body = resp.text().await.unwrap_or_default();
368            bail!(
369                "S3 ListObjectsV2 failed (HTTP {}): {}",
370                status,
371                body.chars().take(500).collect::<String>()
372            );
373        }
374
375        let xml_body = resp.text().await?;
376        let (batch, is_truncated, next_token) = parse_list_objects_response(&xml_body)?;
377        objects.extend(batch);
378
379        if is_truncated {
380            continuation_token = next_token;
381        } else {
382            break;
383        }
384    }
385
386    Ok(objects)
387}
388
389/// Download a single object's content from S3 using a signed GET request.
390async fn download_object(
391    s3_config: &S3ConnectorConfig,
392    creds: &AwsCredentials,
393    client: &reqwest::Client,
394    key: &str,
395) -> Result<String> {
396    let host = s3_host(s3_config);
397    let encoded_key = key.split('/').map(uri_encode).collect::<Vec<_>>().join("/");
398    let url = format!("https://{}/{}", host, encoded_key);
399
400    let now = Utc::now();
401    let date_stamp = now.format("%Y%m%d").to_string();
402    let amz_date = now.format("%Y%m%dT%H%M%SZ").to_string();
403
404    let payload_hash = hex_sha256(b"");
405
406    let mut headers = vec![
407        ("host".to_string(), host.clone()),
408        ("x-amz-content-sha256".to_string(), payload_hash.clone()),
409        ("x-amz-date".to_string(), amz_date.clone()),
410    ];
411    if let Some(ref token) = creds.session_token {
412        headers.push(("x-amz-security-token".to_string(), token.clone()));
413    }
414    headers.sort_by(|a, b| a.0.cmp(&b.0));
415
416    let signed_headers: String = headers
417        .iter()
418        .map(|(k, _)| k.as_str())
419        .collect::<Vec<_>>()
420        .join(";");
421
422    let canonical_headers: String = headers
423        .iter()
424        .map(|(k, v)| format!("{}:{}\n", k, v))
425        .collect();
426
427    let canonical_uri = format!("/{}", encoded_key);
428    let canonical_request = format!(
429        "GET\n{}\n\n{}\n{}\n{}",
430        canonical_uri, canonical_headers, signed_headers, payload_hash
431    );
432
433    let credential_scope = format!("{}/{}/s3/aws4_request", date_stamp, s3_config.region);
434    let string_to_sign = format!(
435        "AWS4-HMAC-SHA256\n{}\n{}\n{}",
436        amz_date,
437        credential_scope,
438        hex_sha256(canonical_request.as_bytes())
439    );
440
441    let signing_key = derive_signing_key(
442        &creds.secret_access_key,
443        &date_stamp,
444        &s3_config.region,
445        "s3",
446    );
447    let signature = hex_hmac_sha256(&signing_key, string_to_sign.as_bytes());
448
449    let authorization = format!(
450        "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
451        creds.access_key_id, credential_scope, signed_headers, signature
452    );
453
454    let mut req_builder = client
455        .get(&url)
456        .header("Authorization", &authorization)
457        .header("x-amz-content-sha256", &payload_hash)
458        .header("x-amz-date", &amz_date);
459
460    if let Some(ref token) = creds.session_token {
461        req_builder = req_builder.header("x-amz-security-token", token);
462    }
463
464    let resp = req_builder
465        .send()
466        .await
467        .map_err(|e| anyhow::anyhow!("Failed to get s3://{}/{}: {}", s3_config.bucket, key, e))?;
468
469    if !resp.status().is_success() {
470        let status = resp.status();
471        bail!("S3 GetObject failed (HTTP {}) for key '{}'", status, key);
472    }
473
474    let bytes = resp.bytes().await?;
475    Ok(String::from_utf8_lossy(&bytes).to_string())
476}
477
478// ============ AWS SigV4 Helpers ============
479
480/// Compute the S3 hostname for the configured bucket and region.
481///
482/// If a custom `endpoint_url` is set (for MinIO, LocalStack, etc.),
483/// that is used instead of the standard `<bucket>.s3.<region>.amazonaws.com`.
484fn s3_host(s3_config: &S3ConnectorConfig) -> String {
485    if let Some(ref endpoint) = s3_config.endpoint_url {
486        // Custom endpoint (MinIO, LocalStack, etc.)
487        endpoint
488            .trim_start_matches("https://")
489            .trim_start_matches("http://")
490            .trim_end_matches('/')
491            .to_string()
492    } else {
493        format!("{}.s3.{}.amazonaws.com", s3_config.bucket, s3_config.region)
494    }
495}
496
497/// Compute the hex-encoded SHA-256 hash of data.
498fn hex_sha256(data: &[u8]) -> String {
499    let mut hasher = Sha256::new();
500    hasher.update(data);
501    hex::encode(hasher.finalize())
502}
503
504/// Compute HMAC-SHA256 of data with the given key.
505fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
506    let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
507    mac.update(data);
508    mac.finalize().into_bytes().to_vec()
509}
510
511/// Compute hex-encoded HMAC-SHA256.
512fn hex_hmac_sha256(key: &[u8], data: &[u8]) -> String {
513    hex::encode(hmac_sha256(key, data))
514}
515
516/// Derive the AWS SigV4 signing key for a given date, region, and service.
517///
518/// ```text
519/// kDate    = HMAC("AWS4" + secret, dateStamp)
520/// kRegion  = HMAC(kDate, region)
521/// kService = HMAC(kRegion, service)
522/// kSigning = HMAC(kService, "aws4_request")
523/// ```
524fn derive_signing_key(secret_key: &str, date_stamp: &str, region: &str, service: &str) -> Vec<u8> {
525    let k_date = hmac_sha256(
526        format!("AWS4{}", secret_key).as_bytes(),
527        date_stamp.as_bytes(),
528    );
529    let k_region = hmac_sha256(&k_date, region.as_bytes());
530    let k_service = hmac_sha256(&k_region, service.as_bytes());
531    hmac_sha256(&k_service, b"aws4_request")
532}
533
534/// URI-encode a string per RFC 3986 (used in SigV4 canonical requests).
535///
536/// Encodes all characters except unreserved characters:
537/// `A-Z a-z 0-9 - _ . ~`
538fn uri_encode(s: &str) -> String {
539    let mut result = String::new();
540    for byte in s.bytes() {
541        match byte {
542            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
543                result.push(byte as char);
544            }
545            _ => {
546                result.push_str(&format!("%{:02X}", byte));
547            }
548        }
549    }
550    result
551}
552
553// ============ XML Parsing (minimal, no extra deps) ============
554
555/// Parse a `ListObjectsV2` XML response into a list of [`S3Object`]s.
556///
557/// Also returns whether the listing is truncated and the next continuation
558/// token for pagination.
559fn parse_list_objects_response(xml: &str) -> Result<(Vec<S3Object>, bool, Option<String>)> {
560    let mut objects = Vec::new();
561    let is_truncated = extract_xml_value(xml, "IsTruncated")
562        .map(|v| v == "true")
563        .unwrap_or(false);
564    let next_token = extract_xml_value(xml, "NextContinuationToken");
565
566    // Parse <Contents> blocks
567    let mut remaining = xml;
568    while let Some(start) = remaining.find("<Contents>") {
569        let block_start = start + "<Contents>".len();
570        if let Some(end) = remaining[block_start..].find("</Contents>") {
571            let block = &remaining[block_start..block_start + end];
572
573            let key = extract_xml_value(block, "Key").unwrap_or_default();
574            if key.is_empty() || key.ends_with('/') {
575                remaining = &remaining[block_start + end + "</Contents>".len()..];
576                continue;
577            }
578
579            let last_modified = extract_xml_value(block, "LastModified")
580                .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
581                .map(|dt| dt.timestamp())
582                .unwrap_or(0);
583
584            let etag = extract_xml_value(block, "ETag")
585                .unwrap_or_default()
586                .trim_matches('"')
587                .to_string();
588
589            let size = extract_xml_value(block, "Size")
590                .and_then(|s| s.parse::<i64>().ok())
591                .unwrap_or(0);
592
593            objects.push(S3Object {
594                key,
595                last_modified,
596                etag,
597                size,
598            });
599
600            remaining = &remaining[block_start + end + "</Contents>".len()..];
601        } else {
602            break;
603        }
604    }
605
606    Ok((objects, is_truncated, next_token))
607}
608
609/// Extract the text content of an XML tag (simple, non-nested).
610fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
611    let open = format!("<{}>", tag);
612    let close = format!("</{}>", tag);
613    if let Some(start) = xml.find(&open) {
614        let value_start = start + open.len();
615        if let Some(end) = xml[value_start..].find(&close) {
616            return Some(xml[value_start..value_start + end].to_string());
617        }
618    }
619    None
620}
621
622/// Detect MIME content type from a file extension.
623fn detect_content_type(key: &str) -> String {
624    match key.rsplit('.').next() {
625        Some("md") => "text/markdown".to_string(),
626        Some("txt") => "text/plain".to_string(),
627        Some("json") => "application/json".to_string(),
628        Some("yaml" | "yml") => "text/yaml".to_string(),
629        Some("rst") => "text/x-rst".to_string(),
630        Some("html" | "htm") => "text/html".to_string(),
631        _ => "text/plain".to_string(),
632    }
633}
634
635/// Build a [`GlobSet`] from a list of glob pattern strings.
636fn build_globset(patterns: &[String]) -> Result<GlobSet> {
637    let mut builder = GlobSetBuilder::new();
638    for pattern in patterns {
639        builder.add(Glob::new(pattern)?);
640    }
641    Ok(builder.build()?)
642}