1use anyhow::{bail, Context, Result};
56use async_trait::async_trait;
57use chrono::{TimeZone, Utc};
58use globset::{Glob, GlobSet, GlobSetBuilder};
59use hmac::{Hmac, Mac};
60use sha2::{Digest, Sha256};
61
62use crate::config::S3ConnectorConfig;
63use crate::models::SourceItem;
64use crate::traits::Connector;
65
66pub struct S3Connector {
75 name: String,
77 config: S3ConnectorConfig,
79}
80
81impl S3Connector {
82 pub fn new(name: String, config: S3ConnectorConfig) -> Self {
84 Self { name, config }
85 }
86}
87
88#[async_trait]
89impl Connector for S3Connector {
90 fn name(&self) -> &str {
91 &self.name
92 }
93
94 fn description(&self) -> &str {
95 "List and download objects from S3 buckets"
96 }
97
98 fn connector_type(&self) -> &str {
99 "s3"
100 }
101
102 async fn scan(&self) -> Result<Vec<SourceItem>> {
103 scan_s3(&self.name, &self.config).await
104 }
105}
106
107type HmacSha256 = Hmac<Sha256>;
108
109pub async fn scan_s3(name: &str, s3_config: &S3ConnectorConfig) -> Result<Vec<SourceItem>> {
136 let creds = AwsCredentials::from_env()?;
137
138 let include_set = build_globset(&s3_config.include_globs)?;
140
141 let mut default_excludes = vec!["**/.git/**".to_string(), "**/node_modules/**".to_string()];
142 default_excludes.extend(s3_config.exclude_globs.clone());
143 let exclude_set = build_globset(&default_excludes)?;
144
145 let objects = list_objects(s3_config, &creds).await?;
147
148 let mut items = Vec::new();
149 let client = reqwest::Client::new();
150
151 for obj in &objects {
152 let rel_key = if s3_config.prefix.is_empty() {
154 obj.key.clone()
155 } else {
156 let prefix = s3_config.prefix.trim_end_matches('/');
157 obj.key
158 .strip_prefix(prefix)
159 .map(|s| s.trim_start_matches('/').to_string())
160 .unwrap_or_else(|| obj.key.clone())
161 };
162
163 if exclude_set.is_match(&rel_key) {
165 continue;
166 }
167 if !include_set.is_match(&rel_key) {
168 continue;
169 }
170
171 let body = match download_object(s3_config, &creds, &client, &obj.key).await {
173 Ok(b) => b,
174 Err(e) => {
175 eprintln!(
176 "Warning: failed to download s3://{}/{}: {}",
177 s3_config.bucket, obj.key, e
178 );
179 continue;
180 }
181 };
182
183 let title = obj.key.rsplit('/').next().unwrap_or(&obj.key).to_string();
184 let source_url = format!("s3://{}/{}", s3_config.bucket, obj.key);
185
186 let metadata = serde_json::json!({
187 "bucket": s3_config.bucket,
188 "etag": obj.etag,
189 "size": obj.size,
190 });
191
192 items.push(SourceItem {
193 source: format!("s3:{}", name),
194 source_id: obj.key.clone(),
195 source_url: Some(source_url),
196 title: Some(title),
197 author: None,
198 created_at: Utc.timestamp_opt(obj.last_modified, 0).unwrap(),
199 updated_at: Utc.timestamp_opt(obj.last_modified, 0).unwrap(),
200 content_type: detect_content_type(&obj.key),
201 body,
202 metadata_json: metadata.to_string(),
203 raw_json: None,
204 raw_bytes: None,
205 });
206 }
207
208 items.sort_by(|a, b| a.source_id.cmp(&b.source_id));
209 Ok(items)
210}
211
212struct AwsCredentials {
216 access_key_id: String,
217 secret_access_key: String,
218 session_token: Option<String>,
219}
220
221impl AwsCredentials {
222 fn from_env() -> Result<Self> {
225 let access_key_id = std::env::var("AWS_ACCESS_KEY_ID")
226 .context("AWS_ACCESS_KEY_ID environment variable not set")?;
227 let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY")
228 .context("AWS_SECRET_ACCESS_KEY environment variable not set")?;
229 let session_token = std::env::var("AWS_SESSION_TOKEN").ok();
230
231 Ok(Self {
232 access_key_id,
233 secret_access_key,
234 session_token,
235 })
236 }
237}
238
239struct S3Object {
243 key: String,
245 last_modified: i64,
247 etag: String,
249 size: i64,
251}
252
253async fn list_objects(
258 s3_config: &S3ConnectorConfig,
259 creds: &AwsCredentials,
260) -> Result<Vec<S3Object>> {
261 let client = reqwest::Client::new();
262 let mut objects = Vec::new();
263 let mut continuation_token: Option<String> = None;
264
265 loop {
266 let mut query_params = vec![
267 ("list-type".to_string(), "2".to_string()),
268 ("max-keys".to_string(), "1000".to_string()),
269 ];
270
271 if !s3_config.prefix.is_empty() {
272 query_params.push(("prefix".to_string(), s3_config.prefix.clone()));
273 }
274
275 if let Some(ref token) = continuation_token {
276 query_params.push(("continuation-token".to_string(), token.clone()));
277 }
278
279 let host = s3_host(s3_config);
280 let url = format!("https://{}/", host);
281
282 let now = Utc::now();
283 let date_stamp = now.format("%Y%m%d").to_string();
284 let amz_date = now.format("%Y%m%dT%H%M%SZ").to_string();
285
286 let mut sorted_params = query_params.clone();
288 sorted_params.sort_by(|a, b| a.0.cmp(&b.0));
289 let canonical_querystring: String = sorted_params
290 .iter()
291 .map(|(k, v)| format!("{}={}", uri_encode(k), uri_encode(v)))
292 .collect::<Vec<_>>()
293 .join("&");
294
295 let payload_hash = hex_sha256(b"");
296
297 let mut headers = vec![
298 ("host".to_string(), host.clone()),
299 ("x-amz-content-sha256".to_string(), payload_hash.clone()),
300 ("x-amz-date".to_string(), amz_date.clone()),
301 ];
302 if let Some(ref token) = creds.session_token {
303 headers.push(("x-amz-security-token".to_string(), token.clone()));
304 }
305 headers.sort_by(|a, b| a.0.cmp(&b.0));
306
307 let signed_headers: String = headers
308 .iter()
309 .map(|(k, _)| k.as_str())
310 .collect::<Vec<_>>()
311 .join(";");
312
313 let canonical_headers: String = headers
314 .iter()
315 .map(|(k, v)| format!("{}:{}\n", k, v))
316 .collect();
317
318 let canonical_request = format!(
319 "GET\n/\n{}\n{}\n{}\n{}",
320 canonical_querystring, canonical_headers, signed_headers, payload_hash
321 );
322
323 let credential_scope = format!("{}/{}/s3/aws4_request", date_stamp, s3_config.region);
324 let string_to_sign = format!(
325 "AWS4-HMAC-SHA256\n{}\n{}\n{}",
326 amz_date,
327 credential_scope,
328 hex_sha256(canonical_request.as_bytes())
329 );
330
331 let signing_key = derive_signing_key(
332 &creds.secret_access_key,
333 &date_stamp,
334 &s3_config.region,
335 "s3",
336 );
337 let signature = hex_hmac_sha256(&signing_key, string_to_sign.as_bytes());
338
339 let authorization = format!(
340 "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
341 creds.access_key_id, credential_scope, signed_headers, signature
342 );
343
344 let full_url = format!("{}?{}", url, canonical_querystring);
345
346 let mut req_builder = client
347 .get(&full_url)
348 .header("Authorization", &authorization)
349 .header("x-amz-content-sha256", &payload_hash)
350 .header("x-amz-date", &amz_date);
351
352 if let Some(ref token) = creds.session_token {
353 req_builder = req_builder.header("x-amz-security-token", token);
354 }
355
356 let resp = req_builder.send().await.map_err(|e| {
357 anyhow::anyhow!(
358 "Failed to list S3 objects in s3://{}/{}: {}",
359 s3_config.bucket,
360 s3_config.prefix,
361 e
362 )
363 })?;
364
365 if !resp.status().is_success() {
366 let status = resp.status();
367 let body = resp.text().await.unwrap_or_default();
368 bail!(
369 "S3 ListObjectsV2 failed (HTTP {}): {}",
370 status,
371 body.chars().take(500).collect::<String>()
372 );
373 }
374
375 let xml_body = resp.text().await?;
376 let (batch, is_truncated, next_token) = parse_list_objects_response(&xml_body)?;
377 objects.extend(batch);
378
379 if is_truncated {
380 continuation_token = next_token;
381 } else {
382 break;
383 }
384 }
385
386 Ok(objects)
387}
388
389async fn download_object(
391 s3_config: &S3ConnectorConfig,
392 creds: &AwsCredentials,
393 client: &reqwest::Client,
394 key: &str,
395) -> Result<String> {
396 let host = s3_host(s3_config);
397 let encoded_key = key.split('/').map(uri_encode).collect::<Vec<_>>().join("/");
398 let url = format!("https://{}/{}", host, encoded_key);
399
400 let now = Utc::now();
401 let date_stamp = now.format("%Y%m%d").to_string();
402 let amz_date = now.format("%Y%m%dT%H%M%SZ").to_string();
403
404 let payload_hash = hex_sha256(b"");
405
406 let mut headers = vec![
407 ("host".to_string(), host.clone()),
408 ("x-amz-content-sha256".to_string(), payload_hash.clone()),
409 ("x-amz-date".to_string(), amz_date.clone()),
410 ];
411 if let Some(ref token) = creds.session_token {
412 headers.push(("x-amz-security-token".to_string(), token.clone()));
413 }
414 headers.sort_by(|a, b| a.0.cmp(&b.0));
415
416 let signed_headers: String = headers
417 .iter()
418 .map(|(k, _)| k.as_str())
419 .collect::<Vec<_>>()
420 .join(";");
421
422 let canonical_headers: String = headers
423 .iter()
424 .map(|(k, v)| format!("{}:{}\n", k, v))
425 .collect();
426
427 let canonical_uri = format!("/{}", encoded_key);
428 let canonical_request = format!(
429 "GET\n{}\n\n{}\n{}\n{}",
430 canonical_uri, canonical_headers, signed_headers, payload_hash
431 );
432
433 let credential_scope = format!("{}/{}/s3/aws4_request", date_stamp, s3_config.region);
434 let string_to_sign = format!(
435 "AWS4-HMAC-SHA256\n{}\n{}\n{}",
436 amz_date,
437 credential_scope,
438 hex_sha256(canonical_request.as_bytes())
439 );
440
441 let signing_key = derive_signing_key(
442 &creds.secret_access_key,
443 &date_stamp,
444 &s3_config.region,
445 "s3",
446 );
447 let signature = hex_hmac_sha256(&signing_key, string_to_sign.as_bytes());
448
449 let authorization = format!(
450 "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
451 creds.access_key_id, credential_scope, signed_headers, signature
452 );
453
454 let mut req_builder = client
455 .get(&url)
456 .header("Authorization", &authorization)
457 .header("x-amz-content-sha256", &payload_hash)
458 .header("x-amz-date", &amz_date);
459
460 if let Some(ref token) = creds.session_token {
461 req_builder = req_builder.header("x-amz-security-token", token);
462 }
463
464 let resp = req_builder
465 .send()
466 .await
467 .map_err(|e| anyhow::anyhow!("Failed to get s3://{}/{}: {}", s3_config.bucket, key, e))?;
468
469 if !resp.status().is_success() {
470 let status = resp.status();
471 bail!("S3 GetObject failed (HTTP {}) for key '{}'", status, key);
472 }
473
474 let bytes = resp.bytes().await?;
475 Ok(String::from_utf8_lossy(&bytes).to_string())
476}
477
478fn s3_host(s3_config: &S3ConnectorConfig) -> String {
485 if let Some(ref endpoint) = s3_config.endpoint_url {
486 endpoint
488 .trim_start_matches("https://")
489 .trim_start_matches("http://")
490 .trim_end_matches('/')
491 .to_string()
492 } else {
493 format!("{}.s3.{}.amazonaws.com", s3_config.bucket, s3_config.region)
494 }
495}
496
497fn hex_sha256(data: &[u8]) -> String {
499 let mut hasher = Sha256::new();
500 hasher.update(data);
501 hex::encode(hasher.finalize())
502}
503
504fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
506 let mut mac = HmacSha256::new_from_slice(key).expect("HMAC can take key of any size");
507 mac.update(data);
508 mac.finalize().into_bytes().to_vec()
509}
510
511fn hex_hmac_sha256(key: &[u8], data: &[u8]) -> String {
513 hex::encode(hmac_sha256(key, data))
514}
515
516fn derive_signing_key(secret_key: &str, date_stamp: &str, region: &str, service: &str) -> Vec<u8> {
525 let k_date = hmac_sha256(
526 format!("AWS4{}", secret_key).as_bytes(),
527 date_stamp.as_bytes(),
528 );
529 let k_region = hmac_sha256(&k_date, region.as_bytes());
530 let k_service = hmac_sha256(&k_region, service.as_bytes());
531 hmac_sha256(&k_service, b"aws4_request")
532}
533
534fn uri_encode(s: &str) -> String {
539 let mut result = String::new();
540 for byte in s.bytes() {
541 match byte {
542 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
543 result.push(byte as char);
544 }
545 _ => {
546 result.push_str(&format!("%{:02X}", byte));
547 }
548 }
549 }
550 result
551}
552
553fn parse_list_objects_response(xml: &str) -> Result<(Vec<S3Object>, bool, Option<String>)> {
560 let mut objects = Vec::new();
561 let is_truncated = extract_xml_value(xml, "IsTruncated")
562 .map(|v| v == "true")
563 .unwrap_or(false);
564 let next_token = extract_xml_value(xml, "NextContinuationToken");
565
566 let mut remaining = xml;
568 while let Some(start) = remaining.find("<Contents>") {
569 let block_start = start + "<Contents>".len();
570 if let Some(end) = remaining[block_start..].find("</Contents>") {
571 let block = &remaining[block_start..block_start + end];
572
573 let key = extract_xml_value(block, "Key").unwrap_or_default();
574 if key.is_empty() || key.ends_with('/') {
575 remaining = &remaining[block_start + end + "</Contents>".len()..];
576 continue;
577 }
578
579 let last_modified = extract_xml_value(block, "LastModified")
580 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
581 .map(|dt| dt.timestamp())
582 .unwrap_or(0);
583
584 let etag = extract_xml_value(block, "ETag")
585 .unwrap_or_default()
586 .trim_matches('"')
587 .to_string();
588
589 let size = extract_xml_value(block, "Size")
590 .and_then(|s| s.parse::<i64>().ok())
591 .unwrap_or(0);
592
593 objects.push(S3Object {
594 key,
595 last_modified,
596 etag,
597 size,
598 });
599
600 remaining = &remaining[block_start + end + "</Contents>".len()..];
601 } else {
602 break;
603 }
604 }
605
606 Ok((objects, is_truncated, next_token))
607}
608
609fn extract_xml_value(xml: &str, tag: &str) -> Option<String> {
611 let open = format!("<{}>", tag);
612 let close = format!("</{}>", tag);
613 if let Some(start) = xml.find(&open) {
614 let value_start = start + open.len();
615 if let Some(end) = xml[value_start..].find(&close) {
616 return Some(xml[value_start..value_start + end].to_string());
617 }
618 }
619 None
620}
621
622fn detect_content_type(key: &str) -> String {
624 match key.rsplit('.').next() {
625 Some("md") => "text/markdown".to_string(),
626 Some("txt") => "text/plain".to_string(),
627 Some("json") => "application/json".to_string(),
628 Some("yaml" | "yml") => "text/yaml".to_string(),
629 Some("rst") => "text/x-rst".to_string(),
630 Some("html" | "htm") => "text/html".to_string(),
631 _ => "text/plain".to_string(),
632 }
633}
634
635fn build_globset(patterns: &[String]) -> Result<GlobSet> {
637 let mut builder = GlobSetBuilder::new();
638 for pattern in patterns {
639 builder.add(Glob::new(pattern)?);
640 }
641 Ok(builder.build()?)
642}