Files
markbase/markbase-core/src/ssh_server/channel.rs
Warren 5344a7c16e
Some checks failed
Test / test (push) Has been cancelled
Test / build (push) Has been cancelled
Fix rsync: Use real rsync subprocess instead of in-process handler
In-process RsyncHandler couldn't match openrsync protocol 29 flow
after version exchange. Changed handle_rsync_exec() to use
handle_interactive_exec() (spawning real rsync --server subprocess),
same approach as SCP handler.

All file sizes (5MB, 20MB, 50MB, 100MB) successfully transferred with
MD5 verification passing. Transfer speed ~712 KB/s limited by
AES-256-CTR encryption overhead.
2026-06-18 06:01:16 +08:00

1638 lines
75 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// SSH Channel协议实现Phase 6 + Phase 13端口转发
// 参考OpenSSH channel.c
use crate::ssh_server::packet::{SshPacket, PacketType};
use crate::ssh_server::ssh_security_config::SshSecurityConfig; // Phase 13.3: 安全配置
use crate::ssh_server::port_forward::{PortForwardManager, DirectTcpipChannel, ForwardedTcpipChannel}; // Phase 13.3
use std::io::{Read, Write}; // 导入Write traitOpenSSH标准
use anyhow::{Result, anyhow};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use log::{info, warn, debug, error};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::ssh_server::sftp_handler::SftpHandler; // Phase 7: SFTP handler
use crate::ssh_server::scp_handler::ScpHandler; // Phase 8: SCP handler
use crate::ssh_server::rsync_handler::RsyncHandler; // Phase 8: rsync handler
use std::path::PathBuf; // Phase 7-8: Path for SFTP/SCP/rsync root directory
use std::process::{Child, ChildStdin, ChildStdout, ChildStderr}; // Phase 14: 交互式exec
use std::os::unix::io::{AsRawFd, RawFd}; // Phase 14: OpenSSH风格poll机制需要RawFd
use nix::fcntl::{fcntl, FcntlArg, OFlag}; // Phase 14: 非阻塞I/OOpenSSH风格
use nix::poll::{poll, PollFd, PollFlags}; // Phase 14: poll机制OpenSSH风格
/// SSH Channel管理器参考OpenSSH channel.c: struct channel
pub struct ChannelManager {
channels: HashMap<u32, Channel>,
next_channel_id: u32,
}
/// Phase 14: 交互式Exec进程管理参考OpenSSH session.c: do_exec_no_pty
/// ⭐⭐⭐⭐⭐ OpenSSH风格使用poll()替代thread::spawn非阻塞I/O
pub struct ExecProcess {
pub child: Child, // 子进程rsync/scp等
pub stdin: Option<ChildStdin>, // stdin管道SSH client → 子进程)
pub stdout: Option<ChildStdout>, // ⭐⭐⭐⭐⭐ stdout管道直接poll不使用thread
pub stderr: Option<ChildStderr>, // ⭐⭐⭐⭐⭐ stderr管道直接poll不使用thread
pub stdout_fd: RawFd, // ⭐⭐⭐⭐⭐ stdout RawFd用于poll
pub stderr_fd: RawFd, // ⭐⭐⭐⭐⭐ stderr RawFd用于poll
pub command: String, // ⭐⭐⭐⭐⭐ Phase 16.2: 存储exec命令用于SCP检测
}
impl ChannelManager {
pub fn new() -> Self {
Self {
channels: HashMap::new(),
next_channel_id: 0,
}
}
/// 处理SSH_MSG_CHANNEL_OPEN参考OpenSSH channel.c: channel_open())
/// Phase 13.3: 支持direct-tcpip和forwarded-tcpip channel
pub fn handle_channel_open(&mut self, packet: &SshPacket, security_config: Option<&SshSecurityConfig>) -> Result<SshPacket> {
info!("Processing SSH_MSG_CHANNEL_OPEN");
let mut cursor = std::io::Cursor::new(packet.payload.as_slice());
// Packet type
let packet_type = cursor.read_u8()?;
if packet_type != PacketType::SSH_MSG_CHANNEL_OPEN as u8 {
return Err(anyhow!("Invalid packet type for CHANNEL_OPEN"));
}
// 读取channel类型SSH string
let channel_type = read_ssh_string(&mut cursor)?;
// 读取sender channel IDu32
let sender_channel = cursor.read_u32::<BigEndian>()?;
// 读取初始窗口大小u32
let initial_window_size = cursor.read_u32::<BigEndian>()?;
// 读取最大packet大小u32
let maximum_packet_size = cursor.read_u32::<BigEndian>()?;
info!("Channel open: type={}, sender_channel={}, window={}, max_packet={}",
channel_type, sender_channel, initial_window_size, maximum_packet_size);
// Phase 13.3: 检查channel类型支持session、direct-tcpip、forwarded-tcpip
match channel_type.as_str() {
"session" => {
// 传统的session channelPhase 6
self.handle_session_channel_open(sender_channel, initial_window_size, maximum_packet_size)
}
"direct-tcpip" => {
// Phase 13.3: Remote port forwarding channel
info!("Received direct-tcpip channel open (Remote port forwarding)");
self.handle_direct_tcpip_channel_open(packet, sender_channel, initial_window_size, maximum_packet_size, security_config)
}
"forwarded-tcpip" => {
// Phase 13.3: Local port forwarding channel
info!("Received forwarded-tcpip channel open (Local port forwarding)");
self.handle_forwarded_tcpip_channel_open(packet, sender_channel, initial_window_size, maximum_packet_size)
}
_ => {
warn!("Unsupported channel type: {}", channel_type);
self.build_channel_open_failure(
sender_channel,
3, // SSH_OPEN_UNKNOWN_CHANNEL_TYPE
"Unsupported channel type",
"en"
)
}
}
}
/// 处理session channel openPhase 6
fn handle_session_channel_open(&mut self, sender_channel: u32, initial_window_size: u32, maximum_packet_size: u32) -> Result<SshPacket> {
info!("Processing session channel open");
let server_channel = self.next_channel_id;
self.next_channel_id += 1;
let channel = Channel {
server_channel,
sender_channel,
channel_type: "session".to_string(),
// ⭐⭐⭐⭐⭐ Phase 15: Window Control参考OpenSSH channels.h
remote_window: initial_window_size, // 远端窗口(从 CHANNEL_OPEN packet 中读取)
remote_maxpacket: maximum_packet_size, // 远端最大 packet
local_window: 2097152, // 本地窗口OpenSSH 默认 2MB
local_window_max: 2097152, // 本地窗口最大值(同上)
local_consumed: 0, // 本地已消费的数据(初始为 0⭐⭐⭐⭐⭐
local_maxpacket: 32768, // 本地最大 packetOpenSSH 默认 32KB
// 旧字段(保留兼容)
window_size: initial_window_size,
maximum_packet_size,
state: ChannelState::Open,
output_buffer: None,
sftp_handler: None,
scp_handler: None,
rsync_handler: None,
exec_process: None, // Phase 14: 交互式exec
sftp_input_buffer: Vec::new(), // ⭐⭐⭐⭐⭐ Phase 14.2修复SFTP packet累积
scp_input_buffer: Vec::new(), // ⭐⭐⭐⭐⭐ Phase 14.4修复SCP packet累积
direct_tcpip: None,
forwarded_tcpip: None,
};
self.channels.insert(server_channel, channel);
info!("Session channel created: server_channel={}, sender_channel={}", server_channel, sender_channel);
self.build_channel_open_confirmation(server_channel, sender_channel, initial_window_size, maximum_packet_size)
}
/// 处理direct-tcpip channel openPhase 13.3: Remote port forwarding
fn handle_direct_tcpip_channel_open(
&mut self,
packet: &SshPacket,
sender_channel: u32,
initial_window_size: u32,
maximum_packet_size: u32,
security_config: Option<&SshSecurityConfig>,
) -> Result<SshPacket> {
info!("Processing direct-tcpip channel open");
// 解析direct-tcpip参数
let mut port_forward_manager = PortForwardManager::new();
let direct_tcpip = port_forward_manager.handle_direct_tcpip_channel(&packet.payload)?;
// Phase 13.3: 安全配置验证
if let Some(security) = security_config {
if let Err(e) = security.validate_direct_tcpip_channel(&direct_tcpip.host_to_connect, direct_tcpip.port_to_connect) {
warn!("direct-tcpip security validation failed: {}", e);
return self.build_channel_open_failure(
sender_channel,
2, // SSH_OPEN_CONNECT_FAILED
"Security validation failed",
"en"
);
}
info!("direct-tcpip security validation passed");
}
let server_channel = self.next_channel_id;
self.next_channel_id += 1;
let channel = Channel {
server_channel,
sender_channel,
channel_type: "direct-tcpip".to_string(),
// ⭐⭐⭐⭐⭐ Phase 15: Window Control
remote_window: initial_window_size,
remote_maxpacket: maximum_packet_size,
local_window: 2097152,
local_window_max: 2097152,
local_consumed: 0,
local_maxpacket: 32768,
window_size: initial_window_size,
maximum_packet_size,
state: ChannelState::Open,
output_buffer: None,
sftp_handler: None,
scp_handler: None,
rsync_handler: None,
exec_process: None,
sftp_input_buffer: Vec::new(),
scp_input_buffer: Vec::new(),
direct_tcpip: Some(direct_tcpip),
forwarded_tcpip: None,
};
self.channels.insert(server_channel, channel);
info!("direct-tcpip channel created: server_channel={}, host={}, port={}",
server_channel,
self.channels.get(&server_channel).unwrap().direct_tcpip.as_ref().unwrap().host_to_connect,
self.channels.get(&server_channel).unwrap().direct_tcpip.as_ref().unwrap().port_to_connect);
self.build_channel_open_confirmation(server_channel, sender_channel, initial_window_size, maximum_packet_size)
}
/// 处理forwarded-tcpip channel openPhase 13.3: Local port forwarding
fn handle_forwarded_tcpip_channel_open(
&mut self,
packet: &SshPacket,
sender_channel: u32,
initial_window_size: u32,
maximum_packet_size: u32,
) -> Result<SshPacket> {
info!("Processing forwarded-tcpip channel open");
// 解析forwarded-tcpip参数
let mut port_forward_manager = PortForwardManager::new();
let forwarded_tcpip = port_forward_manager.handle_forwarded_tcpip_channel(&packet.payload)?;
let server_channel = self.next_channel_id;
self.next_channel_id += 1;
let channel = Channel {
server_channel,
sender_channel,
channel_type: "forwarded-tcpip".to_string(),
// ⭐⭐⭐⭐⭐ Phase 15: Window Control
remote_window: initial_window_size,
remote_maxpacket: maximum_packet_size,
local_window: 2097152,
local_window_max: 2097152,
local_consumed: 0,
local_maxpacket: 32768,
window_size: initial_window_size,
maximum_packet_size,
state: ChannelState::Open,
output_buffer: None,
sftp_handler: None,
scp_handler: None,
rsync_handler: None,
exec_process: None, // Phase 14: 交互式exec
sftp_input_buffer: Vec::new(), // ⭐⭐⭐⭐⭐ Phase 14.2修复
scp_input_buffer: Vec::new(), // ⭐⭐⭐⭐⭐ Phase 14.4修复
direct_tcpip: None,
forwarded_tcpip: Some(forwarded_tcpip),
};
self.channels.insert(server_channel, channel);
info!("forwarded-tcpip channel created: server_channel={}, bind={}, originator={}",
server_channel,
self.channels.get(&server_channel).unwrap().forwarded_tcpip.as_ref().unwrap().bind_port,
self.channels.get(&server_channel).unwrap().forwarded_tcpip.as_ref().unwrap().originator_address);
self.build_channel_open_confirmation(server_channel, sender_channel, initial_window_size, maximum_packet_size)
}
/// 处理SSH_MSG_CHANNEL_REQUEST参考OpenSSH channel.c: channel_request())
pub fn handle_channel_request(&mut self, packet: &SshPacket) -> Result<Option<SshPacket>> {
info!("Processing SSH_MSG_CHANNEL_REQUEST");
let mut cursor = std::io::Cursor::new(packet.payload.as_slice()); // 使用as_slice()Rust标准
// Packet type
let packet_type = cursor.read_u8()?;
if packet_type != PacketType::SSH_MSG_CHANNEL_REQUEST as u8 {
return Err(anyhow!("Invalid packet type for CHANNEL_REQUEST"));
}
// 读取recipient channelu32
let recipient_channel = cursor.read_u32::<BigEndian>()?;
// 读取请求类型SSH string
let request_type = read_ssh_string(&mut cursor)?;
// 读取want reply标志boolean
let want_reply = cursor.read_u8()? != 0;
info!("Channel request: channel={}, type={}, want_reply={}",
recipient_channel, request_type, want_reply);
// 处理不同请求类型参考OpenSSH channel.c
if request_type == "exec" {
self.handle_exec_request(&mut cursor, recipient_channel, want_reply) // 移除?操作符返回Option不是Result
} else if request_type == "subsystem" {
self.handle_subsystem_request(&mut cursor, recipient_channel, want_reply) // 移除?操作符
} else if request_type == "shell" {
self.handle_shell_request(recipient_channel, want_reply) // 移除?操作符
} else if request_type == "env" {
self.handle_env_request(&mut cursor, recipient_channel, want_reply) // 移除?操作符
} else if request_type == "pty-req" {
self.handle_pty_request(&mut cursor, recipient_channel, want_reply) // 移除?操作符
} else {
warn!("Unsupported channel request: {}", request_type);
if want_reply {
Ok(Some(self.build_channel_failure(recipient_channel)?))
} else {
Ok(None)
}
}
}
/// 处理exec请求参考OpenSSH channel.c: channel_request_exec() + session.c: do_exec_no_pty
fn handle_exec_request(&mut self, cursor: &mut std::io::Cursor<&[u8]>, channel: u32, want_reply: bool) -> Result<Option<SshPacket>> {
info!("Handling exec request for channel {}", channel);
// 读取命令SSH string
let command = read_ssh_string(cursor)?;
info!("Exec command: {}", command);
// Phase 14: 检测rsync/SCP命令启动交互式进程
if command.starts_with("rsync --server") || command.contains("rsync") {
info!("⭐⭐⭐⭐⭐ [EXEC_REQUEST] Detected rsync command: {}", command);
self.handle_rsync_exec(&command, channel)?;
} else if command.starts_with("scp") || command.contains("scp -") {
// ⭐⭐⭐⭐⭐ Phase 14.5: SCP命令处理scp -t destination 或 scp -f source
info!("⭐⭐⭐⭐⭐ [EXEC_REQUEST] Detected SCP command: {}", command);
self.handle_scp_exec(&command, channel)?;
} else {
// Phase 6: 普通命令使用非交互式执行
let output = self.execute_command(&command)?;
// 存储输出等待后续发送CHANNEL_DATA
if let Some(ch) = self.channels.get_mut(&channel) {
ch.output_buffer = Some(output);
}
}
if want_reply {
Ok(Some(self.build_channel_success(channel)?))
} else {
Ok(None)
}
}
/// ⭐⭐⭐⭐⭐ Phase 16.5: rsync exec使用真实rsync子进程替代in-process handler
fn handle_rsync_exec(&mut self, command: &str, channel_id: u32) -> Result<()> {
self.handle_interactive_exec(command, channel_id, "rsync")
}
/// Phase 14.5: 处理SCP交互式execscp -t destination 或 scp -f source
/// ⭐⭐⭐⭐⭐ OpenSSH风格使用poll()替代thread::spawn非阻塞I/O
fn handle_scp_exec(&mut self, command: &str, channel_id: u32) -> Result<()> {
// ⭐⭐⭐⭐⭐ SCP和rsync共用相同的交互式exec逻辑
self.handle_interactive_exec(command, channel_id, "scp")
}
/// ⭐⭐⭐⭐⭐ Phase 14.6: 交互式exec通用处理rsync/SCP共用
fn handle_interactive_exec(&mut self, command: &str, channel_id: u32, process_type: &str) -> Result<()> {
use std::process::{Command, Stdio};
use std::os::unix::io::AsRawFd;
info!("⭐⭐⭐⭐⭐ [{}_EXEC_START] Starting interactive process: {}", process_type, command);
// 启动子进程相当于OpenSSH fork
let mut child = Command::new("sh")
.arg("-c")
.arg(command)
.stdin(Stdio::piped()) // ← 创建stdin管道相当于pipe(pin)
.stdout(Stdio::piped()) // ← 创建stdout管道相当于pipe(pout)
.stderr(Stdio::piped()) // ← 创建stderr管道相当于pipe(perr)
.spawn()?;
info!("⭐⭐⭐⭐⭐ [CHILD_SPAWNED] Child process spawned, PID: {}", child.id());
// 提取管道相当于OpenSSH dup2
let stdin = child.stdin.take().ok_or(anyhow!("stdin take failed"))?;
let stdout = child.stdout.take().ok_or(anyhow!("stdout take failed"))?;
let stderr = child.stderr.take().ok_or(anyhow!("stderr take failed"))?;
// ⭐⭐⭐⭐⭐ OpenSSH关键设置非阻塞模式fcntl O_NONBLOCK
let stdout_fd = stdout.as_raw_fd();
let stderr_fd = stderr.as_raw_fd();
info!("Setting stdout/stderr to non-blocking mode (OpenSSH style)");
fcntl(stdout_fd, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))?;
fcntl(stderr_fd, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))?;
info!("Non-blocking I/O enabled for stdout (fd {}) and stderr (fd {})", stdout_fd, stderr_fd);
// ⭐⭐⭐⭐⭐ OpenSSH风格不再使用thread::spawn直接保留File对象用于poll
// 存储到channel相当于OpenSSH session_set_fds
if let Some(ch) = self.channels.get_mut(&channel_id) {
ch.exec_process = Some(ExecProcess {
child,
stdin: Some(stdin),
stdout: Some(stdout), // ⭐⭐⭐⭐⭐ 直接保留File对象
stderr: Some(stderr), // ⭐⭐⭐⭐⭐ 直接保留File对象
stdout_fd, // ⭐⭐⭐⭐⭐ RawFd用于poll
stderr_fd, // ⭐⭐⭐⭐⭐ RawFd用于poll
command: command.to_string(), // ⭐⭐⭐⭐⭐ Phase 16.2: 存储exec命令用于SCP检测
});
info!("Interactive process stored for channel {} (poll-ready)", channel_id);
}
Ok(())
}
/// 执行命令并捕获输出Phase 6基础实现
fn execute_command(&self, command: &str) -> Result<Vec<u8>> {
use std::process::{Command, Stdio};
info!("Executing command: {}", command);
// 使用shell执行命令参考OpenSSH session.c
let output = Command::new("sh")
.arg("-c")
.arg(command)
.output()?;
// 返回stdout + stderr
let mut result = output.stdout;
result.extend_from_slice(&output.stderr);
info!("Command output: {} bytes", result.len());
Ok(result)
}
/// 处理subsystem请求参考OpenSSH channel.c: channel_request_subsystem())
fn handle_subsystem_request(&mut self, cursor: &mut std::io::Cursor<&[u8]>, channel: u32, want_reply: bool) -> Result<Option<SshPacket>> {
info!("Handling subsystem request for channel {}", channel);
// 读取subsystem名称SSH string
let subsystem = read_ssh_string(cursor)?;
info!("Subsystem: {}", subsystem);
// 检查subsystem支持OpenSSH支持sftp
if subsystem == "sftp" {
info!("SFTP subsystem requested");
// Phase 7: 初始化SFTP handler
let root_dir = PathBuf::from("/Users/accusys/markbase"); // 默认root目录
// ⭐⭐⭐⭐⭐ Phase 4: 获取 client maxpack 限制(从 Channel 中获取)
let maxpacket = if let Some(ch) = self.channels.get(&channel) {
ch.remote_maxpacket // 来自 SSH_MSG_CHANNEL_OPEN 的 maximum_packet_size
} else {
32768 // OpenSSH 默认值32KB
};
let sftp_handler = SftpHandler::new(root_dir, maxpacket); // ⭐⭐⭐⭐⭐ Phase 4: 传入 maxpack
// 存储到channel
if let Some(ch) = self.channels.get_mut(&channel) {
ch.sftp_handler = Some(sftp_handler);
info!("SFTP handler initialized for channel {}", channel);
}
if want_reply {
Ok(Some(self.build_channel_success(channel)?))
} else {
Ok(None)
}
} else {
warn!("Unsupported subsystem: {}", subsystem);
if want_reply {
Ok(Some(self.build_channel_failure(channel)?))
} else {
Ok(None)
}
}
}
/// 处理shell请求参考OpenSSH channel.c
fn handle_shell_request(&mut self, channel: u32, want_reply: bool) -> Result<Option<SshPacket>> {
info!("Handling shell request for channel {}", channel);
// Phase 9将实现shell
warn!("Shell not implemented in Phase 6");
if want_reply {
Ok(Some(self.build_channel_failure(channel)?))
} else {
Ok(None)
}
}
/// 处理env请求参考OpenSSH channel.c
fn handle_env_request(&mut self, cursor: &mut std::io::Cursor<&[u8]>, channel: u32, want_reply: bool) -> Result<Option<SshPacket>> {
info!("Handling env request for channel {}", channel);
// 读取环境变量名和值
let name = read_ssh_string(cursor)?;
let value = read_ssh_string(cursor)?;
info!("Env: {}={}", name, value);
if want_reply {
Ok(Some(self.build_channel_success(channel)?))
} else {
Ok(None)
}
}
/// 处理pty请求参考OpenSSH channel.c
fn handle_pty_request(&mut self, cursor: &mut std::io::Cursor<&[u8]>, channel: u32, want_reply: bool) -> Result<Option<SshPacket>> {
info!("Handling pty request for channel {}", channel);
// 读取terminal类型SSH string
let term = read_ssh_string(cursor)?;
// 读取窗口大小4个uint32
let width = cursor.read_u32::<BigEndian>()?;
let height = cursor.read_u32::<BigEndian>()?;
let _pixel_width = cursor.read_u32::<BigEndian>()?;
let _pixel_height = cursor.read_u32::<BigEndian>()?;
// 读取terminal modesSSH string格式
let modes_len = cursor.read_u32::<BigEndian>()?;
let mut modes = vec![0u8; modes_len as usize];
cursor.read_exact(&mut modes)?;
info!("PTY: term={}, width={}, height={}, modes_len={}", term, width, height, modes_len);
if want_reply {
Ok(Some(self.build_channel_success(channel)?))
} else {
Ok(None)
}
}
/// 处理SSH_MSG_CHANNEL_DATA参考OpenSSH channel.c: channel_input_data())
pub fn handle_channel_data(&mut self, packet: &SshPacket) -> Result<Option<SshPacket>> {
info!("Processing SSH_MSG_CHANNEL_DATA");
let mut cursor = std::io::Cursor::new(packet.payload.as_slice());
// Packet type
let packet_type = cursor.read_u8()?;
if packet_type != PacketType::SSH_MSG_CHANNEL_DATA as u8 {
return Err(anyhow!("Invalid packet type for CHANNEL_DATA"));
}
// 读取recipient channel
let recipient_channel = cursor.read_u32::<BigEndian>()?;
// 读取数据SSH string
let data_length = cursor.read_u32::<BigEndian>()?;
let mut data = vec![0u8; data_length as usize];
cursor.read_exact(&mut data)?;
info!("Channel data: channel={}, length={}", recipient_channel, data.len());
info!("Channel data content (first 20 bytes): {:?}", &data[..std::cmp::min(20, data.len())]);
// Phase 14: 检查是否是交互式exec进程
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
if let Some(exec_process) = &mut channel.exec_process {
info!("Interactive exec process detected, forwarding data to stdin");
info!("Channel data content: {:?}", &data);
info!("Child PID: {:?}", exec_process.child.id());
// 检查子进程状态
match exec_process.child.try_wait() {
Ok(Some(status)) => {
warn!("Child process already exited with status: {:?}", status);
}
Ok(None) => {
info!("Child process still running");
}
Err(e) => {
warn!("Failed to check child status: {}", e);
}
}
// 转发数据到子进程stdin相当于OpenSSH写fdin
if let Some(stdin) = &mut exec_process.stdin {
use std::io::Write;
info!("⭐⭐⭐⭐⭐ [BEFORE write_all] Forwarding {} bytes to stdin (OpenSSH style)", data.len());
stdin.write_all(&data)?;
stdin.flush()?;
info!("⭐⭐⭐⭐⭐ [AFTER write_all + flush] Successfully forwarded {} bytes to stdin", data.len());
}
// ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Critical修复Window Control - 减少 local_window
// OpenSSH channel.c: channel_input_data() 中 c->local_window -= data_len
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
channel.local_window -= data.len() as u32;
info!("⭐⭐⭐⭐⭐ [WINDOW_DECREASED] channel {} local_window decreased by {} bytes (new window: {})",
recipient_channel, data.len(), channel.local_window);
}
// ⭐⭐⭐⭐⭐ OpenSSH风格不等待直接返回None主循环会通过poll处理stdout
info!("stdin forwarded, returning None (main loop will poll stdout/stderr)");
// ⭐⭐⭐⭐⭐ Phase 15: 更新 local_consumed跟踪已消费的数据
if let Some(channel) = self.channels.get_mut(&recipient_channel) {
channel.local_consumed += data.len() as u32;
info!("⭐⭐⭐⭐⭐ [LOCAL_CONSUMED] channel {} consumed {} bytes (total: {})",
recipient_channel, data.len(), channel.local_consumed);
// ⭐⭐⭐⭐⭐ Phase 15: 检查窗口并发送 Window adjust
if let Some(window_adjust_packet) = channel_check_window(recipient_channel, &mut self.channels) {
// 返回 window adjust packet主循环会发送
return Ok(Some(window_adjust_packet));
}
}
return Ok(None);
}
// ⭐⭐⭐⭐⭐ Phase 16.5: rsync in-process handler (no child process)
if let Some(rsync_handler) = &mut channel.rsync_handler {
info!("⭐⭐⭐⭐⭐ [RSYNC_DATA] Feeding {} bytes to RsyncHandler", data.len());
let data_clone = data.clone();
rsync_handler.feed(&data_clone)?;
let output = rsync_handler.drain_output();
info!("⭐⭐⭐⭐⭐ [RSYNC_DATA] RsyncHandler produced {} bytes output, done={}",
output.len(), rsync_handler.is_done());
// ⭐⭐⭐⭐⭐ Phase 15: Window Control - decrease local_window
channel.local_window -= data.len() as u32;
channel.local_consumed += data.len() as u32;
// Check for window adjust
if let Some(window_adjust_packet) = channel_check_window(recipient_channel, &mut self.channels) {
return Ok(Some(window_adjust_packet));
}
if !output.is_empty() {
info!("⭐⭐⭐⭐⭐ [RSYNC_DATA] Returning {} bytes as CHANNEL_DATA", output.len());
return Ok(Some(self.build_channel_data(recipient_channel, &output)?));
}
return Ok(None);
}
// Phase 7: 检查是否是SFTP channel⭐⭐⭐⭐⭐ Phase 14.3: packet accumulation
if let Some(sftp_handler) = &mut channel.sftp_handler {
info!("Processing SFTP request ({} bytes)", data.len());
// ⭐⭐⭐⭐⭐ Critical修复累积SFTP packet数据
channel.sftp_input_buffer.extend_from_slice(&data);
info!("SFTP buffer accumulated: {} bytes total", channel.sftp_input_buffer.len());
// 检查buffer是否有足够数据解析packet length
if channel.sftp_input_buffer.len() < 4 {
info!("SFTP buffer too short for length field, waiting for more data");
return Ok(None); // 继续累积
}
// 解析SFTP packet length前4 bytes
let sftp_length = u32::from_be_bytes([
channel.sftp_input_buffer[0],
channel.sftp_input_buffer[1],
channel.sftp_input_buffer[2],
channel.sftp_input_buffer[3]
]) as usize;
info!("SFTP packet length field: {}", sftp_length);
let expected_total = 4 + sftp_length;
if channel.sftp_input_buffer.len() < expected_total {
info!("SFTP packet incomplete: expected {} bytes, have {} bytes in buffer, waiting for more",
expected_total, channel.sftp_input_buffer.len());
return Ok(None); // 继续累积
}
// ⭐⭐⭐⭐⭐ Buffer足够解析完整SFTP packet
let sftp_packet = &channel.sftp_input_buffer[4..expected_total];
info!("SFTP packet complete: {} bytes, processing", sftp_packet.len());
info!("SFTP packet content (first 20 bytes): {:?}", &sftp_packet[..std::cmp::min(20, sftp_packet.len())]);
let response = sftp_handler.handle_request(sftp_packet)?;
info!("SFTP response: {} bytes", response.len());
// ⭐⭐⭐⭐⭐ 处理完后清空buffer或保留剩余数据
if channel.sftp_input_buffer.len() > expected_total {
// 有剩余数据多个packets的情况
let remaining = channel.sftp_input_buffer[expected_total..].to_vec();
channel.sftp_input_buffer = remaining;
info!("SFTP buffer has remaining {} bytes after processing", channel.sftp_input_buffer.len());
} else {
// 清空buffer
channel.sftp_input_buffer.clear();
info!("SFTP buffer cleared after processing");
}
// 构建SSH_MSG_CHANNEL_DATA返回SFTP响应需要SSH string格式
return Ok(Some(self.build_channel_data(recipient_channel, &response)?));
}
}
// 如果不是SFTP或exec_process返回None
Ok(None)
}
/// Phase 14: 构建SSH_MSG_CHANNEL_EXTENDED_DATA参考OpenSSH channel.c
fn build_channel_extended_data(&self, channel: u32, data_type: u32, data: &[u8]) -> Result<SshPacket> {
let mut buffer = Vec::new();
buffer.write_u8(PacketType::SSH_MSG_CHANNEL_EXTENDED_DATA as u8)?;
buffer.write_u32::<BigEndian>(channel)?;
buffer.write_u32::<BigEndian>(data_type)?; // 1 = stderr, 2 = exit status
buffer.write_u32::<BigEndian>(data.len() as u32)?;
buffer.write_all(data)?;
Ok(SshPacket {
packet_length: 0,
padding_length: 0,
payload: buffer,
padding: Vec::new(),
})
}
/// 处理SSH_MSG_CHANNEL_CLOSE参考OpenSSH channel.c: channel_input_close())
pub fn handle_channel_close(&mut self, packet: &SshPacket) -> Result<Option<SshPacket>> {
info!("Processing SSH_MSG_CHANNEL_CLOSE");
let mut cursor = std::io::Cursor::new(packet.payload.as_slice()); // 使用as_slice()Rust标准
// Packet type
let packet_type = cursor.read_u8()?;
if packet_type != PacketType::SSH_MSG_CHANNEL_CLOSE as u8 {
return Err(anyhow!("Invalid packet type for CHANNEL_CLOSE"));
}
// 读取recipient channel
let recipient_channel = cursor.read_u32::<BigEndian>()?;
info!("Channel close: channel={}", recipient_channel);
// 移除channel参考OpenSSH channel.c
if let Some(channel) = self.channels.remove(&recipient_channel) {
info!("Channel {} removed", recipient_channel);
// 发送SSH_MSG_CHANNEL_CLOSE回应
Ok(Some(self.build_channel_close(channel.sender_channel)?))
} else {
warn!("Channel {} not found", recipient_channel);
Ok(None)
}
}
/// 构建SSH_MSG_CHANNEL_OPEN_CONFIRMATION参考OpenSSH channel.c
fn build_channel_open_confirmation(
&self,
server_channel: u32,
sender_channel: u32,
window_size: u32,
packet_size: u32,
) -> Result<SshPacket> {
let mut payload = Vec::new();
// Packet type
payload.write_u8(PacketType::SSH_MSG_CHANNEL_OPEN_CONFIRMATION as u8)?;
// Server channel number
payload.write_u32::<BigEndian>(server_channel)?;
// Sender channel number
payload.write_u32::<BigEndian>(sender_channel)?;
// Initial window size
payload.write_u32::<BigEndian>(window_size)?;
// Maximum packet size
payload.write_u32::<BigEndian>(packet_size)?;
Ok(SshPacket::new(payload))
}
/// 构建SSH_MSG_CHANNEL_OPEN_FAILURE参考OpenSSH channel.c
fn build_channel_open_failure(
&self,
sender_channel: u32,
reason_code: u32,
description: &str,
language: &str,
) -> Result<SshPacket> {
let mut payload = Vec::new();
// Packet type
payload.write_u8(PacketType::SSH_MSG_CHANNEL_OPEN_FAILURE as u8)?;
// Sender channel number
payload.write_u32::<BigEndian>(sender_channel)?;
// Reason code
payload.write_u32::<BigEndian>(reason_code)?;
// DescriptionSSH string
payload.write_u32::<BigEndian>(description.len() as u32)?;
payload.write_all(description.as_bytes())?;
// LanguageSSH string
payload.write_u32::<BigEndian>(language.len() as u32)?;
payload.write_all(language.as_bytes())?;
Ok(SshPacket::new(payload))
}
/// 构建SSH_MSG_CHANNEL_SUCCESS参考OpenSSH channel.c
fn build_channel_success(&self, channel: u32) -> Result<SshPacket> {
let mut payload = Vec::new();
payload.write_u8(PacketType::SSH_MSG_CHANNEL_SUCCESS as u8)?;
payload.write_u32::<BigEndian>(channel)?;
Ok(SshPacket::new(payload))
}
/// 构建SSH_MSG_CHANNEL_FAILURE参考OpenSSH channel.c
fn build_channel_failure(&self, channel: u32) -> Result<SshPacket> {
let mut payload = Vec::new();
payload.write_u8(PacketType::SSH_MSG_CHANNEL_FAILURE as u8)?;
payload.write_u32::<BigEndian>(channel)?;
Ok(SshPacket::new(payload))
}
/// 构建SSH_MSG_CHANNEL_CLOSE参考OpenSSH channel.c
pub fn build_channel_close(&self, channel: u32) -> Result<SshPacket> {
let mut payload = Vec::new();
payload.write_u8(PacketType::SSH_MSG_CHANNEL_CLOSE as u8)?;
payload.write_u32::<BigEndian>(channel)?;
Ok(SshPacket::new(payload))
}
/// 构建SSH_MSG_CHANNEL_DATAPhase 6新增
pub fn build_channel_data(&self, channel: u32, data: &[u8]) -> Result<SshPacket> {
info!("⭐⭐⭐⭐⭐ [build_channel_data] Building SSH_MSG_CHANNEL_DATA: channel={}, data_len={}", channel, data.len());
let mut payload = Vec::new();
payload.write_u8(PacketType::SSH_MSG_CHANNEL_DATA as u8)?;
payload.write_u32::<BigEndian>(channel)?;
payload.write_u32::<BigEndian>(data.len() as u32)?;
payload.write_all(data)?;
info!("⭐⭐⭐⭐⭐ [build_channel_data] Packet built successfully, payload_len={}", payload.len());
Ok(SshPacket::new(payload))
}
/// 构建SSH_MSG_CHANNEL_EOFPhase 6新增
pub fn build_channel_eof(&self, channel: u32) -> Result<SshPacket> {
let mut payload = Vec::new();
payload.write_u8(PacketType::SSH_MSG_CHANNEL_EOF as u8)?;
payload.write_u32::<BigEndian>(channel)?;
Ok(SshPacket::new(payload))
}
/// 获取有输出待发送的channel IDPhase 6新增
pub fn get_channel_with_output(&self) -> Option<u32> {
for (&id, channel) in &self.channels {
if channel.output_buffer.is_some() {
return Some(id);
}
}
None
}
/// ⭐⭐⭐⭐⭐ Phase 14.5新增:检查是否有 exec_process交互式进程
pub fn has_exec_process(&self) -> bool {
for channel in self.channels.values() {
if channel.exec_process.is_some() || channel.rsync_handler.is_some() {
return true;
}
}
false
}
/// 获取channel输出Phase 6新增
pub fn get_channel_output(&mut self, channel_id: u32) -> Option<Vec<u8>> {
if let Some(channel) = self.channels.get_mut(&channel_id) {
channel.output_buffer.take()
} else {
None
}
}
/// 移除channelPhase 6新增
pub fn remove_channel(&mut self, channel_id: u32) {
self.channels.remove(&channel_id);
}
/// Phase 14: OpenSSH风格poll机制使用nix::poll监听stdout/stderr fd
/// ⭐⭐⭐⭐⭐ 关键:非阻塞读取数据,不等待子进程完成
/// ⭐⭐⭐⭐⭐ Phase 14.2: 处理child exited发送EOF + CLOSE
/// 参考OpenSSH session.c: do_exec_no_pty()
pub fn handle_child_exited(&mut self) -> Result<Vec<SshPacket>> {
// 1. 收集需要处理的channel IDs (exec_process OR rsync_handler)
let channel_ids: Vec<u32> = self.channels
.iter()
.filter_map(|(id, channel)| {
if channel.exec_process.is_some() || channel.rsync_handler.is_some() {
Some(*id)
} else {
None
}
})
.collect();
// 2. 构建packets避免borrow冲突
let mut packets = Vec::new();
for channel_id in &channel_ids {
let eof_packet = self.build_channel_eof(*channel_id)?;
packets.push(eof_packet);
let close_packet = self.build_channel_close(*channel_id)?;
packets.push(close_packet);
}
// 3. 清除exec_process + rsync_handlermutable borrow
for channel_id in &channel_ids {
if let Some(channel) = self.channels.get_mut(channel_id) {
channel.exec_process = None;
channel.rsync_handler = None;
}
}
if !channel_ids.is_empty() {
info!("Child/rsync exited, sent EOF + CLOSE for {} channels", channel_ids.len());
}
Ok(packets)
}
/// ⭐⭐⭐⭐⭐ Phase 14.2: OpenSSH统一poll + child进程状态检测
/// 参考OpenSSH session.c: do_exec_no_pty() + channel.c: channel_handle_fd()
///
/// 关键改进Phase 14.2
/// - 单次poll()同时监听client socket和子进程输出
/// - timeout 10ms非阻塞
/// - **添加child进程状态检测**防止无限spinning⭐⭐⭐⭐⭐
/// - **添加max_poll_iterations限制**最多100次1秒
/// - 返回(stdout_packets, client_has_data, child_exited)
pub fn poll_exec_stdout_and_client(&mut self, stream: &std::net::TcpStream) -> Result<(Option<Vec<SshPacket>>, bool, bool)> {
use std::io::Read;
use std::os::unix::io::{BorrowedFd, AsRawFd};
use nix::poll::{poll, PollFd, PollFlags};
// 收集所有需要poll的fd
let mut poll_fds_vec = Vec::new();
let mut client_has_data = false;
let mut child_exited = false;
// 1. 添加client socket fd监听stdin数据
let client_fd = stream.as_raw_fd();
let client_poll_fd = unsafe {
BorrowedFd::borrow_raw(client_fd)
};
poll_fds_vec.push(PollFd::new(client_poll_fd, PollFlags::POLLIN));
let client_fd_idx = 0; // client fd总是第一个
// 2. 添加所有channel的stdout/stderr fd
let mut channel_fds_map: HashMap<u32, (usize, usize)> = HashMap::new(); // channel_id -> (stdout_idx, stderr_idx)
let mut channel_ids_vec = Vec::new(); // 用于后续child状态检查
for (channel_id, channel) in &self.channels {
if let Some(exec_process) = &channel.exec_process {
channel_ids_vec.push(*channel_id);
// stdout fd
if let Some(_stdout) = &exec_process.stdout {
let stdout_poll_fd = unsafe {
BorrowedFd::borrow_raw(exec_process.stdout_fd)
};
poll_fds_vec.push(PollFd::new(stdout_poll_fd, PollFlags::POLLIN));
}
// stderr fd
if let Some(_stderr) = &exec_process.stderr {
let stderr_poll_fd = unsafe {
BorrowedFd::borrow_raw(exec_process.stderr_fd)
};
poll_fds_vec.push(PollFd::new(stderr_poll_fd, PollFlags::POLLIN));
}
// 记录索引相对于client_fd_idx
let stdout_idx = poll_fds_vec.len() - 2;
let stderr_idx = poll_fds_vec.len() - 1;
channel_fds_map.insert(*channel_id, (stdout_idx, stderr_idx));
}
}
if poll_fds_vec.len() == 1 {
// 只有client fd没有exec_process
// ⭐⭐⭐⭐⭐ Phase 16.5: 检查rsync handler的pending output
// 检查rsync handler是否done先收集避免borrow冲突
let mut rsync_is_done = false;
// Drain rsync handler output (mutable borrow)
let mut rsync_items: Vec<(u32, Vec<u8>)> = Vec::new();
for channel in self.channels.values_mut() {
if let Some(rsync) = &mut channel.rsync_handler {
let out = rsync.drain_output();
if !out.is_empty() {
let sid = channel.server_channel;
info!("⭐⭐⭐⭐⭐ [RSYNC_POLL] {} bytes pending from rsync handler", out.len());
rsync_items.push((sid, out));
}
}
}
// Check rsync done (immutable borrow)
rsync_is_done = self.channels.values().any(|ch| {
ch.rsync_handler.as_ref().map_or(false, |r| r.is_done())
});
// Directly poll client
match poll(&mut poll_fds_vec, 10u16) {
Ok(n) if n > 0 => {
if let Some(revents) = poll_fds_vec[client_fd_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
client_has_data = true;
}
}
}
_ => {}
}
if rsync_is_done {
info!("⭐⭐⭐⭐⭐ [RSYNC_DONE] RsyncHandler is done, signaling child_exited");
}
// Return rsync output if any
if !rsync_items.is_empty() {
let mut packets = Vec::new();
for (channel_id, data) in rsync_items {
packets.push(self.build_channel_data(channel_id, &data)?);
}
return Ok((Some(packets), client_has_data, rsync_is_done));
}
return Ok((None, client_has_data, rsync_is_done));
}
// ⭐⭐⭐⭐⭐ Phase 16.4修复增加poll轮询限制支持大文件传输
// 最多轮询2000次200秒poll timeout从10ms改到100ms
// 修复从500改到2000支持50MB+文件传输预计可传输500MB+
let max_poll_iterations = 2000;
let mut poll_iteration = 0;
let mut found_data = false;
let mut stdin_closed = false; // ⭐⭐⭐⭐⭐ 新增跟踪stdin是否已关闭
for iteration in 0..max_poll_iterations {
poll_iteration = iteration;
// ⭐⭐⭐⭐⭐ Phase 16.2.1优化增加poll timeout减少iteration overhead
// 每50次轮询记录一次日志从10改到50减少噪音
if iteration % 50 == 0 {
info!("Polling {} fds (iteration {} of {}, stdin_closed={})", poll_fds_vec.len(), iteration, max_poll_iterations, stdin_closed);
}
// ⭐⭐⭐⭐⭐ Phase 16.2.1优化增加poll timeout减少iteration overhead
match poll(&mut poll_fds_vec, 100u16) {
Ok(n) if n > 0 => {
info!("{} fds have data available (iteration {})", n, iteration);
found_data = true;
break; // 有数据,立即处理
}
Ok(0) => {
// timeout无数据
// ⭐⭐⭐⭐⭐ Phase 16.2.1优化减少child状态检查频率每50次
if iteration % 50 == 49 {
// 检查child是否exited
for channel_id in &channel_ids_vec {
if let Some(channel) = self.channels.get_mut(channel_id) {
if let Some(exec_process) = &mut channel.exec_process {
match exec_process.child.try_wait() {
Ok(Some(status)) => {
info!("Child process exited (channel {}, status: {:?})", channel_id, status);
child_exited = true;
// ⭐⭐⭐⭐⭐ Child exited读取剩余stdout如果有
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
match stdout.read(&mut buffer) {
Ok(n) if n > 0 => {
info!("Read {} final bytes from stdout (child exited)", n);
// 构建packet并返回
let packet = self.build_channel_data(*channel_id, &buffer[..n])?;
return Ok((Some(vec![packet]), false, true));
}
_ => {}
}
}
// 没有剩余数据返回child_exited标志
return Ok((None, false, true));
}
Ok(None) => {
// Child still running正常
info!("Child still running (channel {}, iteration {}, stdin_closed={})", channel_id, iteration, stdin_closed);
// ⭐⭐⭐⭐⭐ Phase 16.4修复增加stdin超时机制支持大文件传输
// 如果stdin未关闭且超过1500次poll150s无数据
// 强制关闭stdin发送EOF给SCP/rsync
// ⭐⭐⭐⭐⭐ Phase 16.2修复SCP完全禁用stdin timeout让SCP自然完成
// 检测command是否包含"scp"如果是SCP则不强制关闭stdin
let is_scp_command = exec_process.command.contains("scp");
if !stdin_closed && !is_scp_command && iteration >= 1500 && exec_process.stdin.is_some() {
info!("⭐⭐⭐⭐⭐ Forcing stdin close after {} iterations ({} ms) - sending EOF to rsync (SCP excluded)", iteration, iteration * 100);
exec_process.stdin = None; // Drop stdin发送EOF
stdin_closed = true;
// ⭐⭐⭐⭐⭐ stdin关闭后继续等待child处理完成
// 不要立即返回给rsync时间处理数据并产生stdout
info!("stdin closed, continuing to poll for stdout output...");
}
}
Err(e) => {
warn!("Child try_wait error: {}", e);
}
}
}
}
}
}
// 继续轮询如果iteration < max_poll_iterations
}
Err(e) => {
warn!("poll error: {}", e);
return Ok((None, false, false));
}
Ok(_) => {
// 其他情况(不应该发生)
}
}
}
// ⭐⭐⭐⭐⭐ 达到max_poll_iterations检查最终child状态
if !found_data {
info!("No data after {} iterations ({} ms), checking child status", max_poll_iterations, max_poll_iterations * 10);
for channel_id in &channel_ids_vec {
if let Some(channel) = self.channels.get_mut(channel_id) {
if let Some(exec_process) = &mut channel.exec_process {
match exec_process.child.try_wait() {
Ok(Some(status)) => {
info!("Child exited after max iterations (status: {:?})", status);
child_exited = true;
// 读取剩余stdout
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
match stdout.read(&mut buffer) {
Ok(n) if n > 0 => {
let packet = self.build_channel_data(*channel_id, &buffer[..n])?;
return Ok((Some(vec![packet]), false, true));
}
_ => {}
}
}
return Ok((None, false, true));
}
Ok(None) => {
info!("Child still running after max iterations, returning None");
// Child还在运行但无stdout数据
// 主循环会继续调用此函数
return Ok((None, false, false));
}
Err(e) => {
warn!("Final child check error: {}", e);
}
}
}
}
}
}
// ⭐⭐⭐⭐⭐ 处理找到的数据如果found_data
// 3. 检查client fd状态包括EOF/HUP
if let Some(revents) = poll_fds_vec[client_fd_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("Client fd has data (stdin from client)");
client_has_data = true;
} else if revents.contains(PollFlags::POLLHUP) {
info!("Client fd hangup (EOF received from client)");
// ⭐⭐⭐⭐⭐ Phase 14.2关键修复关闭stdin pipe发送EOF给child
// 参考OpenSSH session.c: do_exec_no_pty() stdin handling
for (_, channel) in &mut self.channels {
if let Some(exec_process) = &mut channel.exec_process {
if exec_process.stdin.is_some() {
info!("Closing stdin pipe (sending EOF to child process)");
exec_process.stdin = None; // Drop stdin发送EOF给child
}
}
}
client_has_data = false;
} else if revents.contains(PollFlags::POLLERR) {
warn!("Client fd error");
return Err(anyhow::anyhow!("Client socket error"));
}
}
// 4. 检查stdout/stderr fd是否有数据
let mut packets_data: Vec<(u32, Vec<u8>)> = Vec::new();
for (channel_id, (stdout_idx, stderr_idx)) in channel_fds_map {
if let Some(channel) = self.channels.get_mut(&channel_id) {
if let Some(exec_process) = &mut channel.exec_process {
// 检查stdout
if let Some(revents) = poll_fds_vec[stdout_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("⭐⭐⭐⭐⭐ [stdout POLLIN] stdout fd has data (channel {})", channel_id);
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
info!("⭐⭐⭐⭐⭐ [BEFORE stdout.read] Attempting to read from stdout (buffer size 32KB)");
match stdout.read(&mut buffer) {
Ok(n) if n > 0 => {
info!("⭐⭐⭐⭐⭐ [AFTER stdout.read] Read {} bytes from stdout (channel {})", n, channel_id);
packets_data.push((channel_id, buffer[..n].to_vec()));
}
Ok(0) => {
info!("stdout EOF (channel {}), closing stdout pipe", channel_id);
// ⭐⭐⭐⭐⭐ Critical修复EOF时关闭pipe避免无限循环
exec_process.stdout = None;
}
Err(e) if e.kind() != std::io::ErrorKind::WouldBlock => {
warn!("stdout read error: {}", e);
exec_process.stdout = None; // 错误时也关闭
}
_ => {}
}
}
}
}
// 检查stderr
if let Some(revents) = poll_fds_vec[stderr_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("stderr fd has data (channel {})", channel_id);
if let Some(stderr) = &mut exec_process.stderr {
info!("⭐⭐⭐⭐⭐ [BEFORE stderr.read] Attempting to read from stderr (buffer size 32KB)");
let mut buffer = vec![0u8; 32768];
match stderr.read(&mut buffer) {
Ok(n) if n > 0 => {
info!("⭐⭐⭐⭐⭐ [AFTER stderr.read] Read {} bytes from stderr (channel {})", n, channel_id);
info!("⭐⭐⭐⭐⭐ stderr content: {:?}", &buffer[..std::cmp::min(50, n)]);
packets_data.push((channel_id, buffer[..n].to_vec()));
}
Ok(0) => {
info!("stderr EOF (channel {}), closing stderr pipe", channel_id);
// ⭐⭐⭐⭐⭐ Critical修复EOF时关闭pipe避免无限循环
exec_process.stderr = None;
}
Err(e) if e.kind() != std::io::ErrorKind::WouldBlock => {
warn!("stderr read error: {}", e);
exec_process.stderr = None; // 错误时也关闭
}
_ => {}
}
}
}
// ⭐⭐⭐⭐⭐ 检查 POLLHUPpipe 关闭)
if revents.contains(PollFlags::POLLHUP) {
info!("stderr POLLHUP (channel {}), pipe closed", channel_id);
exec_process.stderr = None;
}
}
}
}
}
// 构建packets
if !packets_data.is_empty() {
let mut packets = Vec::new();
for (channel_id, data) in packets_data {
let packet = self.build_channel_data(channel_id, &data)?;
packets.push(packet);
}
info!("⭐⭐⭐⭐⭐ Returning {} packets (stdout/stderr data)", packets.len());
return Ok((Some(packets), client_has_data, child_exited));
}
// ⭐⭐⭐⭐⭐ Phase 14.2最终修复stdout/stderr EOF后检查child exited
// 当stdout和stderr都关闭后强制检查child状态
for channel_id in &channel_ids_vec {
if let Some(channel) = self.channels.get_mut(channel_id) {
if let Some(exec_process) = &mut channel.exec_process {
// 检查stdout和stderr是否都已关闭
if exec_process.stdout.is_none() && exec_process.stderr.is_none() {
info!("stdout/stderr both closed (channel {}), checking child status", channel_id);
// ⭐⭐⭐⭐⭐ 立即检查child是否exited
match exec_process.child.try_wait() {
Ok(Some(status)) => {
info!("⭐⭐⭐⭐⭐ Child exited after stdout/stderr EOF (status: {:?})", status);
child_exited = true;
// ⭐⭐⭐⭐⭐ 关键立即返回child_exited标志
// server.rs会发送SSH_MSG_CHANNEL_EOF + CLOSE
info!("⭐⭐⭐⭐⭐ No packets to send, returning child_exited flag");
return Ok((None, false, true));
}
Ok(None) => {
// Child still running but stdout/stderr closed
// 等待child exited
info!("Child still running after pipes closed, waiting...");
}
Err(e) => {
warn!("Child try_wait error after pipes closed: {}", e);
}
}
}
}
}
}
// 有数据但只有client数据
Ok((None, client_has_data, child_exited))
}
// ⭐⭐⭐⭐⭐ Phase 14.0: 旧版poll仅监听stdout/stderr已废弃
/// 已废弃使用poll_exec_stdout_and_client()替代
#[allow(dead_code)]
pub fn poll_exec_stdout_with_fds(&mut self) -> Result<Option<Vec<SshPacket>>> {
use std::io::Read;
use std::os::unix::io::BorrowedFd;
// 遍历所有channel收集poll_fds
let mut poll_fds_vec = Vec::new();
let mut channel_fds_map: HashMap<u32, (usize, usize)> = HashMap::new(); // channel_id -> (stdout_idx, stderr_idx) in poll_fds_vec
for (channel_id, channel) in &self.channels {
if let Some(exec_process) = &channel.exec_process {
// ⭐⭐⭐⭐⭐ OpenSSH风格创建PollFd监听stdout/stderr
// nix 0.29 API: PollFd::new()需要借用fd不是RawFd
if let Some(stdout) = &exec_process.stdout {
let stdout_poll_fd = unsafe {
// ⭐⭐⭐⭐⭐ 使用BorrowedFd::borrow_raw()正确API
BorrowedFd::borrow_raw(exec_process.stdout_fd)
};
poll_fds_vec.push(PollFd::new(stdout_poll_fd, PollFlags::POLLIN));
}
if let Some(stderr) = &exec_process.stderr {
let stderr_poll_fd = unsafe {
BorrowedFd::borrow_raw(exec_process.stderr_fd)
};
poll_fds_vec.push(PollFd::new(stderr_poll_fd, PollFlags::POLLIN));
}
// 记录poll_fds_vec中的索引
let stdout_idx = poll_fds_vec.len() - 2;
let stderr_idx = poll_fds_vec.len() - 1;
channel_fds_map.insert(*channel_id, (stdout_idx, stderr_idx));
}
}
if poll_fds_vec.is_empty() {
return Ok(None); // 没有exec_process
}
// ⭐⭐⭐⭐⭐ OpenSSH关键使用poll监听所有fd
// ⭐⭐⭐⭐⭐ 持续poll机制最多轮询1000次给大文件传输足够时间
// 大文件传输需要很长时间增加轮询次数到1000次总共10秒
let max_poll_attempts = 1000;
let mut poll_attempt = 0;
let mut found_data = false;
for attempt in 0..max_poll_attempts {
poll_attempt = attempt;
// 每100次轮询记录一次日志减少日志噪音
if attempt % 100 == 0 {
info!("Polling {} fds (OpenSSH style, timeout 10ms, attempt {} of {})", poll_fds_vec.len(), attempt, max_poll_attempts);
}
match poll(&mut poll_fds_vec, 10u16) { // timeout 10ms
Ok(n) => {
if n > 0 {
info!("{} fds have data available (attempt {})", n, attempt);
found_data = true;
break; // 有数据,立即处理
}
// 没有数据继续轮询最多1000次
}
Err(e) => {
warn!("poll error: {}", e);
return Ok(None);
}
}
}
if !found_data {
info!("No data available after {} poll attempts ({} ms), returning None", max_poll_attempts, max_poll_attempts * 10);
return Ok(None); // 轮询1000次后仍无数据主循环继续处理client packet
}
// ⭐⭐⭐⭐⭐ OpenSSH风格根据revents判断哪个fd有数据立即读取
let mut packets_data: Vec<(u32, Vec<u8>)> = Vec::new(); // (channel_id, data)
for (channel_id, (stdout_idx, stderr_idx)) in channel_fds_map {
if let Some(channel) = self.channels.get_mut(&channel_id) {
if let Some(exec_process) = &mut channel.exec_process {
// 检查stdout是否有数据
if let Some(revents) = poll_fds_vec[stdout_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("stdout fd has data (channel {})", channel_id);
// ⭐⭐⭐⭐⭐ 非阻塞读取因为设置了O_NONBLOCK
if let Some(stdout) = &mut exec_process.stdout {
let mut buffer = vec![0u8; 32768];
match stdout.read(&mut buffer) {
Ok(n) => {
if n > 0 {
info!("Read {} bytes from stdout (channel {})", n, channel_id);
packets_data.push((channel_id, buffer[..n].to_vec()));
} else {
info!("stdout EOF (channel {})", channel_id);
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// 非阻塞模式,没有数据(正常)
}
Err(e) => {
warn!("stdout read error: {}", e);
}
}
}
}
}
// 检查stderr是否有数据类似处理
if let Some(revents) = poll_fds_vec[stderr_idx].revents() {
if revents.contains(PollFlags::POLLIN) {
info!("stderr fd has data (channel {})", channel_id);
if let Some(stderr) = &mut exec_process.stderr {
let mut buffer = vec![0u8; 32768];
match stderr.read(&mut buffer) {
Ok(n) => {
if n > 0 {
info!("Read {} bytes from stderr (channel {})", n, channel_id);
packets_data.push((channel_id, buffer[..n].to_vec()));
} else {
info!("stderr EOF (channel {})", channel_id);
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// 非阻塞模式,没有数据(正常)
}
Err(e) => {
warn!("stderr read error: {}", e);
}
}
}
}
}
}
}
}
// ⭐⭐⭐⭐⭐ 释放mutable borrow后构建packets避免borrow冲突
let mut packets = Vec::new();
for (channel_id, data) in packets_data {
let packet = self.build_channel_data(channel_id, &data)?;
packets.push(packet);
}
if packets.is_empty() {
Ok(None)
} else {
Ok(Some(packets))
}
}
}
/// SSH Channel结构参考OpenSSH channel.c: struct channel
struct Channel {
server_channel: u32,
sender_channel: u32,
channel_type: String,
// ⭐⭐⭐⭐⭐ Phase 15: Window Control参考OpenSSH channels.h:176-182
remote_window: u32, // 远端窗口大小OpenSSH: c->remote_window
remote_maxpacket: u32, // 远端最大 packetOpenSSH: c->remote_maxpacket
local_window: u32, // 本地窗口大小OpenSSH: c->local_window
local_window_max: u32, // 本地窗口最大值OpenSSH: c->local_window_max
local_consumed: u32, // 本地已消费的数据OpenSSH: c->local_consumed⭐⭐⭐⭐⭐ 关键!
local_maxpacket: u32, // 本地最大 packetOpenSSH: c->local_maxpacket
// 旧字段(保留兼容)
window_size: u32, // 当前窗口大小(兼容旧代码)
maximum_packet_size: u32, // 最大 packet 大小(兼容旧代码)
state: ChannelState,
output_buffer: Option<Vec<u8>>, // Phase 6: 命令输出缓冲
sftp_handler: Option<SftpHandler>, // Phase 7: SFTP处理器
scp_handler: Option<ScpHandler>, // Phase 8: SCP处理器
rsync_handler: Option<RsyncHandler>, // Phase 8: rsync处理器
exec_process: Option<ExecProcess>, // Phase 14: 交互式exec进程
// ⭐⭐⭐⭐⭐ Critical修复SFTP packet累积buffer
sftp_input_buffer: Vec<u8>, // Phase 14.2修复累积不完整的SFTP packets
// ⭐⭐⭐⭐⭐ Phase 14.4SCP packet累积buffer
scp_input_buffer: Vec<u8>, // Phase 14.4修复累积不完整的SCP packets
// Phase 13.3: 端口转发相关字段
direct_tcpip: Option<DirectTcpipChannel>, // direct-tcpip channelRemote forwarding
forwarded_tcpip: Option<ForwardedTcpipChannel>, // forwarded-tcpip channelLocal forwarding
}
/// SSH Channel状态参考OpenSSH channel.c
enum ChannelState {
Open,
Closing,
Closed,
}
/// SSH string读取辅助函数
fn read_ssh_string<R: std::io::Read>(reader: &mut R) -> Result<String> {
let length = reader.read_u32::<BigEndian>()?;
let mut buffer = vec![0u8; length as usize];
reader.read_exact(&mut buffer)?;
Ok(String::from_utf8(buffer)?)
}
/// ⭐⭐⭐⭐⭐ Phase 15: 检查并发送 Window Adjust参考OpenSSH channels.c:2425-2450
///
/// OpenSSH 实现:
/// ```c
/// static int channel_check_window(struct ssh *ssh, Channel *c) {
/// if (c->type == SSH_CHANNEL_OPEN &&
/// !(c->flags & (CHAN_CLOSE_SENT|CHAN_CLOSE_RCVD)) &&
/// ((c->local_window_max - c->local_window > c->local_maxpacket*3) ||
/// c->local_window < c->local_window_max/2) &&
/// c->local_consumed > 0) {
///
/// // 发送 SSH2_MSG_CHANNEL_WINDOW_ADJUST
/// sshpkt_start(ssh, SSH2_MSG_CHANNEL_WINDOW_ADJUST);
/// sshpkt_put_u32(ssh, c->remote_id);
/// sshpkt_put_u32(ssh, c->local_consumed);
/// sshpkt_send(ssh);
///
/// c->local_window += c->local_consumed;
/// c->local_consumed = 0;
/// }
/// }
/// ```
pub fn channel_check_window(channel_id: u32, channels: &mut HashMap<u32, Channel>) -> Option<SshPacket> {
if let Some(channel) = channels.get_mut(&channel_id) {
// 检查窗口调整条件
let window_used = channel.local_window_max - channel.local_window;
let need_adjust = (window_used > channel.local_maxpacket * 3) ||
(channel.local_window < channel.local_window_max / 2);
if need_adjust && channel.local_consumed > 0 {
info!("⭐⭐⭐⭐⭐ [WINDOW_ADJUST] channel {} needs adjust: window_used={}, local_consumed={}",
channel_id, window_used, channel.local_consumed);
// 发送 SSH_MSG_CHANNEL_WINDOW_ADJUST
let adjust_packet = build_window_adjust(
channel.server_channel,
channel.local_consumed
);
// 更新窗口大小
channel.local_window += channel.local_consumed;
channel.local_consumed = 0;
info!("⭐⭐⭐⭐⭐ [WINDOW_UPDATED] channel {} new window: {}",
channel_id, channel.local_window);
return Some(adjust_packet);
}
}
None
}
/// ⭐⭐⭐⭐⭐ Phase 15: 构建 SSH_MSG_CHANNEL_WINDOW_ADJUST packet
///
/// OpenSSH packet format
/// ```c
/// SSH2_MSG_CHANNEL_WINDOW_ADJUST (93)
/// recipient_channel (u32)
/// bytes_to_add (u32)
/// ```
fn build_window_adjust(recipient_channel: u32, bytes_to_add: u32) -> SshPacket {
let mut payload = Vec::new();
// Packet type
payload.push(PacketType::SSH_MSG_CHANNEL_WINDOW_ADJUST as u8);
// recipient_channel (u32)
payload.write_u32::<BigEndian>(recipient_channel).unwrap();
// bytes_to_add (u32)
payload.write_u32::<BigEndian>(bytes_to_add).unwrap();
info!("⭐⭐⭐⭐⭐ [BUILD_WINDOW_ADJUST] recipient_channel={}, bytes_to_add={}",
recipient_channel, bytes_to_add);
SshPacket {
packet_length: 0,
padding_length: 0,
payload,
padding: Vec::new(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_channel_manager_creation() {
let manager = ChannelManager::new();
assert_eq!(manager.next_channel_id, 0);
}
#[test]
fn test_channel_open_confirmation() {
let manager = ChannelManager::new();
let packet = manager.build_channel_open_confirmation(0, 100, 2097152, 32768).unwrap();
assert_eq!(packet.payload[0], PacketType::SSH_MSG_CHANNEL_OPEN_CONFIRMATION as u8);
}
#[test]
fn test_channel_success() {
let manager = ChannelManager::new();
let packet = manager.build_channel_success(0).unwrap();
assert_eq!(packet.payload[0], PacketType::SSH_MSG_CHANNEL_SUCCESS as u8);
}
}