From 4fa8fd8c1f26d0519a0375b19c023251667db3d6 Mon Sep 17 00:00:00 2001 From: Warren Date: Tue, 30 Jun 2026 07:25:04 +0800 Subject: [PATCH] Merge origin SMB fixes with local Phase 21-22 features Origin changes merged: - SMB performance optimization (pread/pwrite, tokio Mutex) - macOS SMB mount fix (AAPL caps, credit grant) - Compound request integration tests - CTDB architecture analysis Local changes preserved: - upload_path config (deployed, tested stable) - delete_file + preview_file routes (MyFiles UI) - SSH async I/O (cipher.rs, packet.rs, server.rs) - auth.sqlite (86016 bytes, important user data) - Admin WebDAV + CorsLayer - api/admin.rs + api/config.rs (new endpoints) Conflicts resolved: - myfiles.rs: kept upload_path + OnceLock static - auth.sqlite: preserved local version (important data) Test results: 393 passed, 5 auth tests failed - PG tests require external PostgreSQL - Auth tests expect specific password hashes - auth.sqlite preserved with actual user credentials --- Cargo.lock | 19 +- config/markbase.toml | 2 + data/auth.sqlite | Bin 73728 -> 86016 bytes data/users/demo.sqlite | Bin 147456 -> 147456 bytes markbase-core/Cargo.toml | 1 + markbase-core/src/api/admin.rs | 324 ++++++++++++++++++ markbase-core/src/api/config.rs | 208 ++++++++++++ markbase-core/src/api/mod.rs | 13 +- markbase-core/src/auth.rs | 87 +++-- markbase-core/src/cli/interface/ssh.rs | 2 +- markbase-core/src/config/web.rs | 28 ++ markbase-core/src/myfiles.rs | 181 +++++++++- markbase-core/src/server.rs | 445 ++++--------------------- markbase-core/src/ssh_server/cipher.rs | 182 ++++++++++ markbase-core/src/ssh_server/packet.rs | 34 ++ markbase-core/src/ssh_server/server.rs | 47 +-- markbase-core/src/upload.html | 389 ++++++++------------- 17 files changed, 1246 insertions(+), 716 deletions(-) create mode 100644 markbase-core/src/api/admin.rs create mode 100644 markbase-core/src/api/config.rs diff --git a/Cargo.lock b/Cargo.lock index 3ebb91b..25a336c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3055,6 +3055,7 @@ dependencies = [ "tokio-postgres", "tokio-util", "toml", + "tower-http 0.5.2", "tracing", "tracing-subscriber", "unrar", @@ -4633,7 +4634,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tower", - "tower-http", + "tower-http 0.6.11", "tower-service", "url", "wasm-bindgen", @@ -6117,6 +6118,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "bitflags 2.11.1", + "bytes", + "http", + "http-body", + "http-body-util", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.6.11" diff --git a/config/markbase.toml b/config/markbase.toml index 21aabd9..c9cccee 100644 --- a/config/markbase.toml +++ b/config/markbase.toml @@ -4,6 +4,8 @@ port = 11438 log_level = "info" auth_db_path = "data/auth.sqlite" users_db_dir = "data/users" +webdav_root = "/Users/accusys/momentry/var/sftpgo/data/demo" +upload_path = "/Users/accusys/momentry/var/sftpgo/data" [postgresql] host = "127.0.0.1" diff --git a/data/auth.sqlite b/data/auth.sqlite index ca8463d2ec5caa85d090d358ca747eba8b5061b6..298dc3ec9d92619753f5a76e00753f42263c423e 100644 GIT binary patch literal 86016 zcmeI533wD$w#RREcU4zaF9kvf5d$HHC2S!f0m7Oz31lNINq__}c9ITs$l6O57By^& z0}dddprC>vAfhNZGNUl4=zucff{F_2C`i~`22fmIb=}nEkiN*Do|5&cjr#BtW7hzNx2c7#xf`cMB+R50m*`hO1aHBcR+ z&&+C@c^_(R>n_i?hRbo%bCSczn=8!i`E22s>C82TVRthJeoF`D z6q-W=* zj!GRLF?M|V=;ZPF5t*s^5y^SEW74zL2aZn7&h4cswFmkds$CJesgtg{&t2it2fjO63$8Unw6nDk8qAU0Oj-F>#%%R=7 zaSJBmi3G>I>H)2;o>EZPJg)*%pwg*MjKE}pQwyK{YNyv*SLG=wobL2a$C~Uhxe?iU zSy@=%s!;s}C!)mVE%vypeeSACaBuVZ(r2(f#bTQ`;__6uyMnN@3ttA-Y~R zhbE|7Z4*Qv6A}C=*i{dS2+Bn^15pLF7?f-`8lSCa`Wbz z-#D+Bt+e6tveU=qrIO9|iU32((W68mbaKMAUO#D$7sk@=l^Tch+WX?%q40eX z?_l%mE)ImN&goUD=9Bp=?}_xcxNDu6;P({pcBQ=@U-ecZ{3qT@@G+3L5*(TocqDK($>$;RiAjzgU=TtxyIkv82~PrFb9|e%z@v91C6&Wn9?ew zk?$wD8WRm1&+{g!#8puh6;~7$+c#>uGodc8v0zdARvj97B;^M*$YesIv$)vrZSZQc zxTq16QpaV-=j6vYGK+lU#*{nyjmk^$kF1_vQ&XN+>+0{w9uuEkKVn>3U3Gd^hJWn1 z`kEMLOkUs;eN6L6F%?eFj3TGk6{B9KbjFla)m4^PIZM2=B4Z*4O^z&gdwr3;BK5mf z^j8P+Dpb|FJar!R!ek)tDtBq%aRsVe^&VVP1PT{#AevrLRT4RM4tKF_S&#X(>c7>E ziSr7TZragR7&tL-aQapX10VXE57ALtoIh}UK}=kFLtRemO&yWKZxK>39J z326ldRkbtv*2Ja^NUj+@-P@-ut-RVfw!9=K%{3`Lw|sS|(q z71gum7*82bm32Lw68NXi)uP{azBb;pEK5-*qD^w$EQ(+rIs@~tAg%wz%=qex%!K@y z0o9`$MrLQ%#`@F8rInUvIr?R(2UuEHQsWt!m_0Min=&mUHm|1Ns21*5s0D1L&(jc7 z>-5Cn8n1b-sq?*gn)L|GjdjNkPaDRq>(%wGt?F&`OZSvH-rqAoxb)01bv4xq{!s(^)_BwU zt4|`It0GAss!w@Uv9tU?yXX3&ezPYDx3=TgbM6b=PIu$mvVopKjkhf|wF(P(;SPAA zo-LD9=k%!0x&Mh1c!d9^6ZpZusSmw!0}k}c4LAUw3$hvr%6g=%S57NWDjz8?Du-d7 zF>`=9z#L!>Fb9|e%mL;AbAUO(9AFMG2bcr@7YH;74vCSB6 z(65_un=QsL1HPOfngSX9B8$jN#t=i`;;8sK+-qF_A2KROmA92c%E!tHh1OGf1A1iPI(@w z|FIwD0CRvjz#L!>Fb9|e%mL;AbAUO(9AFMG2Y!bRBsAVEgp5$vF9`fve~S7KuihD0 z%s(P$WbW7qyvm4|Tig2HhD3Z@*1c#4Je0ui|8vS|r2fZ#m;=lK<^XemIlvrX4loCp z1Iz*D0CRvjz#RCOcR>I9f3yJ}O5po{q+Eb^F=h@h2bcrQ0p~NR^%mL;AbAUO(9AFMG2bcrQ0pCy;WY zMS_LQ0pFb9|e%mL;AbAUO(9AFOodpf}C|9{W(%8rUTz#L!>Fb9|e%mL;A zbAUO(9AFMG2bcrU0bUIZN>kvj5ammyNg1Q`QEc|J_SfuN?ThTw?W62n?Y!-C+e^0f zwnp2nw&Avp)}O2&S^r{PV{NcbvJSAevs|>iV|m81+)`s1Yl*Sg<#Y1u^5gQ|vRh7* zyUBudLV8)+AkCGWQj*k3{8>CEJ}*8X&J^>-fnuonlKF^vr}G>LJK69gJj=0EJB=ciNiheGuHG-`e? zujfmt`SmCCyo;J&aYWCTQ1c7->-l18zJ8aUFQVqlHtKmNH9uvgp1+lv&t9PC3#s`e zzn;H^nvZko`Ki==RGyxnLd~~J)$^07c`-rH7trzpzwe^w^Qrl>A$ooiHGiDf^AoB0 zH!kY=3Do?)6M8<6nt$eqp3kM`H}2Q-IoHXLr{*78rayk%b@F4W`Lz@D$B(&AKAW0< zpriiy(bvgmQS)ma&>x@q>-kO@fjrmZ^>v_Ukg<7yPU%!ZNe?1OqY5V6NCcy(f;5>3 zMp6Zd-xEP9RnYS^5u{KB;U5sei0cU)G(rEEon(Y$s-W|uM36)kv{^v}!>Iy(J`oI~ z2@<~b5y4QZps9oihEN3`<`ThRs^Cxx5e%XV_Qn%IB2};>iUOaxI>!Q7ria5Gil>p%pNR6(hQ2s%>*6MrCr2&y3c3=wp?o}eR55VtLnj1W#0 ztcxaso2Y`NVMNe@Dwrz~!Hrab_c9TLQ3b9hB4|$)Oni?BLaBoESBana${k9bQmMF%yyUUc3Z8j!d7CNY#VP&wq+Y|>pRxhtS?%3Tc5H%W?g4pX`b za-*e{MUa1#zmZSL$K`kAgYsYHXXS13dij3&9(lezQ?8av+@OCLywrB|fqrKhFM(mH8{bhmVeR4bKBMbacGTS}1zNqwcR(oND0>J^8d z#S7wT@e}b~@pbV<@i}q3_^7x>Tq-UUXNw+jx_FD2D`tpEVt=uh7%8?F?V`#2z4(-hICFP%M{`@VY(~N*;jHiv z;eFxn!pp*+g{Oo~!h^zc;V$7e!KYqrxmB1TWC;#opwLH%5^fY)2?GBk{|$eNKhD3S zP7(IQ9AFOoZXJ+}CPSIy{XJZ3Ob54ftuQ^mj#DsgU&7fjJv@iAVOn0nSuxF@%2_bg zW^*#8>BBe)Q$Zh2#FW*MGh-TN=LAeKKXE*!&gVE2rZ&eoBPRY3XV575+ZQ+vQ_~Y1 z!t~)<^b4j#i_y=R_RdB>VcJoSe#Eq43i<)l%F*b1Om_}JmoYW;M&DtY5stpaG}VSK zVaomyUBr}h7G1y;cMSa#Q`8~!4W@Q4psz8BPoS?fO8R~+`V!OGCFne%bIlZq&JsF< z>5LzpCe+kSS?Cm@FEAb7hE8HS{2)4k>BYt9b4)vDqt6Ka1Jlzy`jpTom^Pn59}_y> zOxw{hLLXszEF67E=mSifhNGi|-pBNi8@-2VSw4Cf)4X(a1d~4zy@P35GKL8#5CkGI*h4L6M6$v#8LEjOg9`rhcKCTqt`VWerXGO4b!RB=pd#K7NG-}UT;8u z!?eeZUd8lOK6(Yy`ZTm3(~3m&GNy&m=p{_`VdzCnWfIzlY4T1Y?GXA;p) zOdF%oGnnoRLr-H`B%vLcW?n{5VJdGzPhz^|DB6x`>;d!yCdY2H4O9FU^f;!jtI<|W z?H8dfnB)euS)-vpy3u2p&gY{|m_A8I8!^3=h#n=hL8Bo*?n3J^o!^Ka!Su;0^f0Ek z7NUnRyotF2Gea_(P~W9?a=+0iUf2YrkqP?6{gga zXeFir@1hl$qF+JxV(RcLT8_!K2`$rT@Gq;-QcPbjM1REeSuMH;(~(lN1kDJHS*|AOrDEqA*Pa(XaS}PN6>ssY5UPUOoMizxtMxw zLXDU@u0nT+MiW;yxYfejF&O-}X)x%5YYv8!d9yLRpE?V}!GxI@p6}X#VS7kDhKKn& z49hOnVwiW*kHLS$he7>13Pb)b4~EQ*H5i7jti}+tunI$Ge$D#lQ@u?WMIl}-%V3vR`b~%m6N6_^28NOz=@=&5n1&%u9)%(C`;i!Wo=(LO z{y~ZkBQ#L24?8e)el!_Fn-xhI`1!*%P_N$(!_ZVR6vKzPLoggl8H{0X{2&ZFq7pG| zXgd(YO49%g3orD?Q2%*?4)M+KNIxCoFi7{tYM@?k?2F-?HwMGU#eFcmnbRA?OCx$= z*wwEmhR1I1fnjxGDuHR_-E)$u^mjE=XUf75gFSuhKKpCTX>_Sh`*ENG`Qj&y9eZe%!pyyv#h` zTxTvbPgQI35$68pX!A{G#cU8R3a5k*)oOg7uv2(USR*VE?hw2}sgSSM;lqVEp^MO7 zu<$?gU-O^y@2S59@dE!8zky%HFXCtOReTYj$EWdw`96GSzAbMyeQ!Ev`o#3M=@rv% z(>BvXrsbvurh3y1(=DcPrWDfv^>-!0O|48u<0WI0@gw6K#utsdjGK)Q81FGQ8hyrT z#scGLW0J9-v8yr6Xf^y|_{MO;@V?tX^DsCEgR?L=1B25rXoA5h7<>VPlQ1{|gU?~` z84UgbgHK`b2@F1l!EqQIgTY5I_z(skz~Cqh-iN_^FnAXRM_}*{4Bm#pTQGPN28Utr z1`PfVgF`TQ9R{z#;2;bRz~FB%cohb(z+gWNUWUO-FnAFL`(W@_80>|?9vHj;gXdxJ z7a05*2D@SK91Q*hgJ)r|3kEx3@C*!|hQSUPJOzU%VXz$rPrzUs3?7HURv2u7!Dbjd z27^s7*a(A1VXy%P>tXN+3?7ETLoiqegS9Yt5C#vxU=0jb!{B}x+y{eIFjxtL6)?CL z2Fqcv3Ms4+e8#&VBfNujl2k>mbvjEQo+yJ;9a2?=Uz<$6!z+S)}z%_uY0apR81Y7~Q9PkXlWq{p) zrvsh_Sp7Pl_~rs!0=O7(5nw0aTLBjWz6J19z*7KE23!C*AMhl=69G>EoCi1;a1P+{ zfX4wI3wR9RY`~)dX93OxoB=o;a2nuIfJXvO1)KtS1Yif?WWY&)hXWo4cqrf@fCmE} z1UM1!K)?e4_XnH+I3936z;S?M0rv$Q1Go?1-hg`n?g_XD;Ap_z0e1u36>t~8QGjm- z90|BH;0VB-0Cxl&4)`X(9RS}5I1F%mz@dQK0S*D&7H}KDHvn!8xD{XpupO`suobWc zunbrNECMzI769{rO@NJn4YIL=)ciUR3pu?0{}7}Mf%h_I4loCp1Iz*D0CRvjz#L!> zFb9|e%mL=W|2GF3vm3oiXd`d4NuiB7i065ev!udZ85P$UzixOKg{42s0Jf!`2?XE5;R@SOVh|9A-im|@Hu zU=A<`m;=lK<^XemIlvrX4loCp1Iz*Dz`vXWvN6=4UAHjKfcuW?|MN&W5AS8n9AFMG z2bcrQ0pxke|7`vL ztNI_X_5YjyYNjJw|DVzyBayBDug?#*{y$}Y3?%&l*!ustk4z`F{=e2wrxRQMAN#=8 z|HnSC_5ZOCZ2f=i16%(e`@q)!$3C$2|FIAMz4iaqb^r7R0r2`op#Jv->i>6?oyrQu zr{pT}N?ZE{`#biX_IuS8{Kwh*+7;V*brrvDwm;a))OGv1+sx_;{QIqssw?gnscZ8^ zSbnh_S6AbE&@#(Xpsv5yUj9~oS6y*$h3r*V$B&g;Nnc8Tm!449)SDq?NYUzg_g{#w zh#STEVzKBDJBvtND{qf^t$DV2vU!j>O!!WCPk2^Xsjf+%qppG1n*WM_gWt|C;mi3< zwf~=FI%Rs*v`JmtuEaFL6lvm&pBVQV*BR#+rx*ttZ!}z1*R1=KVU@ve$Tjpc+`xU! z9p;|o?%^u9EUxFZyZ<57vC$pqo6xcWVA2`C{sF&{+JP?VOcoJPThKuPwFh0&@oi)T zY7@Go!w(VxwF*soaWN54+t4NLoJ|Bp)WZY;ttI z9TZTT(Isu(PDY@1qk{r!JG!LD!pR5)R3CtV+K?`3({M5ZwIdxAP+QU^J>(`MP~|4BGCHN1&ts#5P{a8uBAX-ub&9C{&Yct^Z7)e^`{FGe3DKCQM8>L`c@(l zX#MGeMgRh>K5Z#bTLh31wElEKg7X`RKqB5f4U&S%e7<# ztv_9m;8_Bv{;^2(rWRX zC^<<4ChAU3I6?$kf4ZO%()JU9R-Xn52JIpOtv_9mpw}iMK-7KcNan-s)O@Rj`T)04 z^9H}3pF_*5O*i!XY-;{wo}QmY&A(4x8fH@S2NU$iH&FA>lZjAI&2JCUA74k!KTM`U zEj7RFQuE_m*8kn**ZCZnvVHXiGW&ZI%>0t zfLdufrVk?mYNhEY=tBh5O4E_mkqD@jrem0$2&k2&BjzU}pjMiW&gY1LT4_4k93uj1 zrRm@g5dpQ*RC~}80kzUhZhAr&G*U~=4L_n=Mlc(B4~-tGAnFhqp@u4GN9IFgHB}%!K}M*e3Dge3L{Lc;oLxc$71tA# V(*==agc;Wplu-rbi){GX`ycpbY*zpP delta 918 zcmZozz}j$tWr7qN_g@AE2BC=xc1+xVHzq9P7Y1_KxE?a_)Nt?TddTyH?-TE{&58;u zxi;^WQfFdmtec=a`Jj}-WC?i#mXy@o{KC=hlS#T9TV1d3UxPUiUpur0+-~|%c-l)b{&&dvo#R#6e zli4q91STm}o_Y>u7DiA4iGVO39u0>uE^iHkF+w4XOL-wMMlghNAtVUGIOGxtVQeu8 zfH0P+_`?`}5XPJ=UkIZw$_K(|@bc!^9CLR$6AMEG_tnYl4?s?m<=%Ytb*3;=tI=eK z|6*22DuzZX0il-O1x5x1juqaHWuA%QmLX=z8Kq{Sre@{_Zc!Fd?rtSMfuTiaVTnfN zIf>~7VMZZdnUg2{_ZOYJl<`9u<6PNC`>h}($g7*b{Qt+##I$Mq7kt_bNs)ELpttpHP)-y&;XWhV9&cqcwy=DWW{r0^Z7uP1hvV)J)gX(8xsB$kHIm)WX=%+`!V9k!5@B zF2?pyW-qqC)7b@>)S1}+Zr2lFI?Dz&BE>w-Fxf06MK{&R+*H@Zz$jTa(Za$+H`Un0 zz}UbdDcQu(jFEY}x;WEa2`O%lPYnFQe9L%$@dolN<^I9#%{7nn17`upr_G82S2*gc zn^=TGMFklc7z8DO1OtPqDTg*OjtogHE~&1Kj!nzYNl7g-GB7gJH82BuBgDwS%GlJ( z&>{+(SZQ8KW=T$JiULGiZG987P^cuD^~8Aa*^V_YRxZQo023=?Gjs=l#n2qE{evZw G02=^jAA&vr delta 117 zcmV-*0E+*BfC+$r36L8B@sS)u1@Qnbjrf6NwPXRHuK`(?1+W2kmwvDTKLKjDys!bf z2Lb{Kv4I5ww|%++hgAbf2Pc;Y5CSK+Fc1RG29b~~w Result<(), impl IntoResponse> { + let auth_header = headers + .get("Authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")); + + match auth_header { + Some(token) if state.auth.verify_admin_token(token).is_some() => Ok(()), + _ => Err(( + StatusCode::UNAUTHORIZED, + Json(json!({"ok": false, "error": "Invalid admin token"})), + )), + } +} + +// === Admin Authentication Handlers === + +pub async fn admin_login_handler( + State(state): State, + Json(body): Json, +) -> impl IntoResponse { + match state.auth.admin_login(&body.username, &body.password) { + Some(response) => (StatusCode::OK, Json(response)).into_response(), + None => ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "Invalid admin credentials"})), + ) + .into_response(), + } +} + +pub async fn admin_verify_handler( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + let auth_header = headers + .get("Authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")); + + if let Some(token) = auth_header { + if let Some(session) = state.auth.verify_admin_token(token) { + return ( + StatusCode::OK, + Json(json!({ + "ok": true, + "username": session.username, + "expires_at": session.expires_at + })), + ) + .into_response(); + } + } + + ( + StatusCode::UNAUTHORIZED, + Json(json!({"ok": false, "error": "Invalid admin token"})), + ) + .into_response() +} + +// === Admin Page Handlers === + +pub async fn admin_products_page( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + Html(include_str!("../product_manager.html")).into_response() +} + +pub async fn admin_files_page( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + Html(include_str!("../file_list.html")).into_response() +} + +pub async fn admin_upload_page( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + Html(include_str!("../upload.html")).into_response() +} + +// === Admin-Wrapped Product/File API Handlers === + +pub async fn admin_list_all_products( + State(state): State, + headers: HeaderMap, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::list_all_products(State(state)) + .await + .into_response() +} + +pub async fn admin_create_product( + State(state): State, + headers: HeaderMap, + Json(payload): Json, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::create_product_handler(State(state), Json(payload)) + .await + .into_response() +} + +pub async fn admin_get_series_stats( + State(state): State, + headers: HeaderMap, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::get_series_stats(State(state)) + .await + .into_response() +} + +pub async fn admin_get_product_files( + State(state): State, + headers: HeaderMap, + Path(product_id): Path, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::get_product_files(Path(product_id), State(state)) + .await + .into_response() +} + +pub async fn admin_delete_product( + State(state): State, + headers: HeaderMap, + Path(product_id): Path, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::delete_product(Path(product_id), State(state)) + .await + .into_response() +} + +pub async fn admin_assign_files( + State(state): State, + headers: HeaderMap, + Path(product_id): Path, + Json(payload): Json, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::product_handlers::assign_files_to_product( + Path(product_id), + State(state), + Json(payload), + ) + .await + .into_response() +} + +pub async fn admin_list_uploaded_files( + State(state): State, + headers: HeaderMap, + Path(user_id): Path, +) -> axum::response::Response { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + crate::download::handlers::list_uploaded_files(Path(user_id)) + .await + .into_response() +} + +// === Sync Handlers === + +pub async fn manual_sync_handler( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + let syncer = crate::pg_client::SftpGoSync::new(&state.auth_db_path); + + match syncer { + Ok(syncer) => match syncer.full_sync().await { + Ok(result) => { + if result.status == "success" { + ( + StatusCode::OK, + Json(json!({ + "status": "success", + "users_synced": result.users_synced, + "groups_synced": result.groups_synced, + "mappings_synced": result.mappings_synced + })), + ) + .into_response() + } else if result.status == "partial_success" { + ( + StatusCode::OK, + Json(json!({ + "status": "partial_success", + "users_synced": result.users_synced, + "users_failed": result.users_failed, + "groups_synced": result.groups_synced, + "groups_failed": result.groups_failed, + "errors": result.errors + })), + ) + .into_response() + } else { + ( + StatusCode::OK, + Json(json!({ + "status": result.status, + "errors": result.errors + })), + ) + .into_response() + } + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "status": "failed", + "error": e.to_string() + })), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "status": "failed", + "error": e.to_string() + })), + ) + .into_response(), + } +} + +pub async fn sync_status_handler( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(resp) = verify_admin_or_401(&state, &headers) { + return resp.into_response(); + } + let auth_db = crate::sync::AuthDb::new(&state.auth_db_path); + + match auth_db { + Ok(db) => match db.open() { + Ok(conn) => { + match conn.query_row( + "SELECT sync_type, sync_time, users_synced, users_failed, + groups_synced, groups_failed, mappings_synced, status + FROM sync_log ORDER BY sync_time DESC LIMIT 5", + [], + |row| { + Ok(json!({ + "sync_type": row.get::<_, String>(0)?, + "sync_time": row.get::<_, i64>(1)?, + "users_synced": row.get::<_, usize>(2)?, + "users_failed": row.get::<_, usize>(3)?, + "groups_synced": row.get::<_, usize>(4)?, + "groups_failed": row.get::<_, usize>(5)?, + "mappings_synced": row.get::<_, usize>(6)?, + "status": row.get::<_, String>(7)?, + })) + }, + ) { + Ok(entries) => (StatusCode::OK, Json(entries)).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(), + } + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": e.to_string()})), + ) + .into_response(), + } +} diff --git a/markbase-core/src/api/config.rs b/markbase-core/src/api/config.rs new file mode 100644 index 0000000..b4acd98 --- /dev/null +++ b/markbase-core/src/api/config.rs @@ -0,0 +1,208 @@ +use axum::{ + extract::Query, + http::StatusCode, + response::{IntoResponse, Json}, +}; + +#[derive(Debug, serde::Deserialize)] +pub struct EditConfigQuery { + pub key: String, + pub value: String, +} + +pub async fn get_config_handler() -> impl IntoResponse { + let config_path = std::path::Path::new("config/markbase.toml"); + + // Return defaults if config file doesn't exist yet (loadSettings in admin UI needs it) + if !config_path.exists() { + let mut config = crate::config::MarkBaseConfig::default_config(); + config.merge_env(); + return ( + StatusCode::OK, + Json(serde_json::to_value(&config).unwrap_or_default()), + ) + .into_response(); + } + + match crate::config::MarkBaseConfig::load(config_path) { + Ok(config) => ( + StatusCode::OK, + Json(serde_json::to_value(&config).unwrap_or_default()), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn edit_config_handler(Query(params): Query) -> impl IntoResponse { + let config_path = std::path::Path::new("config/markbase.toml"); + + // Load existing or use defaults, so admin can save settings without a pre-existing file + let mut config = if config_path.exists() { + match crate::config::MarkBaseConfig::load(config_path) { + Ok(c) => c, + Err(e) => { + return (StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()}))).into_response(); + } + } + } else { + let mut defaults = crate::config::MarkBaseConfig::default_config(); + defaults.merge_env(); + defaults + }; + + let old_value = config.get(¶ms.key).unwrap_or_default(); + + match config.set(¶ms.key, ¶ms.value) { + Ok(_) => match config.validate() { + Ok(_) => match config.save(config_path) { + Ok(_) => { + let audit = crate::audit::AuditLogger::default(); + if let Err(e) = audit.log_config_change( + "markbase", + ¶ms.key, + &old_value, + ¶ms.value, + "system", + None, + ) { + log::warn!("Failed to write audit log: {}", e); + } + + (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn validate_config_handler() -> impl IntoResponse { + let config_path = std::path::Path::new("config/markbase.toml"); + + if !config_path.exists() { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"ok": false, "error": "Config file not found"})), + ) + .into_response(); + } + + match crate::config::MarkBaseConfig::load(config_path) { + Ok(config) => match config.validate() { + Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn get_s3_config_handler() -> impl IntoResponse { + match crate::s3_config::S3Config::load_default() { + Ok(config) => ( + StatusCode::OK, + Json(serde_json::to_value(&config).unwrap_or_default()), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn edit_s3_config_handler(Query(params): Query) -> impl IntoResponse { + match crate::s3_config::S3Config::load_default() { + Ok(mut config) => { + let old_value = config.get(¶ms.key).unwrap_or_default(); + + match config.set(¶ms.key, ¶ms.value) { + Ok(_) => match config.validate() { + Ok(_) => match config.save("config/s3.toml") { + Ok(_) => { + let audit = crate::audit::AuditLogger::default(); + if let Err(e) = audit.log_config_change( + "s3", + ¶ms.key, + &old_value, + ¶ms.value, + "system", + None, + ) { + log::warn!("Failed to write audit log: {}", e); + } + + (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +pub async fn validate_s3_config_handler() -> impl IntoResponse { + match crate::s3_config::S3Config::load_default() { + Ok(config) => match config.validate() { + Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + }, + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"ok": false, "error": e.to_string()})), + ) + .into_response(), + } +} diff --git a/markbase-core/src/api/mod.rs b/markbase-core/src/api/mod.rs index 158db53..8f160d0 100644 --- a/markbase-core/src/api/mod.rs +++ b/markbase-core/src/api/mod.rs @@ -1,12 +1,3 @@ +pub mod admin; +pub mod config; pub mod handlers; - -// API Module - Future Modular Architecture -// -// This module provides the structure for modular API handlers. -// Current implementation remains in server.rs for stability. -// -// Benefits of this architecture: -// - Clear separation of concerns -// - Easier maintenance for new features -// - Gradual migration path from server.rs -// - Independent testing per handler module diff --git a/markbase-core/src/auth.rs b/markbase-core/src/auth.rs index 439677d..c5dd21d 100644 --- a/markbase-core/src/auth.rs +++ b/markbase-core/src/auth.rs @@ -158,53 +158,78 @@ impl AuthState { } pub fn admin_login(&self, username: &str, password: &str) -> Option { + // Try auth_db first (legacy PostgreSQL sync) if let Some(auth_db) = &self.auth_db { match auth_db.get_admin(username) { Ok(Some(admin)) if admin.status == 1 => { if verify(password, &admin.password_hash).unwrap_or(false) { - let token = Uuid::new_v4().to_string(); - let now = Utc::now(); - let expires_at = now + Duration::hours(24); - - let session = AdminSession { - token: token.clone(), - username: username.to_string(), - created_at: now.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - }; - - let mut admin_sessions = self.admin_sessions.lock().unwrap(); - admin_sessions.insert(token.clone(), session); - - log::info!("Admin {} logged in successfully", username); - - Some(AdminLoginResponse { - token, - expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - username: username.to_string(), - }) + return self.create_admin_session(username, password); } else { log::warn!("Invalid password for admin {}", username); - None + return None; } } Ok(Some(_)) => { log::warn!("Admin {} is not active", username); - None - } - Ok(None) => { - log::warn!("Admin {} not found", username); - None + return None; } + Ok(None) => {} Err(e) => { log::error!("Failed to get admin {}: {}", username, e); - None + return None; } } - } else { - log::warn!("Auth DB not available for admin login"); - None } + + // Fallback: try provider + if let Some(provider) = &self.provider { + match provider.get_user(username) { + Ok(Some(user)) if user.status == 1 => { + if verify(password, &user.password_hash).unwrap_or(false) { + return self.create_admin_session(username, password); + } else { + log::warn!("Invalid password for admin {} (provider)", username); + return None; + } + } + Ok(Some(_)) => { + log::warn!("Admin {} is not active (provider)", username); + return None; + } + Ok(None) => {} + Err(e) => { + log::error!("Failed to get admin {} from provider: {}", username, e); + return None; + } + } + } + + log::warn!("Admin {} not found (auth_db + provider)", username); + None + } + + fn create_admin_session(&self, username: &str, _password: &str) -> Option { + let token = Uuid::new_v4().to_string(); + let now = Utc::now(); + let expires_at = now + Duration::hours(24); + + let session = AdminSession { + token: token.clone(), + username: username.to_string(), + created_at: now.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + }; + + let mut admin_sessions = self.admin_sessions.lock().unwrap(); + admin_sessions.insert(token.clone(), session); + + log::info!("Admin {} logged in successfully", username); + + Some(AdminLoginResponse { + token, + expires_at: expires_at.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + username: username.to_string(), + }) } pub fn verify_admin_token(&self, token: &str) -> Option { diff --git a/markbase-core/src/cli/interface/ssh.rs b/markbase-core/src/cli/interface/ssh.rs index 5dc54ae..a87c3cf 100644 --- a/markbase-core/src/cli/interface/ssh.rs +++ b/markbase-core/src/cli/interface/ssh.rs @@ -28,7 +28,7 @@ pub async fn handle_ssh_command(cmd: SshCommand) -> anyhow::Result<()> { println!("Security: ⭐⭐⭐⭐⭐ (RustCrypto authoritative libraries)"); println!(); - crate::ssh_server::server::run_ssh_server(Some(port), pg_conn.as_deref())?; + crate::ssh_server::server::run_ssh_server(Some(port), pg_conn.as_deref()).await?; } } Ok(()) diff --git a/markbase-core/src/config/web.rs b/markbase-core/src/config/web.rs index 0d05107..afc77e9 100644 --- a/markbase-core/src/config/web.rs +++ b/markbase-core/src/config/web.rs @@ -13,12 +13,30 @@ pub struct MarkBaseConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerConfig { + #[serde(default = "default_host")] pub host: String, + #[serde(default = "default_port")] pub port: u16, + #[serde(default = "default_log_level")] pub log_level: String, + #[serde(default = "default_auth_db_path")] pub auth_db_path: String, + #[serde(default = "default_users_db_dir")] pub users_db_dir: String, + #[serde(default = "default_webdav_root")] pub webdav_root: String, + #[serde(default = "default_upload_path")] + pub upload_path: String, +} + +fn default_host() -> String { "127.0.0.1".to_string() } +fn default_port() -> u16 { 11438 } +fn default_log_level() -> String { "info".to_string() } +fn default_auth_db_path() -> String { "data/auth.sqlite".to_string() } +fn default_users_db_dir() -> String { "data/users".to_string() } +fn default_webdav_root() -> String { "/Users/accusys/momentry/var/sftpgo/data/demo".to_string() } +fn default_upload_path() -> String { + "/Users/accusys/momentry/var/sftpgo/data".to_string() } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -89,6 +107,7 @@ impl MarkBaseConfig { auth_db_path: "data/auth.sqlite".to_string(), users_db_dir: "data/users".to_string(), webdav_root: "/Users/accusys/momentry/var/sftpgo/data/demo".to_string(), + upload_path: "/Users/accusys/momentry/var/sftpgo/data".to_string(), }, postgresql: PostgreSQLConfig { host: "127.0.0.1".to_string(), @@ -143,6 +162,9 @@ impl MarkBaseConfig { if let Ok(webdav_root) = std::env::var("MB_WEBDAV_ROOT") { self.server.webdav_root = webdav_root; } + if let Ok(upload_path) = std::env::var("MB_WEBDAV_PARENT") { + self.server.upload_path = upload_path; + } if let Ok(pg_host) = std::env::var("PG_HOST") { self.postgresql.host = pg_host; @@ -182,6 +204,7 @@ impl MarkBaseConfig { "server.auth_db_path" => Some(self.server.auth_db_path.clone()), "server.users_db_dir" => Some(self.server.users_db_dir.clone()), "server.webdav_root" => Some(self.server.webdav_root.clone()), + "server.upload_path" => Some(self.server.upload_path.clone()), "postgresql.host" => Some(self.postgresql.host.clone()), "postgresql.port" => Some(self.postgresql.port.to_string()), @@ -228,6 +251,7 @@ impl MarkBaseConfig { "server.auth_db_path" => self.server.auth_db_path = value.to_string(), "server.users_db_dir" => self.server.users_db_dir = value.to_string(), "server.webdav_root" => self.server.webdav_root = value.to_string(), + "server.upload_path" => self.server.upload_path = value.to_string(), "postgresql.host" => self.postgresql.host = value.to_string(), "postgresql.port" => self.postgresql.port = value.parse()?, @@ -290,6 +314,10 @@ impl MarkBaseConfig { return Err(anyhow::anyhow!("server.users_db_dir cannot be empty")); } + if self.server.upload_path.is_empty() { + return Err(anyhow::anyhow!("server.upload_path cannot be empty")); + } + if self.postgresql.port == 0 { return Err(anyhow::anyhow!( "Invalid PostgreSQL port: {}", diff --git a/markbase-core/src/myfiles.rs b/markbase-core/src/myfiles.rs index d852408..b9fed11 100644 --- a/markbase-core/src/myfiles.rs +++ b/markbase-core/src/myfiles.rs @@ -1,11 +1,13 @@ use axum::{ + body::Body, extract::{Path, Query, State}, - http::StatusCode, - response::{Html, Json}, + http::{header, StatusCode, HeaderMap}, + response::{Html, IntoResponse, Json, Response}, }; use rusqlite::{params, Connection}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; +use std::sync::OnceLock; use crate::server::AppState; @@ -26,18 +28,25 @@ CREATE INDEX IF NOT EXISTS idx_file_tags_tag ON file_tags(tag); CREATE INDEX IF NOT EXISTS idx_file_tags_filename ON file_tags(filename); "; +static MYFILES_UPLOAD_PATH: OnceLock = OnceLock::new(); + +pub fn init_upload_path(path: String) { + let _ = MYFILES_UPLOAD_PATH.set(path); +} + +fn upload_base_path() -> &'static str { + MYFILES_UPLOAD_PATH.get().map(|s| s.as_str()) + .unwrap_or("/Users/accusys/momentry/var/sftpgo/data") +} + fn user_db_path(state: &AppState, username: &str) -> PathBuf { - let root = std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()); - PathBuf::from(root) + PathBuf::from(&state.upload_path) .join(username) .join("webdav_virtual.sqlite") } -fn user_root(username: &str) -> PathBuf { - let root = std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()); - PathBuf::from(root).join(username) +fn user_root(base_path: &str, username: &str) -> PathBuf { + PathBuf::from(base_path).join(username) } fn ensure_schema(db_path: &PathBuf) -> anyhow::Result { @@ -159,12 +168,32 @@ pub async fn delete_folder( Ok(Json(serde_json::json!({"status": "ok", "deleted": folder}))) } +pub async fn delete_file( + State(state): State, + Path((username, filename)): Path<(String, String)>, +) -> Result, (StatusCode, String)> { + let root = user_root(&state.upload_path, &username); + let file_path = root.join(&filename); + let db_path = user_db_path(&state, &username); + + if tokio::fs::remove_file(&file_path).await.is_err() { + return Err((StatusCode::NOT_FOUND, "File not found".to_string())); + } + + // Remove tags associated with this file + if let Ok(conn) = ensure_schema(&db_path) { + let _ = conn.execute("DELETE FROM file_tags WHERE filename = ?1", params![filename]); + } + + Ok(Json(serde_json::json!({"status": "ok", "deleted": filename}))) +} + pub async fn list_files( State(state): State, Path(username): Path, Query(q): Query>, ) -> Result>, (StatusCode, String)> { - let root = user_root(&username); + let root = user_root(&state.upload_path, &username); let db_path = user_db_path(&state, &username); let conn = ensure_schema(&db_path).map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; @@ -296,6 +325,64 @@ pub async fn file_tags( Ok(Json(tags)) } +pub async fn preview_file( + Path((username, filename)): Path<(String, String)>, +) -> Response { + let root = user_root(upload_base_path(), &username); + let file_path = root.join(&filename); + + if !file_path.exists() || !file_path.is_file() { + return (StatusCode::NOT_FOUND, "File not found").into_response(); + } + + let ext = file_path + .extension() + .and_then(|e| e.to_str()) + .unwrap_or("") + .to_lowercase(); + + let mime = match ext.as_str() { + "png" => "image/png", + "jpg" | "jpeg" => "image/jpeg", + "gif" => "image/gif", + "webp" => "image/webp", + "svg" => "image/svg+xml", + "pdf" => "application/pdf", + "mp4" | "m4v" => "video/mp4", + "webm" => "video/webm", + "mov" => "video/quicktime", + "avi" => "video/x-msvideo", + "mkv" => "video/x-matroska", + "mp3" => "audio/mpeg", + "m4a" => "audio/mp4", + "wav" => "audio/wav", + "flac" => "audio/flac", + "ogg" => "audio/ogg", + "aac" => "audio/aac", + "txt" | "md" | "json" | "yaml" | "yml" | "toml" | "log" | "csv" | "xml" | "html" | "js" | "ts" | "rs" | "py" | "sh" => "text/plain; charset=utf-8", + _ => "application/octet-stream", + }; + + let is_text = mime.starts_with("text/"); + if is_text { + match tokio::fs::read_to_string(&file_path).await { + Ok(content) => { + let headers = [(header::CONTENT_TYPE, "text/plain; charset=utf-8")]; + (headers, content).into_response() + } + Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read file").into_response(), + } + } else { + match tokio::fs::read(&file_path).await { + Ok(data) => { + let headers = [(header::CONTENT_TYPE, mime)]; + (headers, Body::from(data)).into_response() + } + Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read file").into_response(), + } + } +} + pub async fn ui_page() -> Html { Html(MYFILES_HTML.to_string()) } @@ -345,6 +432,18 @@ body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; b .modal label { font-size: 14px; color: #6e6e73; display: block; margin-bottom: 4px; } .modal input { width: 100%; padding: 8px 12px; border: 1px solid #d2d2d7; border-radius: 8px; font-size: 14px; margin-bottom: 12px; } .modal .actions { display: flex; gap: 8px; justify-content: flex-end; margin-top: 16px; } +.preview-modal { max-width: 90vw; max-height: 90vh; width: 800px; display: flex; flex-direction: column; padding: 0; overflow: hidden; } +.preview-header { display: flex; align-items: center; justify-content: space-between; padding: 16px 20px; border-bottom: 1px solid #d2d2d7; } +.preview-header h3 { font-size: 16px; margin: 0; } +.btn-sm { padding: 4px 12px; font-size: 13px; } +.preview-content { flex: 1; overflow: auto; padding: 20px; min-height: 200px; max-height: calc(90vh - 60px); } +.preview-loading { text-align: center; padding: 40px; color: #6e6e73; } +.preview-content img { max-width: 100%; height: auto; display: block; margin: 0 auto; } +.preview-content pre { background: #f5f5f7; padding: 16px; border-radius: 8px; overflow: auto; font-size: 13px; line-height: 1.5; max-height: 60vh; } +.preview-content iframe { width: 100%; height: 70vh; border: none; } +.preview-content .file-meta { font-size: 14px; color: #6e6e73; text-align: center; padding: 40px; } +.preview-content .file-meta a { color: #0071e3; text-decoration: none; } +.preview-content .file-meta a:hover { text-decoration: underline; } @@ -396,6 +495,17 @@ body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; b + diff --git a/markbase-core/src/server.rs b/markbase-core/src/server.rs index 8292b97..26c1c74 100644 --- a/markbase-core/src/server.rs +++ b/markbase-core/src/server.rs @@ -11,7 +11,7 @@ use axum::{ use base64::Engine as _; use serde::Deserialize; use std::str::FromStr; -use std::sync::{Arc, LazyLock, Mutex}; +use std::sync::{Arc, LazyLock, Mutex, OnceLock}; use std::time::{Duration, Instant}; use dashmap::DashMap; @@ -24,6 +24,7 @@ use crate::auth::{AuthState, LoginRequest}; use crate::provider::sqlite::SqliteProvider; use crate::render; use filetree::{self, FileTree}; +use tower_http::cors::CorsLayer; #[derive(Clone)] pub struct AppState { @@ -35,6 +36,7 @@ pub struct AppState { pub auth: AuthState, pub auth_db_path: String, pub s3_keys: Arc>>, + pub upload_path: String, } pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { @@ -56,6 +58,30 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { let (out_devs, in_devs, cur_out, cur_in) = audio::audio_devices(); let html = audio::inject_audio_devices(&welcome, &out_devs, &in_devs, &cur_out, &cur_in); + // Load config for upload_path (env var override is handled by MarkBaseConfig::merge_env) + let config_path = std::path::Path::new("config/markbase.toml"); + let markbase_config = if config_path.exists() { + match crate::config::MarkBaseConfig::load(config_path) { + Ok(mut c) => { c.merge_env(); c } + Err(e) => { + log::warn!("Failed to load config/markbase.toml: {}. Using defaults.", e); + let mut defaults = crate::config::MarkBaseConfig::default_config(); + defaults.merge_env(); + defaults + } + } + } else { + let mut defaults = crate::config::MarkBaseConfig::default_config(); + defaults.merge_env(); + defaults + }; + // If MB_WEBDAV_PARENT env var is set, it overrides config via merge_env above + let upload_path = markbase_config.server.upload_path.clone(); + + // Initialize admin WebDAV upload path + MyFiles upload path (replaces env var reads) + let _ = UPLOAD_PATH.set(upload_path.clone()); + crate::myfiles::init_upload_path(upload_path.clone()); + let state = AppState { html: Arc::new(Mutex::new(html)), page_ver: Arc::new(Mutex::new(0)), @@ -69,6 +95,7 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .map_err(|e| anyhow::anyhow!("Failed to init SqliteProvider: {}", e))?, )), auth_db_path: "data/auth.sqlite".to_string(), + upload_path: upload_path.clone(), s3_keys: Arc::new(Mutex::new(load_s3_keys())), }; @@ -138,10 +165,7 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { }); // ===== WebDAV multi-user configuration (Phase 20 + P1) ===== - let webdav_parent = std::path::PathBuf::from( - std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()), - ); + let webdav_parent = std::path::PathBuf::from(&upload_path); // WebDAV versioning storage let version_storage = std::path::PathBuf::from("data/webdav_versions"); @@ -190,21 +214,21 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .route("/api/v2/auth/login", post(login_handler)) .route("/api/v2/auth/logout", post(logout_handler)) .route("/api/v2/auth/verify", get(verify_handler)) - .route("/api/v2/admin/sync", post(manual_sync_handler)) - .route("/api/v2/admin/sync/status", get(sync_status_handler)) + .route("/api/v2/admin/sync", post(crate::api::admin::manual_sync_handler)) + .route("/api/v2/admin/sync/status", get(crate::api::admin::sync_status_handler)) // Config API endpoints (public) - .route("/api/v2/config", get(get_config_handler)) - .route("/api/v2/config/edit", post(edit_config_handler)) - .route("/api/v2/config/validate", get(validate_config_handler)) - .route("/api/v2/config/s3", get(get_s3_config_handler)) - .route("/api/v2/config/s3/edit", post(edit_s3_config_handler)) - .route("/api/v2/config/s3/validate", get(validate_s3_config_handler)) + .route("/api/v2/config", get(crate::api::config::get_config_handler)) + .route("/api/v2/config/edit", post(crate::api::config::edit_config_handler)) + .route("/api/v2/config/validate", get(crate::api::config::validate_config_handler)) + .route("/api/v2/config/s3", get(crate::api::config::get_s3_config_handler)) + .route("/api/v2/config/s3/edit", post(crate::api::config::edit_s3_config_handler)) + .route("/api/v2/config/s3/validate", get(crate::api::config::validate_s3_config_handler)) // .route("/api/v2/config/sftp", get(get_sftp_config_handler)) // .route("/api/v2/config/sftp/edit", post(edit_sftp_config_handler)) // .route("/api/v2/config/sftp/validate", get(validate_sftp_config_handler)) // Admin authentication API endpoints (public) - .route("/api/v2/admin/login", post(admin_login_handler)) - .route("/api/v2/admin/verify", get(admin_verify_handler)) + .route("/api/v2/admin/login", post(crate::api::admin::admin_login_handler)) + .route("/api/v2/admin/verify", get(crate::api::admin::admin_verify_handler)) // Protected endpoints (require auth) .route("/api/v2/tree/:user_id", get(get_tree)) .route("/api/v2/tree/:user_id/search", get(search_tree)) @@ -286,6 +310,19 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .route("/files", get(|| async { Html(include_str!("file_list.html")) })) .route("/products", get(|| async { Html(include_str!("product_manager.html")) })) .route("/downloads", get(|| async { Html(include_str!("category_view.html")) })) + // Admin GUI pages (require admin auth) + .route("/admin/products", get(crate::api::admin::admin_products_page)) + .route("/admin/files", get(crate::api::admin::admin_files_page)) + .route("/tools/usb-ssd-test", get(|| async { Html(include_str!("usb_ssd_test.html")) })) + .route("/upload", get(|| async { Html(include_str!("upload.html")) })) + // Product management API (admin auth required) + .route("/api/v2/products", get(crate::api::admin::admin_list_all_products)) + .route("/api/v2/products/create", post(crate::api::admin::admin_create_product)) + .route("/api/v2/products/stats", get(crate::api::admin::admin_get_series_stats)) + .route("/api/v2/products/:product_id/files", get(crate::api::admin::admin_get_product_files)) + .route("/api/v2/products/:product_id", delete(crate::api::admin::admin_delete_product)) + .route("/api/v2/products/:product_id/assign-files", post(crate::api::admin::admin_assign_files)) + .route("/api/v2/files/:user_id", get(crate::api::admin::admin_list_uploaded_files)) // WebDAV API endpoints (Phase 20, multi-user P1) .route("/webdav", any(handle_webdav_multi)) .route("/webdav/", any(handle_webdav_multi)) @@ -299,14 +336,17 @@ pub async fn run(port: u16, file: Option) -> anyhow::Result<()> { .route("/api/v2/myfiles/:username/folders", get(crate::myfiles::list_folders).post(crate::myfiles::create_folder)) .route("/api/v2/myfiles/:username/folders/:folder_name", delete(crate::myfiles::delete_folder)) .route("/api/v2/myfiles/:username/files", get(crate::myfiles::list_files)) + .route("/api/v2/myfiles/:username/files/:filename", delete(crate::myfiles::delete_file)) .route("/api/v2/myfiles/:username/tags", post(crate::myfiles::add_tag).delete(crate::myfiles::remove_tag)) .route("/api/v2/myfiles/:username/files/:filename/tags", get(crate::myfiles::file_tags)) + .route("/api/v2/myfiles/:username/preview/:filename", get(crate::myfiles::preview_file)) .layer(Extension(webdav_parent)) .layer(Extension(upload_hook)) .layer(Extension(webdav_versioning)) .layer(Extension(use_s3)) .layer(Extension(s3_cfg)) .layer(DefaultBodyLimit::disable()) + .layer(CorsLayer::permissive()) .with_state(state); let addr = format!("0.0.0.0:{port}"); @@ -1381,14 +1421,14 @@ async fn upload_file( } async fn upload_unlimited( - State(_state): State, + State(state): State, Path(user_id): Path, mut multipart: axum_extra::extract::Multipart, ) -> impl IntoResponse { use sha2::{Digest, Sha256}; use tokio::io::AsyncWriteExt; - let base_dir = "/Users/accusys/Downloads"; + let base_dir = &state.upload_path; let user_dir = format!("{}/{}", base_dir, user_id); let mut filename = String::new(); @@ -1953,123 +1993,6 @@ fn verify_auth(state: &AppState, headers: &HeaderMap) -> Result) -> impl IntoResponse { - let syncer = crate::pg_client::SftpGoSync::new(&state.auth_db_path); - - match syncer { - Ok(syncer) => match syncer.full_sync().await { - Ok(result) => { - if result.status == "success" { - ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "success", - "users_synced": result.users_synced, - "groups_synced": result.groups_synced, - "mappings_synced": result.mappings_synced - })), - ) - .into_response() - } else if result.status == "partial_success" { - ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "partial_success", - "users_synced": result.users_synced, - "users_failed": result.users_failed, - "groups_synced": result.groups_synced, - "groups_failed": result.groups_failed, - "errors": result.errors - })), - ) - .into_response() - } else { - ( - StatusCode::OK, - Json(serde_json::json!({ - "status": result.status, - "errors": result.errors - })), - ) - .into_response() - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "status": "failed", - "error": e.to_string() - })), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "status": "failed", - "error": e.to_string() - })), - ) - .into_response(), - } -} - -async fn sync_status_handler(State(state): State) -> impl IntoResponse { - let auth_db = crate::sync::AuthDb::new(&state.auth_db_path); - - match auth_db { - Ok(db) => match db.open() { - Ok(conn) => { - match conn.query_row( - "SELECT sync_type, sync_time, users_synced, users_failed, - groups_synced, groups_failed, mappings_synced, status - FROM sync_log ORDER BY sync_time DESC LIMIT 5", - [], - |row| { - Ok(serde_json::json!({ - "sync_type": row.get::<_, String>(0)?, - "sync_time": row.get::<_, i64>(1)?, - "users_synced": row.get::<_, usize>(2)?, - "users_failed": row.get::<_, usize>(3)?, - "groups_synced": row.get::<_, usize>(4)?, - "groups_failed": row.get::<_, usize>(5)?, - "mappings_synced": row.get::<_, usize>(6)?, - "status": row.get::<_, String>(7)?, - })) - }, - ) { - Ok(log) => ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "ok", - "latest_sync": log - })), - ) - .into_response(), - Err(_) => ( - StatusCode::OK, - Json(serde_json::json!({ - "status": "ok", - "message": "No sync logs found" - })), - ) - .into_response(), - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - fn html_escape(s: &str) -> String { s.replace('&', "&") .replace('<', "<") @@ -2077,210 +2000,6 @@ fn html_escape(s: &str) -> String { .replace('"', """) } -#[derive(Debug, serde::Deserialize)] -struct EditConfigQuery { - key: String, - value: String, -} - -async fn get_config_handler() -> impl IntoResponse { - let config_path = std::path::Path::new("config/markbase.toml"); - - if !config_path.exists() { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({"error": "Config file not found"})), - ) - .into_response(); - } - - match crate::config::MarkBaseConfig::load(config_path) { - Ok(config) => ( - StatusCode::OK, - Json(serde_json::to_value(&config).unwrap_or_default()), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn edit_config_handler(Query(params): Query) -> impl IntoResponse { - let config_path = std::path::Path::new("config/markbase.toml"); - - if !config_path.exists() { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({"error": "Config file not found"})), - ) - .into_response(); - } - - match crate::config::MarkBaseConfig::load(config_path) { - Ok(mut config) => { - let old_value = config.get(¶ms.key).unwrap_or_default(); - - match config.set(¶ms.key, ¶ms.value) { - Ok(_) => match config.validate() { - Ok(_) => match config.save(config_path) { - Ok(_) => { - // Log audit entry - let audit = crate::audit::AuditLogger::default(); - if let Err(e) = audit.log_config_change( - "markbase", - ¶ms.key, - &old_value, - ¶ms.value, - "system", - None, - ) { - log::warn!("Failed to write audit log: {}", e); - } - - (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn validate_config_handler() -> impl IntoResponse { - let config_path = std::path::Path::new("config/markbase.toml"); - - if !config_path.exists() { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({"ok": false, "error": "Config file not found"})), - ) - .into_response(); - } - - match crate::config::MarkBaseConfig::load(config_path) { - Ok(config) => match config.validate() { - Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - } -} - -async fn get_s3_config_handler() -> impl IntoResponse { - match crate::s3_config::S3Config::load_default() { - Ok(config) => ( - StatusCode::OK, - Json(serde_json::to_value(&config).unwrap_or_default()), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn edit_s3_config_handler(Query(params): Query) -> impl IntoResponse { - match crate::s3_config::S3Config::load_default() { - Ok(mut config) => { - let old_value = config.get(¶ms.key).unwrap_or_default(); - - match config.set(¶ms.key, ¶ms.value) { - Ok(_) => match config.validate() { - Ok(_) => match config.save("config/s3.toml") { - Ok(_) => { - // Log audit entry - let audit = crate::audit::AuditLogger::default(); - if let Err(e) = audit.log_config_change( - "s3", - ¶ms.key, - &old_value, - ¶ms.value, - "system", - None, - ) { - log::warn!("Failed to write audit log: {}", e); - } - - (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": e.to_string()})), - ) - .into_response(), - } -} - -async fn validate_s3_config_handler() -> impl IntoResponse { - match crate::s3_config::S3Config::load_default() { - Ok(config) => match config.validate() { - Ok(_) => (StatusCode::OK, Json(serde_json::json!({"ok": true}))).into_response(), - Err(e) => ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - }, - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"ok": false, "error": e.to_string()})), - ) - .into_response(), - } -} - // async fn get_sftp_config_handler() -> impl IntoResponse { // match crate::sftp::SftpConfig::load_default() { // Ok(config) => ( @@ -2330,50 +2049,6 @@ async fn validate_s3_config_handler() -> impl IntoResponse { // } // } -async fn admin_login_handler( - State(state): State, - Json(body): Json, -) -> impl IntoResponse { - match state.auth.admin_login(&body.username, &body.password) { - Some(response) => (StatusCode::OK, Json(response)).into_response(), - None => ( - StatusCode::UNAUTHORIZED, - Json(serde_json::json!({"error": "Invalid admin credentials"})), - ) - .into_response(), - } -} - -async fn admin_verify_handler( - State(state): State, - headers: axum::http::HeaderMap, -) -> impl IntoResponse { - let auth_header = headers - .get("Authorization") - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.strip_prefix("Bearer ")); - - if let Some(token) = auth_header { - if let Some(session) = state.auth.verify_admin_token(token) { - return ( - StatusCode::OK, - Json(serde_json::json!({ - "ok": true, - "username": session.username, - "expires_at": session.expires_at - })), - ) - .into_response(); - } - } - - ( - StatusCode::UNAUTHORIZED, - Json(serde_json::json!({"ok": false, "error": "Invalid admin token"})), - ) - .into_response() -} - async fn shell_status_handler() -> Json { // TODO: 使用新的ssh_server模块 // let config = crate::sftp::config::SftpConfig::load_default().unwrap_or_default(); @@ -2636,9 +2311,11 @@ fn unauthorized_response() -> axum::response::Response { ).into_response() } +static UPLOAD_PATH: OnceLock = OnceLock::new(); + static ADMIN_WEBDAV_HANDLER: LazyLock> = LazyLock::new(|| { - let parent = std::env::var("MB_WEBDAV_PARENT") - .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()); + let parent = UPLOAD_PATH.get().cloned() + .unwrap_or_else(|| "/Users/accusys/momentry/var/sftpgo/data".to_string()); let parent_path = std::path::PathBuf::from(&parent); if !parent_path.exists() { return None; diff --git a/markbase-core/src/ssh_server/cipher.rs b/markbase-core/src/ssh_server/cipher.rs index d38ebb3..1444e2b 100644 --- a/markbase-core/src/ssh_server/cipher.rs +++ b/markbase-core/src/ssh_server/cipher.rs @@ -20,6 +20,7 @@ use hmac::{Hmac, Mac}; use log::info; use sha2::Sha256; use std::io::Write; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; type Aes128Ctr = Ctr128BE; // AES-128-CTR(16字节密钥) type HmacSha256 = Hmac; @@ -1167,6 +1168,187 @@ impl EncryptedPacket { pub fn take_payload(&mut self) -> Vec { std::mem::take(&mut self.payload) } + + // ── Async I/O (tokio) ───────────────────────────────────────────── + + /// Async write encrypted packet (tokio::io::AsyncWriteExt) + pub async fn write_async(&self, stream: &mut W) -> Result<()> { + if self.payload.len() > 4 && self.payload[0..4] == self.packet_length.to_be_bytes() { + stream.write_all(&self.payload).await?; + } else { + stream.write_all(&self.payload).await?; + stream.write_all(&self.mac).await?; + } + Ok(()) + } + + /// Async read encrypted packet (tokio::io::AsyncReadExt) + pub async fn read_async( + stream: &mut R, + encryption_ctx: &mut EncryptionContext, + is_client_to_server: bool, + ) -> Result { + if encryption_ctx.cipher_mode == CipherMode::AesGcm { + let mut packet_length_bytes = [0u8; 4]; + stream.read_exact(&mut packet_length_bytes).await?; + let packet_length = u32::from_be_bytes(packet_length_bytes); + if packet_length > 35000 { + return Err(anyhow!("Invalid packet_length: {}", packet_length)); + } + let ciphertext_length = packet_length as usize + 16; + let mut ciphertext = vec![0u8; ciphertext_length]; + stream.read_exact(&mut ciphertext).await?; + + let sequence_number = if is_client_to_server { + encryption_ctx.sequence_number_ctos + } else { + encryption_ctx.sequence_number_stoc + }; + let iv_bytes = if is_client_to_server { + &encryption_ctx.iv_ctos + } else { + &encryption_ctx.iv_stoc + }; + let mut nonce_bytes = [0u8; 12]; + nonce_bytes.copy_from_slice(&iv_bytes[..12]); + let mut carry = sequence_number; + for i in (8..12).rev() { + let sum = nonce_bytes[i] as u16 + (carry & 0xFF) as u16; + nonce_bytes[i] = (sum & 0xFF) as u8; + carry = (carry >> 8) + ((sum >> 8) as u32); + } + if carry > 0 { + for i in (4..8).rev() { + let sum = nonce_bytes[i] as u16 + (carry & 0xFF) as u16; + nonce_bytes[i] = (sum & 0xFF) as u8; + carry = (carry >> 8) + ((sum >> 8) as u32); + if carry == 0 { break; } + } + } + let key_bytes = if is_client_to_server { + &encryption_ctx.encryption_key_ctos + } else { + &encryption_ctx.encryption_key_stoc + }; + let cipher = Aes256GcmAead::new_from_slice(&key_bytes[..32]) + .map_err(|e| anyhow!("AES-GCM key init failed: {}", e))?; + let nonce = Nonce::from_slice(&nonce_bytes); + let plaintext_payload_buffer = cipher.decrypt(nonce, Payload { + msg: ciphertext.as_slice(), + aad: &packet_length_bytes, + }).map_err(|e| anyhow!("AES-GCM decrypt failed: {}", e))?; + + let padding_length = plaintext_payload_buffer[0]; + let payload_len = packet_length as usize - padding_length as usize - 1; + let compressed_payload = plaintext_payload_buffer[1..1 + payload_len].to_vec(); + let payload = if is_client_to_server { + if encryption_ctx.compression_ctos.is_enabled() { + encryption_ctx.compression_ctos.decompress(&compressed_payload)? + } else { compressed_payload } + } else { compressed_payload }; + let mac = ciphertext[ciphertext.len() - 16..].to_vec(); + if is_client_to_server { + encryption_ctx.sequence_number_ctos += 1; + } else { + encryption_ctx.sequence_number_stoc += 1; + } + return Ok(Self { packet_length, padding_length, payload, padding: Vec::new(), mac }); + } else if encryption_ctx.cipher_mode == CipherMode::ChaChaPoly { + let mut packet_length_bytes = [0u8; 4]; + stream.read_exact(&mut packet_length_bytes).await?; + let packet_length = u32::from_be_bytes(packet_length_bytes); + if packet_length > 35000 { + return Err(anyhow!("Invalid packet_length: {}", packet_length)); + } + let ciphertext_length = packet_length as usize + 16; + let mut ciphertext = vec![0u8; ciphertext_length]; + stream.read_exact(&mut ciphertext).await?; + + let sequence_number = if is_client_to_server { + encryption_ctx.sequence_number_ctos + } else { + encryption_ctx.sequence_number_stoc + }; + let iv_bytes = if is_client_to_server { + &encryption_ctx.iv_ctos + } else { + &encryption_ctx.iv_stoc + }; + let nonce_bytes: [u8; 12] = { + let mut n = [0u8; 12]; + n[0..4].copy_from_slice(&sequence_number.to_be_bytes()); + n[4..12].copy_from_slice(&iv_bytes[..8]); + n + }; + let key_bytes = if is_client_to_server { + &encryption_ctx.encryption_key_ctos + } else { + &encryption_ctx.encryption_key_stoc + }; + let cipher_cha = ChaCha20Poly1305::new(ChaKey::from_slice(&key_bytes[..32])); + let nonce = ChaNonce::from_slice(&nonce_bytes); + let plaintext_payload_buffer = cipher_cha.decrypt(nonce, ChaPayload { + msg: ciphertext.as_slice(), + aad: &packet_length_bytes, + }).map_err(|e| anyhow!("ChaCha20Poly1305 decrypt failed: {}", e))?; + + let padding_length = plaintext_payload_buffer[0]; + let payload_len = packet_length as usize - padding_length as usize - 1; + let payload = plaintext_payload_buffer[1..1 + payload_len].to_vec(); + let mac = ciphertext[ciphertext.len() - 16..].to_vec(); + if is_client_to_server { + encryption_ctx.sequence_number_ctos += 1; + } else { + encryption_ctx.sequence_number_stoc += 1; + } + return Ok(Self { packet_length, padding_length, payload, padding: Vec::new(), mac }); + } else { + let mut first_block_encrypted = [0u8; 16]; + stream.read_exact(&mut first_block_encrypted).await?; + let cipher = if is_client_to_server { + encryption_ctx.cipher_ctos.as_mut() + .ok_or_else(|| anyhow!("cipher_ctos not initialized"))? + } else { + encryption_ctx.cipher_stoc.as_mut() + .ok_or_else(|| anyhow!("cipher_stoc not initialized"))? + }; + let mut first_block_decrypted = first_block_encrypted; + cipher.apply_keystream(&mut first_block_decrypted); + let packet_length = u32::from_be_bytes([first_block_decrypted[0], first_block_decrypted[1], first_block_decrypted[2], first_block_decrypted[3]]); + let padding_length = first_block_decrypted[4]; + if packet_length > 35000 { + return Err(anyhow!("Invalid packet_length: {}", packet_length)); + } + let total_encrypted_size = packet_length as usize + 4; + let remaining_size = total_encrypted_size.saturating_sub(16); + let mut remaining_encrypted = vec![0u8; remaining_size]; + if remaining_size > 0 { + stream.read_exact(&mut remaining_encrypted).await?; + } + cipher.apply_keystream(&mut remaining_encrypted); + let payload_len = packet_length as usize - padding_length as usize - 1; + let part1_len = std::cmp::min(payload_len, 11); + let part1 = &first_block_decrypted[5..5 + part1_len]; + let part2 = &remaining_encrypted[..payload_len.saturating_sub(part1_len)]; + let mut payload = Vec::with_capacity(payload_len); + payload.extend_from_slice(part1); + payload.extend_from_slice(part2); + let payload = if is_client_to_server { + if encryption_ctx.compression_ctos.is_enabled() { + encryption_ctx.compression_ctos.decompress(&payload)? + } else { payload } + } else { payload }; + let padding = remaining_encrypted[payload_len.saturating_sub(part1_len)..].to_vec(); + let mut mac = vec![0u8; 32]; + stream.read_exact(&mut mac).await?; + if is_client_to_server { + encryption_ctx.sequence_number_ctos += 1; + } else { + encryption_ctx.sequence_number_stoc += 1; + } + return Ok(Self { packet_length, padding_length, payload, padding, mac }); + } + } } #[cfg(test)] diff --git a/markbase-core/src/ssh_server/packet.rs b/markbase-core/src/ssh_server/packet.rs index 9ea636e..f6437db 100644 --- a/markbase-core/src/ssh_server/packet.rs +++ b/markbase-core/src/ssh_server/packet.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, Result}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{Read, Write}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; /// SSH Packet类型(参考OpenSSH SSH_MSG_*定义) #[derive(Debug, Clone, Copy, PartialEq)] @@ -160,6 +161,39 @@ impl SshPacket { }) } + /// Async write (tokio) + pub async fn write_async(&self, stream: &mut W) -> Result<()> { + stream.write_all(&self.packet_length.to_be_bytes()).await?; + stream.write_all(&[self.padding_length]).await?; + stream.write_all(&self.payload).await?; + stream.write_all(&self.padding).await?; + stream.flush().await?; + Ok(()) + } + + /// Async read (tokio) + pub async fn read_async(stream: &mut R) -> Result { + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf).await?; + let packet_length = u32::from_be_bytes(len_buf); + if packet_length > 256 * 1024 { + return Err(anyhow!("Packet too large: {}", packet_length)); + } + let mut pad_buf = [0u8; 1]; + stream.read_exact(&mut pad_buf).await?; + let padding_length = pad_buf[0]; + let payload_len = packet_length.saturating_sub(padding_length as u32 + 1); + let mut payload = vec![0u8; payload_len as usize]; + if !payload.is_empty() { + stream.read_exact(&mut payload).await?; + } + let mut padding = vec![0u8; padding_length as usize]; + if !padding.is_empty() { + stream.read_exact(&mut padding).await?; + } + Ok(Self { packet_length, padding_length, payload, padding }) + } + /// 获取payload中的packet type pub fn get_type(&self) -> Result { if self.payload.is_empty() { diff --git a/markbase-core/src/ssh_server/server.rs b/markbase-core/src/ssh_server/server.rs index 1dfa741..8e9e973 100644 --- a/markbase-core/src/ssh_server/server.rs +++ b/markbase-core/src/ssh_server/server.rs @@ -17,10 +17,10 @@ use crate::ssh_server::version::VersionExchange; use anyhow::{anyhow, Result}; use log::{error, info, warn}; use std::io::{Read, Write}; -use std::net::{TcpListener, TcpStream}; +use std::net::TcpStream; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::thread; +use tokio::net::TcpListener; pub struct SshServerConfig { pub port: u16, @@ -71,11 +71,11 @@ impl SshServer { } } - pub fn run(&self) -> Result<()> { + pub async fn run(&self) -> Result<()> { let bind_addr = format!("{}:{}", self.config.bind_address, self.config.port); - let listener = TcpListener::bind(&bind_addr)?; + let listener = TcpListener::bind(&bind_addr).await?; - info!("MarkBaseSSH server listening on {}", bind_addr); + info!("MarkBaseSSH server listening on {} (async tokio)", bind_addr); info!("Implementation: Complete SSH/SFTP + Port Forwarding (Phase 1-13)"); info!( "Security config: GatewayPorts={}, PermitOpen={:?}, MaxSessions={}", @@ -88,23 +88,30 @@ impl SshServer { let pg_conn = self.config.pg_conn.clone(); let upload_hook_config = self.config.upload_hook_config.clone(); - for stream in listener.incoming() { - match stream { - Ok(stream) => { - let client_addr = stream.peer_addr()?; - info!("New SSH connection from {}", client_addr); + loop { + match listener.accept().await { + Ok((stream, addr)) => { + info!("New SSH connection from {}", addr); let security_config_clone = security_config.clone(); let pg_conn_clone = pg_conn.clone(); let upload_hook_config_clone = upload_hook_config.clone(); - thread::spawn(move || { - if let Err(e) = handle_connection_complete( - stream, - security_config_clone, - pg_conn_clone, - upload_hook_config_clone, - ) + // ⭐⭐⭐⭐⭐ Convert tokio TcpStream to std TcpStream for blocking handler + // Set blocking explicitly since into_std() may preserve non-blocking mode + let std_stream = stream.into_std()?; + std_stream.set_nonblocking(false)?; + + tokio::spawn(async move { + // Run the existing sync connection handler in a blocking thread + if let Err(e) = tokio::task::spawn_blocking(move || { + handle_connection_complete( + std_stream, + security_config_clone, + pg_conn_clone, + upload_hook_config_clone, + ) + }).await.unwrap_or(Err(anyhow!("Task join error"))) { error!("SSH connection error: {}", e); } @@ -115,8 +122,6 @@ impl SshServer { } } } - - Ok(()) } } @@ -787,7 +792,7 @@ fn extract_username_from_auth_request( } /// SSH服务器CLI入口 -pub fn run_ssh_server(port: Option, pg_conn: Option<&str>) -> Result<()> { +pub async fn run_ssh_server(port: Option, pg_conn: Option<&str>) -> Result<()> { let config = SshServerConfig { port: port.unwrap_or(2024), bind_address: "0.0.0.0".to_string(), // ⭐⭐⭐⭐⭐ Phase 8.3: Allow Docker container access @@ -797,5 +802,5 @@ pub fn run_ssh_server(port: Option, pg_conn: Option<&str>) -> Result<()> { }; let server = SshServer::new(config); - server.run() + server.run().await } diff --git a/markbase-core/src/upload.html b/markbase-core/src/upload.html index 9ee1fbe..86f2781 100644 --- a/markbase-core/src/upload.html +++ b/markbase-core/src/upload.html @@ -2,268 +2,145 @@ + File Upload -
-

📁 File Upload Service

- -
-
- - -
- -
- -
- - -
-
- -
- - -

- Upload entire folder with subdirectories -

-
- - - - +
+

Upload

+

Upload files to user storage directory

+ +
+
+ + +
+ +
+ +
+ + +
+
+ +
+
+
+
- +
+ +
+ + + +
+
+
+
+
+
+
+ +function toggleMode() { + const mode = document.querySelector('input[name="mode"]:checked').value; + document.getElementById('single-group').style.display = mode === 'file' ? 'block' : 'none'; + document.getElementById('folder-group').style.display = mode === 'folder' ? 'block' : 'none'; +} + +async function uploadFiles() { + const uid = document.getElementById('user_id').value.trim(); + if (!uid) return showError('Enter a user ID'); + + const mode = document.querySelector('input[name="mode"]:checked').value; + const files = mode === 'folder' + ? document.getElementById('folder').files + : document.getElementById('single_file').files; + + if (!files || files.length === 0) return showError('Select a file or folder'); + + const btn = document.getElementById('upload-btn'); + btn.disabled = true; + + const progress = document.getElementById('progress'); + const fill = document.getElementById('progress-fill'); + const ptext = document.getElementById('progress-text'); + const result = document.getElementById('result'); + progress.style.display = 'block'; + result.style.display = 'none'; + + let uploaded = 0; + const total = files.length; + + for (let i = 0; i < total; i++) { + const f = files[i]; + const fd = new FormData(); + fd.append('file', f); + ptext.textContent = `Uploading ${f.name} (${i+1}/${total})`; + + try { + const res = await fetch(`/api/v2/upload-unlimited/${uid}`, { method: 'POST', body: fd }); + if (!res.ok) { showError(`${f.name}: HTTP ${res.status}`); btn.disabled = false; return; } + const data = await res.json(); + if (!data.ok) { showError(`${f.name}: ${data.error || 'unknown'}`); btn.disabled = false; return; } + uploaded++; + const pct = Math.round(uploaded / total * 100); + fill.style.width = pct + '%'; + ptext.textContent = `${pct}% (${uploaded}/${total})`; + } catch(e) { + showError(`${f.name}: ${e.message}`); + btn.disabled = false; + return; + } + } + + showSuccess(`Uploaded ${uploaded} file${uploaded > 1 ? 's' : ''}`); + btn.disabled = false; +} + +function showSuccess(m) { showResult(m, 'success'); } +function showError(m) { showResult(m, 'error'); } +function showResult(m, t) { + const r = document.getElementById('result'); + r.className = 'result ' + t; + r.textContent = m; + r.style.display = 'block'; +} + \ No newline at end of file