use axum::{extract::State, http::StatusCode, response::Json, routing::get, Router}; use once_cell::sync::OnceCell; use serde::Serialize; use std::time::Instant; use super::types::AppState; use crate::core::cache::MongoCache; use crate::core::config; use crate::core::db::{Database, PostgresDb, RedisClient}; use crate::worker::resources::SystemResources; // Global State static SERVER_START: OnceCell = OnceCell::new(); static SERVER_HOST: OnceCell = OnceCell::new(); static SERVER_PORT: OnceCell = OnceCell::new(); pub fn init_server_state(host: &str, port: u16) { let _ = SERVER_START.set(Instant::now()); let resolved_ip = if host == "0.0.0.0" { if let Ok(addrs) = std::net::ToSocketAddrs::to_socket_addrs(&"localhost:0") { addrs .filter_map(|a| if a.is_ipv4() { Some(a.ip()) } else { None }) .next() .map(|ip| ip.to_string()) .unwrap_or_else(|| "127.0.0.1".to_string()) } else { "127.0.0.1".to_string() } } else { host.to_string() }; let _ = SERVER_HOST.set(resolved_ip); let _ = SERVER_PORT.set(port); } pub fn get_host() -> String { SERVER_HOST .get() .cloned() .unwrap_or_else(|| "0.0.0.0".to_string()) } pub fn get_port() -> u16 { SERVER_PORT.get().copied().unwrap_or(0) } pub fn get_uptime_ms() -> u64 { SERVER_START .get() .map(|i| i.elapsed().as_millis() as u64) .unwrap_or(0) } #[derive(Debug, Serialize)] struct HealthResponse { ip: String, port: u16, status: String, version: String, build_git_hash: String, build_timestamp: String, uptime_ms: u64, watcher_running: bool, worker_running: bool, auto_pipeline_enabled: bool, watcher_auto_register_enabled: bool, system_timezone: String, } #[derive(Debug, Serialize)] struct DetailedHealthResponse { ip: String, port: u16, status: String, version: String, build_git_hash: String, build_timestamp: String, uptime_ms: u64, services: ServiceHealth, resources: ResourceStatus, pipeline: PipelineStatus, schema: SchemaHealth, identities: IdentityHealth, integrations: IntegrationHealth, config: ConfigHealth, } #[derive(Debug, Serialize)] struct IntegrationHealth { tmdb: crate::core::tmdb::status::TmdbResourceStatus, } #[derive(Debug, Serialize)] struct IdentityHealth { directory_exists: bool, files_count: usize, index_ok: bool, db_count: i64, synced: bool, } #[derive(Debug, Serialize)] struct ConfigHealth { cache_enabled: bool, auto_pipeline_enabled: bool, watcher_auto_register_enabled: bool, system_timezone: String, } #[derive(Debug, Serialize)] pub struct SchemaHealth { pub table_exists: bool, pub applied: Vec, pub required: Vec, pub ok: bool, } #[derive(Debug, Serialize)] pub struct MigrationInfo { pub filename: String, pub checksum: String, } #[derive(Debug, Serialize)] struct PipelineStatus { scripts_ready: bool, scripts_count: usize, processors: ProcessorInventory, models_ready: bool, models_count: usize, scripts_integrity: ScriptIntegrity, ffmpeg: bool, embedding_server: ServiceStatus, gdino_api: ServiceStatus, llm: ServiceStatus, rsync: ServiceStatus, watcher_running: bool, worker_running: bool, } #[derive(Debug, Serialize)] struct ScriptIntegrity { matched: usize, total: usize, ok: bool, } #[derive(Debug, Serialize)] struct ProcessorInventory { asr: bool, yolo: bool, face: bool, pose: bool, ocr: bool, cut: bool, caption: bool, scene: bool, story: bool, asrx: bool, probe: bool, visual_chunk: bool, total_py_files: usize, } #[derive(Debug, Serialize)] struct ResourceStatus { cpu_used_percent: f64, cpu_idle_percent: f64, memory_available_mb: u64, memory_total_mb: u64, memory_used_percent: f64, gpu_available: bool, gpu_utilization: Option, gpu_memory_used_pct: Option, } #[derive(Debug, Serialize)] struct ServiceHealth { postgres: ServiceStatus, redis: ServiceStatus, qdrant: ServiceStatus, mongodb: ServiceStatus, } #[derive(Debug, Serialize)] struct ServiceStatus { status: String, latency_ms: Option, error: Option, } async fn health(State(state): State) -> Json { let postgres = check_postgres().await; let redis = check_redis().await; let qdrant = check_qdrant().await; let mongodb = check_mongodb(&state.mongo_cache).await; let all_ok = postgres.status == "ok" && redis.status == "ok" && qdrant.status == "ok" && mongodb.status == "ok"; let status = if all_ok { "ok" } else { "degraded" }; if all_ok { let _ = state.redis_cache.set_health(status).await; } Json(HealthResponse { ip: get_host(), port: get_port(), status: status.to_string(), version: env!("BUILD_VERSION").to_string(), build_git_hash: env!("BUILD_GIT_HASH").to_string(), build_timestamp: env!("BUILD_TIMESTAMP").to_string(), uptime_ms: get_uptime_ms(), watcher_running: check_process_running("watcher"), worker_running: check_process_running("worker"), auto_pipeline_enabled: config::get_auto_pipeline_enabled(), watcher_auto_register_enabled: config::get_watcher_auto_register(), system_timezone: config::SYSTEM_TIMEZONE.clone(), }) } async fn health_detailed(State(state): State) -> Json { let postgres = check_postgres().await; let redis = check_redis().await; let qdrant = check_qdrant().await; let mongodb = check_mongodb(&state.mongo_cache).await; let overall_status = if postgres.status == "ok" && redis.status == "ok" && qdrant.status == "ok" && mongodb.status == "ok" { "ok" } else { "degraded" }; let sys = SystemResources::check(); let scripts_base = config::SCRIPTS_DIR.clone(); let scripts_dir = std::path::Path::new(&scripts_base); let scripts_path = scripts_dir.to_path_buf(); let models_path = std::path::PathBuf::from("/Users/accusys/momentry_core_0.1/models"); let py_files = std::fs::read_dir(&scripts_path) .map(|d| { d.filter_map(|e| e.ok()) .filter(|e| e.path().extension().map(|x| x == "py").unwrap_or(false)) .count() }) .unwrap_or(0); let total_model_files = std::fs::read_dir(&models_path) .map(|d| { d.filter_map(|e| e.ok()) .filter(|e| { let p = e.path(); let ext = p.extension().and_then(|x| x.to_str()).unwrap_or(""); matches!(ext, "pt" | "mlpackage" | "gguf" | "bin" | "onnx") }) .count() }) .unwrap_or(0); let check_script = |name: &str| -> bool { let candidate = scripts_path.join(name); candidate.exists() }; let check_python_module = |module: &str| -> bool { std::process::Command::new(&*config::PYTHON_PATH) .arg("-c") .arg(format!("import {}", module)) .output() .map(|o| o.status.success()) .unwrap_or(false) }; let checksums_path = scripts_path.join("checksums.sha256"); let scripts_integrity = match std::fs::read_to_string(&checksums_path) { Ok(content) => { let mut matched = 0usize; let mut total = 0usize; for line in content.lines() { let line = line.trim(); if line.is_empty() { continue; } let parts: Vec<&str> = line.splitn(2, ' ').collect(); if parts.len() < 2 { continue; } let expected_hash = parts[0]; let file_path = parts[1].trim_start(); total += 1; let full_path = scripts_path.join(file_path); if full_path.exists() { if let Ok(actual) = std::process::Command::new("shasum") .arg("-a") .arg("256") .arg(&full_path) .output() { let out = String::from_utf8_lossy(&actual.stdout); let actual_hash = out.split(' ').next().unwrap_or("").to_string(); if actual_hash == expected_hash { matched += 1; } } } } ScriptIntegrity { matched, total, ok: matched == total, } } Err(_) => ScriptIntegrity { matched: 0, total: 0, ok: false, }, }; Json(DetailedHealthResponse { ip: get_host(), port: get_port(), status: overall_status.to_string(), version: env!("BUILD_VERSION").to_string(), build_git_hash: env!("BUILD_GIT_HASH").to_string(), build_timestamp: env!("BUILD_TIMESTAMP").to_string(), uptime_ms: get_uptime_ms(), services: ServiceHealth { postgres, redis, qdrant, mongodb, }, resources: ResourceStatus { cpu_used_percent: sys.cpu_used_percent, cpu_idle_percent: sys.cpu_idle_percent, memory_available_mb: sys.memory_available_mb, memory_total_mb: sys.memory_total_mb, memory_used_percent: sys.memory_used_percent, gpu_available: sys.gpu_available, gpu_utilization: sys.gpu_utilization, gpu_memory_used_pct: sys.gpu_memory_used_pct, }, pipeline: PipelineStatus { scripts_ready: scripts_path.is_dir(), scripts_count: py_files, scripts_integrity, processors: ProcessorInventory { asr: check_script("asr_processor.py"), yolo: check_script("yolo_processor.py"), face: check_script("face_processor.py"), pose: check_script("pose_processor.py"), ocr: check_script("ocr_processor.py"), cut: check_script("cut_processor.py"), caption: check_script("caption_processor.py"), scene: check_script("scene_classifier.py"), story: check_script("story_processor.py"), asrx: check_script("asrx_processor.py"), probe: check_script("probe_file.py"), visual_chunk: check_script("visual_chunk_processor.py"), total_py_files: py_files, }, models_ready: models_path.is_dir(), models_count: total_model_files, ffmpeg: std::process::Command::new("which") .arg("ffmpeg") .output() .map(|o| o.status.success()) .unwrap_or(false), embedding_server: check_http(&format!("{}/health", config::EMBED_URL.as_str())).await, gdino_api: check_http("http://127.0.0.1:8080/health").await, llm: check_http(config::LLM_HEALTH_URL.as_str()).await, rsync: check_rsync().await, watcher_running: check_process_running("watcher"), worker_running: check_process_running("worker"), }, schema: check_schema_migrations(state.db.pool()).await, identities: { let identities_root = std::path::Path::new(&*config::OUTPUT_DIR).join("identities"); let directory_exists = identities_root.is_dir(); let files_count = crate::core::identity::storage::count_identity_files(); let index_ok = crate::core::identity::storage::read_index().is_ok(); let db_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM identities") .fetch_one(state.db.pool()) .await .unwrap_or(0); IdentityHealth { directory_exists, files_count, index_ok, db_count, synced: directory_exists && files_count as i64 == db_count, } }, integrations: IntegrationHealth { tmdb: crate::core::tmdb::status::quick_status(), }, config: ConfigHealth { cache_enabled: config::get_cache_enabled(), auto_pipeline_enabled: config::get_auto_pipeline_enabled(), watcher_auto_register_enabled: config::get_watcher_auto_register(), system_timezone: config::SYSTEM_TIMEZONE.clone(), }, }) } async fn health_consistency( State(state): State, ) -> Result, (StatusCode, String)> { let report = crate::core::health_agent::run_consistency_checks(&state.db).await; if report.checks.iter().any(|c| c.count > 0) { tracing::warn!( "[HEALTH] Consistency issues found: {}", report .checks .iter() .filter(|c| c.count > 0) .map(|c| format!("{}={}", c.check, c.count)) .collect::>() .join(", ") ); } Ok(Json(report)) } async fn check_postgres() -> ServiceStatus { let start = Instant::now(); match PostgresDb::init().await { Ok(db) => match db.list_videos(1, 0).await { Ok(_) => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(e.to_string()), }, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, } } async fn check_redis() -> ServiceStatus { let start = Instant::now(); match RedisClient::new() { Ok(redis) => match redis.get_conn().await { Ok(mut conn) => { let result: Result = redis::cmd("PING").query_async(&mut conn).await; match result { Ok(_) => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(e.to_string()), }, } } Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, } } async fn check_qdrant() -> ServiceStatus { let start = Instant::now(); let base_url = std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string()); let api_key = std::env::var("QDRANT_API_KEY").unwrap_or_else(|_| "Test3200Test3200Test3200".to_string()); let url = format!("{}/collections", base_url); let client = reqwest::Client::new(); match client .get(&url) .header("api-key", api_key) .timeout(std::time::Duration::from_secs(5)) .send() .await { Ok(resp) if resp.status().is_success() => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Ok(resp) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(format!("HTTP {}", resp.status())), }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: None, error: Some(e.to_string()), }, } } async fn check_mongodb(cache: &MongoCache) -> ServiceStatus { let start = Instant::now(); match cache.health_check().await { Ok(_) => ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }, Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(e.to_string()), }, } } fn parse_required_migrations() -> Vec { let raw = env!("REQUIRED_MIGRATIONS"); if raw.is_empty() { return vec![]; } raw.split(',') .filter_map(|entry| { let mut parts = entry.splitn(2, ':'); let filename = parts.next()?.trim().to_string(); let checksum = parts.next()?.trim().to_string(); if filename.is_empty() || checksum.is_empty() { return None; } Some(MigrationInfo { filename, checksum }) }) .collect() } pub async fn check_schema_migrations(pool: &sqlx::PgPool) -> SchemaHealth { let required = parse_required_migrations(); let table_exists: bool = sqlx::query_scalar( "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'schema_migrations')", ) .fetch_one(pool) .await .unwrap_or(false); if !table_exists { return SchemaHealth { table_exists: false, applied: vec![], required, ok: false, }; } let applied: Vec = sqlx::query_as::<_, (String, String)>( "SELECT filename, checksum FROM schema_migrations ORDER BY id", ) .fetch_all(pool) .await .unwrap_or_default() .into_iter() .map(|(filename, checksum)| MigrationInfo { filename, checksum }) .collect(); let ok = required.iter().all(|req| { applied .iter() .any(|app| app.filename == req.filename && app.checksum == req.checksum) }); SchemaHealth { table_exists: true, applied, required, ok, } } async fn check_rsync() -> ServiceStatus { let start = Instant::now(); let paths = [ std::path::Path::new("/Users/accusys/bin/rsync"), std::path::Path::new("/opt/homebrew/bin/rsync"), ]; for p in &paths { if p.exists() { return ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, }; } } ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some("rsync not found (built from source expected at ~/bin/rsync)".to_string()), } } fn check_process_running(name: &str) -> bool { let patterns: &[&str] = match name { "watcher" => &[ "target/release/momentry watcher", "target/debug/momentry_playground watcher", ], "worker" => &[ "target/release/momentry worker", "target/debug/momentry_playground worker", ], _ => return false, }; for pattern in patterns { if let Ok(o) = std::process::Command::new("pgrep") .arg("-f") .arg(pattern) .output() { if o.status.success() { return true; } } } false } async fn check_http(url: &str) -> ServiceStatus { let start = Instant::now(); match reqwest::get(url).await { Ok(resp) => { if resp.status().is_success() { ServiceStatus { status: "ok".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: None, } } else { ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(format!("HTTP {}", resp.status())), } } } Err(e) => ServiceStatus { status: "error".to_string(), latency_ms: Some(start.elapsed().as_millis() as u64), error: Some(e.to_string()), }, } } pub fn health_routes() -> Router { Router::new() .route("/health", get(health)) .route("/health/detailed", get(health_detailed)) .route("/health/consistency", get(health_consistency)) }