Files
markbase/markbase-core/src/ctdb/tdb.rs
Warren 1418e9958b Apply clippy fixes for code quality
Clippy Fixes Applied:
- Removed unused imports
- Fixed manual implementation of .is_multiple_of()
- Fixed unnecessary_sort_by suggestions
- Added missing Ipv4Addr imports

Files Modified:
- forward_acl.rs: Add Ipv4Addr import
- known_hosts.rs: Add Ipv4Addr import
- Various files: Remove unused imports

Build:  markbase-core
Tests: 495 passed
2026-06-24 11:18:02 +08:00

678 lines
20 KiB
Rust

use std::collections::HashMap;
use std::io::{self, Read, Write, Seek, SeekFrom};
use std::path::Path;
use std::sync::{Mutex, RwLock};
const TDB_MAGIC: u32 = 0x1BADFACE;
const TDB_VERSION: u32 = 1;
const DEFAULT_HASH_SIZE: u32 = 1024;
const TDB_HEADER_SIZE: u64 = 128;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecordFlag {
Active,
Free,
Deleted,
}
impl RecordFlag {
fn as_u32(&self) -> u32 {
match self {
RecordFlag::Active => 0,
RecordFlag::Free => 1,
RecordFlag::Deleted => 2,
}
}
fn from_u32(v: u32) -> Self {
match v {
1 => RecordFlag::Free,
2 => RecordFlag::Deleted,
_ => RecordFlag::Active,
}
}
}
#[derive(Debug, Clone)]
pub struct TdbRecord {
pub key: Vec<u8>,
pub data: Vec<u8>,
pub flag: RecordFlag,
pub hash_next: u64,
}
impl TdbRecord {
pub fn new(key: Vec<u8>, data: Vec<u8>) -> Self {
Self {
key,
data,
flag: RecordFlag::Active,
hash_next: 0,
}
}
pub fn key_str(&self) -> &str {
std::str::from_utf8(&self.key).unwrap_or("")
}
pub fn data_str(&self) -> &str {
std::str::from_utf8(&self.data).unwrap_or("")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TdbError {
IoError,
NotFound,
Corrupt,
Exists,
LockFailed,
}
impl std::fmt::Display for TdbError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TdbError::IoError => write!(f, "I/O error"),
TdbError::NotFound => write!(f, "record not found"),
TdbError::Corrupt => write!(f, "database corrupt"),
TdbError::Exists => write!(f, "record already exists"),
TdbError::LockFailed => write!(f, "lock failed"),
}
}
}
impl std::error::Error for TdbError {}
impl From<io::Error> for TdbError {
fn from(_: io::Error) -> Self {
TdbError::IoError
}
}
pub type TdbResult<T> = Result<T, TdbError>;
fn hash_key(key: &[u8], hash_size: u32) -> u32 {
let mut h: u32 = 0;
for &b in key {
h = h.wrapping_mul(31).wrapping_add(b as u32);
}
h % hash_size
}
#[derive(Debug, Clone)]
struct HashEntry {
offset: u64,
}
struct TdbHeader {
magic: u32,
version: u32,
hash_size: u32,
record_count: u64,
}
impl TdbHeader {
fn new(hash_size: u32) -> Self {
Self {
magic: TDB_MAGIC,
version: TDB_VERSION,
hash_size,
record_count: 0,
}
}
fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(TDB_HEADER_SIZE as usize);
buf.extend_from_slice(&self.magic.to_le_bytes());
buf.extend_from_slice(&self.version.to_le_bytes());
buf.extend_from_slice(&self.hash_size.to_le_bytes());
buf.extend_from_slice(&self.record_count.to_le_bytes());
while buf.len() < TDB_HEADER_SIZE as usize {
buf.push(0);
}
buf
}
fn from_bytes(buf: &[u8]) -> TdbResult<Self> {
if buf.len() < 20 {
return Err(TdbError::Corrupt);
}
let magic = u32::from_le_bytes(buf[0..4].try_into().unwrap());
if magic != TDB_MAGIC {
return Err(TdbError::Corrupt);
}
let version = u32::from_le_bytes(buf[4..8].try_into().unwrap());
let hash_size = u32::from_le_bytes(buf[8..12].try_into().unwrap());
let record_count = u64::from_le_bytes(buf[12..20].try_into().unwrap());
Ok(Self {
magic,
version,
hash_size,
record_count,
})
}
}
pub struct TdbEngine {
header: RwLock<TdbHeader>,
records: RwLock<HashMap<Vec<u8>, TdbRecord>>,
hash_table: RwLock<Vec<Vec<u64>>>,
file_path: Option<std::path::PathBuf>,
dirty: Mutex<bool>,
}
impl TdbEngine {
pub fn new() -> Self {
Self::with_hash_size(DEFAULT_HASH_SIZE)
}
pub fn with_hash_size(hash_size: u32) -> Self {
Self {
header: RwLock::new(TdbHeader::new(hash_size)),
records: RwLock::new(HashMap::new()),
hash_table: RwLock::new(vec![Vec::new(); 1]),
file_path: None,
dirty: Mutex::new(false),
}
}
pub fn open<P: AsRef<Path>>(path: P) -> TdbResult<Self> {
let path = path.as_ref().to_path_buf();
if path.exists() {
Self::load_from_file(&path)
} else {
let engine = Self::new();
let mut engine = engine;
engine.file_path = Some(path.clone());
engine.save_to_file(&path)?;
Ok(engine)
}
}
pub fn create<P: AsRef<Path>>(path: P, hash_size: u32) -> TdbResult<Self> {
let path = path.as_ref().to_path_buf();
let engine = Self::with_hash_size(hash_size);
let mut engine = engine;
engine.file_path = Some(path.clone());
engine.save_to_file(&path)?;
Ok(engine)
}
fn load_from_file(path: &Path) -> TdbResult<Self> {
let mut file = std::fs::File::open(path)?;
let mut header_buf = vec![0u8; TDB_HEADER_SIZE as usize];
file.read_exact(&mut header_buf)?;
let header = TdbHeader::from_bytes(&header_buf)?;
let hash_size = header.hash_size;
let mut hash_table = vec![Vec::new(); hash_size as usize];
let mut records = HashMap::new();
let mut pos = TDB_HEADER_SIZE;
let file_meta = file.metadata()?;
let file_size = file_meta.len();
while pos < file_size {
file.seek(SeekFrom::Start(pos))?;
let mut flag_buf = [0u8; 4];
let mut key_len_buf = [0u8; 4];
let mut data_len_buf = [0u8; 4];
let mut hash_next_buf = [0u8; 8];
if file.read_exact(&mut flag_buf).is_err() {
break;
}
let _ = file.read_exact(&mut key_len_buf);
let _ = file.read_exact(&mut data_len_buf);
let _ = file.read_exact(&mut hash_next_buf);
let flag = RecordFlag::from_u32(u32::from_le_bytes(flag_buf));
let key_len = u32::from_le_bytes(key_len_buf) as usize;
let data_len = u32::from_le_bytes(data_len_buf) as usize;
let hash_next = u64::from_le_bytes(hash_next_buf);
let mut key = vec![0u8; key_len];
let mut data = vec![0u8; data_len];
let _ = file.read_exact(&mut key);
let _ = file.read_exact(&mut data);
let rec_offset = pos;
let bucket = hash_key(&key, hash_size) as usize;
if bucket < hash_table.len() {
hash_table[bucket].push(rec_offset);
}
if flag == RecordFlag::Active {
let record = TdbRecord {
key: key.clone(),
data,
flag,
hash_next,
};
records.insert(key, record);
}
let rec_size = 4 + 4 + 4 + 8 + key_len as u64 + data_len as u64;
pos += rec_size;
}
let record_count = records.len() as u64;
let mut header = header;
header.record_count = record_count;
Ok(Self {
header: RwLock::new(header),
records: RwLock::new(records),
hash_table: RwLock::new(hash_table),
file_path: Some(path.to_path_buf()),
dirty: Mutex::new(false),
})
}
fn save_to_file(&self, path: &Path) -> TdbResult<()> {
let header = self.header.read().unwrap();
let header_bytes = header.to_bytes();
let mut file = std::fs::File::create(path)?;
file.write_all(&header_bytes)?;
let records = self.records.read().unwrap();
for (_, record) in records.iter() {
let flag = record.flag.as_u32().to_le_bytes();
let key_len = (record.key.len() as u32).to_le_bytes();
let data_len = (record.data.len() as u32).to_le_bytes();
let hash_next = record.hash_next.to_le_bytes();
file.write_all(&flag)?;
file.write_all(&key_len)?;
file.write_all(&data_len)?;
file.write_all(&hash_next)?;
file.write_all(&record.key)?;
file.write_all(&record.data)?;
}
file.flush()?;
Ok(())
}
pub fn store(&self, key: Vec<u8>, data: Vec<u8>) -> TdbResult<bool> {
let mut records = self.records.write().unwrap();
let existed = records.contains_key(&key);
let record = TdbRecord::new(key.clone(), data);
records.insert(key, record);
*self.dirty.lock().unwrap() = true;
let mut header = self.header.write().unwrap();
header.record_count = records.len() as u64;
drop(header);
drop(records);
self.try_flush()?;
Ok(!existed)
}
pub fn fetch(&self, key: &[u8]) -> TdbResult<Vec<u8>> {
let records = self.records.read().unwrap();
match records.get(key) {
Some(record) => Ok(record.data.clone()),
None => Err(TdbError::NotFound),
}
}
pub fn delete(&self, key: &[u8]) -> TdbResult<()> {
let mut records = self.records.write().unwrap();
if records.remove(key).is_some() {
*self.dirty.lock().unwrap() = true;
let mut header = self.header.write().unwrap();
header.record_count = records.len() as u64;
drop(header);
drop(records);
self.try_flush()?;
Ok(())
} else {
Err(TdbError::NotFound)
}
}
pub fn exists(&self, key: &[u8]) -> bool {
self.records.read().unwrap().contains_key(key)
}
pub fn keys(&self) -> Vec<Vec<u8>> {
self.records.read().unwrap().keys().cloned().collect()
}
pub fn record_count(&self) -> u64 {
self.header.read().unwrap().record_count
}
pub fn iter(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
self.records
.read()
.unwrap()
.iter()
.map(|(k, v)| (k.clone(), v.data.clone()))
.collect()
}
pub fn try_flush(&self) -> TdbResult<()> {
let dirty = *self.dirty.lock().unwrap();
if dirty {
if let Some(ref path) = self.file_path {
self.save_to_file(path)?;
*self.dirty.lock().unwrap() = false;
}
}
Ok(())
}
pub fn flush(&self) -> TdbResult<()> {
if let Some(ref path) = self.file_path {
self.save_to_file(path)?;
*self.dirty.lock().unwrap() = false;
}
Ok(())
}
}
impl Default for TdbEngine {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct TdbStats {
pub record_count: u64,
pub hash_size: u32,
pub active_records: u64,
pub total_data_size: u64,
pub avg_key_size: f64,
pub avg_data_size: f64,
}
impl TdbEngine {
pub fn stats(&self) -> TdbStats {
let records = self.records.read().unwrap();
let header = self.header.read().unwrap();
let count = records.len();
let total_data: u64 = records.values().map(|r| r.data.len() as u64).sum();
let total_key: u64 = records.values().map(|r| r.key.len() as u64).sum();
TdbStats {
record_count: header.record_count,
hash_size: header.hash_size,
active_records: count as u64,
total_data_size: total_data,
avg_key_size: if count > 0 {
total_key as f64 / count as f64
} else {
0.0
},
avg_data_size: if count > 0 {
total_data as f64 / count as f64
} else {
0.0
},
}
}
}
pub struct TdbTransaction<'a> {
engine: &'a TdbEngine,
snapshot: HashMap<Vec<u8>, TdbRecord>,
committed: bool,
}
impl<'a> TdbTransaction<'a> {
pub fn begin(engine: &'a TdbEngine) -> Self {
let snapshot = engine.records.read().unwrap().clone();
Self {
engine,
snapshot,
committed: false,
}
}
pub fn store(&self, key: Vec<u8>, data: Vec<u8>) -> TdbResult<bool> {
self.engine.store(key, data)
}
pub fn fetch(&self, key: &[u8]) -> TdbResult<Vec<u8>> {
self.engine.fetch(key)
}
pub fn delete(&self, key: &[u8]) -> TdbResult<()> {
self.engine.delete(key)
}
pub fn commit(&mut self) -> TdbResult<()> {
self.committed = true;
self.engine.flush()
}
pub fn rollback(&mut self) -> TdbResult<()> {
if !self.committed {
let mut records = self.engine.records.write().unwrap();
*records = self.snapshot.clone();
let mut header = self.engine.header.write().unwrap();
header.record_count = records.len() as u64;
*self.engine.dirty.lock().unwrap() = true;
self.engine.flush()?;
}
Ok(())
}
}
impl<'a> Drop for TdbTransaction<'a> {
fn drop(&mut self) {
if !self.committed {
let _ = self.rollback();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_tdb_create_and_store() {
let engine = TdbEngine::new();
let created = engine.store(b"key1".to_vec(), b"value1".to_vec()).unwrap();
assert!(created);
assert_eq!(engine.fetch(b"key1").unwrap(), b"value1");
assert_eq!(engine.record_count(), 1);
}
#[test]
fn test_tdb_overwrite() {
let engine = TdbEngine::new();
engine.store(b"key1".to_vec(), b"v1".to_vec()).unwrap();
let created = engine.store(b"key1".to_vec(), b"v2".to_vec()).unwrap();
assert!(!created);
assert_eq!(engine.fetch(b"key1").unwrap(), b"v2");
assert_eq!(engine.record_count(), 1);
}
#[test]
fn test_tdb_delete() {
let engine = TdbEngine::new();
engine.store(b"key1".to_vec(), b"v1".to_vec()).unwrap();
engine.delete(b"key1").unwrap();
assert!(!engine.exists(b"key1"));
assert_eq!(engine.record_count(), 0);
assert!(engine.fetch(b"key1").is_err());
}
#[test]
fn test_tdb_not_found() {
let engine = TdbEngine::new();
assert!(engine.fetch(b"missing").is_err());
assert!(engine.delete(b"missing").is_err());
}
#[test]
fn test_tdb_multiple_records() {
let engine = TdbEngine::new();
for i in 0..100 {
engine.store(
format!("key{}", i).into_bytes(),
format!("value{}", i).into_bytes(),
).unwrap();
}
assert_eq!(engine.record_count(), 100);
assert_eq!(engine.fetch(b"key50").unwrap(), b"value50");
}
#[test]
fn test_tdb_persist_to_disk() {
let tmp = TempDir::new().unwrap();
let db_path = tmp.path().join("test.tdb");
{
let engine = TdbEngine::open(&db_path).unwrap();
engine.store(b"persistent".to_vec(), b"yes".to_vec()).unwrap();
engine.store(b"key2".to_vec(), b"val2".to_vec()).unwrap();
engine.flush().unwrap();
}
let engine2 = TdbEngine::open(&db_path).unwrap();
assert_eq!(engine2.fetch(b"persistent").unwrap(), b"yes");
assert_eq!(engine2.fetch(b"key2").unwrap(), b"val2");
assert_eq!(engine2.record_count(), 2);
}
#[test]
fn test_tdb_keys() {
let engine = TdbEngine::new();
engine.store(b"a".to_vec(), b"1".to_vec()).unwrap();
engine.store(b"b".to_vec(), b"2".to_vec()).unwrap();
engine.store(b"c".to_vec(), b"3".to_vec()).unwrap();
let mut keys = engine.keys();
keys.sort();
assert_eq!(keys, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
}
#[test]
fn test_tdb_iter() {
let engine = TdbEngine::new();
engine.store(b"k1".to_vec(), b"d1".to_vec()).unwrap();
engine.store(b"k2".to_vec(), b"d2".to_vec()).unwrap();
let entries: Vec<_> = engine.iter();
assert_eq!(entries.len(), 2);
}
#[test]
fn test_tdb_stats() {
let engine = TdbEngine::new();
engine.store(b"key1".to_vec(), b"12345".to_vec()).unwrap();
engine.store(b"key2".to_vec(), b"67890".to_vec()).unwrap();
let stats = engine.stats();
assert_eq!(stats.active_records, 2);
assert_eq!(stats.total_data_size, 10);
assert_eq!(stats.avg_key_size, 4.0);
assert_eq!(stats.avg_data_size, 5.0);
}
#[test]
fn test_tdb_create_with_hash_size() {
let engine = TdbEngine::with_hash_size(256);
let header = engine.header.read().unwrap();
assert_eq!(header.hash_size, 256);
}
#[test]
fn test_tdb_transaction_commit() {
let engine = TdbEngine::new();
engine.store(b"init".to_vec(), b"data".to_vec()).unwrap();
{
let mut tx = TdbTransaction::begin(&engine);
tx.store(b"tx_key".to_vec(), b"tx_data".to_vec()).unwrap();
tx.commit().unwrap();
}
assert!(engine.exists(b"tx_key"));
assert_eq!(engine.fetch(b"tx_key").unwrap(), b"tx_data");
}
#[test]
fn test_tdb_transaction_rollback() {
let engine = TdbEngine::new();
engine.store(b"keep".to_vec(), b"yes".to_vec()).unwrap();
{
let mut tx = TdbTransaction::begin(&engine);
tx.store(b"temp".to_vec(), b"no".to_vec()).unwrap();
tx.delete(b"keep").unwrap();
tx.rollback().unwrap();
}
assert!(!engine.exists(b"temp"));
assert!(engine.exists(b"keep"));
}
#[test]
fn test_tdb_transaction_auto_rollback_on_drop() {
let engine = TdbEngine::new();
engine.store(b"orig".to_vec(), b"val".to_vec()).unwrap();
{
let _tx = TdbTransaction::begin(&engine);
_tx.store(b"temp".to_vec(), b"x".to_vec()).unwrap();
}
assert!(!engine.exists(b"temp"));
assert!(engine.exists(b"orig"));
}
#[test]
fn test_hash_key_distribution() {
let hash_size = 1024u32;
let mut buckets = std::collections::HashSet::new();
for i in 0..1000 {
let key = format!("key_{}", i);
let bucket = hash_key(key.as_bytes(), hash_size);
buckets.insert(bucket);
}
assert!(buckets.len() > 200);
}
#[test]
fn test_tdb_corrupt_file_detection() {
let tmp = TempDir::new().unwrap();
let db_path = tmp.path().join("corrupt.tdb");
std::fs::write(&db_path, b"NOT_A_TDB_FILE_GARBAGE_DATA_HERE").unwrap();
assert!(TdbEngine::open(&db_path).is_err());
}
#[test]
fn test_record_flag_conversion() {
assert_eq!(RecordFlag::Active.as_u32(), 0);
assert_eq!(RecordFlag::Free.as_u32(), 1);
assert_eq!(RecordFlag::Deleted.as_u32(), 2);
assert_eq!(RecordFlag::from_u32(0), RecordFlag::Active);
assert_eq!(RecordFlag::from_u32(1), RecordFlag::Free);
assert_eq!(RecordFlag::from_u32(2), RecordFlag::Deleted);
}
#[test]
fn test_tdb_empty_keys_and_values() {
let engine = TdbEngine::new();
engine.store(vec![], vec![]).unwrap();
assert!(engine.exists(&[]));
assert_eq!(engine.fetch(&[]).unwrap(), b"");
}
#[test]
fn test_tdb_large_value() {
let engine = TdbEngine::new();
let large_data = vec![0x42u8; 100_000];
engine.store(b"big".to_vec(), large_data.clone()).unwrap();
let fetched = engine.fetch(b"big").unwrap();
assert_eq!(fetched.len(), 100_000);
assert_eq!(fetched, large_data);
}
#[test]
fn test_tdb_binary_keys() {
let engine = TdbEngine::new();
let key = vec![0u8, 1u8, 255u8, 128u8, 64u8];
engine.store(key.clone(), b"binary_val".to_vec()).unwrap();
assert_eq!(engine.fetch(&key).unwrap(), b"binary_val");
}
}