Files
markbase/markbase-core/src/ssh_server/server.rs
Warren 93e33b04a7 Implement SSH Compression Phase 2: Integration
- Add compression_ctos/compression_stoc to EncryptionContext
- Default impl: CompressionContext::new(6)
- from_session_keys(): initialize compression fields
- enable_compression() method (based on KEX negotiation)
- server.rs: enable compression after NEWKEYS (if negotiated)

All 179 tests pass.
2026-06-21 01:51:39 +08:00

802 lines
34 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// SSH服务器完整实现Phase 1-7集成版 + Phase 13端口转发
// 参考OpenSSH sshd.c: complete SSH/SFTP flow + port forwarding
use crate::provider::pg::PgProvider;
use crate::provider::sqlite::SqliteProvider;
use crate::provider::DataProvider;
use crate::ssh_server::auth::{AuthHandler, AuthResult};
use crate::ssh_server::channel::ChannelManager;
use crate::ssh_server::cipher::{EncryptedPacket, EncryptionContext};
use crate::ssh_server::kex::{KexProposal, KexResult};
use crate::ssh_server::kex_complete::KexState;
use crate::ssh_server::packet::{PacketType, SshPacket};
use crate::ssh_server::port_forward::PortForwardManager;
use crate::ssh_server::ssh_security_config::SshSecurityConfig;
use crate::ssh_server::upload_hook::UploadHook;
use crate::ssh_server::version::VersionExchange;
use anyhow::{anyhow, Result};
use log::{error, info, warn};
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread;
pub struct SshServerConfig {
pub port: u16,
pub bind_address: String,
pub security_config: SshSecurityConfig,
pub pg_conn: Option<String>,
pub upload_hook_config: crate::config::UploadHookSection,
}
impl Default for SshServerConfig {
fn default() -> Self {
Self {
port: 2024,
bind_address: "0.0.0.0".to_string(), // ⭐⭐⭐⭐⭐ Phase 8.3: Allow Docker container access
security_config: SshSecurityConfig::enterprise_default(),
pg_conn: None,
upload_hook_config: crate::config::UploadHookSection::default(),
}
}
}
impl SshServerConfig {
/// 从配置文件加载Phase 13.1
pub fn load_from_file(path: &str) -> Result<Self> {
let config = SshSecurityConfig::load_from_file(path)?;
Ok(Self {
port: 2024,
bind_address: "0.0.0.0".to_string(), // ⭐⭐⭐⭐⭐ Phase 8.3: Allow Docker container access
security_config: config,
pg_conn: None,
upload_hook_config: crate::config::UploadHookSection::default(),
})
}
}
/// SSH服务器主结构Phase 1-13完整版
pub struct SshServer {
config: SshServerConfig,
security_config: Arc<Mutex<SshSecurityConfig>>, // Phase 13.1: 共享安全配置
}
impl SshServer {
pub fn new(config: SshServerConfig) -> Self {
let security_config = Arc::new(Mutex::new(config.security_config.clone())); // Phase 13.1: 先clone
Self {
config,
security_config, // Phase 13.1
}
}
pub fn run(&self) -> Result<()> {
let bind_addr = format!("{}:{}", self.config.bind_address, self.config.port);
let listener = TcpListener::bind(&bind_addr)?;
info!("MarkBaseSSH server listening on {}", bind_addr);
info!("Implementation: Complete SSH/SFTP + Port Forwarding (Phase 1-13)");
info!(
"Security config: GatewayPorts={}, PermitOpen={:?}, MaxSessions={}",
self.config.security_config.gateway_ports,
self.config.security_config.permit_open,
self.config.security_config.max_sessions
);
let security_config = self.security_config.clone();
let pg_conn = self.config.pg_conn.clone();
let upload_hook_config = self.config.upload_hook_config.clone();
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let client_addr = stream.peer_addr()?;
info!("New SSH connection from {}", client_addr);
let security_config_clone = security_config.clone();
let pg_conn_clone = pg_conn.clone();
let upload_hook_config_clone = upload_hook_config.clone();
thread::spawn(move || {
if let Err(e) = handle_connection_complete(
stream,
security_config_clone,
pg_conn_clone,
upload_hook_config_clone,
)
{
error!("SSH connection error: {}", e);
}
});
}
Err(e) => {
warn!("Failed to accept connection: {}", e);
}
}
}
Ok(())
}
}
/// 处理完整SSH连接Phase 1-13完整流程
fn handle_connection_complete(
stream: TcpStream,
security_config: Arc<Mutex<SshSecurityConfig>>,
pg_conn: Option<String>,
upload_hook_config: crate::config::UploadHookSection,
) -> Result<()> {
info!("Handling client connection (Phase 1-13 complete flow with port forwarding)");
// Phase 13.1: 增加活动会话数
{
let mut security = security_config.lock().unwrap();
security.increment_sessions()?;
}
let mut stream = stream;
// Phase 1: 版本交换
let client_version = VersionExchange::exchange(&mut stream)?;
info!(
"Version exchange: client={}, server=SSH-2.0-MarkBaseSSH_1.0",
client_version
);
// Phase 2: 箋法协商
let (kex_result, server_kexinit, client_kexinit) =
perform_kex_negotiation_complete(&mut stream)?;
info!(
"KEX negotiation: KEX={}, Cipher={}",
kex_result.kex_algorithm, kex_result.encryption_ctos
);
// Phase 3: 密钥交换完整流程
let mut encryption_ctx = perform_complete_kex_exchange(
&mut stream,
client_version.clone(),
kex_result,
server_kexinit,
client_kexinit,
)?;
info!("Key exchange completed, encryption channel ready");
// Phase 5: SSH认证SFTPGo兼容 — PostgreSQL或SQLite
let provider: Box<dyn DataProvider> = if let Some(ref conn_str) = pg_conn {
info!(
"Using PostgreSQL auth provider (SFTPGo-compatible): {}",
conn_str
);
Box::new(
PgProvider::new(conn_str).map_err(|e| anyhow!("Failed to init PgProvider: {}", e))?,
)
} else {
info!("Using SQLite auth provider");
Box::new(
SqliteProvider::new("data/auth.sqlite")
.map_err(|e| anyhow!("Failed to init SqliteProvider: {}", e))?,
)
};
let mut auth_handler = AuthHandler::new(provider);
let auth_user = perform_ssh_auth(&mut stream, &mut auth_handler, &mut encryption_ctx, security_config.clone())?;
info!("SSH authentication succeeded: user={}", auth_user.username);
let upload_hook = if upload_hook_config.enabled {
Some(Arc::new(UploadHook::new(
upload_hook_config.enabled,
PathBuf::from(&upload_hook_config.video_probe_path),
PathBuf::from(&upload_hook_config.video_register_cli),
PathBuf::from(&upload_hook_config.video_register_dir),
upload_hook_config.video_extensions.clone(),
)))
} else {
None
};
let mut channel_manager = ChannelManager::new(
auth_user.home_dir.clone(),
upload_hook,
auth_user.username.clone(),
);
// Phase 13: PortForwardManager初始化
let mut port_forward_manager = PortForwardManager::new();
// Phase 6-13: SSH服务循环处理channel请求 + 端口转发)
let security_config_clone = security_config.clone(); // Phase 13.1: clone for service loop
handle_ssh_service_loop(
&mut stream,
&mut channel_manager,
&mut encryption_ctx,
&mut port_forward_manager,
security_config_clone,
)?;
info!("SSH session completed successfully");
// Phase 13.1: 减少活动会话数
{
let mut security = security_config.lock().unwrap();
security.decrement_sessions();
}
Ok(())
}
/// 完整算法协商返回KEXINIT payloads
fn perform_kex_negotiation_complete(
stream: &mut TcpStream,
) -> Result<(KexResult, SshPacket, SshPacket)> {
info!("Starting complete KEX negotiation");
// 1. 发送服务器KEXINIT
let server_proposal = KexProposal::server_default();
let server_kexinit = server_proposal.to_kexinit_packet()?;
server_kexinit.write(stream)?;
info!(
"Sent server KEXINIT (payload size: {} bytes)",
server_kexinit.payload.len()
);
// 2. 接收客户端KEXINIT
let client_kexinit = SshPacket::read(stream)?;
let client_proposal = KexProposal::from_kexinit_packet(&client_kexinit)?;
info!(
"Received client KEXINIT (payload size: {} bytes)",
client_kexinit.payload.len()
);
// 3. 算法匹配
let kex_result = KexResult::choose_algorithms(&server_proposal, &client_proposal)?;
Ok((kex_result, server_kexinit, client_kexinit))
}
/// 完整密钥交换流程Phase 3核心
fn perform_complete_kex_exchange(
stream: &mut TcpStream,
client_version: String,
kex_result: KexResult,
server_kexinit: SshPacket,
client_kexinit: SshPacket,
) -> Result<EncryptionContext> {
info!("Starting complete key exchange flow");
let mut kex_state = KexState::new(
client_version,
"SSH-2.0-MarkBaseSSH_1.0".to_string(),
kex_result.clone(), // Phase 1: clone kex_result for cipher mode setting
)?;
kex_state.save_kexinit_payloads(&client_kexinit, &server_kexinit);
let kexdh_init = SshPacket::read(stream)?;
info!("Received SSH_MSG_KEX_ECDH_INIT");
let kexdh_reply = kex_state.exchange_handler.handle_kexdh_init(
&kexdh_init,
&kex_state.client_version,
&kex_state.server_version,
&kex_state.client_kexinit_payload,
&kex_state.server_kexinit_payload,
)?;
kexdh_reply.write(stream)?;
info!("Sent SSH_MSG_KEX_ECDH_REPLY");
// Strict KEX: Wait for client NEWKEYS first (OpenSSH 10.2 requirement)
let client_newkeys = SshPacket::read(stream)?;
kex_state.handle_newkeys(&client_newkeys)?;
info!("Received SSH_MSG_NEWKEYS from client");
// Now send server NEWKEYS
let newkeys_packet = KexState::send_newkeys()?;
newkeys_packet.write(stream)?;
kex_state.newkeys_sent = true;
info!("Sent SSH_MSG_NEWKEYS from server");
if kex_state.is_encryption_ready() {
info!("Encryption channel established successfully");
} else {
return Err(anyhow::anyhow!("Encryption channel not ready"));
}
let session_keys = kex_state.exchange_handler.compute_session_keys()?;
let mut encryption_ctx = EncryptionContext::from_session_keys(&session_keys);
// Phase 1+5: 根据 KEX 协商结果设置加密模式ChaCha20-Poly1305 / AES-GCM / AES-CTR
let encryption_algorithm = &kex_result.encryption_stoc;
info!("KEX negotiated encryption algorithm: {}", encryption_algorithm);
use crate::ssh_server::cipher::CipherMode;
if encryption_algorithm.contains("chacha20") {
info!("Setting cipher mode to ChaCha20-Poly1305 (AEAD)");
encryption_ctx.set_cipher_mode(CipherMode::ChaChaPoly)?;
} else if encryption_algorithm.contains("gcm") {
info!("Setting cipher mode to AES-GCM (AEAD)");
encryption_ctx.set_cipher_mode(CipherMode::AesGcm)?;
} else {
info!("Setting cipher mode to AES-CTR (MtE)");
encryption_ctx.set_cipher_mode(CipherMode::AesCtr)?;
}
// Phase 2: 根据 KEX 协商结果启用压缩compression_ctos / compression_stoc
let compression_ctos = &kex_result.compression_ctos;
let compression_stoc = &kex_result.compression_stoc;
info!("KEX negotiated compression algorithms: ctos={}, stoc={}", compression_ctos, compression_stoc);
if compression_ctos != "none" || compression_stoc != "none" {
info!("Enabling SSH compression");
encryption_ctx.enable_compression(compression_ctos, compression_stoc);
}
Ok(encryption_ctx)
}
/// SSH认证流程Phase 5
pub struct AuthUser {
pub username: String,
pub home_dir: PathBuf,
}
fn perform_ssh_auth(
stream: &mut TcpStream,
auth_handler: &mut AuthHandler,
encryption_ctx: &mut EncryptionContext,
security_config: Arc<Mutex<SshSecurityConfig>>,
) -> Result<AuthUser> {
info!("Starting SSH authentication");
info!(
"Encryption context: key_ctos_len={}, key_stoc_len={}, iv_ctos_len={}, iv_stoc_len={}",
encryption_ctx.encryption_key_ctos.len(),
encryption_ctx.encryption_key_stoc.len(),
encryption_ctx.iv_ctos.len(),
encryption_ctx.iv_stoc.len()
);
// OpenSSH strict KEX: SSH_MSG_EXT_INFO may be sent before SSH_MSG_SERVICE_REQUEST
let mut encrypted_request = EncryptedPacket::read(stream, encryption_ctx, true)?;
let payload = encrypted_request.payload();
if payload[0] == PacketType::SSH_MSG_EXT_INFO as u8 {
info!("Received SSH_MSG_EXT_INFO, reading next packet");
encrypted_request = EncryptedPacket::read(stream, encryption_ctx, true)?;
}
let payload = encrypted_request.payload();
info!("Received packet type: {}", payload[0]);
if payload[0] != PacketType::SSH_MSG_SERVICE_REQUEST as u8 {
return Err(anyhow!(
"Expected SSH_MSG_SERVICE_REQUEST, got type {}",
payload[0]
));
}
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
let mut cursor = std::io::Cursor::new(&payload[1..]);
let service_name_len = cursor.read_u32::<BigEndian>()?;
let mut service_name = vec![0u8; service_name_len as usize];
cursor.read_exact(&mut service_name)?;
let service_name_str = String::from_utf8_lossy(&service_name);
if service_name_str != "ssh-userauth" {
return Err(anyhow!("Unsupported service: {}", service_name_str));
}
let mut service_accept_payload = Vec::new();
service_accept_payload.write_u8(PacketType::SSH_MSG_SERVICE_ACCEPT as u8)?;
service_accept_payload.write_u32::<BigEndian>(12)?; // "ssh-userauth" length is 12, not 14!
service_accept_payload.write_all("ssh-userauth".as_bytes())?;
let encrypted_accept = EncryptedPacket::new(&service_accept_payload, encryption_ctx, true)?;
encrypted_accept.write(stream)?;
info!("Sent encrypted SSH_MSG_SERVICE_ACCEPT");
let session_id = encryption_ctx.session_id.clone();
loop {
let auth_packet = EncryptedPacket::read(stream, encryption_ctx, true)?; // Reading from client, use cipher_ctos
let auth_payload = auth_packet.payload();
info!("Received encrypted SSH_MSG_USERAUTH_REQUEST");
let auth_request = SshPacket::new(auth_payload.to_vec());
match auth_handler.handle_userauth_request(&auth_request, &session_id)? {
AuthResult::Success => {
// Send banner if configured (SSH_MSG_USERAUTH_BANNER)
let security = security_config.lock().unwrap();
let banner = if let Some(file) = &security.banner_file {
std::fs::read_to_string(file).ok()
} else {
security.banner.clone()
};
drop(security);
if let Some(banner_text) = banner {
let mut banner_payload = Vec::new();
banner_payload.write_u8(PacketType::SSH_MSG_USERAUTH_BANNER as u8)?;
banner_payload.write_u32::<BigEndian>(banner_text.len() as u32)?;
banner_payload.write_all(banner_text.as_bytes())?;
// Language tag (empty SSH string)
banner_payload.write_u32::<BigEndian>(0)?;
let encrypted_banner =
EncryptedPacket::new(&banner_payload, encryption_ctx, true)?;
encrypted_banner.write(stream)?;
info!("Sent SSH_MSG_USERAUTH_BANNER");
}
let success_payload = vec![PacketType::SSH_MSG_USERAUTH_SUCCESS as u8];
let encrypted_success =
EncryptedPacket::new(&success_payload, encryption_ctx, true)?;
encrypted_success.write(stream)?;
info!("Sent encrypted SSH_MSG_USERAUTH_SUCCESS");
// Extract username from auth request
let user = extract_username_from_auth_request(&auth_request)
.unwrap_or_else(|_| "unknown".to_string());
let home_dir = auth_handler
.get_home_dir(&user)
.ok()
.flatten()
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/Users/accusys/markbase"));
info!("Auth success: user={}, home_dir={:?}", user, home_dir);
return Ok(AuthUser {
username: user,
home_dir,
});
}
AuthResult::Failure(message) => {
// message包含可用的认证方法列表如"password,publickey"
let mut failure_payload = Vec::new();
failure_payload.write_u8(PacketType::SSH_MSG_USERAUTH_FAILURE as u8)?;
failure_payload.write_u32::<BigEndian>(message.len() as u32)?;
failure_payload.write_all(message.as_bytes())?;
failure_payload.write_u8(0)?; // partial_success = false
let encrypted_failure =
EncryptedPacket::new(&failure_payload, encryption_ctx, true)?;
encrypted_failure.write(stream)?;
warn!("Sent encrypted SSH_MSG_USERAUTH_FAILURE: {}", message);
}
AuthResult::PartialSuccess => {
warn!("Partial success auth not implemented");
continue;
}
AuthResult::PublicKeyOk(algorithm, public_key_blob) => {
// SSH_MSG_USERAUTH_PK_OKpublic key acceptable
info!("Public key acceptable, sending USERAUTH_PK_OK");
let mut pk_ok_payload = Vec::new();
pk_ok_payload.write_u8(PacketType::SSH_MSG_USERAUTH_PK_OK as u8)?;
// algorithm (SSH string)
pk_ok_payload.write_u32::<BigEndian>(algorithm.len() as u32)?;
pk_ok_payload.write_all(algorithm.as_bytes())?;
// public key blob (SSH string)
pk_ok_payload.write_u32::<BigEndian>(public_key_blob.len() as u32)?;
pk_ok_payload.write_all(&public_key_blob)?;
let encrypted_pk_ok = EncryptedPacket::new(&pk_ok_payload, encryption_ctx, true)?;
encrypted_pk_ok.write(stream)?;
info!("Sent SSH_MSG_USERAUTH_PK_OK");
continue; // Wait for signed request
}
}
}
}
/// SSH服务循环Phase 14.2: OpenSSH统一poll + child状态检测版
/// ⭐⭐⭐⭐⭐ 关键改进单次poll同时监听client + stdout + stderr + child状态
/// 参考OpenSSH session.c: do_exec_no_pty() + channel.c: channel_handle_fd()
fn handle_ssh_service_loop(
stream: &mut TcpStream,
channel_manager: &mut ChannelManager,
encryption_ctx: &mut EncryptionContext,
port_forward_manager: &mut PortForwardManager, // Phase 13
security_config: Arc<Mutex<SshSecurityConfig>>, // Phase 13.1
) -> Result<()> {
info!("Starting SSH service loop (Phase 14.2: unified poll + child status)");
// Keep-alive tracking
let keep_alive_interval = security_config.lock().unwrap().keep_alive_interval;
let keep_alive_max_count = security_config.lock().unwrap().keep_alive_max_count;
let mut last_activity = std::time::Instant::now();
let mut keep_alive_failures = 0;
loop {
// ⭐⭐⭐⭐⭐ Phase 14.2: 统一poll + child状态检测
let (stdout_packets, client_has_data, child_exited) =
channel_manager.poll_exec_stdout_and_client(stream)?;
// Update activity timestamp on any data transfer
if stdout_packets.is_some() || client_has_data {
last_activity = std::time::Instant::now();
keep_alive_failures = 0;
}
// Keep-alive check: send if idle for too long
let idle_duration = last_activity.elapsed().as_secs();
if idle_duration >= keep_alive_interval && keep_alive_failures < keep_alive_max_count {
info!("Sending keepalive (idle {}s)", idle_duration);
if let Some(channel_id) = channel_manager.get_first_session_channel() {
let keepalive_packet = channel_manager.build_keepalive_request(channel_id)?;
let encrypted_keepalive = EncryptedPacket::new(&keepalive_packet.payload, encryption_ctx, true)?;
encrypted_keepalive.write(stream)?;
keep_alive_failures += 1;
last_activity = std::time::Instant::now();
}
}
// Disconnect if too many keepalive failures
if keep_alive_failures >= keep_alive_max_count {
warn!("Connection timed out (keepalive failures: {})", keep_alive_failures);
return Err(anyhow!("Connection timed out"));
}
// 1. 发送stdout/stderr数据如果有
if let Some(packets) = stdout_packets {
// Phase 4: Batch encrypt all packets in parallel
let payloads: Vec<&[u8]> = packets.iter().map(|p| p.payload.as_slice()).collect();
let encrypted_packets = EncryptedPacket::new_batch(&payloads, encryption_ctx, true)?;
for encrypted_packet in &encrypted_packets {
encrypted_packet.write(stream)?;
}
info!("Sent stdout/stderr data ({} packets)", packets.len());
}
// 2. 处理child exited发送EOF + CLOSE
if child_exited {
info!("Child process exited, sending SSH_MSG_CHANNEL_EOF + CLOSE");
// ⭐⭐⭐⭐⭐ Phase 14.2: 使用ChannelManager.handle_child_exited()
let exit_packets = channel_manager.handle_child_exited()?;
// Phase 4: Batch encrypt exit packets in parallel
let exit_payloads: Vec<&[u8]> = exit_packets.iter().map(|p| p.payload.as_slice()).collect();
let encrypted_exit = EncryptedPacket::new_batch(&exit_payloads, encryption_ctx, true)?;
for packet in encrypted_exit {
packet.write(stream)?;
}
// 继续处理client数据可能还有其他请求
}
// 3. 处理client数据如果有
if !client_has_data {
// client没有数据继续下一轮循环
continue;
}
// client有数据读取并处理
let mut encrypted_packet = EncryptedPacket::read(stream, encryption_ctx, true)?;
let packet = SshPacket::new(encrypted_packet.take_payload());
match packet.payload.first() {
// Phase 13: SSH_MSG_GLOBAL_REQUEST处理端口转发
Some(&pt) if pt == PacketType::SSH_MSG_GLOBAL_REQUEST as u8 => {
info!("Received SSH_MSG_GLOBAL_REQUEST (port forwarding)");
// Phase 13.1: 安全配置验证
let security = security_config.lock().unwrap();
if !security.allow_tcp_forwarding {
warn!("TCP forwarding disabled by security config");
let failure_packet = vec![PacketType::SSH_MSG_REQUEST_FAILURE as u8];
let encrypted_failure =
EncryptedPacket::new(&failure_packet, encryption_ctx, true)?;
encrypted_failure.write(stream)?;
info!("Sent SSH_MSG_REQUEST_FAILURE (TCP forwarding disabled)");
continue;
}
// Phase 13.2: 调用PortForwardManager处理传递security_config
let (success, response) =
port_forward_manager.handle_global_request(&packet.payload, &security)?;
drop(security); // 释放锁
if success {
if let Some(response_data) = response {
let encrypted_response =
EncryptedPacket::new(&response_data, encryption_ctx, true)?;
encrypted_response.write(stream)?;
info!("Sent SSH_MSG_REQUEST_SUCCESS (tcpip-forward accepted)");
} else {
// 无响应数据时发送简单的SUCCESS
let success_packet = vec![PacketType::SSH_MSG_REQUEST_SUCCESS as u8];
let encrypted_success =
EncryptedPacket::new(&success_packet, encryption_ctx, true)?;
encrypted_success.write(stream)?;
info!("Sent SSH_MSG_REQUEST_SUCCESS");
}
} else {
let failure_packet = vec![PacketType::SSH_MSG_REQUEST_FAILURE as u8];
let encrypted_failure =
EncryptedPacket::new(&failure_packet, encryption_ctx, true)?;
encrypted_failure.write(stream)?;
info!("Sent SSH_MSG_REQUEST_FAILURE (tcpip-forward rejected)");
}
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_OPEN as u8 => {
info!("Received SSH_MSG_CHANNEL_OPEN");
// Phase 13.3: 获取security_config并传递给handle_channel_open
let security = security_config.lock().unwrap();
let response = channel_manager.handle_channel_open(&packet, Some(&security))?;
drop(security); // 释放锁
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_OPEN_CONFIRMATION");
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_REQUEST as u8 => {
info!("Received SSH_MSG_CHANNEL_REQUEST");
if let Some(response) = channel_manager.handle_channel_request(&packet)? {
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
// ⭐⭐⭐⭐⭐ Phase 14.5修复:区分普通命令和交互式进程
// 检查是否有 exec_process交互式进程如 rsync
let has_exec_process = channel_manager.has_exec_process();
if has_exec_process {
info!("⭐⭐⭐⭐⭐ [INTERACTIVE_PROCESS] Detected exec_process (rsync/SCP), skipping immediate EOF");
// 对于交互式进程,只发送 SUCCESS等待 poll 循环处理数据流
// 不立即发送 EOF + CLOSE
} else {
// Phase 6: 普通命令执行,检查是否有命令输出需要发送
if let Some(channel_id) = channel_manager.get_channel_with_output() {
if let Some(output) = channel_manager.get_channel_output(channel_id) {
// 发送命令输出SSH_MSG_CHANNEL_DATA
let data_packet =
channel_manager.build_channel_data(channel_id, &output)?;
let encrypted_data = EncryptedPacket::new(
&data_packet.payload,
encryption_ctx,
true,
)?;
encrypted_data.write(stream)?;
info!("Sent command output ({} bytes)", output.len());
// 发送SSH_MSG_CHANNEL_EOF
let eof_packet = channel_manager.build_channel_eof(channel_id)?;
let encrypted_eof = EncryptedPacket::new(
&eof_packet.payload,
encryption_ctx,
true,
)?;
encrypted_eof.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_EOF");
// 发送SSH_MSG_CHANNEL_CLOSE
let close_packet =
channel_manager.build_channel_close(channel_id)?;
let encrypted_close = EncryptedPacket::new(
&close_packet.payload,
encryption_ctx,
true,
)?;
encrypted_close.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_CLOSE");
// 移除channel
channel_manager.remove_channel(channel_id);
}
}
}
}
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_DATA as u8 => {
if let Some(response) = channel_manager.handle_channel_data(&packet)? {
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
info!("Sent SSH_MSG_CHANNEL_DATA (SFTP response)");
}
while let Some(pending) = channel_manager.pending_packets.pop_front() {
let encrypted_pending =
EncryptedPacket::new(&pending.payload, encryption_ctx, true)?;
encrypted_pending.write(stream)?;
}
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_CLOSE as u8 => {
if let Some(response) = channel_manager.handle_channel_close(&packet)? {
let encrypted_response =
EncryptedPacket::new(&response.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
}
break;
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_EOF as u8 => {
info!("Received SSH_MSG_CHANNEL_EOF");
// Phase 17: EOF means client won't send more data → close child stdin
// (Essential for SCP upload where scp -t waits for EOF on stdin)
channel_manager.close_child_stdin();
// ⭐⭐⭐⭐⭐ Phase 17: Send SSH_MSG_CHANNEL_CLOSE in response to EOF
// ONLY for subsystem channels (no exec_process) — RFC 4254 §5.3
// For exec channels, wait for child exit → exit-status + EOF + CLOSE
let has_exec = packet.payload.len() >= 5 && {
let channel_id =
u32::from_be_bytes([packet.payload[1], packet.payload[2], packet.payload[3], packet.payload[4]]);
channel_manager.channel_has_exec_process(channel_id)
};
if !has_exec && packet.payload.len() >= 5 {
let channel_id =
u32::from_be_bytes([packet.payload[1], packet.payload[2], packet.payload[3], packet.payload[4]]);
// ⭐⭐⭐⭐⭐ P0: Send exit-status 0 for subsystem channels
let exit_status_packet = channel_manager.build_channel_exit_status(channel_id, 0)?;
let encrypted_exit = EncryptedPacket::new(&exit_status_packet.payload, encryption_ctx, true)?;
encrypted_exit.write(stream)?;
let close_packet = channel_manager.build_channel_close(channel_id)?;
let encrypted_response =
EncryptedPacket::new(&close_packet.payload, encryption_ctx, true)?;
encrypted_response.write(stream)?;
}
}
Some(&pt) if pt == PacketType::SSH_MSG_DISCONNECT as u8 => {
let reason_code = if packet.payload.len() >= 5 {
u32::from_be_bytes([packet.payload[1], packet.payload[2], packet.payload[3], packet.payload[4]])
} else { 0 };
info!("Received SSH_MSG_DISCONNECT (reason={})", reason_code);
break;
}
Some(&pt) if pt == PacketType::SSH_MSG_CHANNEL_WINDOW_ADJUST as u8 => {
let payload = &packet.payload;
if payload.len() >= 9 {
// Format: uint32 recipient_channel || uint32 bytes_to_add
let recipient_channel =
u32::from_be_bytes([payload[1], payload[2], payload[3], payload[4]]);
let bytes_to_add =
u32::from_be_bytes([payload[5], payload[6], payload[7], payload[8]]);
channel_manager.adjust_remote_window(recipient_channel, bytes_to_add);
}
}
_ => {
let pt = packet.payload.first().copied().unwrap_or(0);
warn!("Unknown/unhandled packet type: {} (0x{:02x}), payload_len={}", pt, pt, packet.payload.len());
}
}
}
Ok(())
}
/// 从SSH_MSG_USERAUTH_REQUEST payload中提取用户名
fn extract_username_from_auth_request(
packet: &crate::ssh_server::packet::SshPacket,
) -> Result<String> {
let payload = &packet.payload;
if payload.len() < 5 {
return Err(anyhow!("Auth request too short"));
}
let name_len = u32::from_be_bytes([payload[1], payload[2], payload[3], payload[4]]) as usize;
if payload.len() < 5 + name_len {
return Err(anyhow!("Auth request truncated"));
}
let username = String::from_utf8_lossy(&payload[5..5 + name_len]).to_string();
Ok(username)
}
/// SSH服务器CLI入口
pub fn run_ssh_server(port: Option<u16>, pg_conn: Option<&str>) -> Result<()> {
let config = SshServerConfig {
port: port.unwrap_or(2024),
bind_address: "0.0.0.0".to_string(), // ⭐⭐⭐⭐⭐ Phase 8.3: Allow Docker container access
security_config: SshSecurityConfig::enterprise_default(),
pg_conn: pg_conn.map(|s| s.to_string()),
upload_hook_config: crate::config::UploadHookSection::default(),
};
let server = SshServer::new(config);
server.run()
}