Distributed storage research: Ceph (shelved) + MinIO guide + DedupS3 design
This commit is contained in:
563
docs/DEDUP_S3_COMBINATION.md
Normal file
563
docs/DEDUP_S3_COMBINATION.md
Normal file
@@ -0,0 +1,563 @@
|
||||
# DedupFs + S3Vfs Combination Design
|
||||
|
||||
**Date**: 2026-06-25
|
||||
**Status**: Design proposal
|
||||
**Goal**: Distributed deduplication storage via MinIO/S3 backend
|
||||
|
||||
---
|
||||
|
||||
## Executive Summary
|
||||
|
||||
### Current State
|
||||
|
||||
**DedupStore**(`dedup.rs`, 224 行):
|
||||
- 基于**本地文件系统**的 dedup 存储
|
||||
- SHA-256 块哈希 + 引用计数
|
||||
- 块存储到本地目录(`store_path/.dedup/`)
|
||||
|
||||
**问题**:
|
||||
- ❌ 无法跨节点共享 dedup 块
|
||||
- ❌ 无分布式容错能力
|
||||
- ❌ 单节点存储限制
|
||||
|
||||
### Proposed Solution
|
||||
|
||||
**DedupS3Store**:
|
||||
- 块存储到 **MinIO/S3** 对象(跨节点共享)
|
||||
- 引用计数存储到 S3 object metadata
|
||||
- Manifest 存储到 S3 对象(JSON 格式)
|
||||
|
||||
**优势**:
|
||||
- ✅ 跨节点 dedup 共享(MinIO 分布式)
|
||||
- ✅ 自动容错(MinIO erasure coding)
|
||||
- ✅ 无单节点限制(MinIO 可扩展)
|
||||
- ✅ 与现有 S3Vfs 集成(无需新 HTTP API)
|
||||
|
||||
---
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ MarkBase Node A │
|
||||
│ ├── DedupS3Store │
|
||||
│ │ ├── store_block() → S3 PUT <hash> │
|
||||
│ │ ├── get_block() → S3 GET <hash> │
|
||||
│ │ └── dedup_file() → 分块 + S3 PUT + manifest │
|
||||
│ └───────────────────────────────────────────────────────────────────────┘
|
||||
│ ↓ │
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ MinIO Cluster (S3-compatible) │
|
||||
│ ├── Bucket: markbase-dedup │
|
||||
│ │ ├── Objects: <sha256-hash> (dedup 块) │
|
||||
│ │ ├── Metadata: x-amz-meta-ref-count (引用计数) │
|
||||
│ │ └── Manifests: manifests/<file-id>.json │
|
||||
│ │ │
|
||||
│ ├── Erasure Coding: EC:2 (自动容错) │
|
||||
│ ├── Replication: Node A → Node B (DR) │
|
||||
│ └─────────────────────────────────────────────────────────────────────┘
|
||||
│ ↓ │
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ MarkBase Node B │
|
||||
│ ├── DedupS3Store │
|
||||
│ │ ├── get_block() → S3 GET <hash> (共享 Node A 的块) │
|
||||
│ │ └── restore_file() → S3 GET manifest + S3 GET blocks │
|
||||
│ └─────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Implementation Design
|
||||
|
||||
### DedupS3Store Struct
|
||||
|
||||
```rust
|
||||
pub struct DedupS3Store {
|
||||
s3vfs: S3Vfs, // S3 backend
|
||||
bucket: String, // Bucket name (markbase-dedup)
|
||||
block_prefix: String, // Object key prefix (blocks/)
|
||||
manifest_prefix: String, // Manifest prefix (manifests/)
|
||||
config: VfsDedupConfig, // block_size, min_file_size
|
||||
}
|
||||
|
||||
pub struct DedupManifest {
|
||||
original_size: usize,
|
||||
block_hashes: Vec<String>,
|
||||
dedup_ratio: f64,
|
||||
file_id: String, // UUID for manifest storage
|
||||
}
|
||||
```
|
||||
|
||||
### Core Methods
|
||||
|
||||
| Method | Current (LocalFs) | Proposed (S3Vfs) |
|
||||
|--------|------------------|------------------|
|
||||
| `store_block(data)` | `std::fs::write(store_path/hash, data)` | `S3Vfs.put_object(blocks/hash, data)` |
|
||||
| `get_block(hash)` | `std::fs::read(store_path/hash)` | `S3Vfs.get_object(blocks/hash)` |
|
||||
| `increment_ref(hash)` | `std::fs::write(hash.ref, count)` | `S3Vfs.put_object(blocks/hash, data) + metadata update` |
|
||||
| `decrement_ref(hash)` | `std::fs::write/remove` | `S3Vfs.delete_object + metadata check` |
|
||||
| `dedup_file(source)` | Local file read + block store | Local file read + S3 PUT blocks |
|
||||
| `restore_file(manifest)` | Local file write + block read | Local file write + S3 GET blocks |
|
||||
| `get_ref_count(hash)` | `std::fs::read(hash.ref)` | `S3Vfs.head_object(blocks/hash) → metadata` |
|
||||
|
||||
---
|
||||
|
||||
## S3 Object Layout
|
||||
|
||||
```
|
||||
Bucket: markbase-dedup
|
||||
├── blocks/
|
||||
│ ├── <sha256-hash-1> # Dedup 块(4KB)
|
||||
│ │ └── Metadata: x-amz-meta-ref-count: 5
|
||||
│ ├── <sha256-hash-2>
|
||||
│ │ └── Metadata: x-amz-meta-ref-count: 2
|
||||
│ └── ...
|
||||
│
|
||||
├── manifests/
|
||||
│ ├── <file-id-1>.json # Manifest JSON
|
||||
│ │ └── Content: {"original_size": 1024, "block_hashes": [...], ...}
|
||||
│ ├── <file-id-2>.json
|
||||
│ └── ...
|
||||
│
|
||||
└── stats.json # DedupStats(可选)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Reference Count Management
|
||||
|
||||
### Challenge
|
||||
|
||||
S3 对象不支持 atomic increment/decrement 操作。
|
||||
|
||||
### Solution 1: Metadata Update (推荐 ⭐⭐⭐⭐⭐)
|
||||
|
||||
**流程**:
|
||||
```rust
|
||||
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
|
||||
// 1. GET current metadata
|
||||
let head = self.s3vfs.head_object(&format!("blocks/{}", hash))?;
|
||||
let current_ref = head.metadata.get("x-amz-meta-ref-count")
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
// 2. PUT with updated metadata
|
||||
let block_data = self.s3vfs.get_object(&format!("blocks/{}", hash))?;
|
||||
self.s3vfs.put_object_with_metadata(
|
||||
&format!("blocks/{}", hash),
|
||||
&block_data,
|
||||
[("x-amz-meta-ref-count", (current_ref + 1).to_string())]
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
**优势**:
|
||||
- ✅ 简单实现
|
||||
- ✅ 与 S3 标准兼容
|
||||
- ⚠️ 需要两次请求(GET + PUT)
|
||||
|
||||
**劣势**:
|
||||
- ⚠️ 非原子操作(并发问题)
|
||||
- ⚠️ 需要读取块数据(PUT 需要 body)
|
||||
|
||||
---
|
||||
|
||||
### Solution 2: Separate Ref Count Object
|
||||
|
||||
**流程**:
|
||||
```rust
|
||||
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
|
||||
// 1. GET ref count object
|
||||
let ref_key = format!("refs/{}/count", hash);
|
||||
let current = self.s3vfs.get_object(&ref_key)
|
||||
.and_then(|data| data.parse::<u64>())
|
||||
.unwrap_or(0);
|
||||
|
||||
// 2. PUT updated ref count
|
||||
self.s3vfs.put_object(&ref_key, (current + 1).to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
**优势**:
|
||||
- ✅ 无需读取块数据
|
||||
- ✅ 更小的对象(仅数字)
|
||||
|
||||
**劣势**:
|
||||
- ⚠️ 需要额外对象存储
|
||||
- ⚠️ 非原子操作(并发问题)
|
||||
|
||||
---
|
||||
|
||||
### Solution 3: MinIO Extended API (企业版)
|
||||
|
||||
MinIO 企业版提供 `mc admin bucket policy` 和 object locking API。
|
||||
|
||||
**优势**:
|
||||
- ✅ 可能提供 atomic operation
|
||||
|
||||
**劣势**:
|
||||
- ⚠️ 仅 MinIO 企业版
|
||||
- ⚠️ 需要研究具体 API
|
||||
|
||||
---
|
||||
|
||||
## Concurrency Problem
|
||||
|
||||
### Scenario
|
||||
|
||||
Node A 和 Node B 同时 dedup 相同文件:
|
||||
1. Node A: `increment_ref(hash-abc)` → GET count=2 → PUT count=3
|
||||
2. Node B: `increment_ref(hash-abc)` → GET count=2 → PUT count=3
|
||||
3. 结果:count=3(错误,应为 count=4)
|
||||
|
||||
### Solution 1: Optimistic Locking
|
||||
|
||||
使用 S3 versioning 检测冲突:
|
||||
```rust
|
||||
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
|
||||
loop {
|
||||
// 1. GET current version + metadata
|
||||
let (version_id, current_ref) = self.get_ref_with_version(hash)?;
|
||||
|
||||
// 2. PUT with version check
|
||||
let result = self.s3vfs.put_object_if_version(
|
||||
&format!("blocks/{}", hash),
|
||||
block_data,
|
||||
(current_ref + 1),
|
||||
version_id // Only succeed if version unchanged
|
||||
);
|
||||
|
||||
if result.is_ok() {
|
||||
break;
|
||||
}
|
||||
// Retry if version mismatch
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
**要求**:MinIO versioning enabled。
|
||||
|
||||
---
|
||||
|
||||
### Solution 2: Distributed Lock Service
|
||||
|
||||
使用外部分布式锁(如 Redis/Zookeeper):
|
||||
```rust
|
||||
fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
|
||||
// 1. Acquire distributed lock
|
||||
let lock = self.lock_service.acquire(&format!("lock:{}", hash))?;
|
||||
|
||||
// 2. Increment ref count
|
||||
self.update_ref_count(hash)?;
|
||||
|
||||
// 3. Release lock
|
||||
lock.release();
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
**劣势**:需要额外服务(Redis)。
|
||||
|
||||
---
|
||||
|
||||
### Solution 3: Accept Non-Atomic (简化方案)
|
||||
|
||||
对于 MarkBase Lightweight 定位:
|
||||
- ⚠️ 接受非原子操作风险
|
||||
- ⚠️ 偶尔 ref count 不准确(不影响数据完整性)
|
||||
- ⚠️ 定期修复(scrub job)
|
||||
|
||||
**推荐**:Phase 1 使用 Solution 1(Metadata Update),Phase 2 研究 MinIO versioning。
|
||||
|
||||
---
|
||||
|
||||
## Implementation Phases
|
||||
|
||||
| Phase | Task | Code Lines | Priority | Risk |
|
||||
|-------|------|------------|----------|------|
|
||||
| **Phase 1** | DedupS3Store struct + basic I/O | ~300 | P0 | Medium |
|
||||
| **Phase 2** | Reference count metadata | ~100 | P0 | Medium |
|
||||
| **Phase 3** | Manifest storage to S3 | ~50 | P1 | Low |
|
||||
| **Phase 4** | CLI integration | ~100 | P1 | Low |
|
||||
| **Phase 5** | Async version (DedupAsyncS3Store) | ~200 | P2 | High |
|
||||
| **Phase 6** | Concurrency fix (versioning) | ~150 | P2 | High |
|
||||
| **Phase 7** | Performance benchmark | ~100 | P2 | Low |
|
||||
| **Total** | | **~1000** | | |
|
||||
|
||||
---
|
||||
|
||||
## DedupS3Store Implementation (Phase 1 Draft)
|
||||
|
||||
```rust
|
||||
use super::s3_fs::S3Vfs;
|
||||
use super::{VfsDedupConfig, VfsError};
|
||||
use sha2::{Sha256, Digest};
|
||||
use std::path::Path;
|
||||
|
||||
pub struct DedupS3Store {
|
||||
s3vfs: S3Vfs,
|
||||
bucket: String,
|
||||
block_prefix: String,
|
||||
manifest_prefix: String,
|
||||
config: VfsDedupConfig,
|
||||
}
|
||||
|
||||
impl DedupS3Store {
|
||||
pub fn new(
|
||||
endpoint: &str,
|
||||
region: &str,
|
||||
bucket: &str,
|
||||
access_key: &str,
|
||||
secret_key: &str,
|
||||
config: VfsDedupConfig,
|
||||
) -> Result<Self, VfsError> {
|
||||
let s3vfs = S3Vfs::new(endpoint, region, bucket, access_key, secret_key)?;
|
||||
Ok(Self {
|
||||
s3vfs,
|
||||
bucket: bucket.to_string(),
|
||||
block_prefix: "blocks/".to_string(),
|
||||
manifest_prefix: "manifests/".to_string(),
|
||||
config,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn store_block(&self, data: &[u8]) -> Result<String, VfsError> {
|
||||
if data.len() > self.config.block_size {
|
||||
return Err(VfsError::Io(format!("Block size exceeds limit")));
|
||||
}
|
||||
|
||||
let hash = Self::hash_block(data);
|
||||
let key = format!("{}{}", self.block_prefix, hash);
|
||||
|
||||
// Check if block exists
|
||||
if !self.s3vfs.object_exists(&key)? {
|
||||
// PUT with initial ref count = 1
|
||||
self.s3vfs.put_object_with_metadata(
|
||||
&key,
|
||||
data,
|
||||
[("x-amz-meta-ref-count", "1")]
|
||||
)?;
|
||||
} else {
|
||||
// Increment ref count
|
||||
self.increment_ref(&hash)?;
|
||||
}
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
pub fn get_block(&self, hash: &str) -> Result<Vec<u8>, VfsError> {
|
||||
let key = format!("{}{}", self.block_prefix, hash);
|
||||
self.s3vfs.get_object(&key)
|
||||
}
|
||||
|
||||
pub fn increment_ref(&self, hash: &str) -> Result<(), VfsError> {
|
||||
let key = format!("{}{}", self.block_prefix, hash);
|
||||
let head = self.s3vfs.head_object(&key)?;
|
||||
|
||||
let current_ref = head.metadata
|
||||
.get("x-amz-meta-ref-count")
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.unwrap_or(1);
|
||||
|
||||
// Need to GET block data + PUT with new metadata
|
||||
let block_data = self.get_block(hash)?;
|
||||
self.s3vfs.put_object_with_metadata(
|
||||
&key,
|
||||
&block_data,
|
||||
[("x-amz-meta-ref-count", (current_ref + 1).to_string())]
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dedup_file(&self, source: &Path) -> Result<DedupManifest, VfsError> {
|
||||
let mut file = std::fs::File::open(source)?;
|
||||
let mut manifest = DedupManifest::new();
|
||||
let mut buffer = vec![0u8; self.config.block_size];
|
||||
|
||||
loop {
|
||||
let n = file.read(&mut buffer)?;
|
||||
if n == 0 { break; }
|
||||
|
||||
manifest.original_size += n;
|
||||
let hash = self.store_block(&buffer[..n])?;
|
||||
manifest.block_hashes.push(hash);
|
||||
}
|
||||
|
||||
// Store manifest to S3
|
||||
let file_id = uuid::Uuid::new_v4().to_string();
|
||||
manifest.file_id = file_id;
|
||||
let manifest_key = format!("{}{}.json", self.manifest_prefix, file_id);
|
||||
let manifest_json = serde_json::to_string(&manifest)?;
|
||||
self.s3vfs.put_object(&manifest_key, manifest_json.as_bytes())?;
|
||||
|
||||
Ok(manifest)
|
||||
}
|
||||
|
||||
pub fn restore_file(&self, manifest_id: &str, target: &Path) -> Result<(), VfsError> {
|
||||
let manifest_key = format!("{}{}.json", self.manifest_prefix, manifest_id);
|
||||
let manifest_json = self.s3vfs.get_object(&manifest_key)?;
|
||||
let manifest: DedupManifest = serde_json::from_slice(&manifest_json)?;
|
||||
|
||||
let mut file = std::fs::File::create(target)?;
|
||||
for hash in &manifest.block_hashes {
|
||||
let block = self.get_block(hash)?;
|
||||
file.write_all(&block)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn hash_block(data: &[u8]) -> String {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(data);
|
||||
hex::encode(hasher.finalize())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Integration with MarkBase VFS
|
||||
|
||||
### Option 1: Standalone DedupS3Store
|
||||
|
||||
用户手动创建 DedupS3Store:
|
||||
```bash
|
||||
# CLI tool
|
||||
markbase dedup-upload --s3 --s3-endpoint http://localhost:9000 --file /data/large.iso
|
||||
markbase dedup-download --s3 --manifest-id <uuid> --output /data/restored.iso
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Option 2: DedupVfsBackend (VfsBackend trait)
|
||||
|
||||
创建 VfsBackend wrapper,自动 dedup:
|
||||
```rust
|
||||
pub struct DedupS3Backend {
|
||||
dedup_store: DedupS3Store,
|
||||
manifest_dir: PathBuf, // Local cache for manifests
|
||||
}
|
||||
|
||||
impl VfsBackend for DedupS3Backend {
|
||||
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
|
||||
// 1. Read manifest from S3
|
||||
let manifest = self.load_manifest(path)?;
|
||||
|
||||
// 2. DedupS3File (read blocks from S3)
|
||||
Ok(Box::new(DedupS3File::new(self.dedup_store.clone(), manifest)))
|
||||
}
|
||||
|
||||
fn stat(&self, path: &Path) -> Result<VfsStat, VfsError> {
|
||||
// Read from manifest metadata
|
||||
let manifest = self.load_manifest(path)?;
|
||||
Ok(VfsStat {
|
||||
size: manifest.original_size,
|
||||
mtime: manifest.mtime,
|
||||
...
|
||||
})
|
||||
}
|
||||
|
||||
fn read_dir(&self, path: &Path) -> Result<Vec<VfsDirEntry>, VfsError> {
|
||||
// List manifests from S3
|
||||
self.dedup_store.s3vfs.list_objects(&self.manifest_prefix)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**优势**:
|
||||
- ✅ 透明 dedup(用户无需关心)
|
||||
- ✅ 与 SMB/WebDAV/SFTP 无缝集成
|
||||
|
||||
---
|
||||
|
||||
### Option 3: Hybrid (LocalFs + DedupS3Store)
|
||||
|
||||
```rust
|
||||
pub struct HybridDedupBackend {
|
||||
local: LocalFs, // Small files (<1MB) 存本地
|
||||
dedup_s3: DedupS3Store, // Large files (>1MB) dedup to S3
|
||||
}
|
||||
|
||||
impl VfsBackend for HybridDedupBackend {
|
||||
fn open_file(&self, path: &Path, flags: &OpenFlags) -> Result<Box<dyn VfsFile>, VfsError> {
|
||||
// Check file size
|
||||
let stat = self.local.stat(path)?;
|
||||
|
||||
if stat.size < self.dedup_s3.config.min_file_size {
|
||||
// Small file: direct LocalFs
|
||||
self.local.open_file(path, flags)
|
||||
} else {
|
||||
// Large file: dedup to S3
|
||||
self.dedup_s3.dedup_file(path)?;
|
||||
self.dedup_s3.open_file_from_manifest(path)
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**推荐**:Option 1(Phase 1),Option 3(Phase 2)。
|
||||
|
||||
---
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
### Network Latency
|
||||
|
||||
| Operation | LocalFs | S3Vfs | Overhead |
|
||||
|-----------|---------|-------|----------|
|
||||
| store_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x |
|
||||
| get_block (4KB) | ~0.1ms | ~5-10ms (HTTP) | ~50-100x |
|
||||
| dedup_file (100MB) | ~2s (25MB/s) | ~10s (10MB/s) | ~5x |
|
||||
|
||||
**缓解方案**:
|
||||
- ✅ Async concurrent upload(4-8 并发)
|
||||
- ✅ ReadCache(64MB cache)
|
||||
- ✅ Local cache for hot blocks
|
||||
|
||||
---
|
||||
|
||||
### Dedup Ratio Impact
|
||||
|
||||
| File Type | Dedup Ratio | Network Traffic Saved |
|
||||
|-----------|-------------|----------------------|
|
||||
| VM images (similar OS) | ~80% | -80% upload bandwidth |
|
||||
| Log files (daily) | ~60% | -60% upload bandwidth |
|
||||
| Unique files (photos) | ~5% | -5% upload bandwidth |
|
||||
|
||||
---
|
||||
|
||||
## Next Steps
|
||||
|
||||
1. **Phase 1 Implementation** (~300 lines)
|
||||
- `DedupS3Store` struct
|
||||
- `store_block()` / `get_block()` via S3Vfs
|
||||
- `increment_ref()` with metadata update
|
||||
|
||||
2. **Phase 2 CLI Integration** (~100 lines)
|
||||
- `markbase dedup-upload --s3`
|
||||
- `markbase dedup-download --manifest-id`
|
||||
|
||||
3. **Phase 3 Performance Test**
|
||||
- Benchmark dedup_file (100MB)
|
||||
- Compare LocalFs vs S3Vfs
|
||||
|
||||
---
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Concurrency**: Accept non-atomic ref count vs implement versioning?
|
||||
2. **Backend choice**: Standalone CLI vs VfsBackend integration?
|
||||
3. **Min versioning**: Should we require MinIO versioning enabled?
|
||||
4. **Ref count object**: Metadata vs separate object?
|
||||
5. **Block cache**: Should we cache blocks locally?
|
||||
|
||||
---
|
||||
|
||||
**文档创建**: 2026-06-25
|
||||
**最后更新**: 2026-06-25
|
||||
Reference in New Issue
Block a user