Files
markbase/docs/DEDUP_S3_COMBINATION.md

18 KiB
Raw Permalink Blame History

DedupFs + S3Vfs Combination Design

Date: 2026-06-25 Status: Design proposal Goal: Distributed deduplication storage via MinIO/S3 backend


Executive Summary

Current State

DedupStorededup.rs, 224 行):

  • 基于本地文件系统的 dedup 存储
  • SHA-256 块哈希 + 引用计数
  • 块存储到本地目录(store_path/.dedup/

问题

  • 无法跨节点共享 dedup 块
  • 无分布式容错能力
  • 单节点存储限制

Proposed Solution

DedupS3Store

  • 块存储到 MinIO/S3 对象(跨节点共享)
  • 引用计数存储到 S3 object metadata
  • Manifest 存储到 S3 对象JSON 格式)

优势

  • 跨节点 dedup 共享MinIO 分布式)
  • 自动容错MinIO erasure coding
  • 无单节点限制MinIO 可扩展)
  • 与现有 S3Vfs 集成(无需新 HTTP API

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│ MarkBase Node A                                                         │
│ ├── DedupS3Store                                                        │
│ │   ├── store_block() → S3 PUT <hash>                                  │
│ │   ├── get_block()    → S3 GET <hash>                                 │
│ │   └── dedup_file()   → 分块 + S3 PUT + manifest                      │
│ └───────────────────────────────────────────────────────────────────────┘
│                    ↓                                                    │
┌─────────────────────────────────────────────────────────────────────────┐
│ MinIO Cluster (S3-compatible)                                           │
│ ├── Bucket: markbase-dedup                                              │
│ │   ├── Objects: <sha256-hash>         (dedup 块)                       │
│ │   ├── Metadata: x-amz-meta-ref-count (引用计数)                       │
│ │   └── Manifests: manifests/<file-id>.json                            │
│ │                                                                       │
│ ├── Erasure Coding: EC:2 (自动容错)                                     │
│ ├── Replication: Node A → Node B (DR)                                  │
│ └─────────────────────────────────────────────────────────────────────┘
│                    ↓                                                    │
┌─────────────────────────────────────────────────────────────────────────┐
│ MarkBase Node B                                                         │
│ ├── DedupS3Store                                                        │
│ │   ├── get_block()    → S3 GET <hash> (共享 Node A 的块)              │
│ │   └── restore_file() → S3 GET manifest + S3 GET blocks               │
│ └─────────────────────────────────────────────────────────────────────┘

Implementation Design

DedupS3Store Struct

pub struct DedupS3Store {
    s3vfs: S3Vfs,                  // S3 backend
    bucket: String,                // Bucket name (markbase-dedup)
    block_prefix: String,          // Object key prefix (blocks/)
    manifest_prefix: String,       // Manifest prefix (manifests/)
    config: VfsDedupConfig,        // block_size, min_file_size
}

pub struct DedupManifest {
    original_size: usize,
    block_hashes: Vec<String>,
    dedup_ratio: f64,
    file_id: String,               // UUID for manifest storage
}

Core Methods

Method Current (LocalFs) Proposed (S3Vfs)
store_block(data) std::fs::write(store_path/hash, data) S3Vfs.put_object(blocks/hash, data)
get_block(hash) std::fs::read(store_path/hash) S3Vfs.get_object(blocks/hash)
increment_ref(hash) std::fs::write(hash.ref, count) S3Vfs.put_object(blocks/hash, data) + metadata update
decrement_ref(hash) std::fs::write/remove S3Vfs.delete_object + metadata check
dedup_file(source) Local file read + block store Local file read + S3 PUT blocks
restore_file(manifest) Local file write + block read Local file write + S3 GET blocks
get_ref_count(hash) std::fs::read(hash.ref) S3Vfs.head_object(blocks/hash) → metadata

S3 Object Layout

Bucket: markbase-dedup
├── blocks/
│   ├── <sha256-hash-1>            # Dedup 块4KB
│   │   └── Metadata: x-amz-meta-ref-count: 5
│   ├── <sha256-hash-2>
│   │   └── Metadata: x-amz-meta-ref-count: 2
│   └── ...
│
├── manifests/
│   ├── <file-id-1>.json           # Manifest JSON
│   │   └── Content: {"original_size": 1024, "block_hashes": [...], ...}
│   ├── <file-id-2>.json
│   └── ...
│
└── stats.json                     # DedupStats可选

Reference Count Management

Challenge

S3 对象不支持 atomic increment/decrement 操作。

Solution 1: Metadata Update (推荐 )

流程

fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
    // 1. GET current metadata
    let head = self.s3vfs.head_object(&format!("blocks/{}", hash))?;
    let current_ref = head.metadata.get("x-amz-meta-ref-count")
        .and_then(|v| v.parse::<u64>().ok())
        .unwrap_or(0);
    
    // 2. PUT with updated metadata
    let block_data = self.s3vfs.get_object(&format!("blocks/{}", hash))?;
    self.s3vfs.put_object_with_metadata(
        &format!("blocks/{}", hash),
        &block_data,
        [("x-amz-meta-ref-count", (current_ref + 1).to_string())]
    )?;
    
    Ok(())
}

优势

  • 简单实现
  • 与 S3 标准兼容
  • ⚠️ 需要两次请求GET + PUT

劣势

  • ⚠️ 非原子操作(并发问题)
  • ⚠️ 需要读取块数据PUT 需要 body

Solution 2: Separate Ref Count Object

流程

fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
    // 1. GET ref count object
    let ref_key = format!("refs/{}/count", hash);
    let current = self.s3vfs.get_object(&ref_key)
        .and_then(|data| data.parse::<u64>())
        .unwrap_or(0);
    
    // 2. PUT updated ref count
    self.s3vfs.put_object(&ref_key, (current + 1).to_string())?;
    Ok(())
}

优势

  • 无需读取块数据
  • 更小的对象(仅数字)

劣势

  • ⚠️ 需要额外对象存储
  • ⚠️ 非原子操作(并发问题)

Solution 3: MinIO Extended API (企业版)

MinIO 企业版提供 mc admin bucket policy 和 object locking API。

优势

  • 可能提供 atomic operation

劣势

  • ⚠️ 仅 MinIO 企业版
  • ⚠️ 需要研究具体 API

Concurrency Problem

Scenario

Node A 和 Node B 同时 dedup 相同文件:

  1. Node A: increment_ref(hash-abc) → GET count=2 → PUT count=3
  2. Node B: increment_ref(hash-abc) → GET count=2 → PUT count=3
  3. 结果count=3错误应为 count=4

Solution 1: Optimistic Locking

使用 S3 versioning 检测冲突:

fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
    loop {
        // 1. GET current version + metadata
        let (version_id, current_ref) = self.get_ref_with_version(hash)?;
        
        // 2. PUT with version check
        let result = self.s3vfs.put_object_if_version(
            &format!("blocks/{}", hash),
            block_data,
            (current_ref + 1),
            version_id  // Only succeed if version unchanged
        );
        
        if result.is_ok() {
            break;
        }
        // Retry if version mismatch
    }
    Ok(())
}

要求MinIO versioning enabled。


Solution 2: Distributed Lock Service

使用外部分布式锁(如 Redis/Zookeeper

fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
    // 1. Acquire distributed lock
    let lock = self.lock_service.acquire(&format!("lock:{}", hash))?;
    
    // 2. Increment ref count
    self.update_ref_count(hash)?;
    
    // 3. Release lock
    lock.release();
    Ok(())
}

劣势需要额外服务Redis


Solution 3: Accept Non-Atomic (简化方案)

对于 MarkBase Lightweight 定位:

  • ⚠️ 接受非原子操作风险
  • ⚠️ 偶尔 ref count 不准确(不影响数据完整性)
  • ⚠️ 定期修复scrub job

推荐Phase 1 使用 Solution 1Metadata UpdatePhase 2 研究 MinIO versioning。


Implementation Phases

Phase Task Code Lines Priority Risk
Phase 1 DedupS3Store struct + basic I/O ~300 P0 Medium
Phase 2 Reference count metadata ~100 P0 Medium
Phase 3 Manifest storage to S3 ~50 P1 Low
Phase 4 CLI integration ~100 P1 Low
Phase 5 Async version (DedupAsyncS3Store) ~200 P2 High
Phase 6 Concurrency fix (versioning) ~150 P2 High
Phase 7 Performance benchmark ~100 P2 Low
Total ~1000

DedupS3Store Implementation (Phase 1 Draft)

use super::s3_fs::S3Vfs;
use super::{VfsDedupConfig, VfsError};
use sha2::{Sha256, Digest};
use std::path::Path;

pub struct DedupS3Store {
    s3vfs: S3Vfs,
    bucket: String,
    block_prefix: String,
    manifest_prefix: String,
    config: VfsDedupConfig,
}

impl DedupS3Store {
    pub fn new(
        endpoint: &str,
        region: &str,
        bucket: &str,
        access_key: &str,
        secret_key: &str,
        config: VfsDedupConfig,
    ) -> Result<Self, VfsError> {
        let s3vfs = S3Vfs::new(endpoint, region, bucket, access_key, secret_key)?;
        Ok(Self {
            s3vfs,
            bucket: bucket.to_string(),
            block_prefix: "blocks/".to_string(),
            manifest_prefix: "manifests/".to_string(),
            config,
        })
    }
    
    pub fn store_block(&self, data: &[u8]) -> Result<String, VfsError> {
        if data.len() > self.config.block_size {
            return Err(VfsError::Io(format!("Block size exceeds limit")));
        }
        
        let hash = Self::hash_block(data);
        let key = format!("{}{}", self.block_prefix, hash);
        
        // Check if block exists
        if !self.s3vfs.object_exists(&key)? {
            // PUT with initial ref count = 1
            self.s3vfs.put_object_with_metadata(
                &key,
                data,
                [("x-amz-meta-ref-count", "1")]
            )?;
        } else {
            // Increment ref count
            self.increment_ref(&hash)?;
        }
        
        Ok(hash)
    }
    
    pub fn get_block(&self, hash: &str) -> Result<Vec<u8>, VfsError> {
        let key = format!("{}{}", self.block_prefix, hash);
        self.s3vfs.get_object(&key)
    }
    
    pub fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
        let key = format!("{}{}", self.block_prefix, hash);
        let head = self.s3vfs.head_object(&key)?;
        
        let current_ref = head.metadata
            .get("x-amz-meta-ref-count")
            .and_then(|v| v.parse::<u64>().ok())
            .unwrap_or(1);
        
        // Need to GET block data + PUT with new metadata
        let block_data = self.get_block(hash)?;
        self.s3vfs.put_object_with_metadata(
            &key,
            &block_data,
            [("x-amz-meta-ref-count", (current_ref + 1).to_string())]
        )?;
        
        Ok(())
    }
    
    pub fn dedup_file(&self, source: &Path) -> Result<DedupManifest, VfsError> {
        let mut file = std::fs::File::open(source)?;
        let mut manifest = DedupManifest::new();
        let mut buffer = vec![0u8; self.config.block_size];
        
        loop {
            let n = file.read(&mut buffer)?;
            if n == 0 { break; }
            
            manifest.original_size += n;
            let hash = self.store_block(&buffer[..n])?;
            manifest.block_hashes.push(hash);
        }
        
        // Store manifest to S3
        let file_id = uuid::Uuid::new_v4().to_string();
        manifest.file_id = file_id;
        let manifest_key = format!("{}{}.json", self.manifest_prefix, file_id);
        let manifest_json = serde_json::to_string(&manifest)?;
        self.s3vfs.put_object(&manifest_key, manifest_json.as_bytes())?;
        
        Ok(manifest)
    }
    
    pub fn restore_file(&self, manifest_id: &str, target: &Path) -> Result<(), VfsError> {
        let manifest_key = format!("{}{}.json", self.manifest_prefix, manifest_id);
        let manifest_json = self.s3vfs.get_object(&manifest_key)?;
        let manifest: DedupManifest = serde_json::from_slice(&manifest_json)?;
        
        let mut file = std::fs::File::create(target)?;
        for hash in &manifest.block_hashes {
            let block = self.get_block(hash)?;
            file.write_all(&block)?;
        }
        
        Ok(())
    }
    
    fn hash_block(data: &[u8]) -> String {
        let mut hasher = Sha256::new();
        hasher.update(data);
        hex::encode(hasher.finalize())
    }
}

Integration with MarkBase VFS

Option 1: Standalone DedupS3Store

用户手动创建 DedupS3Store

# CLI tool
markbase dedup-upload --s3 --s3-endpoint http://localhost:9000 --file /data/large.iso
markbase dedup-download --s3 --manifest-id <uuid> --output /data/restored.iso

Option 2: DedupVfsBackend (VfsBackend trait)

创建 VfsBackend wrapper自动 dedup

pub struct DedupS3Backend {
    dedup_store: DedupS3Store,
    manifest_dir: PathBuf,  // Local cache for manifests
}

impl VfsBackend for DedupS3Backend {
    fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
        // 1. Read manifest from S3
        let manifest = self.load_manifest(path)?;
        
        // 2. DedupS3File (read blocks from S3)
        Ok(Box::new(DedupS3File::new(self.dedup_store.clone(), manifest)))
    }
    
    fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
        // Read from manifest metadata
        let manifest = self.load_manifest(path)?;
        Ok(VfsStat {
            size: manifest.original_size,
            mtime: manifest.mtime,
            ...
        })
    }
    
    fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
        // List manifests from S3
        self.dedup_store.s3vfs.list_objects(&self.manifest_prefix)
    }
}

优势

  • 透明 dedup用户无需关心
  • 与 SMB/WebDAV/SFTP 无缝集成

Option 3: Hybrid (LocalFs + DedupS3Store)

pub struct HybridDedupBackend {
    local: LocalFs,               // Small files (<1MB) 存本地
    dedup_s3: DedupS3Store,       // Large files (>1MB) dedup to S3
}

impl VfsBackend for HybridDedupBackend {
    fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
        // Check file size
        let stat = self.local.stat(path)?;
        
        if stat.size < self.dedup_s3.config.min_file_size {
            // Small file: direct LocalFs
            self.local.open_file(path, flags)
        } else {
            // Large file: dedup to S3
            self.dedup_s3.dedup_file(path)?;
            self.dedup_s3.open_file_from_manifest(path)
        }
    }
}

推荐Option 1Phase 1Option 3Phase 2


Performance Considerations

Network Latency

Operation LocalFs S3Vfs Overhead
store_block (4KB) ~0.1ms ~5-10ms (HTTP) ~50-100x
get_block (4KB) ~0.1ms ~5-10ms (HTTP) ~50-100x
dedup_file (100MB) ~2s (25MB/s) ~10s (10MB/s) ~5x

缓解方案

  • Async concurrent upload4-8 并发)
  • ReadCache64MB cache
  • Local cache for hot blocks

Dedup Ratio Impact

File Type Dedup Ratio Network Traffic Saved
VM images (similar OS) ~80% -80% upload bandwidth
Log files (daily) ~60% -60% upload bandwidth
Unique files (photos) ~5% -5% upload bandwidth

Next Steps

  1. Phase 1 Implementation (~300 lines)

    • DedupS3Store struct
    • store_block() / get_block() via S3Vfs
    • increment_ref() with metadata update
  2. Phase 2 CLI Integration (~100 lines)

    • markbase dedup-upload --s3
    • markbase dedup-download --manifest-id
  3. Phase 3 Performance Test

    • Benchmark dedup_file (100MB)
    • Compare LocalFs vs S3Vfs

Open Questions

  1. Concurrency: Accept non-atomic ref count vs implement versioning?
  2. Backend choice: Standalone CLI vs VfsBackend integration?
  3. Min versioning: Should we require MinIO versioning enabled?
  4. Ref count object: Metadata vs separate object?
  5. Block cache: Should we cache blocks locally?

文档创建: 2026-06-25 最后更新: 2026-06-25