# DedupFs + S3Vfs Combination Design **Date**: 2026-06-25 **Status**: Design proposal **Goal**: Distributed deduplication storage via MinIO/S3 backend --- ## Executive Summary ### Current State **DedupStore**(`dedup.rs`, 224 行): - 基于**本地文件系统**的 dedup 存储 - SHA-256 块哈希 + 引用计数 - 块存储到本地目录(`store_path/.dedup/`) **问题**: - ❌ 无法跨节点共享 dedup 块 - ❌ 无分布式容错能力 - ❌ 单节点存储限制 ### Proposed Solution **DedupS3Store**: - 块存储到 **MinIO/S3** 对象(跨节点共享) - 引用计数存储到 S3 object metadata - Manifest 存储到 S3 对象(JSON 格式) **优势**: - ✅ 跨节点 dedup 共享(MinIO 分布式) - ✅ 自动容错(MinIO erasure coding) - ✅ 无单节点限制(MinIO 可扩展) - ✅ 与现有 S3Vfs 集成(无需新 HTTP API) --- ## Architecture ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ MarkBase Node A │ │ ├── DedupS3Store │ │ │ ├── store_block() → S3 PUT │ │ │ ├── get_block() → S3 GET │ │ │ └── dedup_file() → 分块 + S3 PUT + manifest │ │ └───────────────────────────────────────────────────────────────────────┘ │ ↓ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ MinIO Cluster (S3-compatible) │ │ ├── Bucket: markbase-dedup │ │ │ ├── Objects: (dedup 块) │ │ │ ├── Metadata: x-amz-meta-ref-count (引用计数) │ │ │ └── Manifests: manifests/.json │ │ │ │ │ ├── Erasure Coding: EC:2 (自动容错) │ │ ├── Replication: Node A → Node B (DR) │ │ └─────────────────────────────────────────────────────────────────────┘ │ ↓ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ MarkBase Node B │ │ ├── DedupS3Store │ │ │ ├── get_block() → S3 GET (共享 Node A 的块) │ │ │ └── restore_file() → S3 GET manifest + S3 GET blocks │ │ └─────────────────────────────────────────────────────────────────────┘ ``` --- ## Implementation Design ### DedupS3Store Struct ```rust pub struct DedupS3Store { s3vfs: S3Vfs, // S3 backend bucket: String, // Bucket name (markbase-dedup) block_prefix: String, // Object key prefix (blocks/) manifest_prefix: String, // Manifest prefix (manifests/) config: VfsDedupConfig, // block_size, min_file_size } pub struct DedupManifest { original_size: usize, block_hashes: Vec, dedup_ratio: f64, file_id: String, // UUID for manifest storage } ``` ### Core Methods | Method | Current (LocalFs) | Proposed (S3Vfs) | |--------|------------------|------------------| | `store_block(data)` | `std::fs::write(store_path/hash, data)` | `S3Vfs.put_object(blocks/hash, data)` | | `get_block(hash)` | `std::fs::read(store_path/hash)` | `S3Vfs.get_object(blocks/hash)` | | `increment_ref(hash)` | `std::fs::write(hash.ref, count)` | `S3Vfs.put_object(blocks/hash, data) + metadata update` | | `decrement_ref(hash)` | `std::fs::write/remove` | `S3Vfs.delete_object + metadata check` | | `dedup_file(source)` | Local file read + block store | Local file read + S3 PUT blocks | | `restore_file(manifest)` | Local file write + block read | Local file write + S3 GET blocks | | `get_ref_count(hash)` | `std::fs::read(hash.ref)` | `S3Vfs.head_object(blocks/hash) → metadata` | --- ## S3 Object Layout ``` Bucket: markbase-dedup ├── blocks/ │ ├── # Dedup 块(4KB) │ │ └── Metadata: x-amz-meta-ref-count: 5 │ ├── │ │ └── Metadata: x-amz-meta-ref-count: 2 │ └── ... │ ├── manifests/ │ ├── .json # Manifest JSON │ │ └── Content: {"original_size": 1024, "block_hashes": [...], ...} │ ├── .json │ └── ... │ └── stats.json # DedupStats(可选) ``` --- ## Reference Count Management ### Challenge S3 对象不支持 atomic increment/decrement 操作。 ### Solution 1: Metadata Update (推荐 ⭐⭐⭐⭐⭐) **流程**: ```rust fn increment_ref(&self, hash: &str) -> Result<(), VfsError> { // 1. GET current metadata let head = self.s3vfs.head_object(&format!("blocks/{}", hash))?; let current_ref = head.metadata.get("x-amz-meta-ref-count") .and_then(|v| v.parse::().ok()) .unwrap_or(0); // 2. PUT with updated metadata let block_data = self.s3vfs.get_object(&format!("blocks/{}", hash))?; self.s3vfs.put_object_with_metadata( &format!("blocks/{}", hash), &block_data, [("x-amz-meta-ref-count", (current_ref + 1).to_string())] )?; Ok(()) } ``` **优势**: - ✅ 简单实现 - ✅ 与 S3 标准兼容 - ⚠️ 需要两次请求(GET + PUT) **劣势**: - ⚠️ 非原子操作(并发问题) - ⚠️ 需要读取块数据(PUT 需要 body) --- ### Solution 2: Separate Ref Count Object **流程**: ```rust fn increment_ref(&self, hash: &str) -> Result<(), VfsError> { // 1. GET ref count object let ref_key = format!("refs/{}/count", hash); let current = self.s3vfs.get_object(&ref_key) .and_then(|data| data.parse::()) .unwrap_or(0); // 2. PUT updated ref count self.s3vfs.put_object(&ref_key, (current + 1).to_string())?; Ok(()) } ``` **优势**: - ✅ 无需读取块数据 - ✅ 更小的对象(仅数字) **劣势**: - ⚠️ 需要额外对象存储 - ⚠️ 非原子操作(并发问题) --- ### Solution 3: MinIO Extended API (企业版) MinIO 企业版提供 `mc admin bucket policy` 和 object locking API。 **优势**: - ✅ 可能提供 atomic operation **劣势**: - ⚠️ 仅 MinIO 企业版 - ⚠️ 需要研究具体 API --- ## Concurrency Problem ### Scenario Node A 和 Node B 同时 dedup 相同文件: 1. Node A: `increment_ref(hash-abc)` → GET count=2 → PUT count=3 2. Node B: `increment_ref(hash-abc)` → GET count=2 → PUT count=3 3. 结果:count=3(错误,应为 count=4) ### Solution 1: Optimistic Locking 使用 S3 versioning 检测冲突: ```rust fn increment_ref(&self, hash: &str) -> Result<(), VfsError> { loop { // 1. GET current version + metadata let (version_id, current_ref) = self.get_ref_with_version(hash)?; // 2. PUT with version check let result = self.s3vfs.put_object_if_version( &format!("blocks/{}", hash), block_data, (current_ref + 1), version_id // Only succeed if version unchanged ); if result.is_ok() { break; } // Retry if version mismatch } Ok(()) } ``` **要求**:MinIO versioning enabled。 --- ### Solution 2: Distributed Lock Service 使用外部分布式锁(如 Redis/Zookeeper): ```rust fn increment_ref(&self, hash: &str) -> Result<(), VfsError> { // 1. Acquire distributed lock let lock = self.lock_service.acquire(&format!("lock:{}", hash))?; // 2. Increment ref count self.update_ref_count(hash)?; // 3. Release lock lock.release(); Ok(()) } ``` **劣势**:需要额外服务(Redis)。 --- ### Solution 3: Accept Non-Atomic (简化方案) 对于 MarkBase Lightweight 定位: - ⚠️ 接受非原子操作风险 - ⚠️ 偶尔 ref count 不准确(不影响数据完整性) - ⚠️ 定期修复(scrub job) **推荐**:Phase 1 使用 Solution 1(Metadata Update),Phase 2 研究 MinIO versioning。 --- ## Implementation Phases | Phase | Task | Code Lines | Priority | Risk | |-------|------|------------|----------|------| | **Phase 1** | DedupS3Store struct + basic I/O | ~300 | P0 | Medium | | **Phase 2** | Reference count metadata | ~100 | P0 | Medium | | **Phase 3** | Manifest storage to S3 | ~50 | P1 | Low | | **Phase 4** | CLI integration | ~100 | P1 | Low | | **Phase 5** | Async version (DedupAsyncS3Store) | ~200 | P2 | High | | **Phase 6** | Concurrency fix (versioning) | ~150 | P2 | High | | **Phase 7** | Performance benchmark | ~100 | P2 | Low | | **Total** | | **~1000** | | | --- ## DedupS3Store Implementation (Phase 1 Draft) ```rust use super::s3_fs::S3Vfs; use super::{VfsDedupConfig, VfsError}; use sha2::{Sha256, Digest}; use std::path::Path; pub struct DedupS3Store { s3vfs: S3Vfs, bucket: String, block_prefix: String, manifest_prefix: String, config: VfsDedupConfig, } impl DedupS3Store { pub fn new( endpoint: &str, region: &str, bucket: &str, access_key: &str, secret_key: &str, config: VfsDedupConfig, ) -> Result { let s3vfs = S3Vfs::new(endpoint, region, bucket, access_key, secret_key)?; Ok(Self { s3vfs, bucket: bucket.to_string(), block_prefix: "blocks/".to_string(), manifest_prefix: "manifests/".to_string(), config, }) } pub fn store_block(&self, data: &[u8]) -> Result { if data.len() > self.config.block_size { return Err(VfsError::Io(format!("Block size exceeds limit"))); } let hash = Self::hash_block(data); let key = format!("{}{}", self.block_prefix, hash); // Check if block exists if !self.s3vfs.object_exists(&key)? { // PUT with initial ref count = 1 self.s3vfs.put_object_with_metadata( &key, data, [("x-amz-meta-ref-count", "1")] )?; } else { // Increment ref count self.increment_ref(&hash)?; } Ok(hash) } pub fn get_block(&self, hash: &str) -> Result, VfsError> { let key = format!("{}{}", self.block_prefix, hash); self.s3vfs.get_object(&key) } pub fn increment_ref(&self, hash: &str) -> Result<(), VfsError> { let key = format!("{}{}", self.block_prefix, hash); let head = self.s3vfs.head_object(&key)?; let current_ref = head.metadata .get("x-amz-meta-ref-count") .and_then(|v| v.parse::().ok()) .unwrap_or(1); // Need to GET block data + PUT with new metadata let block_data = self.get_block(hash)?; self.s3vfs.put_object_with_metadata( &key, &block_data, [("x-amz-meta-ref-count", (current_ref + 1).to_string())] )?; Ok(()) } pub fn dedup_file(&self, source: &Path) -> Result { let mut file = std::fs::File::open(source)?; let mut manifest = DedupManifest::new(); let mut buffer = vec![0u8; self.config.block_size]; loop { let n = file.read(&mut buffer)?; if n == 0 { break; } manifest.original_size += n; let hash = self.store_block(&buffer[..n])?; manifest.block_hashes.push(hash); } // Store manifest to S3 let file_id = uuid::Uuid::new_v4().to_string(); manifest.file_id = file_id; let manifest_key = format!("{}{}.json", self.manifest_prefix, file_id); let manifest_json = serde_json::to_string(&manifest)?; self.s3vfs.put_object(&manifest_key, manifest_json.as_bytes())?; Ok(manifest) } pub fn restore_file(&self, manifest_id: &str, target: &Path) -> Result<(), VfsError> { let manifest_key = format!("{}{}.json", self.manifest_prefix, manifest_id); let manifest_json = self.s3vfs.get_object(&manifest_key)?; let manifest: DedupManifest = serde_json::from_slice(&manifest_json)?; let mut file = std::fs::File::create(target)?; for hash in &manifest.block_hashes { let block = self.get_block(hash)?; file.write_all(&block)?; } Ok(()) } fn hash_block(data: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(data); hex::encode(hasher.finalize()) } } ``` --- ## Integration with MarkBase VFS ### Option 1: Standalone DedupS3Store 用户手动创建 DedupS3Store: ```bash # CLI tool markbase dedup-upload --s3 --s3-endpoint http://localhost:9000 --file /data/large.iso markbase dedup-download --s3 --manifest-id --output /data/restored.iso ``` --- ### Option 2: DedupVfsBackend (VfsBackend trait) 创建 VfsBackend wrapper,自动 dedup: ```rust pub struct DedupS3Backend { dedup_store: DedupS3Store, manifest_dir: PathBuf, // Local cache for manifests } impl VfsBackend for DedupS3Backend { fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result, VfsError> { // 1. Read manifest from S3 let manifest = self.load_manifest(path)?; // 2. DedupS3File (read blocks from S3) Ok(Box::new(DedupS3File::new(self.dedup_store.clone(), manifest))) } fn stat(&self, path: &Path) -> Result { // Read from manifest metadata let manifest = self.load_manifest(path)?; Ok(VfsStat { size: manifest.original_size, mtime: manifest.mtime, ... }) } fn read_dir(&self, path: &Path) -> Result, VfsError> { // List manifests from S3 self.dedup_store.s3vfs.list_objects(&self.manifest_prefix) } } ``` **优势**: - ✅ 透明 dedup(用户无需关心) - ✅ 与 SMB/WebDAV/SFTP 无缝集成 --- ### Option 3: Hybrid (LocalFs + DedupS3Store) ```rust pub struct HybridDedupBackend { local: LocalFs, // Small files (<1MB) 存本地 dedup_s3: DedupS3Store, // Large files (>1MB) dedup to S3 } impl VfsBackend for HybridDedupBackend { fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result, VfsError> { // Check file size let stat = self.local.stat(path)?; if stat.size < self.dedup_s3.config.min_file_size { // Small file: direct LocalFs self.local.open_file(path, flags) } else { // Large file: dedup to S3 self.dedup_s3.dedup_file(path)?; self.dedup_s3.open_file_from_manifest(path) } } } ``` **推荐**:Option 1(Phase 1),Option 3(Phase 2)。 --- ## Performance Considerations ### Network Latency | Operation | LocalFs | S3Vfs | Overhead | |-----------|---------|-------|----------| | store_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x | | get_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x | | dedup_file (100MB) | ~2s (25MB/s) | ~10s (10MB/s) | ~5x | **缓解方案**: - ✅ Async concurrent upload(4-8 并发) - ✅ ReadCache(64MB cache) - ✅ Local cache for hot blocks --- ### Dedup Ratio Impact | File Type | Dedup Ratio | Network Traffic Saved | |-----------|-------------|----------------------| | VM images (similar OS) | ~80% | -80% upload bandwidth | | Log files (daily) | ~60% | -60% upload bandwidth | | Unique files (photos) | ~5% | -5% upload bandwidth | --- ## Next Steps 1. **Phase 1 Implementation** (~300 lines) - `DedupS3Store` struct - `store_block()` / `get_block()` via S3Vfs - `increment_ref()` with metadata update 2. **Phase 2 CLI Integration** (~100 lines) - `markbase dedup-upload --s3` - `markbase dedup-download --manifest-id` 3. **Phase 3 Performance Test** - Benchmark dedup_file (100MB) - Compare LocalFs vs S3Vfs --- ## Open Questions 1. **Concurrency**: Accept non-atomic ref count vs implement versioning? 2. **Backend choice**: Standalone CLI vs VfsBackend integration? 3. **Min versioning**: Should we require MinIO versioning enabled? 4. **Ref count object**: Metadata vs separate object? 5. **Block cache**: Should we cache blocks locally? --- **文档创建**: 2026-06-25 **最后更新**: 2026-06-25