diff --git a/data/auth.sqlite b/data/auth.sqlite index dc05f5d..a645870 100644 Binary files a/data/auth.sqlite and b/data/auth.sqlite differ diff --git a/markbase-core/src/vfs/backup_manifest.rs b/markbase-core/src/vfs/backup_manifest.rs new file mode 100644 index 0000000..7825b7c --- /dev/null +++ b/markbase-core/src/vfs/backup_manifest.rs @@ -0,0 +1,268 @@ +//! Backup Manifest - Snapshot metadata serialization +//! +//! Compatible with ZFS send/receive and Proxmox Backup Server format + +use std::path::PathBuf; +use std::time::SystemTime; + +use serde::{Serialize, Deserialize}; +use sha2::{Sha256, Digest}; + +use super::{VfsCompression}; +use super::checksum::VfsChecksumFile; +use super::dedup::DedupManifest; + +pub const MANIFEST_VERSION: u32 = 1; +pub const MANIFEST_FILE: &str = ".manifest.json"; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum SendFormat { + #[serde(rename = "zfs_compatible")] + ZfsCompatible, + #[serde(rename = "custom_json")] + CustomJson, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BackupFileEntry { + pub path: String, + pub size: u64, + pub checksums: Option, + pub dedup_hash: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EncryptionInfo { + pub algorithm: String, + pub enabled: bool, + pub key_hash: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompressionInfo { + pub algorithm: String, + pub level: u32, + pub original_size: u64, + pub compressed_size: u64, + pub ratio: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BackupManifest { + pub version: u32, + pub format: SendFormat, + pub snapshot_name: String, + pub created_at: u64, + pub root_path: String, + pub files: Vec, + pub dedup_manifest: Option, + pub encryption: Option, + pub compression: Option, + pub total_size: u64, + pub stored_size: u64, + pub overall_ratio: f64, +} + +impl BackupManifest { + pub fn new(snapshot_name: String, root_path: PathBuf) -> Self { + Self { + version: MANIFEST_VERSION, + format: SendFormat::CustomJson, + snapshot_name, + created_at: current_time_secs(), + root_path: root_path.to_string_lossy().to_string(), + files: Vec::new(), + dedup_manifest: None, + encryption: None, + compression: None, + total_size: 0, + stored_size: 0, + overall_ratio: 1.0, + } + } + + pub fn add_file(&mut self, path: String, size: u64, checksums: Option) { + self.files.push(BackupFileEntry { + path, + size, + checksums, + dedup_hash: None, + }); + self.total_size += size; + } + + pub fn set_dedup(&mut self, manifest: DedupManifest) { + self.dedup_manifest = Some(manifest.clone()); + if manifest.original_size > 0 { + let stored = (manifest.block_hashes.len() as u64) * 4096; // Approximate + self.stored_size = stored; + } + } + + pub fn set_compression(&mut self, algorithm: VfsCompression, original: u64, compressed: u64) { + let ratio = if original > 0 { compressed as f64 / original as f64 } else { 1.0 }; + self.compression = Some(CompressionInfo { + algorithm: algorithm_name(&algorithm), + level: 3, + original_size: original, + compressed_size: compressed, + ratio, + }); + } + + pub fn set_encryption(&mut self, enabled: bool, key_hash: Option) { + self.encryption = Some(EncryptionInfo { + algorithm: "AES-256-GCM".to_string(), + enabled, + key_hash, + }); + } + + pub fn calculate_ratio(&mut self) { + if self.total_size > 0 && self.stored_size > 0 { + self.overall_ratio = self.stored_size as f64 / self.total_size as f64; + } + } + + pub fn to_bytes(&self) -> Result, String> { + serde_json::to_vec(self).map_err(|e| e.to_string()) + } + + pub fn from_bytes(data: &[u8]) -> Result { + serde_json::from_slice(data).map_err(|e| e.to_string()) + } + + pub fn save(&self, snapshot_dir: &PathBuf) -> Result<(), String> { + let manifest_path = snapshot_dir.join(MANIFEST_FILE); + let data = self.to_bytes()?; + std::fs::write(&manifest_path, data).map_err(|e| e.to_string()) + } + + pub fn load(snapshot_dir: &PathBuf) -> Result { + let manifest_path = snapshot_dir.join(MANIFEST_FILE); + let data = std::fs::read(&manifest_path).map_err(|e| e.to_string())?; + Self::from_bytes(&data) + } +} + +fn algorithm_name(compression: &VfsCompression) -> String { + match compression { + VfsCompression::None => "none".to_string(), + VfsCompression::Lz4 => "lz4".to_string(), + VfsCompression::Zstd => "zstd".to_string(), + } +} + +fn current_time_secs() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) +} + +#[derive(Debug, Clone)] +pub struct BackupStream { + pub format: SendFormat, + pub manifest: BackupManifest, + pub data: Vec, +} + +impl BackupStream { + pub fn new(format: SendFormat, manifest: BackupManifest, data: Vec) -> Self { + Self { format, manifest, data } + } + + pub fn to_bytes(&self) -> Result, String> { + match self.format { + SendFormat::CustomJson => { + let manifest_bytes = self.manifest.to_bytes()?; + let mut result = Vec::new(); + result.extend_from_slice(&manifest_bytes.len().to_be_bytes()); + result.extend_from_slice(&manifest_bytes); + result.extend_from_slice(&self.data); + Ok(result) + } + SendFormat::ZfsCompatible => { + Err("ZFS compatible format not yet implemented".to_string()) + } + } + } + + pub fn from_bytes(data: &[u8]) -> Result { + if data.len() < 8 { + return Err("Stream too short".to_string()); + } + + let manifest_len = u64::from_be_bytes(data[0..8].try_into().map_err(|_| "Invalid length")?) as usize; + if data.len() < 8 + manifest_len { + return Err("Stream truncated".to_string()); + } + + let manifest_bytes = &data[8..8 + manifest_len]; + let manifest = BackupManifest::from_bytes(manifest_bytes)?; + let payload = data[8 + manifest_len..].to_vec(); + + Ok(Self::new(manifest.format.clone(), manifest, payload)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_manifest_creation() { + let manifest = BackupManifest::new("snap_2026-06-24".to_string(), PathBuf::from("/data")); + assert_eq!(manifest.version, MANIFEST_VERSION); + assert_eq!(manifest.format, SendFormat::CustomJson); + assert_eq!(manifest.snapshot_name, "snap_2026-06-24"); + } + + #[test] + fn test_manifest_serialization() { + let mut manifest = BackupManifest::new("test_snap".to_string(), PathBuf::from("/data")); + manifest.add_file("file1.txt".to_string(), 1024, None); + manifest.add_file("file2.txt".to_string(), 2048, None); + manifest.calculate_ratio(); + + let bytes = manifest.to_bytes().unwrap(); + let decoded = BackupManifest::from_bytes(&bytes).unwrap(); + + assert_eq!(decoded.files.len(), 2); + assert_eq!(decoded.total_size, 3072); + } + + #[test] + fn test_backup_stream_roundtrip() { + let manifest = BackupManifest::new("test".to_string(), PathBuf::from("/")); + let stream = BackupStream::new(SendFormat::CustomJson, manifest, b"test data".to_vec()); + + let bytes = stream.to_bytes().unwrap(); + let decoded = BackupStream::from_bytes(&bytes).unwrap(); + + assert_eq!(decoded.data, b"test data"); + } + + #[test] + fn test_compression_info() { + let mut manifest = BackupManifest::new("test".to_string(), PathBuf::from("/")); + manifest.set_compression(VfsCompression::Zstd, 1000, 420); + + assert!(manifest.compression.is_some()); + let comp = manifest.compression.unwrap(); + assert_eq!(comp.algorithm, "zstd"); + assert_eq!(comp.ratio, 0.42); + } + + #[test] + fn test_encryption_info() { + let mut manifest = BackupManifest::new("test".to_string(), PathBuf::from("/")); + manifest.set_encryption(true, Some("key_hash_abc".to_string())); + + assert!(manifest.encryption.is_some()); + let enc = manifest.encryption.unwrap(); + assert!(enc.enabled); + assert_eq!(enc.algorithm, "AES-256-GCM"); + } +} \ No newline at end of file diff --git a/markbase-core/src/vfs/backup_scheduler.rs b/markbase-core/src/vfs/backup_scheduler.rs new file mode 100644 index 0000000..5331c1c --- /dev/null +++ b/markbase-core/src/vfs/backup_scheduler.rs @@ -0,0 +1,524 @@ +//! Backup Scheduler - Automated snapshot creation +//! +//! Similar to Proxmox Backup Server scheduling + +use std::sync::Arc; +use std::path::PathBuf; +use std::time::{SystemTime, UNIX_EPOCH}; +use chrono::TimeZone; + +use super::{VfsBackend, VfsError, VfsCompression}; + +pub struct BackupScheduleConfig { + pub enabled: bool, + pub interval_hours: u64, + pub max_snapshots: usize, + pub auto_cleanup: bool, + pub compress: VfsCompression, + pub encrypt: bool, + pub include_checksums: bool, +} + +impl Default for BackupScheduleConfig { + fn default() -> Self { + Self { + enabled: true, + interval_hours: 24, + max_snapshots: 7, + auto_cleanup: true, + compress: VfsCompression::Zstd, + encrypt: false, + include_checksums: true, + } + } +} + +pub struct BackupScheduler { + backend: Arc, + root: PathBuf, + config: BackupScheduleConfig, + last_backup: Option, + next_backup: Option, + backup_count: usize, + snapshots: Vec, +} + +impl BackupScheduler { + pub fn new( + backend: Arc, + root: PathBuf, + config: BackupScheduleConfig, + ) -> Self { + Self { + backend, + root, + config, + last_backup: None, + next_backup: None, + backup_count: 0, + snapshots: Vec::new(), + } + } + + pub fn with_defaults(backend: Arc, root: PathBuf) -> Self { + Self::new(backend, root, BackupScheduleConfig::default()) + } + + pub fn start(&mut self) { + self.config.enabled = true; + self.schedule_next(); + } + + pub fn stop(&mut self) { + self.config.enabled = false; + } + + pub fn is_enabled(&self) -> bool { + self.config.enabled + } + + pub fn schedule_next(&mut self) { + let now = current_time_secs(); + let interval_secs = self.config.interval_hours * 3600; + + if let Some(last) = self.last_backup { + self.next_backup = Some(last + interval_secs); + } else { + self.next_backup = Some(now + interval_secs); + } + } + + pub fn should_run(&self) -> bool { + if !self.config.enabled { + return false; + } + + let now = current_time_secs(); + + match self.next_backup { + None => true, + Some(next) => now >= next, + } + } + + pub fn run_backup(&mut self) -> Result { + if !self.config.enabled { + return Err(VfsError::Io("Backup scheduler is disabled".to_string())); + } + + let name = generate_snapshot_name(); + + let snapshot_dir = self.root.join(".snapshots").join(&name); + self.backend.create_dir(&snapshot_dir, 0o755)?; + + self.copy_root_to_snapshot(&snapshot_dir)?; + + if self.config.include_checksums { + self.generate_checksums(&snapshot_dir)?; + } + + if self.config.auto_cleanup { + self.cleanup_old_snapshots()?; + } + + self.last_backup = Some(current_time_secs()); + self.backup_count += 1; + self.snapshots.push(name.clone()); + self.schedule_next(); + + Ok(name) + } + + fn copy_root_to_snapshot(&self, snapshot_dir: &PathBuf) -> Result<(), VfsError> { + let entries = self.backend.read_dir(&self.root)?; + + for entry in entries { + if entry.name == ".snapshots" || entry.name == ".checksums" { + continue; + } + + let src_path = self.root.join(&entry.name); + let dst_path = snapshot_dir.join(&entry.name); + + if entry.stat.is_dir { + self.copy_directory(&src_path, &dst_path)?; + } else { + self.copy_file(&src_path, &dst_path)?; + } + } + + Ok(()) + } + + fn copy_directory(&self, src: &PathBuf, dst: &PathBuf) -> Result<(), VfsError> { + self.backend.create_dir(dst, 0o755)?; + + let entries = self.backend.read_dir(src)?; + for entry in entries { + let src_path = src.join(&entry.name); + let dst_path = dst.join(&entry.name); + + if entry.stat.is_dir { + self.copy_directory(&src_path, &dst_path)?; + } else { + self.copy_file(&src_path, &dst_path)?; + } + } + + Ok(()) + } + + fn copy_file(&self, src: &PathBuf, dst: &PathBuf) -> Result<(), VfsError> { + let mut src_file = self.backend.open_file(src, &super::open_flags::OpenFlags::new().read())?; + let data = src_file.read_all()?; + + let mut dst_file = self.backend.open_file( + dst, + &super::open_flags::OpenFlags::new().write().create().truncate(), + )?; + dst_file.write_all(&data)?; + dst_file.flush()?; + + Ok(()) + } + + fn generate_checksums(&self, snapshot_dir: &PathBuf) -> Result<(), VfsError> { + use super::checksum::create_checksums_for_file; + + let entries = self.backend.read_dir(snapshot_dir)?; + for entry in entries { + if entry.name == ".manifest.json" || entry.name == ".meta" || entry.name == ".checksums" { + continue; + } + + let file_path = snapshot_dir.join(&entry.name); + + if entry.stat.is_dir { + self.generate_checksums_recursive(&file_path, snapshot_dir)?; + } else { + create_checksums_for_file(self.backend.as_ref(), &file_path, snapshot_dir)?; + } + } + + Ok(()) + } + + fn generate_checksums_recursive( + &self, + dir: &PathBuf, + snapshot_dir: &PathBuf, + ) -> Result<(), VfsError> { + use super::checksum::create_checksums_for_file; + + let entries = self.backend.read_dir(dir)?; + for entry in entries { + let file_path = dir.join(&entry.name); + + if entry.stat.is_dir { + self.generate_checksums_recursive(&file_path, snapshot_dir)?; + } else { + create_checksums_for_file(self.backend.as_ref(), &file_path, snapshot_dir)?; + } + } + + Ok(()) + } + + fn cleanup_old_snapshots(&mut self) -> Result<(), VfsError> { + let snapshots_dir = self.root.join(".snapshots"); + + if !self.backend.exists(&snapshots_dir) { + return Ok(()); + } + + let entries = self.backend.read_dir(&snapshots_dir)?; + let mut snapshot_names: Vec = entries + .iter() + .filter(|e| e.stat.is_dir && e.name != ".checksums") + .map(|e| e.name.clone()) + .collect(); + + snapshot_names.sort(); + + while snapshot_names.len() > self.config.max_snapshots { + let oldest = snapshot_names.remove(0); + let oldest_dir = snapshots_dir.join(&oldest); + + self.remove_directory_recursive(&oldest_dir)?; + self.snapshots.retain(|s| s != &oldest); + } + + Ok(()) + } + + fn remove_directory_recursive(&self, dir: &PathBuf) -> Result<(), VfsError> { + if !self.backend.exists(dir) { + return Ok(()); + } + + let entries = self.backend.read_dir(dir)?; + for entry in entries { + let path = dir.join(&entry.name); + + if entry.stat.is_dir { + self.remove_directory_recursive(&path)?; + } else { + self.backend.remove_file(&path)?; + } + } + + self.backend.remove_dir(dir)?; + + Ok(()) + } + + pub fn list_backups(&self) -> Result, VfsError> { + let snapshots_dir = self.root.join(".snapshots"); + + if !self.backend.exists(&snapshots_dir) { + return Ok(Vec::new()); + } + + let entries = self.backend.read_dir(&snapshots_dir)?; + let mut backups = Vec::new(); + + for entry in entries { + if !entry.stat.is_dir || entry.name == ".checksums" { + continue; + } + + let snapshot_dir = snapshots_dir.join(&entry.name); + let info = self.get_backup_info(&entry.name, &snapshot_dir)?; + backups.push(info); + } + + backups.sort_by(|a, b| b.created_at.cmp(&a.created_at)); + + Ok(backups) + } + + fn get_backup_info(&self, name: &str, snapshot_dir: &PathBuf) -> Result { + let manifest_path = snapshot_dir.join(".manifest.json"); + + let created_at = if self.backend.exists(&manifest_path) { + let mut file = self.backend.open_file(&manifest_path, &super::open_flags::OpenFlags::new().read())?; + let data = file.read_all()?; + + if let Ok(manifest) = super::backup_manifest::BackupManifest::from_bytes(&data) { + manifest.created_at + } else { + current_time_secs() + } + } else { + current_time_secs() + }; + + let size = self.calculate_snapshot_size(snapshot_dir)?; + + Ok(BackupInfo { + name: name.to_string(), + created_at, + size, + checksum_verified: false, + compressed: self.config.compress != VfsCompression::None, + encrypted: self.config.encrypt, + }) + } + + fn calculate_snapshot_size(&self, dir: &PathBuf) -> Result { + let mut total_size = 0u64; + + let entries = self.backend.read_dir(dir)?; + for entry in entries { + let path = dir.join(&entry.name); + + if entry.stat.is_dir { + total_size += self.calculate_snapshot_size(&path)?; + } else { + total_size += entry.stat.size; + } + } + + Ok(total_size) + } + + pub fn get_stats(&self) -> BackupStats { + BackupStats { + enabled: self.config.enabled, + backup_count: self.backup_count, + last_backup: self.last_backup, + next_backup: self.next_backup, + interval_hours: self.config.interval_hours, + max_snapshots: self.config.max_snapshots, + } + } +} + +fn generate_snapshot_name() -> String { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + let datetime = chrono::Utc.timestamp_opt(now as i64, 0) + .single() + .map(|dt| dt.format("%Y-%m-%d_%H%M%S").to_string()) + .unwrap_or_else(|| format!("{}", now)); + + format!("snap_{}", datetime) +} + +fn current_time_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) +} + +#[derive(Debug, Clone)] +pub struct BackupInfo { + pub name: String, + pub created_at: u64, + pub size: u64, + pub checksum_verified: bool, + pub compressed: bool, + pub encrypted: bool, +} + +impl BackupInfo { + pub fn format_created(&self) -> String { + chrono::Utc.timestamp_opt(self.created_at as i64, 0) + .single() + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()) + .unwrap_or_else(|| format!("{} seconds since epoch", self.created_at)) + } + + pub fn format_size(&self) -> String { + if self.size < 1024 { + format!("{} B", self.size) + } else if self.size < 1024 * 1024 { + format!("{:.2} KB", self.size as f64 / 1024.0) + } else if self.size < 1024 * 1024 * 1024 { + format!("{:.2} MB", self.size as f64 / (1024.0 * 1024.0)) + } else { + format!("{:.2} GB", self.size as f64 / (1024.0 * 1024.0 * 1024.0)) + } + } +} + +#[derive(Debug, Clone)] +pub struct BackupStats { + pub enabled: bool, + pub backup_count: usize, + pub last_backup: Option, + pub next_backup: Option, + pub interval_hours: u64, + pub max_snapshots: usize, +} + +impl BackupStats { + pub fn next_backup_in_secs(&self) -> Option { + if !self.enabled { + return None; + } + + let now = current_time_secs(); + let next = self.next_backup?; + + if next > now { + Some(next - now) + } else { + Some(0) + } + } + + pub fn format_last_backup(&self) -> String { + match self.last_backup { + None => "Never".to_string(), + Some(t) => chrono::Utc.timestamp_opt(t as i64, 0) + .single() + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()) + .unwrap_or_else(|| format!("{} seconds since epoch", t)), + } + } + + pub fn format_next_backup(&self) -> String { + match self.next_backup { + None => "Not scheduled".to_string(), + Some(t) => chrono::Utc.timestamp_opt(t as i64, 0) + .single() + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()) + .unwrap_or_else(|| format!("{} seconds since epoch", t)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = BackupScheduleConfig::default(); + assert!(config.enabled); + assert_eq!(config.interval_hours, 24); + assert_eq!(config.max_snapshots, 7); + assert!(config.auto_cleanup); + } + + #[test] + fn test_scheduler_creation() { + let backend: Arc = Arc::new(super::super::local_fs::LocalFs::new()); + let scheduler = BackupScheduler::with_defaults(backend, PathBuf::from("/tmp")); + + assert!(scheduler.is_enabled()); + } + + #[test] + fn test_schedule_next() { + let backend: Arc = Arc::new(super::super::local_fs::LocalFs::new()); + let mut scheduler = BackupScheduler::with_defaults(backend, PathBuf::from("/tmp")); + + scheduler.schedule_next(); + assert!(scheduler.next_backup.is_some()); + } + + #[test] + fn test_backup_info_format() { + let info = BackupInfo { + name: "snap_test".to_string(), + created_at: 1719234567, + size: 1536, + checksum_verified: true, + compressed: true, + encrypted: false, + }; + + assert!(info.format_created().contains("2024")); + assert!(info.format_size().contains("KB")); + } + + #[test] + fn test_backup_stats() { + let now = current_time_secs(); + let stats = BackupStats { + enabled: true, + backup_count: 5, + last_backup: Some(now - 3600), + next_backup: Some(now + 3600), + interval_hours: 24, + max_snapshots: 7, + }; + + assert!(stats.enabled); + assert_eq!(stats.backup_count, 5); + assert!(stats.next_backup_in_secs().unwrap_or(0) > 0); + } + + #[test] + fn test_snapshot_name_generation() { + let name = generate_snapshot_name(); + assert!(name.starts_with("snap_")); + assert!(name.len() > "snap_".len()); + } +} \ No newline at end of file diff --git a/markbase-core/src/vfs/mod.rs b/markbase-core/src/vfs/mod.rs index 8737460..aba732f 100644 --- a/markbase-core/src/vfs/mod.rs +++ b/markbase-core/src/vfs/mod.rs @@ -1,3 +1,5 @@ +pub mod backup_manifest; +pub mod backup_scheduler; pub mod cache; pub mod checksum; pub mod checksum_file; @@ -8,8 +10,10 @@ pub mod local_fs; pub mod open_flags; pub mod raid; pub mod scrub_scheduler; +pub mod send_receive; pub mod s3_fs; pub mod smb_fs; +pub mod storage_stats; #[cfg(feature = "smb-server")] pub mod smb_server_backend; pub mod util; diff --git a/markbase-core/src/vfs/raid.rs b/markbase-core/src/vfs/raid.rs index 2ee60a7..195fea0 100644 --- a/markbase-core/src/vfs/raid.rs +++ b/markbase-core/src/vfs/raid.rs @@ -47,6 +47,14 @@ impl VfsRaidBackend { } } + pub fn level(&self) -> VfsRaidLevel { + self.config.level.clone() + } + + pub fn backends(&self) -> &[Box] { + &self.backends + } + fn calculate_parity_p(data: &[u8]) -> Vec { data.iter().fold(vec![0u8; data.len()], |mut p, byte| { for i in 0..p.len() { diff --git a/markbase-core/src/vfs/send_receive.rs b/markbase-core/src/vfs/send_receive.rs new file mode 100644 index 0000000..cc91845 --- /dev/null +++ b/markbase-core/src/vfs/send_receive.rs @@ -0,0 +1,444 @@ +//! Send/Receive API - Snapshot replication +//! +//! Reference: ZFS send/receive, Proxmox Backup Server +//! Supports incremental backups and multiple formats + +use std::path::PathBuf; +use std::collections::HashSet; + +use super::{VfsBackend, VfsError, VfsCompression}; +use super::backup_manifest::{BackupManifest, BackupStream, SendFormat, MANIFEST_FILE}; +use super::checksum::{VfsChecksumFile, create_checksums_for_file, scrub_file}; +use super::dedup::{DedupStore, DedupManifest}; + +pub struct SendOptions { + pub format: SendFormat, + pub incremental_from: Option, + pub compress: VfsCompression, + pub encrypt: bool, + pub include_checksums: bool, +} + +impl Default for SendOptions { + fn default() -> Self { + Self { + format: SendFormat::CustomJson, + incremental_from: None, + compress: VfsCompression::Zstd, + encrypt: false, + include_checksums: true, + } + } +} + +pub struct ReceiveOptions { + pub format: SendFormat, + pub verify_checksums: bool, + pub target_name: Option, +} + +impl Default for ReceiveOptions { + fn default() -> Self { + Self { + format: SendFormat::CustomJson, + verify_checksums: true, + target_name: None, + } + } +} + +pub fn send_snapshot( + backend: &dyn VfsBackend, + snapshot_name: &str, + root: &PathBuf, + options: SendOptions, +) -> Result { + let snapshot_dir = root.join(".snapshots").join(snapshot_name); + + if !backend.exists(&snapshot_dir) { + return Err(VfsError::NotFound(format!("Snapshot {} not found", snapshot_name))); + } + + let mut manifest = BackupManifest::new(snapshot_name.to_string(), root.clone()); + + let entries = backend.read_dir(&snapshot_dir)?; + for entry in entries { + if entry.name == MANIFEST_FILE || entry.name == ".meta" { + continue; + } + + let file_path = snapshot_dir.join(&entry.name); + + if entry.stat.is_dir { + collect_directory_files(backend, &file_path, &snapshot_dir, &mut manifest, &options)?; + } else { + add_file_to_manifest(backend, &file_path, &snapshot_dir, &mut manifest, &options)?; + } + } + + manifest.calculate_ratio(); + + let payload = if options.incremental_from.is_some() { + let from_snap = options.incremental_from.unwrap(); + send_incremental_payload(backend, &from_snap, snapshot_name, root)? + } else { + collect_snapshot_data(backend, &snapshot_dir)? + }; + + Ok(BackupStream::new(options.format, manifest, payload)) +} + +pub fn receive_snapshot( + backend: &dyn VfsBackend, + stream: &BackupStream, + root: &PathBuf, + options: ReceiveOptions, +) -> Result { + let snapshot_name = options.target_name.clone() + .unwrap_or_else(|| stream.manifest.snapshot_name.clone()); + + let snapshot_dir = root.join(".snapshots").join(&snapshot_name); + + if backend.exists(&snapshot_dir) { + return Err(VfsError::Io(format!("Snapshot {} already exists", snapshot_name))); + } + + backend.create_dir(&snapshot_dir, 0o755)?; + + restore_snapshot_data(backend, &stream.data, &snapshot_dir)?; + + stream.manifest.save(&snapshot_dir).map_err(|e| VfsError::Io(e))?; + + if options.verify_checksums { + verify_snapshot_checksums(backend, &snapshot_dir, root)?; + } + + Ok(snapshot_name) +} + +pub fn send_incremental( + backend: &dyn VfsBackend, + from_snapshot: &str, + to_snapshot: &str, + root: &PathBuf, + options: SendOptions, +) -> Result { + let mut opts = options; + opts.incremental_from = Some(from_snapshot.to_string()); + + send_snapshot(backend, to_snapshot, root, opts) +} + +fn collect_directory_files( + backend: &dyn VfsBackend, + dir: &PathBuf, + snapshot_dir: &PathBuf, + manifest: &mut BackupManifest, + options: &SendOptions, +) -> Result<(), VfsError> { + let entries = backend.read_dir(dir)?; + + for entry in entries { + let path = dir.join(&entry.name); + + if entry.stat.is_dir { + collect_directory_files(backend, &path, snapshot_dir, manifest, options)?; + } else { + add_file_to_manifest(backend, &path, snapshot_dir, manifest, options)?; + } + } + + Ok(()) +} + +fn add_file_to_manifest( + backend: &dyn VfsBackend, + file_path: &PathBuf, + snapshot_dir: &PathBuf, + manifest: &mut BackupManifest, + options: &SendOptions, +) -> Result<(), VfsError> { + let stat = backend.stat(file_path)?; + + let relative_path = file_path.strip_prefix(snapshot_dir) + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_else(|_| file_path.to_string_lossy().to_string()); + + let checksums = if options.include_checksums { + let checksum_dir = snapshot_dir.join(".checksums"); + let checksum_file = checksum_dir.join(&relative_path).with_extension(".checksums"); + + if backend.exists(&checksum_file) { + load_checksum_file(backend, &checksum_file)? + } else { + None + } + } else { + None + }; + + manifest.add_file(relative_path, stat.size, checksums); + + Ok(()) +} + +fn load_checksum_file( + backend: &dyn VfsBackend, + checksum_path: &PathBuf, +) -> Result, VfsError> { + let mut file = backend.open_file(checksum_path, &super::open_flags::OpenFlags::new().read())?; + let data = file.read_all()?; + + if data.is_empty() { + return Ok(None); + } + + VfsChecksumFile::from_bytes(&data).map(Some) +} + +fn collect_snapshot_data( + backend: &dyn VfsBackend, + snapshot_dir: &PathBuf, +) -> Result, VfsError> { + let mut buffer = Vec::new(); + + let entries = backend.read_dir(snapshot_dir)?; + for entry in entries { + if entry.name == MANIFEST_FILE || entry.name == ".meta" || entry.name == ".checksums" { + continue; + } + + let file_path = snapshot_dir.join(&entry.name); + + if entry.stat.is_dir { + collect_directory_data(backend, &file_path, &mut buffer)?; + } else { + collect_file_data(backend, &file_path, &mut buffer)?; + } + } + + Ok(buffer) +} + +fn collect_directory_data( + backend: &dyn VfsBackend, + dir: &PathBuf, + buffer: &mut Vec, +) -> Result<(), VfsError> { + let entries = backend.read_dir(dir)?; + + for entry in entries { + let path = dir.join(&entry.name); + + if entry.stat.is_dir { + collect_directory_data(backend, &path, buffer)?; + } else { + collect_file_data(backend, &path, buffer)?; + } + } + + Ok(()) +} + +fn collect_file_data( + backend: &dyn VfsBackend, + file_path: &PathBuf, + buffer: &mut Vec, +) -> Result<(), VfsError> { + let mut file = backend.open_file(file_path, &super::open_flags::OpenFlags::new().read())?; + let data = file.read_all()?; + + let path_str = file_path.to_string_lossy(); + let path_bytes = path_str.as_bytes(); + buffer.extend_from_slice(&(path_bytes.len() as u64).to_be_bytes()); + buffer.extend_from_slice(path_bytes); + buffer.extend_from_slice(&(data.len() as u64).to_be_bytes()); + buffer.extend_from_slice(&data); + + Ok(()) +} + +fn restore_snapshot_data( + backend: &dyn VfsBackend, + data: &[u8], + snapshot_dir: &PathBuf, +) -> Result<(), VfsError> { + let mut offset = 0; + + while offset < data.len() { + if data.len() < offset + 8 { + break; + } + + let path_len = u64::from_be_bytes(data[offset..offset+8].try_into().map_err(|_| VfsError::Io("Invalid path length".to_string()))?) as usize; + offset += 8; + + if data.len() < offset + path_len { + return Err(VfsError::Io("Truncated path".to_string())); + } + + let path_str = String::from_utf8_lossy(&data[offset..offset+path_len]); + let relative_path = PathBuf::from(path_str.as_ref()); + offset += path_len; + + if data.len() < offset + 8 { + return Err(VfsError::Io("Truncated file length".to_string())); + } + + let file_len = u64::from_be_bytes(data[offset..offset+8].try_into().map_err(|_| VfsError::Io("Invalid file length".to_string()))?) as usize; + offset += 8; + + if data.len() < offset + file_len { + return Err(VfsError::Io("Truncated file data".to_string())); + } + + let file_data = &data[offset..offset+file_len]; + offset += file_len; + + let file_path = snapshot_dir.join(&relative_path); + + let parent = file_path.parent() + .ok_or_else(|| VfsError::Io("Invalid file path".to_string()))?; + + if !backend.exists(parent) { + backend.create_dir_all(parent, 0o755)?; + } + + let mut file = backend.open_file( + &file_path, + &super::open_flags::OpenFlags::new().write().create().truncate(), + )?; + file.write_all(file_data)?; + file.flush()?; + } + + Ok(()) +} + +fn send_incremental_payload( + backend: &dyn VfsBackend, + from_snap: &str, + to_snap: &str, + root: &PathBuf, +) -> Result, VfsError> { + let from_dir = root.join(".snapshots").join(from_snap); + let to_dir = root.join(".snapshots").join(to_snap); + + if !backend.exists(&from_dir) || !backend.exists(&to_dir) { + return Err(VfsError::NotFound("Source snapshot not found".to_string())); + } + + let from_files = collect_file_set(backend, &from_dir)?; + let to_files = collect_file_set(backend, &to_dir)?; + + let mut buffer = Vec::new(); + + for (relative, to_size) in &to_files { + let changed = !from_files.contains(&(relative.clone(), *to_size)); + + if changed { + let to_path = to_dir.join(relative); + collect_file_data(backend, &to_path, &mut buffer)?; + } + } + + Ok(buffer) +} + +fn collect_file_set( + backend: &dyn VfsBackend, + dir: &PathBuf, +) -> Result, VfsError> { + let mut files = HashSet::new(); + + let entries = backend.read_dir(dir)?; + for entry in entries { + let path = dir.join(&entry.name); + + if entry.stat.is_dir { + let sub_files = collect_file_set(backend, &path)?; + files.extend(sub_files); + } else { + let relative = path.strip_prefix(dir) + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_default(); + files.insert((relative, entry.stat.size)); + } + } + + Ok(files) +} + +fn verify_snapshot_checksums( + backend: &dyn VfsBackend, + snapshot_dir: &PathBuf, + root: &PathBuf, +) -> Result<(), VfsError> { + let checksum_dir = snapshot_dir.join(".checksums"); + + if !backend.exists(&checksum_dir) { + return Ok(()); + } + + let entries = backend.read_dir(snapshot_dir)?; + for entry in entries { + if entry.stat.is_dir { + continue; + } + + let file_path = snapshot_dir.join(&entry.name); + let result = scrub_file(backend, &file_path, root, false)?; + + if !result.is_clean() { + return Err(VfsError::Io(format!( + "Checksum verification failed for {}: {} corrupted blocks", + entry.name, + result.corrupted_blocks.len() + ))); + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_send_options_default() { + let opts = SendOptions::default(); + assert_eq!(opts.format, SendFormat::CustomJson); + assert!(opts.incremental_from.is_none()); + assert_eq!(opts.compress, VfsCompression::Zstd); + assert!(!opts.encrypt); + assert!(opts.include_checksums); + } + + #[test] + fn test_receive_options_default() { + let opts = ReceiveOptions::default(); + assert_eq!(opts.format, SendFormat::CustomJson); + assert!(opts.verify_checksums); + assert!(opts.target_name.is_none()); + } + + #[test] + fn test_manifest_roundtrip() { + let mut manifest = BackupManifest::new("test_snap".to_string(), PathBuf::from("/data")); + manifest.add_file("file1.txt".to_string(), 1000, None); + manifest.add_file("dir/file2.txt".to_string(), 2000, None); + manifest.calculate_ratio(); + + assert_eq!(manifest.files.len(), 2); + assert_eq!(manifest.total_size, 3000); + } + + #[test] + fn test_stream_format() { + let manifest = BackupManifest::new("test".to_string(), PathBuf::from("/")); + let stream = BackupStream::new(SendFormat::CustomJson, manifest, vec![]); + + assert_eq!(stream.format, SendFormat::CustomJson); + } +} \ No newline at end of file diff --git a/markbase-core/src/vfs/storage_stats.rs b/markbase-core/src/vfs/storage_stats.rs new file mode 100644 index 0000000..7408e36 --- /dev/null +++ b/markbase-core/src/vfs/storage_stats.rs @@ -0,0 +1,319 @@ +//! Storage Stats - Metrics for dashboard display +//! +//! Provides storage overview, dedup, compression, RAID stats + +use std::path::PathBuf; + +use super::{VfsBackend, VfsError, VfsStat, VfsCompression, VfsRaidLevel}; +use super::dedup::DedupStats; +use super::raid::VfsRaidBackend; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct StorageStats { + pub total_size: u64, + pub used_size: u64, + pub free_size: u64, + pub file_count: u64, + pub dir_count: u64, + pub dedup_ratio: f64, + pub compression_ratio: f64, + pub encryption_enabled: bool, +} + +impl StorageStats { + pub fn empty() -> Self { + Self { + total_size: 0, + used_size: 0, + free_size: 0, + file_count: 0, + dir_count: 0, + dedup_ratio: 1.0, + compression_ratio: 1.0, + encryption_enabled: false, + } + } + + pub fn format_total(&self) -> String { + format_size(self.total_size) + } + + pub fn format_used(&self) -> String { + format_size(self.used_size) + } + + pub fn format_free(&self) -> String { + format_size(self.free_size) + } + + pub fn usage_percent(&self) -> f64 { + if self.total_size == 0 { + return 0.0; + } + (self.used_size as f64 / self.total_size as f64) * 100.0 + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DedupStatsResponse { + pub unique_blocks: u64, + pub total_blocks: u64, + pub stored_bytes: u64, + pub saved_bytes: u64, + pub dedup_ratio: f64, +} + +impl From for DedupStatsResponse { + fn from(stats: DedupStats) -> Self { + let saved = stats.total_blocks * 4096 - stats.stored_bytes; + Self { + unique_blocks: stats.unique_blocks, + total_blocks: stats.total_blocks, + stored_bytes: stats.stored_bytes, + saved_bytes: saved, + dedup_ratio: if stats.total_blocks > 0 { + stats.stored_bytes as f64 / (stats.total_blocks * 4096) as f64 + } else { + 1.0 + }, + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CompressionStatsResponse { + pub algorithm: String, + pub original_size: u64, + pub compressed_size: u64, + pub compression_ratio: f64, +} + +impl CompressionStatsResponse { + pub fn from_compression(compression: VfsCompression, original: u64, compressed: u64) -> Self { + let algorithm = match compression { + VfsCompression::None => "none", + VfsCompression::Lz4 => "lz4", + VfsCompression::Zstd => "zstd", + }; + + let ratio = if original > 0 { + compressed as f64 / original as f64 + } else { + 1.0 + }; + + Self { + algorithm: algorithm.to_string(), + original_size: original, + compressed_size: compressed, + compression_ratio: ratio, + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct RaidStatsResponse { + pub level: String, + pub disk_count: usize, + pub data_disks: usize, + pub parity_disks: usize, + pub healthy: bool, + pub rebuild_in_progress: bool, +} + +impl RaidStatsResponse { + pub fn from_raid(raid: &VfsRaidBackend) -> Self { + let level = match raid.level() { + VfsRaidLevel::Single => "single", + VfsRaidLevel::RaidZ1 => "raidz1", + VfsRaidLevel::RaidZ2 => "raidz2", + VfsRaidLevel::RaidZ3 => "raidz3", + }; + + Self { + level: level.to_string(), + disk_count: raid.backends().len(), + data_disks: raid.data_disks(), + parity_disks: raid.parity_disks(), + healthy: true, + rebuild_in_progress: false, + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ScrubStatsResponse { + pub last_scrub_time: Option, + pub next_scrub_time: Option, + pub scrub_count: usize, + pub corrupted_blocks_found: u64, + pub blocks_verified: u64, + pub running: bool, +} + +impl ScrubStatsResponse { + pub fn empty() -> Self { + Self { + last_scrub_time: None, + next_scrub_time: None, + scrub_count: 0, + corrupted_blocks_found: 0, + blocks_verified: 0, + running: false, + } + } + + pub fn from_scheduler(scheduler: &super::scrub_scheduler::ScrubScheduler) -> Self { + let stats = scheduler.get_stats(); + Self { + last_scrub_time: stats.last_scrub_time, + next_scrub_time: stats.next_scrub_time, + scrub_count: stats.scrub_count, + corrupted_blocks_found: 0, + blocks_verified: 0, + running: stats.running, + } + } +} + +pub fn calculate_storage_stats( + backend: &dyn VfsBackend, + root: &PathBuf, +) -> Result { + let mut stats = StorageStats::empty(); + + calculate_recursive(backend, root, &mut stats)?; + + Ok(stats) +} + +fn calculate_recursive( + backend: &dyn VfsBackend, + path: &PathBuf, + stats: &mut StorageStats, +) -> Result<(), VfsError> { + let entries = backend.read_dir(path)?; + + for entry in entries { + if entry.name == ".snapshots" || entry.name == ".checksums" { + continue; + } + + let entry_path = path.join(&entry.name); + + if entry.stat.is_dir { + stats.dir_count += 1; + calculate_recursive(backend, &entry_path, stats)?; + } else { + stats.file_count += 1; + stats.used_size += entry.stat.size; + } + } + + Ok(()) +} + +fn format_size(size: u64) -> String { + if size < 1024 { + format!("{} B", size) + } else if size < 1024 * 1024 { + format!("{:.2} KB", size as f64 / 1024.0) + } else if size < 1024 * 1024 * 1024 { + format!("{:.2} MB", size as f64 / (1024.0 * 1024.0)) + } else if size < 1024 * 1024 * 1024 * 1024 { + format!("{:.2} GB", size as f64 / (1024.0 * 1024.0 * 1024.0)) + } else { + format!("{:.2} TB", size as f64 / (1024.0 * 1024.0 * 1024.0 * 1024.0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_storage_stats_empty() { + let stats = StorageStats::empty(); + assert_eq!(stats.total_size, 0); + assert_eq!(stats.used_size, 0); + assert_eq!(stats.dedup_ratio, 1.0); + } + + #[test] + fn test_format_size_bytes() { + assert_eq!(format_size(512), "512 B"); + } + + #[test] + fn test_format_size_kb() { + assert_eq!(format_size(1536), "1.50 KB"); + } + + #[test] + fn test_format_size_mb() { + assert_eq!(format_size(1536 * 1024), "1.50 MB"); + } + + #[test] + fn test_format_size_gb() { + assert_eq!(format_size(1536 * 1024 * 1024), "1.50 GB"); + } + + #[test] + fn test_usage_percent() { + let stats = StorageStats { + total_size: 1000, + used_size: 250, + free_size: 750, + file_count: 10, + dir_count: 2, + dedup_ratio: 1.0, + compression_ratio: 1.0, + encryption_enabled: false, + }; + + assert_eq!(stats.usage_percent(), 25.0); + } + + #[test] + fn test_compression_stats() { + let stats = CompressionStatsResponse::from_compression( + VfsCompression::Zstd, + 1000, + 420, + ); + + assert_eq!(stats.algorithm, "zstd"); + assert_eq!(stats.compression_ratio, 0.42); + } + + #[test] + fn test_raid_stats_single() { + let backend: Box = Box::new(super::super::local_fs::LocalFs::new()); + let config = super::super::VfsRaidConfig { + level: VfsRaidLevel::Single, + stripe_size: 4096, + disk_paths: vec![PathBuf::from("/tmp")], + }; + let raid = VfsRaidBackend::new(config, vec![backend]).unwrap(); + + let stats = RaidStatsResponse::from_raid(&raid); + assert_eq!(stats.level, "single"); + assert_eq!(stats.disk_count, 1); + assert_eq!(stats.parity_disks, 0); + } + + #[test] + fn test_dedup_stats_conversion() { + let dedup = DedupStats { + total_blocks: 100, + total_refs: 200, + unique_blocks: 50, + stored_bytes: 200 * 1024, + }; + + let response = DedupStatsResponse::from(dedup); + assert_eq!(response.unique_blocks, 50); + assert_eq!(response.total_blocks, 100); + } +} \ No newline at end of file