VFS/DataProvider/Config refactoring + SSH public key authentication
Phase 1-6 of refactoring plan: - VFS abstraction (VfsBackend trait + LocalFs + OpenFlags builder) - DataProvider trait (SqliteProvider + PgProvider, SFTPGo-compatible) - Config refactoring (AppConfig unified sections, env overrides) - SSH handlers (sftp/scp/rsync) migrated to VFS + DataProvider - SSH public key authentication (Ed25519 signature verification) - SSH stderr → CHANNEL_EXTENDED_DATA support - Web auth uses DataProvider instead of direct SQL - User home directory from provider (per-user isolation) - PostgreSQL auth provider for SFTPGo compatibility
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
use std::path::PathBuf;
|
||||
use std::fs::{self, File};
|
||||
use std::io::Write;
|
||||
use anyhow::{Result, anyhow};
|
||||
use log::{info, debug, warn};
|
||||
use crate::vfs::{VfsBackend, VfsFile, VfsError};
|
||||
use crate::vfs::open_flags::OpenFlags;
|
||||
|
||||
/// MPLEX_BASE from rsync io.h
|
||||
const MPLEX_BASE: u32 = 7;
|
||||
@@ -27,23 +27,21 @@ pub(crate) enum RsyncState {
|
||||
|
||||
pub struct RsyncHandler {
|
||||
state: RsyncState,
|
||||
/// Raw input from SSH (multiplexed after version exchange)
|
||||
raw_input: Vec<u8>,
|
||||
/// Decoded rsync protocol data (after stripping multiplex)
|
||||
rsync_input: Vec<u8>,
|
||||
/// Raw rsync data to send (multiplex wrapping applied in drain_output)
|
||||
output_raw: Vec<u8>,
|
||||
dest_path: PathBuf,
|
||||
output_file: Option<File>,
|
||||
output_file: Option<Box<dyn VfsFile>>,
|
||||
total_written: u64,
|
||||
file_entries: Vec<String>,
|
||||
current_file: usize,
|
||||
protocol_version: u32,
|
||||
multiplex: bool,
|
||||
vfs: Box<dyn VfsBackend>,
|
||||
}
|
||||
|
||||
impl RsyncHandler {
|
||||
pub fn parse_rsync_command(command: &str) -> Result<Self> {
|
||||
pub fn parse_rsync_command(command: &str, vfs: Box<dyn VfsBackend>) -> Result<Self> {
|
||||
let parts: Vec<&str> = command.split_whitespace().collect();
|
||||
if parts.len() < 3 || parts[0] != "rsync" {
|
||||
return Err(anyhow!("Invalid rsync command: {}", command));
|
||||
@@ -83,9 +81,9 @@ impl RsyncHandler {
|
||||
current_file: 0,
|
||||
protocol_version: 30,
|
||||
multiplex: false,
|
||||
vfs,
|
||||
};
|
||||
|
||||
// Send protocol version (4-byte LE int, no multiplex)
|
||||
handler.output_raw.extend_from_slice(&30u32.to_le_bytes());
|
||||
handler.state = RsyncState::WaitVersion;
|
||||
|
||||
@@ -129,7 +127,6 @@ impl RsyncHandler {
|
||||
}
|
||||
MSG_DONE => {
|
||||
info!("rsync: MSG_DONE received (file complete)");
|
||||
// Signal file completion by appending a sentinel to rsync_input
|
||||
self.rsync_input.extend_from_slice(b"RSYNCDONE");
|
||||
}
|
||||
9 => {
|
||||
@@ -147,7 +144,6 @@ impl RsyncHandler {
|
||||
if data.is_empty() || !self.multiplex {
|
||||
return data;
|
||||
}
|
||||
// Wrap with multiplex header (MSG_DATA)
|
||||
let header = (MPLEX_BASE << 24) | (data.len() as u32);
|
||||
let mut wrapped = Vec::with_capacity(4 + data.len());
|
||||
wrapped.extend_from_slice(&header.to_le_bytes());
|
||||
@@ -180,7 +176,6 @@ impl RsyncHandler {
|
||||
loop {
|
||||
match self.state.clone() {
|
||||
RsyncState::SendVersion => {
|
||||
// Version already sent in constructor
|
||||
self.transition(RsyncState::WaitVersion);
|
||||
}
|
||||
|
||||
@@ -206,7 +201,6 @@ impl RsyncHandler {
|
||||
|
||||
let flags = self.rsync_input[0];
|
||||
if flags == 0 {
|
||||
// End of file list
|
||||
self.rsync_input.drain(..1);
|
||||
info!("rsync: file list end ({} entries)", self.file_entries.len());
|
||||
|
||||
@@ -214,14 +208,12 @@ impl RsyncHandler {
|
||||
self.file_entries.push("file".to_string());
|
||||
}
|
||||
self.current_file = 0;
|
||||
// Enter sum head reading state
|
||||
self.transition(RsyncState::ReadSumHead { need: 20 });
|
||||
break;
|
||||
}
|
||||
|
||||
let mut pos = 1;
|
||||
|
||||
// Extended flags
|
||||
let _more_flags = if flags & 0x80 != 0 {
|
||||
if self.rsync_input.len() <= pos { break; }
|
||||
let ef = self.rsync_input[pos];
|
||||
@@ -249,7 +241,6 @@ impl RsyncHandler {
|
||||
self.file_entries.push(name);
|
||||
}
|
||||
|
||||
// Skip metadata varints
|
||||
let skip_count = if flags & 0x10 == 0 { 1 } else { 0 }
|
||||
+ if flags & 0x20 == 0 { 1 } else { 0 }
|
||||
+ if flags & 0x40 == 0 { 1 } else { 0 }
|
||||
@@ -277,9 +268,6 @@ impl RsyncHandler {
|
||||
|
||||
RsyncState::ReadSumHead { need } => {
|
||||
if self.rsync_input.len() >= need {
|
||||
// Read sum head: count, blength, s2length, remainder (4 × LE int)
|
||||
// + checksum seed (1 × LE int)
|
||||
// = 5 × 4 = 20 bytes
|
||||
let sum_count = i32::from_le_bytes([
|
||||
self.rsync_input[0], self.rsync_input[1],
|
||||
self.rsync_input[2], self.rsync_input[3],
|
||||
@@ -312,7 +300,6 @@ impl RsyncHandler {
|
||||
RsyncState::SendSumCount => {
|
||||
self.open_current_file()?;
|
||||
|
||||
// Send sum_count = 0 (4-byte LE int = we have no existing data)
|
||||
self.output_raw.extend_from_slice(&0u32.to_le_bytes());
|
||||
info!("rsync: sent sum_count=0, ready to receive file data");
|
||||
|
||||
@@ -320,22 +307,17 @@ impl RsyncHandler {
|
||||
}
|
||||
|
||||
RsyncState::ReadFileData => {
|
||||
// Data comes as raw bytes inside MSG_DATA multiplex packets.
|
||||
// MSG_DONE appends b"RSYNCDONE" to rsync_input.
|
||||
let done_marker = b"RSYNCDONE";
|
||||
if let Some(pos) = self.rsync_input.windows(done_marker.len())
|
||||
.position(|w| w == done_marker)
|
||||
{
|
||||
// Data before the marker
|
||||
if pos > 0 {
|
||||
let data = self.rsync_input[..pos].to_vec();
|
||||
self.rsync_input.drain(..pos);
|
||||
self.write_to_file(&data)?;
|
||||
}
|
||||
// Remove marker
|
||||
self.rsync_input.drain(..done_marker.len());
|
||||
|
||||
// Close file
|
||||
if let Some(mut file) = self.output_file.take() {
|
||||
if let Err(e) = file.flush() {
|
||||
warn!("rsync flush error: {}", e);
|
||||
@@ -353,11 +335,9 @@ impl RsyncHandler {
|
||||
info!("rsync ALL DONE: {} bytes written to {}",
|
||||
self.total_written, self.dest_path.display());
|
||||
} else {
|
||||
// Next file sum head
|
||||
self.transition(RsyncState::ReadSumHead { need: 20 });
|
||||
}
|
||||
} else if !self.rsync_input.is_empty() {
|
||||
// Partial data, keep it in buffer for more
|
||||
let data = self.rsync_input.clone();
|
||||
self.rsync_input.clear();
|
||||
self.write_to_file(&data)?;
|
||||
@@ -377,9 +357,11 @@ impl RsyncHandler {
|
||||
|
||||
fn open_current_file(&mut self) -> Result<()> {
|
||||
if let Some(parent) = self.dest_path.parent() {
|
||||
fs::create_dir_all(parent).ok();
|
||||
self.vfs.create_dir_all(parent, 0o755).ok();
|
||||
}
|
||||
let file = File::create(&self.dest_path)?;
|
||||
let flags = OpenFlags::new().write().create().truncate();
|
||||
let file = self.vfs.open_file(&self.dest_path, &flags)
|
||||
.map_err(|e| anyhow!("open error: {}", e))?;
|
||||
self.output_file = Some(file);
|
||||
info!("rsync: opened {} for writing", self.dest_path.display());
|
||||
Ok(())
|
||||
@@ -387,7 +369,8 @@ impl RsyncHandler {
|
||||
|
||||
fn write_to_file(&mut self, data: &[u8]) -> Result<()> {
|
||||
if let Some(file) = &mut self.output_file {
|
||||
file.write_all(data)?;
|
||||
file.write_all(data)
|
||||
.map_err(|e| anyhow!("write error: {}", e))?;
|
||||
self.total_written += data.len() as u64;
|
||||
}
|
||||
Ok(())
|
||||
@@ -426,28 +409,37 @@ fn read_varint(buf: &[u8]) -> Option<(i32, usize)> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::vfs::local_fs::LocalFs;
|
||||
|
||||
fn make_vfs() -> Box<dyn VfsBackend> {
|
||||
Box::new(LocalFs::new())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_command() {
|
||||
let h = RsyncHandler::parse_rsync_command("rsync --server -g -l -o -p -D -r -t -v --dirs . /tmp/upload.bin").unwrap();
|
||||
let h = RsyncHandler::parse_rsync_command(
|
||||
"rsync --server -g -l -o -p -D -r -t -v --dirs . /tmp/upload.bin",
|
||||
make_vfs()
|
||||
).unwrap();
|
||||
assert_eq!(h.dest_path, PathBuf::from("/tmp/upload.bin"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_command_sender() {
|
||||
let h = RsyncHandler::parse_rsync_command("rsync --server --sender -vlogDtprz . /home/user/file.txt").unwrap();
|
||||
let h = RsyncHandler::parse_rsync_command(
|
||||
"rsync --server --sender -vlogDtprz . /home/user/file.txt",
|
||||
make_vfs()
|
||||
).unwrap();
|
||||
assert_eq!(h.dest_path, PathBuf::from("/home/user/file.txt"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_version_exchange() {
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/test.bin").unwrap();
|
||||
// Initial output: protocol version (30 as LE int)
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/test.bin", make_vfs()).unwrap();
|
||||
let output = h.drain_output();
|
||||
assert_eq!(output, b"\x1e\x00\x00\x00");
|
||||
assert_eq!(h.state, RsyncState::WaitVersion);
|
||||
|
||||
// Client sends its version (30 = 0x1E)
|
||||
h.feed(b"\x1e\x00\x00\x00").unwrap();
|
||||
assert_eq!(h.state, RsyncState::ReadFileList);
|
||||
assert!(h.multiplex);
|
||||
@@ -455,9 +447,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_version_negotiate_down() {
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/test.bin").unwrap();
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/test.bin", make_vfs()).unwrap();
|
||||
let _ = h.drain_output();
|
||||
// Client has lower version (29)
|
||||
h.feed(b"\x1d\x00\x00\x00").unwrap();
|
||||
assert_eq!(h.protocol_version, 29);
|
||||
assert_eq!(h.state, RsyncState::ReadFileList);
|
||||
@@ -471,24 +462,14 @@ mod tests {
|
||||
buf
|
||||
}
|
||||
|
||||
fn build_multiplex_done() -> Vec<u8> {
|
||||
let header = (MPLEX_BASE << 24) | 0u32; // MSG_DONE (tag=1 → raw_tag=8)
|
||||
let mut buf = Vec::new();
|
||||
buf.extend_from_slice(&header.to_le_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_list_multiplex() {
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/rsync_test.bin").unwrap();
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/rsync_test.bin", make_vfs()).unwrap();
|
||||
let _ = h.drain_output();
|
||||
// Version exchange
|
||||
h.feed(b"\x1e\x00\x00\x00").unwrap();
|
||||
assert!(h.multiplex);
|
||||
|
||||
// Build file list with multiplex wrapping
|
||||
let mut flist = Vec::new();
|
||||
// Entry: flags=0, name="test.txt\0", + 6 varints
|
||||
flist.push(0);
|
||||
flist.extend_from_slice(b"test.txt");
|
||||
flist.push(0);
|
||||
@@ -514,46 +495,40 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
write_varint(&mut flist, 33188); // mode
|
||||
write_varint(&mut flist, 501); // uid
|
||||
write_varint(&mut flist, 20); // gid
|
||||
write_varint(&mut flist, 1700000000); // time
|
||||
write_varint(&mut flist, 100); // size
|
||||
write_varint(&mut flist, 0); // checksum seed
|
||||
// End marker
|
||||
write_varint(&mut flist, 33188);
|
||||
write_varint(&mut flist, 501);
|
||||
write_varint(&mut flist, 20);
|
||||
write_varint(&mut flist, 1700000000);
|
||||
write_varint(&mut flist, 100);
|
||||
write_varint(&mut flist, 0);
|
||||
flist.push(0);
|
||||
|
||||
// Sum head (5 ints = 20 bytes) as separate multiplex packet
|
||||
let mut sum_head = Vec::new();
|
||||
sum_head.extend_from_slice(&0i32.to_le_bytes()); // count
|
||||
sum_head.extend_from_slice(&7000i32.to_le_bytes()); // blength
|
||||
sum_head.extend_from_slice(&2i32.to_le_bytes()); // s2length
|
||||
sum_head.extend_from_slice(&100i32.to_le_bytes()); // remainder
|
||||
sum_head.extend_from_slice(&42i32.to_le_bytes()); // checksum_seed
|
||||
sum_head.extend_from_slice(&0i32.to_le_bytes());
|
||||
sum_head.extend_from_slice(&7000i32.to_le_bytes());
|
||||
sum_head.extend_from_slice(&2i32.to_le_bytes());
|
||||
sum_head.extend_from_slice(&100i32.to_le_bytes());
|
||||
sum_head.extend_from_slice(&42i32.to_le_bytes());
|
||||
|
||||
// Feed file list
|
||||
h.feed(&build_multiplex(&flist)).unwrap();
|
||||
assert_eq!(h.state, RsyncState::ReadFileList); // Still reading, 0x00 end marker triggered transition
|
||||
assert_eq!(h.state, RsyncState::ReadFileList);
|
||||
assert_eq!(h.file_entries.len(), 1);
|
||||
|
||||
// Now feed sum head
|
||||
h.feed(&build_multiplex(&sum_head)).unwrap();
|
||||
assert_eq!(h.state, RsyncState::SendSumCount);
|
||||
|
||||
// Send sum count response
|
||||
let sum_resp = h.drain_output();
|
||||
assert_eq!(sum_resp.len(), 8); // 4-byte header + 4-byte int
|
||||
assert_eq!(sum_resp.len(), 8);
|
||||
assert_eq!(&sum_resp[4..8], &0u32.to_le_bytes());
|
||||
assert_eq!(h.state, RsyncState::ReadFileData);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_data_multiplex() {
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/rsync_test.bin").unwrap();
|
||||
let mut h = RsyncHandler::parse_rsync_command("rsync --server . /tmp/rsync_test.bin", make_vfs()).unwrap();
|
||||
let _ = h.drain_output();
|
||||
h.feed(b"\x1e\x00\x00\x00").unwrap(); // version
|
||||
h.feed(b"\x1e\x00\x00\x00").unwrap();
|
||||
|
||||
// Simple file list
|
||||
let mut flist = Vec::new();
|
||||
flist.push(0);
|
||||
flist.extend_from_slice(b"test.bin");
|
||||
@@ -568,7 +543,6 @@ mod tests {
|
||||
flist.push(0);
|
||||
h.feed(&build_multiplex(&flist)).unwrap();
|
||||
|
||||
// Sum head
|
||||
let mut sh = Vec::new();
|
||||
sh.extend_from_slice(&0i32.to_le_bytes());
|
||||
sh.extend_from_slice(&7000i32.to_le_bytes());
|
||||
@@ -576,16 +550,13 @@ mod tests {
|
||||
sh.extend_from_slice(&100i32.to_le_bytes());
|
||||
sh.extend_from_slice(&42i32.to_le_bytes());
|
||||
h.feed(&build_multiplex(&sh)).unwrap();
|
||||
let _ = h.drain_output(); // sum count response
|
||||
let _ = h.drain_output();
|
||||
|
||||
// File data + MSG_DONE
|
||||
let file_data = b"Hello, rsync protocol!";
|
||||
h.feed(&build_multiplex(file_data)).unwrap();
|
||||
assert_eq!(h.state, RsyncState::ReadFileData);
|
||||
|
||||
// MSG_DONE
|
||||
// MSG_DONE has tag=1, so raw_tag = MPLEX_BASE + 1 = 8
|
||||
let done_header = (MPLEX_BASE + 1) << 24; // raw_tag = 8, len = 0
|
||||
let done_header = (MPLEX_BASE + 1) << 24;
|
||||
let done_bytes = done_header.to_le_bytes();
|
||||
h.feed(&done_bytes).unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user