use std::path::Path; use std::pin::Pin; use std::future::Future; use std::io::{SeekFrom}; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; use reqwest::Client; use rusty_s3::{Bucket, Credentials, S3Action, actions, UrlStyle}; use url::Url; use super::{VfsError, VfsStat, VfsDirEntry, open_flags::OpenFlags}; pub struct AsyncS3Vfs { bucket: Bucket, credentials: Credentials, client: Client, } struct AsyncS3FileState { key: String, mode: FileMode, position: u64, size: u64, data: Vec, write_buffer: Vec, mtime: std::time::SystemTime, } enum FileMode { Read, Write, } pub struct AsyncS3File { inner: Arc>, vfs: AsyncS3Vfs, } impl AsyncS3Vfs { pub fn new( endpoint: &str, region: &str, bucket_name: &str, access_key: &str, secret_key: &str, ) -> Result { let endpoint_url = Url::parse(endpoint.trim_end_matches('/')) .map_err(|e| VfsError::Io(format!("Invalid S3 endpoint URL: {}", e)))?; let bucket = Bucket::new( endpoint_url, UrlStyle::Path, bucket_name.to_string(), region.to_string(), ).map_err(|e| VfsError::Io(format!("Failed to create S3 bucket config: {}", e)))?; let credentials = Credentials::new(access_key, secret_key); let client = Client::new(); Ok(Self { bucket, credentials, client }) } fn path_to_key(path: &Path) -> String { let s = path.to_string_lossy(); s.strip_prefix('/').unwrap_or(&s).to_string() } async fn head_object(&self, key: &str) -> Result<(u64, std::time::SystemTime, String), VfsError> { let action = actions::HeadObject::new(&self.bucket, Some(&self.credentials), key); let url = action.sign(Duration::from_secs(3600)); let resp = self.client .head(url.as_str()) .send() .await .map_err(|e| VfsError::Io(format!("S3 HEAD failed: {}", e)))?; let status = resp.status(); if status == 404 { return Err(VfsError::NotFound(key.to_string())); } if !status.is_success() { return Err(VfsError::Io(format!("HeadObject returned {}", status))); } let content_len: u64 = resp .headers() .get("Content-Length") .and_then(|v| v.to_str().ok()) .and_then(|v| v.parse().ok()) .unwrap_or(0); let last_modified = parse_last_modified( resp.headers() .get("Last-Modified") .and_then(|v| v.to_str().ok()) ); let etag = resp .headers() .get("ETag") .and_then(|v| v.to_str().ok()) .map(|s| s.replace('"', "")) .unwrap_or_default(); Ok((content_len, last_modified, etag)) } async fn get_object(&self, key: &str) -> Result, VfsError> { let action = actions::GetObject::new(&self.bucket, Some(&self.credentials), key); let url = action.sign(Duration::from_secs(3600)); let resp = self.client .get(url.as_str()) .send() .await .map_err(|e| VfsError::Io(format!("S3 GET failed: {}", e)))?; let status = resp.status(); if status == 404 { return Err(VfsError::NotFound(key.to_string())); } if !status.is_success() { return Err(VfsError::Io(format!("GetObject returned {}", status))); } let bytes = resp.bytes().await .map_err(|e| VfsError::Io(format!("Failed to read response body: {}", e)))?; Ok(bytes.to_vec()) } async fn put_object(&self, key: &str, data: &[u8]) -> Result { let action = actions::PutObject::new(&self.bucket, Some(&self.credentials), key); let url = action.sign(Duration::from_secs(3600)); let resp = self.client .put(url.as_str()) .body(data.to_vec()) .send() .await .map_err(|e| VfsError::Io(format!("S3 PUT failed: {}", e)))?; if !resp.status().is_success() { return Err(VfsError::Io(format!("PutObject returned {}", resp.status()))); } let etag = resp .headers() .get("ETag") .and_then(|v| v.to_str().ok()) .map(|s| s.replace('"', "")) .unwrap_or_default(); Ok(etag) } async fn delete_object(&self, key: &str) -> Result<(), VfsError> { let action = actions::DeleteObject::new(&self.bucket, Some(&self.credentials), key); let url = action.sign(Duration::from_secs(3600)); let resp = self.client .delete(url.as_str()) .send() .await .map_err(|e| VfsError::Io(format!("S3 DELETE failed: {}", e)))?; if !resp.status().is_success() { return Err(VfsError::Io(format!("DeleteObject returned {}", resp.status()))); } Ok(()) } async fn list_objects(&self, prefix: &str) -> Result, VfsError> { let mut action = actions::ListObjectsV2::new(&self.bucket, Some(&self.credentials)); if !prefix.is_empty() { action.with_prefix(prefix); } action.with_delimiter("/"); let url = action.sign(Duration::from_secs(3600)); let resp = self.client .get(url.as_str()) .send() .await .map_err(|e| VfsError::Io(format!("S3 LIST failed: {}", e)))?; if !resp.status().is_success() { return Err(VfsError::Io(format!("ListObjectsV2 returned {}", resp.status()))); } let body = resp.text().await .map_err(|e| VfsError::Io(format!("Failed to read LIST response: {}", e)))?; // Use rusty-s3's built-in parser let list_response = actions::ListObjectsV2::parse_response(&body) .map_err(|e| VfsError::Io(format!("Failed to parse LIST response: {}", e)))?; // Convert to VfsDirEntry let mut entries = Vec::new(); for obj in list_response.contents { let name = obj.key.strip_prefix(prefix).unwrap_or(&obj.key).to_string(); entries.push(VfsDirEntry { name, long_name: obj.key.clone(), stat: VfsStat { size: obj.size, mode: 0o644, uid: 0, gid: 0, atime: std::time::SystemTime::UNIX_EPOCH, mtime: std::time::SystemTime::UNIX_EPOCH, is_dir: false, is_symlink: false, }, }); } for prefix_elem in list_response.common_prefixes { let name = prefix_elem.prefix.strip_prefix(prefix).unwrap_or(&prefix_elem.prefix).trim_end_matches('/').to_string(); entries.push(VfsDirEntry { name, long_name: prefix_elem.prefix.clone(), stat: VfsStat { size: 0, mode: 0o755, uid: 0, gid: 0, atime: std::time::SystemTime::UNIX_EPOCH, mtime: std::time::SystemTime::UNIX_EPOCH, is_dir: true, is_symlink: false, }, }); } Ok(entries) } } impl Clone for AsyncS3Vfs { fn clone(&self) -> Self { Self { bucket: self.bucket.clone(), credentials: self.credentials.clone(), client: self.client.clone(), } } } impl AsyncS3File { pub async fn new_read(vfs: AsyncS3Vfs, key: String) -> Result { let (size, mtime, _) = vfs.head_object(&key).await?; Ok(Self { inner: Arc::new(Mutex::new(AsyncS3FileState { key, mode: FileMode::Read, position: 0, size, data: Vec::new(), write_buffer: Vec::new(), mtime, })), vfs, }) } pub fn new_write(vfs: AsyncS3Vfs, key: String) -> Self { Self { inner: Arc::new(Mutex::new(AsyncS3FileState { key, mode: FileMode::Write, position: 0, size: 0, data: Vec::new(), write_buffer: Vec::new(), mtime: std::time::SystemTime::now(), })), vfs, } } } impl super::AsyncVfsFile for AsyncS3File { fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Pin> + Send + 'a>> { let inner = self.inner.clone(); let vfs = self.vfs.clone(); Box::pin(async move { let mut state = inner.lock().await; if state.position >= state.size { return Ok(0); } if state.data.is_empty() { let key = state.key.clone(); state.data = vfs.get_object(&key).await?; } let remaining = state.size - state.position; let to_read = buf.len().min(remaining as usize); let start = state.position as usize; let end = start + to_read; buf[..to_read].copy_from_slice(&state.data[start..end]); state.position += to_read as u64; Ok(to_read) }) } fn write<'a>(&'a mut self, buf: &'a [u8]) -> Pin> + Send + 'a>> { let inner = self.inner.clone(); Box::pin(async move { let mut state = inner.lock().await; state.write_buffer.extend_from_slice(buf); Ok(buf.len()) }) } fn seek<'a>(&'a mut self, pos: SeekFrom) -> Pin> + Send + 'a>> { let inner = self.inner.clone(); Box::pin(async move { let mut state = inner.lock().await; let new_pos = match pos { SeekFrom::Start(offset) => offset, SeekFrom::Current(offset) => { ((state.position as i64) + offset).max(0) as u64 } SeekFrom::End(offset) => { ((state.size as i64) + offset).max(0) as u64 } }; state.position = new_pos.min(state.size); Ok(state.position) }) } fn flush<'a>(&'a mut self) -> Pin> + Send + 'a>> { let inner = self.inner.clone(); let vfs = self.vfs.clone(); Box::pin(async move { let mut state = inner.lock().await; if !state.write_buffer.is_empty() { let key = state.key.clone(); let data = state.write_buffer.clone(); vfs.put_object(&key, &data).await?; state.write_buffer.clear(); } Ok(()) }) } } impl super::AsyncVfsBackend for AsyncS3Vfs { fn clone_boxed(&self) -> Box { Box::new(self.clone()) } fn read_dir<'a>(&'a self, path: &'a Path) -> Pin, VfsError>> + Send + 'a>> { let prefix = Self::path_to_key(path); Box::pin(async move { self.list_objects(&prefix).await }) } fn open_file<'a>(&'a self, path: &'a Path, flags: &'a OpenFlags) -> Pin, VfsError>> + Send + 'a>> { let key = Self::path_to_key(path); let vfs = self.clone(); let is_write = flags.write; Box::pin(async move { if is_write { Ok(Box::new(AsyncS3File::new_write(vfs, key)) as Box) } else { let file = AsyncS3File::new_read(vfs, key).await?; Ok(Box::new(file) as Box) } }) } fn stat<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { let key = Self::path_to_key(path); Box::pin(async move { let (size, mtime, _) = self.head_object(&key).await?; Ok(VfsStat { size, mode: 0o644, uid: 0, gid: 0, atime: mtime, mtime, is_dir: false, is_symlink: false, }) }) } fn create_dir<'a>(&'a self, path: &'a Path, _mode: u32) -> Pin> + Send + 'a>> { let key = Self::path_to_key(path); if !key.ends_with('/') { let _key = format!("{}/", key); } Box::pin(async move { self.put_object(&key, &[]).await?; Ok(()) }) } fn remove_dir<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { let key = Self::path_to_key(path); let key = if key.ends_with('/') { key } else { format!("{}/", key) }; Box::pin(async move { self.delete_object(&key).await?; Ok(()) }) } fn remove_file<'a>(&'a self, path: &'a Path) -> Pin> + Send + 'a>> { let key = Self::path_to_key(path); Box::pin(async move { self.delete_object(&key).await?; Ok(()) }) } fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> Pin> + Send + 'a>> { let from_key = Self::path_to_key(from); let to_key = Self::path_to_key(to); Box::pin(async move { let data = self.get_object(&from_key).await?; self.put_object(&to_key, &data).await?; self.delete_object(&from_key).await?; Ok(()) }) } fn exists<'a>(&'a self, path: &'a Path) -> Pin + Send + 'a>> { let key = Self::path_to_key(path); Box::pin(async move { self.head_object(&key).await.is_ok() }) } } fn parse_last_modified(header: Option<&str>) -> std::time::SystemTime { header .and_then(|s| chrono::DateTime::parse_from_rfc2822(s).ok()) .map(|dt| std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(dt.timestamp() as u64)) .unwrap_or(std::time::SystemTime::now()) }