use crate::vfs::open_flags::OpenFlags; use crate::vfs::{VfsBackend, VfsFile}; use anyhow::{anyhow, Result}; use log::{debug, info, warn}; use std::path::PathBuf; /// MPLEX_BASE from rsync io.h const MPLEX_BASE: u32 = 7; /// Rsync multiplex message codes (from rsync io.h) const MSG_DATA: u8 = 0; const MSG_DONE: u8 = 1; const MSG_REDO: u8 = 9; #[derive(Debug, Clone, PartialEq)] pub(crate) enum RsyncState { SendVersion, WaitVersion, ReadFileList, /// Sum head (4 × write_int = 16 bytes) + checksum seed (4 bytes) = 20 bytes ReadSumHead { need: usize, }, SendSumCount, /// Raw file data from MSG_DATA packets ReadFileData, Done, } pub struct RsyncHandler { state: RsyncState, raw_input: Vec, rsync_input: Vec, output_raw: Vec, dest_path: PathBuf, output_file: Option>, total_written: u64, file_entries: Vec, current_file: usize, protocol_version: u32, multiplex: bool, vfs: Box, } impl RsyncHandler { pub fn parse_rsync_command(command: &str, vfs: Box) -> Result { let parts: Vec<&str> = command.split_whitespace().collect(); if parts.len() < 3 || parts[0] != "rsync" { return Err(anyhow!("Invalid rsync command: {}", command)); } let mut is_server = false; let mut dest = String::new(); for p in &parts[1..] { if *p == "--server" { is_server = true; continue; } if *p == "--sender" || p.starts_with('-') { continue; } if *p == "." { continue; } dest = p.to_string(); } if !is_server { return Err(anyhow!("Not a rsync --server command")); } let dest_path = if dest.is_empty() { PathBuf::from("/tmp/received_file") } else { PathBuf::from(&dest) }; info!("RsyncHandler: dest_path={}", dest_path.display()); let mut handler = Self { state: RsyncState::SendVersion, raw_input: Vec::new(), rsync_input: Vec::new(), output_raw: Vec::new(), dest_path, output_file: None, total_written: 0, file_entries: Vec::new(), current_file: 0, protocol_version: 30, multiplex: false, vfs, }; handler.output_raw.extend_from_slice(&30u32.to_le_bytes()); handler.state = RsyncState::WaitVersion; Ok(handler) } pub fn feed(&mut self, data: &[u8]) -> Result<()> { if self.multiplex { self.raw_input.extend_from_slice(data); self.decode_multiplex(); } else { self.rsync_input.extend_from_slice(data); } self.process() } /// Strip multiplex headers from raw_input → rsync_input fn decode_multiplex(&mut self) { loop { if self.raw_input.len() < 4 { break; } let header = u32::from_le_bytes([ self.raw_input[0], self.raw_input[1], self.raw_input[2], self.raw_input[3], ]); let raw_tag = ((header >> 24) & 0xFF) as u8; let tag = raw_tag.wrapping_sub(MPLEX_BASE as u8); let len = (header & 0x00FF_FFFF) as usize; let total = 4 + len; if self.raw_input.len() < total { break; } let payload = self.raw_input[4..total].to_vec(); self.raw_input.drain(..total); match tag { MSG_DATA => { self.rsync_input.extend_from_slice(&payload); } MSG_DONE => { info!("rsync: MSG_DONE received (file complete)"); self.rsync_input.extend_from_slice(b"RSYNCDONE"); } 9 => { warn!("rsync: MSG_REDO not handled"); } _ => { debug!("rsync: unknown multiplex tag {} len={}", tag, len); } } } } pub fn drain_output(&mut self) -> Vec { let data = std::mem::take(&mut self.output_raw); if data.is_empty() || !self.multiplex { return data; } let header = (MPLEX_BASE << 24) | (data.len() as u32); let mut wrapped = Vec::with_capacity(4 + data.len()); wrapped.extend_from_slice(&header.to_le_bytes()); wrapped.extend_from_slice(&data); wrapped } pub fn pending_output_len(&self) -> usize { self.output_raw.len() } pub fn has_pending_output(&self) -> bool { !self.output_raw.is_empty() } pub fn is_done(&self) -> bool { self.state == RsyncState::Done } pub fn total_received(&self) -> u64 { self.total_written } fn transition(&mut self, new_state: RsyncState) { let old = std::mem::replace(&mut self.state, new_state.clone()); debug!("RsyncHandler: {:?} -> {:?}", old, new_state); } fn process(&mut self) -> Result<()> { loop { match self.state.clone() { RsyncState::SendVersion => { self.transition(RsyncState::WaitVersion); } RsyncState::WaitVersion => { if self.rsync_input.len() >= 4 { let version = u32::from_le_bytes([ self.rsync_input[0], self.rsync_input[1], self.rsync_input[2], self.rsync_input[3], ]); self.rsync_input.drain(..4); self.protocol_version = std::cmp::min(self.protocol_version, version); info!( "rsync: negotiated protocol version {}", self.protocol_version ); self.multiplex = self.protocol_version >= 30; self.transition(RsyncState::ReadFileList); } else { break; } } RsyncState::ReadFileList => { loop { if self.rsync_input.is_empty() { break; } let flags = self.rsync_input[0]; if flags == 0 { self.rsync_input.drain(..1); info!("rsync: file list end ({} entries)", self.file_entries.len()); if self.file_entries.is_empty() { self.file_entries.push("file".to_string()); } self.current_file = 0; self.transition(RsyncState::ReadSumHead { need: 20 }); break; } let mut pos = 1; let _more_flags = if flags & 0x80 != 0 { if self.rsync_input.len() <= pos { break; } let ef = self.rsync_input[pos]; pos += 1; ef } else { 0 }; let has_name = !(flags & 0x02 != 0 && self.current_file > 0); if has_name { if let Some(nul_pos) = self.rsync_input[pos..].iter().position(|&b| b == 0) { let name = String::from_utf8_lossy(&self.rsync_input[pos..pos + nul_pos]) .to_string(); pos += nul_pos + 1; self.file_entries.push(name.clone()); debug!("rsync: file entry: {}", name); } else { break; } } else { let name = if !self.file_entries.is_empty() { self.file_entries[self.current_file].clone() } else { "file".to_string() }; self.file_entries.push(name); } let skip_count = if flags & 0x10 == 0 { 1 } else { 0 } + if flags & 0x20 == 0 { 1 } else { 0 } + if flags & 0x40 == 0 { 1 } else { 0 } + if flags & 0x08 == 0 { 1 } else { 0 } + 1 + if self.protocol_version >= 30 { 1 } else { 0 }; for _ in 0..skip_count { match read_varint(&self.rsync_input[pos..]) { Some((_, consumed)) => pos += consumed, None => break, } } if pos > self.rsync_input.len() { break; } self.current_file += 1; self.rsync_input.drain(..pos); } if self.state == RsyncState::ReadFileList { break; } } RsyncState::ReadSumHead { need } => { if self.rsync_input.len() >= need { let sum_count = i32::from_le_bytes([ self.rsync_input[0], self.rsync_input[1], self.rsync_input[2], self.rsync_input[3], ]); let _sum_blength = i32::from_le_bytes([ self.rsync_input[4], self.rsync_input[5], self.rsync_input[6], self.rsync_input[7], ]); let _sum_s2length = i32::from_le_bytes([ self.rsync_input[8], self.rsync_input[9], self.rsync_input[10], self.rsync_input[11], ]); let _sum_remainder = i32::from_le_bytes([ self.rsync_input[12], self.rsync_input[13], self.rsync_input[14], self.rsync_input[15], ]); let checksum_seed = i32::from_le_bytes([ self.rsync_input[16], self.rsync_input[17], self.rsync_input[18], self.rsync_input[19], ]); self.rsync_input.drain(..20); info!("rsync: sum_head count={} seed={}", sum_count, checksum_seed); self.transition(RsyncState::SendSumCount); } else { break; } } RsyncState::SendSumCount => { self.open_current_file()?; self.output_raw.extend_from_slice(&0u32.to_le_bytes()); info!("rsync: sent sum_count=0, ready to receive file data"); self.transition(RsyncState::ReadFileData); } RsyncState::ReadFileData => { let done_marker = b"RSYNCDONE"; if let Some(pos) = self .rsync_input .windows(done_marker.len()) .position(|w| w == done_marker) { if pos > 0 { let data = self.rsync_input[..pos].to_vec(); self.rsync_input.drain(..pos); self.write_to_file(&data)?; } self.rsync_input.drain(..done_marker.len()); if let Some(mut file) = self.output_file.take() { if let Err(e) = file.flush() { warn!("rsync flush error: {}", e); } } info!( "rsync: file {} complete ({} bytes written to {})", self.file_entries .get(self.current_file) .unwrap_or(&"?".to_string()), self.total_written, self.dest_path.display(), ); self.current_file += 1; if self.current_file >= self.file_entries.len() { self.transition(RsyncState::Done); info!( "rsync ALL DONE: {} bytes written to {}", self.total_written, self.dest_path.display() ); } else { self.transition(RsyncState::ReadSumHead { need: 20 }); } } else if !self.rsync_input.is_empty() { let data = self.rsync_input.clone(); self.rsync_input.clear(); self.write_to_file(&data)?; break; } else { break; } } RsyncState::Done => { break; } } } Ok(()) } fn open_current_file(&mut self) -> Result<()> { if let Some(parent) = self.dest_path.parent() { self.vfs.create_dir_all(parent, 0o755).ok(); } let flags = OpenFlags::new().write().create().truncate(); let file = self .vfs .open_file(&self.dest_path, &flags) .map_err(|e| anyhow!("open error: {}", e))?; self.output_file = Some(file); info!("rsync: opened {} for writing", self.dest_path.display()); Ok(()) } fn write_to_file(&mut self, data: &[u8]) -> Result<()> { if let Some(file) = &mut self.output_file { file.write_all(data) .map_err(|e| anyhow!("write error: {}", e))?; self.total_written += data.len() as u64; } Ok(()) } } /// Read rsync varint (LSB-first 7-bit groups, 0xFF prefix for negative) fn read_varint(buf: &[u8]) -> Option<(i32, usize)> { if buf.is_empty() { return None; } let mut pos = 0; let mut b = buf[pos]; pos += 1; let neg = if b == 0xFF { if pos >= buf.len() { return None; } b = buf[pos]; pos += 1; true } else { false }; let mut x = (b & 0x7F) as i32; let mut shift = 7; while b & 0x80 != 0 { if pos >= buf.len() { return None; } b = buf[pos]; pos += 1; x |= ((b & 0x7F) as i32) << shift; shift += 7; } if neg { Some((-x, pos)) } else { Some((x, pos)) } } #[cfg(test)] mod tests { use super::*; use crate::vfs::local_fs::LocalFs; fn make_vfs() -> Box { Box::new(LocalFs::new()) } #[test] fn test_parse_command() { let h = RsyncHandler::parse_rsync_command( "rsync --server -g -l -o -p -D -r -t -v --dirs . /tmp/upload.bin", make_vfs(), ) .unwrap(); assert_eq!(h.dest_path, PathBuf::from("/tmp/upload.bin")); } #[test] fn test_parse_command_sender() { let h = RsyncHandler::parse_rsync_command( "rsync --server --sender -vlogDtprz . /home/user/file.txt", make_vfs(), ) .unwrap(); assert_eq!(h.dest_path, PathBuf::from("/home/user/file.txt")); } #[test] fn test_version_exchange() { let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/test.bin", make_vfs()) .unwrap(); let output = h.drain_output(); assert_eq!(output, b"\x1e\x00\x00\x00"); assert_eq!(h.state, RsyncState::WaitVersion); h.feed(b"\x1e\x00\x00\x00").unwrap(); assert_eq!(h.state, RsyncState::ReadFileList); assert!(h.multiplex); } #[test] fn test_version_negotiate_down() { let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/test.bin", make_vfs()) .unwrap(); let _ = h.drain_output(); h.feed(b"\x1d\x00\x00\x00").unwrap(); assert_eq!(h.protocol_version, 29); assert_eq!(h.state, RsyncState::ReadFileList); } fn build_multiplex(data: &[u8]) -> Vec { let header = (MPLEX_BASE << 24) | (data.len() as u32); let mut buf = Vec::with_capacity(4 + data.len()); buf.extend_from_slice(&header.to_le_bytes()); buf.extend_from_slice(data); buf } #[test] fn test_file_list_multiplex() { let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/rsync_test.bin", make_vfs()) .unwrap(); let _ = h.drain_output(); h.feed(b"\x1e\x00\x00\x00").unwrap(); assert!(h.multiplex); let mut flist = Vec::new(); // File list: flags=1 (has name), then name with NUL terminator flist.push(1); // flags: has name flist.extend_from_slice(b"test.txt"); flist.push(0); // name terminator fn write_varint(buf: &mut Vec, val: i32) { if val == 0 { buf.push(0); return; } if val < 0 { buf.push(0xFF); let mut v = (-val) as u32; while v > 0 { let mut byte = (v & 0x7F) as u8; v >>= 7; if v > 0 { byte |= 0x80; } buf.push(byte); } } else { let mut v = val as u32; while v > 0 { let mut byte = (v & 0x7F) as u8; v >>= 7; if v > 0 { byte |= 0x80; } buf.push(byte); } } } write_varint(&mut flist, 33188); write_varint(&mut flist, 501); write_varint(&mut flist, 20); write_varint(&mut flist, 1700000000); write_varint(&mut flist, 100); write_varint(&mut flist, 0); flist.push(0); // file list end marker let mut sum_head = Vec::new(); sum_head.extend_from_slice(&0i32.to_le_bytes()); sum_head.extend_from_slice(&7000i32.to_le_bytes()); sum_head.extend_from_slice(&2i32.to_le_bytes()); sum_head.extend_from_slice(&100i32.to_le_bytes()); sum_head.extend_from_slice(&42i32.to_le_bytes()); h.feed(&build_multiplex(&flist)).unwrap(); // After file list with end marker, state should be ReadSumHead (or ReadFileData after sum_head processing) // The handler processes the file list end and transitions assert_eq!(h.file_entries.len(), 1); h.feed(&build_multiplex(&sum_head)).unwrap(); // After sum_head, transitions through SendSumCount to ReadFileData assert_eq!(h.state, RsyncState::ReadFileData); let sum_resp = h.drain_output(); assert_eq!(sum_resp.len(), 8); assert_eq!(&sum_resp[4..8], &0u32.to_le_bytes()); } #[test] fn test_file_data_multiplex() { let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/rsync_test.bin", make_vfs()) .unwrap(); let _ = h.drain_output(); h.feed(b"\x1e\x00\x00\x00").unwrap(); let mut flist = Vec::new(); flist.push(1); // flags: has name flist.extend_from_slice(b"test.bin"); flist.push(0); fn wv(buf: &mut Vec, val: i32) { if val == 0 { buf.push(0); return; } if val < 0 { buf.push(0xFF); let mut v = (-val) as u32; while v > 0 { let mut byte = (v & 0x7F) as u8; v >>= 7; if v > 0 { byte |= 0x80; } buf.push(byte); } } else { let mut v = val as u32; while v > 0 { let mut byte = (v & 0x7F) as u8; v >>= 7; if v > 0 { byte |= 0x80; } buf.push(byte); } } } wv(&mut flist, 33188); wv(&mut flist, 501); wv(&mut flist, 20); wv(&mut flist, 1700000000); wv(&mut flist, 100); wv(&mut flist, 0); flist.push(0); // file list end h.feed(&build_multiplex(&flist)).unwrap(); let mut sh = Vec::new(); sh.extend_from_slice(&0i32.to_le_bytes()); sh.extend_from_slice(&7000i32.to_le_bytes()); sh.extend_from_slice(&2i32.to_le_bytes()); sh.extend_from_slice(&100i32.to_le_bytes()); sh.extend_from_slice(&42i32.to_le_bytes()); h.feed(&build_multiplex(&sh)).unwrap(); let _ = h.drain_output(); let file_data = b"Hello, rsync protocol!"; h.feed(&build_multiplex(file_data)).unwrap(); assert_eq!(h.state, RsyncState::ReadFileData); let done_header = ((MPLEX_BASE + 1) << 24) as u32; let done_bytes = done_header.to_le_bytes(); h.feed(&done_bytes).unwrap(); assert_eq!(h.state, RsyncState::Done); assert_eq!(h.total_written, file_data.len() as u64); } }