From 12a7b5923294a118290d1c54ce29135c12ab8ee9 Mon Sep 17 00:00:00 2001 From: accusys Date: Wed, 25 Mar 2026 02:50:31 +0800 Subject: [PATCH] feat: add job worker and duplicate registration check Job Worker System: - Add polling-based job worker (max 2 concurrent processors) - Create monitor_jobs records when videos are registered - Link videos.job_id to monitor_jobs - Fix type mismatches (i32 vs i64) for database IDs Duplicate Registration: - Check if video already exists before registering - Return existing video info with already_exists: true - Use canonical path for UUID computation USER_DATA_ROOT Configuration: - Add MOMENTRY_USER_DATA_ROOT environment variable - UUID computed from relative path (username/filename) - Ensures consistent UUIDs when data root changes --- src/api/server.rs | 990 ++++++++++++++-- src/core/config.rs | 8 + src/core/db/postgres_db.rs | 2214 +++++++++++++++++++++++++++++++++-- src/core/db/redis_client.rs | 8 +- src/core/storage/uuid.rs | 57 + src/worker/config.rs | 77 ++ src/worker/job_worker.rs | 184 +++ src/worker/mod.rs | 6 + src/worker/processor.rs | 354 ++++++ 9 files changed, 3669 insertions(+), 229 deletions(-) create mode 100644 src/worker/config.rs create mode 100644 src/worker/job_worker.rs create mode 100644 src/worker/mod.rs create mode 100644 src/worker/processor.rs diff --git a/src/api/server.rs b/src/api/server.rs index e62d138..49526bb 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -6,18 +6,60 @@ use axum::{ Router, }; use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; -use std::path::Path; -use std::sync::Arc; +use sha2::{Digest, Sha256}; +use std::time::Instant; -use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord}; +use crate::core::cache::{keys, MongoCache, RedisCache}; +use crate::core::config::USER_DATA_ROOT; +use crate::core::db::{Database, PostgresDb, QdrantDb, RedisClient, VideoRecord, VideoStatus}; use crate::{Embedder, FileManager}; +#[derive(Debug, Serialize)] +struct HealthResponse { + status: String, + version: String, + uptime_ms: u64, +} + +#[derive(Debug, Serialize)] +struct DetailedHealthResponse { + status: String, + version: String, + uptime_ms: u64, + services: ServiceHealth, +} + +#[derive(Debug, Serialize)] +struct ServiceHealth { + postgres: ServiceStatus, + redis: ServiceStatus, + qdrant: ServiceStatus, + mongodb: ServiceStatus, +} + +#[derive(Debug, Serialize)] +struct ServiceStatus { + status: String, + latency_ms: Option, + error: Option, +} + +static SERVER_START: std::sync::OnceLock = std::sync::OnceLock::new(); + +fn get_uptime_ms() -> u64 { + SERVER_START + .get() + .map(|t| t.elapsed().as_millis() as u64) + .unwrap_or(0) +} + #[derive(Clone)] struct AppState { - embedder: Arc, + embedder: std::sync::Arc, #[allow(dead_code)] embedder_model: String, + mongo_cache: MongoCache, + redis_cache: RedisCache, } #[derive(Debug, Deserialize)] @@ -29,10 +71,53 @@ struct RegisterRequest { struct RegisterResponse { uuid: String, video_id: i64, + job_id: i32, file_name: String, duration: f64, width: u32, height: u32, + already_exists: bool, +} + +#[derive(Debug, Serialize)] +struct JobListResponse { + jobs: Vec, +} + +#[derive(Debug, Serialize)] +struct JobInfoResponse { + id: i32, + uuid: String, + status: String, + current_processor: Option, + progress_current: i32, + progress_total: i32, + created_at: String, + started_at: Option, +} + +#[derive(Debug, Serialize)] +struct JobDetailResponse { + id: i32, + uuid: String, + status: String, + current_processor: Option, + progress_current: i32, + progress_total: i32, + processors: Vec, + created_at: String, + started_at: Option, + updated_at: Option, +} + +#[derive(Debug, Serialize)] +struct ProcessorInfoResponse { + processor_type: String, + status: String, + started_at: Option, + completed_at: Option, + duration_secs: Option, + error_message: Option, } #[derive(Debug, Deserialize)] @@ -42,7 +127,7 @@ struct SearchRequest { uuid: Option, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] struct SearchResult { uuid: String, chunk_id: String, @@ -53,12 +138,60 @@ struct SearchResult { score: f32, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] struct SearchResponse { results: Vec, query: String, } +#[derive(Debug, Serialize, Deserialize)] +struct N8nSearchHit { + id: String, + vid: String, + start: f64, + end: f64, + title: String, + text: String, + score: f32, + #[serde(skip_serializing_if = "Option::is_none")] + media_url: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct N8nSearchResponse { + query: String, + count: usize, + hits: Vec, +} + +#[derive(Debug, Deserialize)] +struct HybridSearchRequest { + query: String, + limit: Option, + uuid: Option, + vector_weight: Option, + bm25_weight: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct HybridSearchResult { + uuid: String, + chunk_id: String, + chunk_type: String, + start_time: f64, + end_time: f64, + text: String, + vector_score: f64, + bm25_score: f64, + combined_score: f64, +} + +#[derive(Debug, Serialize, Deserialize)] +struct HybridSearchResponse { + results: Vec, + query: String, +} + #[derive(Debug, Deserialize)] struct LookupQuery { path: Option, @@ -73,7 +206,7 @@ struct LookupResponse { duration: Option, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] struct VideoInfoResponse { uuid: String, file_path: String, @@ -83,21 +216,258 @@ struct VideoInfoResponse { height: u32, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] struct VideosResponse { videos: Vec, } +#[derive(Debug, Deserialize)] +struct VideosQuery { + page: Option, + limit: Option, +} + +async fn health(State(state): State) -> Json { + if let Ok(Some(status)) = state.redis_cache.get_health().await { + return Json(HealthResponse { + status, + version: env!("CARGO_PKG_VERSION").to_string(), + uptime_ms: get_uptime_ms(), + }); + } + + let status = "ok".to_string(); + let _ = state.redis_cache.set_health(&status).await; + + Json(HealthResponse { + status, + version: env!("CARGO_PKG_VERSION").to_string(), + uptime_ms: get_uptime_ms(), + }) +} + +async fn health_detailed(State(state): State) -> Json { + let postgres = check_postgres().await; + let redis = check_redis().await; + let qdrant = check_qdrant().await; + let mongodb = check_mongodb(&state.mongo_cache).await; + + let overall_status = if postgres.status == "ok" && redis.status == "ok" && qdrant.status == "ok" + { + "ok" + } else { + "degraded" + }; + + Json(DetailedHealthResponse { + status: overall_status.to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + uptime_ms: get_uptime_ms(), + services: ServiceHealth { + postgres, + redis, + qdrant, + mongodb, + }, + }) +} + +async fn check_postgres() -> ServiceStatus { + let start = Instant::now(); + match PostgresDb::init().await { + Ok(db) => match db.list_videos().await { + Ok(_) => ServiceStatus { + status: "ok".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: None, + }, + Err(e) => ServiceStatus { + status: "error".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: Some(e.to_string()), + }, + }, + Err(e) => ServiceStatus { + status: "error".to_string(), + latency_ms: None, + error: Some(e.to_string()), + }, + } +} + +async fn check_redis() -> ServiceStatus { + let start = Instant::now(); + match RedisClient::new() { + Ok(redis) => match redis.get_conn().await { + Ok(mut conn) => { + let result: Result = redis::cmd("PING").query_async(&mut conn).await; + match result { + Ok(_) => ServiceStatus { + status: "ok".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: None, + }, + Err(e) => ServiceStatus { + status: "error".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: Some(e.to_string()), + }, + } + } + Err(e) => ServiceStatus { + status: "error".to_string(), + latency_ms: None, + error: Some(e.to_string()), + }, + }, + Err(e) => ServiceStatus { + status: "error".to_string(), + latency_ms: None, + error: Some(e.to_string()), + }, + } +} + +async fn check_qdrant() -> ServiceStatus { + let start = Instant::now(); + let base_url = + std::env::var("QDRANT_URL").unwrap_or_else(|_| "http://localhost:6333".to_string()); + let api_key = + std::env::var("QDRANT_API_KEY").unwrap_or_else(|_| "Test3200Test3200Test3200".to_string()); + let url = format!("{}/collections", base_url); + + let client = reqwest::Client::new(); + match client + .get(&url) + .header("api-key", api_key) + .timeout(std::time::Duration::from_secs(5)) + .send() + .await + { + Ok(resp) if resp.status().is_success() => ServiceStatus { + status: "ok".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: None, + }, + Ok(resp) => ServiceStatus { + status: "error".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: Some(format!("HTTP {}", resp.status())), + }, + Err(e) => ServiceStatus { + status: "error".to_string(), + latency_ms: None, + error: Some(e.to_string()), + }, + } +} + +async fn check_mongodb(cache: &MongoCache) -> ServiceStatus { + let start = Instant::now(); + match cache.health_check().await { + Ok(_) => ServiceStatus { + status: "ok".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: None, + }, + Err(e) => ServiceStatus { + status: "error".to_string(), + latency_ms: Some(start.elapsed().as_millis() as u64), + error: Some(e.to_string()), + }, + } +} + +fn generate_query_hash(query: &str, uuid: Option<&str>, limit: usize) -> String { + let data = serde_json::json!({ + "query": query, + "uuid": uuid, + "limit": limit, + }); + let mut hasher = Sha256::new(); + hasher.update(data.to_string().as_bytes()); + format!("{:x}", hasher.finalize())[..16].to_string() +} + async fn register( - State(_state): State, + State(state): State, Json(req): Json, ) -> Result, StatusCode> { let path = req.path; - let uuid = crate::uuid::compute_uuid_from_path(&path); + // Canonicalize path first to ensure consistent UUID computation + let canonical_path = std::path::Path::new(&path) + .canonicalize() + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_else(|_| path.clone()); - let probe_result = - crate::core::probe::probe_video(&path).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + // Compute UUID using USER_DATA_ROOT to extract relative path + // This ensures consistent UUIDs even when data root changes + // Relative path format: username/video.mp4 (e.g., demo/video.mp4) + let user_data_root = USER_DATA_ROOT.as_str(); + let uuid = crate::core::storage::uuid::compute_uuid_from_path_with_root( + &canonical_path, + user_data_root, + ); + + // Extract relative path for display/logging (username/filename) + let (user_dir, filename) = + crate::core::storage::uuid::extract_relative_path(&canonical_path, user_data_root); + tracing::info!( + "Registering video: uuid={}, user={}, file={}, full_path={}", + uuid, + user_dir, + filename, + canonical_path + ); + + let db = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + // Check if video already exists + if let Some(existing_video) = db + .get_video_by_uuid(&uuid) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + { + tracing::info!( + "Video already registered: uuid={}, video_id={}", + uuid, + existing_video.id + ); + + // Get the job_id if exists (i64 -> i32) + let job_id: i32 = existing_video.job_id.map(|id| id as i32).unwrap_or(0); + + return Ok(Json(RegisterResponse { + uuid: existing_video.uuid, + video_id: existing_video.id, + job_id, + file_name: existing_video.file_name, + duration: existing_video.duration, + width: existing_video.width, + height: existing_video.height, + already_exists: true, + })); + } + + let probe_result = { + let probe_path = format!( + "{}/{}.probe.json", + crate::core::config::OUTPUT_DIR.as_str(), + uuid + ); + + if let Ok(content) = std::fs::read_to_string(&probe_path) { + tracing::info!("Using existing probe.json: {}", probe_path); + serde_json::from_str(&content).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + } else { + tracing::info!("Running ffprobe for: {}", canonical_path); + crate::core::probe::probe_video(&canonical_path) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + } + }; let duration = probe_result .format @@ -123,16 +493,7 @@ async fn register( .save_json(&uuid, "probe", &json_str) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let db = PostgresDb::init() - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let file_path = Path::new(&path) - .canonicalize() - .map(|p| p.to_string_lossy().to_string()) - .unwrap_or_else(|_| path.clone()); - - let file_name = Path::new(&path) + let file_name = std::path::Path::new(&canonical_path) .file_name() .map(|n| n.to_string_lossy().to_string()) .unwrap_or_default(); @@ -140,7 +501,7 @@ async fn register( let record = VideoRecord { id: 0, uuid: uuid.clone(), - file_path, + file_path: canonical_path.clone(), file_name: file_name.clone(), duration, width, @@ -148,6 +509,9 @@ async fn register( fps: 0.0, probe_json: Some(json_str), storage: Default::default(), + status: VideoStatus::Pending, + user_id: None, + job_id: None, created_at: String::new(), }; @@ -156,13 +520,36 @@ async fn register( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let job = db + .create_monitor_job(&uuid, Some(&canonical_path)) + .await + .map_err(|e| { + tracing::error!("Failed to create monitor job for {}: {}", uuid, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + db.update_video_job_id(video_id, job.id) + .await + .map_err(|e| { + tracing::error!( + "Failed to update video job_id for video {}: {}", + video_id, + e + ); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let _ = state.mongo_cache.invalidate_videos_list().await; + Ok(Json(RegisterResponse { uuid, video_id, + job_id: job.id, file_name, duration, width, height, + already_exists: false, })) } @@ -171,67 +558,215 @@ async fn search( Json(req): Json, ) -> Result, StatusCode> { let limit = req.limit.unwrap_or(10); + let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit); + let cache_key = keys::search(&query_hash); + let ttl = state.mongo_cache.ttl_search(); - let query_vector = state.embedder.embed_query(&req.query).await.map_err(|e| { - tracing::error!("Failed to embed query: {}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let response = state + .mongo_cache + .get_or_fetch(&cache_key, ttl, keys::CATEGORY_SEARCH, || async { + let query_vector = state + .embedder + .embed_query(&req.query) + .await + .map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?; - let qdrant: QdrantDb = QdrantDb::init() - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let pg: PostgresDb = PostgresDb::init() + let qdrant = QdrantDb::new(); + let pg = PostgresDb::init() + .await + .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; + + let search_results = if let Some(ref uuid) = req.uuid { + let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); + qdrant.search_in_uuid(&query_f64, uuid, limit).await? + } else { + qdrant.search(&query_vector, limit).await? + }; + + let mut results = Vec::new(); + for r in search_results { + if let Some(chunk) = pg.get_chunk_by_chunk_id(&r.chunk_id).await.ok().flatten() { + let text = chunk + .content + .get("text") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + results.push(SearchResult { + uuid: chunk.uuid.clone(), + chunk_id: chunk.chunk_id.clone(), + chunk_type: chunk.chunk_type.as_str().to_string(), + start_time: chunk.start_time, + end_time: chunk.end_time, + text, + score: r.score, + }); + } + } + + Ok::(SearchResponse { + results, + query: req.query.clone(), + }) + }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let search_results = if let Some(uuid) = &req.uuid { - let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); - qdrant - .search_in_uuid(&query_f64, uuid, limit) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - } else { - qdrant - .search(&query_vector, limit) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - }; - - let mut results = Vec::new(); - for r in search_results { - let chunks = pg.get_chunks_by_uuid(&r.chunk_id).await.unwrap_or_default(); - - for chunk in chunks { - let text = chunk - .content - .get("text") - .and_then(|v: &serde_json::Value| v.as_str()) - .unwrap_or("") - .to_string(); - - results.push(SearchResult { - uuid: chunk.uuid.clone(), - chunk_id: chunk.chunk_id.clone(), - chunk_type: chunk.chunk_type.as_str().to_string(), - start_time: chunk.start_time, - end_time: chunk.end_time, - text, - score: r.score, - }); - } - } - - Ok(Json(SearchResponse { - results, - query: req.query, - })) + Ok(Json(response)) } -async fn lookup(Query(query): Query) -> Result, StatusCode> { - let db: PostgresDb = PostgresDb::init() +async fn n8n_search( + State(state): State, + Json(req): Json, +) -> Result, StatusCode> { + let limit = req.limit.unwrap_or(10); + let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit); + let cache_key = keys::n8n_search(&query_hash); + let ttl = state.mongo_cache.ttl_search(); + + let response = state + .mongo_cache + .get_or_fetch(&cache_key, ttl, keys::CATEGORY_N8N_SEARCH, || async { + let query_vector = state + .embedder + .embed_query(&req.query) + .await + .map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?; + + let qdrant = QdrantDb::new(); + let pg = PostgresDb::init() + .await + .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; + + let search_results = if let Some(ref uuid) = req.uuid { + let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); + qdrant.search_in_uuid(&query_f64, uuid, limit).await? + } else { + qdrant.search(&query_vector, limit).await? + }; + + let media_base = crate::core::config::MEDIA_BASE_URL.as_str(); + let mut hits = Vec::new(); + + for r in search_results { + if let Some(chunk) = pg.get_chunk_by_chunk_id(&r.chunk_id).await.ok().flatten() { + let text = chunk + .content + .get("text") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let title = chunk + .content + .get("title") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let media_url = if chunk.uuid.is_empty() { + None + } else { + let video = pg.get_video_by_uuid(&chunk.uuid).await.ok().flatten(); + video.map(|v| format!("{}/{}", media_base, v.file_name)) + }; + + hits.push(N8nSearchHit { + id: chunk.chunk_id.clone(), + vid: chunk.uuid.clone(), + start: chunk.start_time, + end: chunk.end_time, + title: if title.is_empty() { + format!("Chunk {}", chunk.chunk_id) + } else { + title + }, + text, + score: r.score, + media_url, + }); + } + } + + Ok::(N8nSearchResponse { + query: req.query.clone(), + count: hits.len(), + hits, + }) + }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Json(response)) +} + +async fn hybrid_search( + State(state): State, + Json(req): Json, +) -> Result, StatusCode> { + let limit = req.limit.unwrap_or(10); + let vector_weight = req.vector_weight.unwrap_or(0.7); + let bm25_weight = req.bm25_weight.unwrap_or(0.3); + + let query_hash = generate_query_hash(&req.query, req.uuid.as_deref(), limit); + let cache_key = keys::hybrid_search(&query_hash); + let ttl = state.mongo_cache.ttl_hybrid_search(); + + let response = state + .mongo_cache + .get_or_fetch(&cache_key, ttl, keys::CATEGORY_HYBRID_SEARCH, || async { + let query_vector = state + .embedder + .embed_query(&req.query) + .await + .map_err(|e| anyhow::anyhow!("Embedding failed: {}", e))?; + + let pg = PostgresDb::init() + .await + .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; + + let results = pg + .hybrid_search( + &req.query, + &query_vector, + req.uuid.as_deref(), + limit, + vector_weight, + bm25_weight, + ) + .await?; + + let search_results: Vec = results + .into_iter() + .map(|r| HybridSearchResult { + uuid: r.uuid, + chunk_id: r.chunk_id, + chunk_type: r.chunk_type, + start_time: r.start_time, + end_time: r.end_time, + text: r.text, + vector_score: r.vector_score, + bm25_score: r.bm25_score, + combined_score: r.combined_score, + }) + .collect(); + + Ok::(HybridSearchResponse { + results: search_results, + query: req.query.clone(), + }) + }) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(Json(response)) +} + +async fn lookup( + State(state): State, + Query(query): Query, +) -> Result, StatusCode> { if let Some(path) = query.path { let uuid = crate::uuid::compute_uuid_from_path(&path); return Ok(Json(LookupResponse { @@ -243,8 +778,19 @@ async fn lookup(Query(query): Query) -> Result } if let Some(uuid) = query.uuid { - let video = db - .get_video_by_uuid(&uuid) + let cache_key = keys::video_meta(&uuid); + let ttl = state.mongo_cache.ttl_video_meta(); + + let video = state + .mongo_cache + .get_or_fetch(&cache_key, ttl, keys::CATEGORY_VIDEO_META, || async { + let db = PostgresDb::init() + .await + .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; + db.get_video_by_uuid(&uuid) + .await + .map_err(|e| anyhow::anyhow!("{}", e)) + }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -261,61 +807,58 @@ async fn lookup(Query(query): Query) -> Result Err(StatusCode::NOT_FOUND) } -pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { - let embedder = Arc::new(Embedder::new("nomic-embed-text:v1.5".to_string())); +async fn list_videos( + State(state): State, + Query(params): Query, +) -> Result, StatusCode> { + let page = params.page.unwrap_or(1); + let limit = params.limit.unwrap_or(20); + let cache_key = keys::videos_list(page, limit); + let ttl = state.mongo_cache.ttl_videos(); - let state = AppState { - embedder, - embedder_model: "nomic-embed-text:v1.5".to_string(), - }; + let video_infos = state + .mongo_cache + .get_or_fetch(&cache_key, ttl, keys::CATEGORY_VIDEOS, || async { + let db = PostgresDb::init() + .await + .map_err(|e| anyhow::anyhow!("PG init failed: {}", e))?; - let app = Router::new() - .route("/api/v1/register", post(register)) - .route("/api/v1/search", post(search)) - .route("/api/v1/lookup", get(lookup)) - .route("/api/v1/videos", get(list_videos)) - .route("/api/v1/progress/:uuid", get(get_progress)) - .with_state(state); + let videos = db.list_videos().await?; - let addr = SocketAddr::new(host.parse().unwrap(), port); - tracing::info!("Starting API server at http://{}", addr); + let video_infos: Vec = videos + .into_iter() + .map(|v| VideoInfoResponse { + uuid: v.uuid, + file_path: v.file_path, + file_name: v.file_name, + duration: v.duration, + width: v.width, + height: v.height, + }) + .collect(); - let listener = tokio::net::TcpListener::bind(addr).await?; - axum::serve(listener, app).await?; - - Ok(()) -} - -async fn list_videos() -> Result, StatusCode> { - let db: PostgresDb = PostgresDb::init() - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let videos = db - .list_videos() - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - let video_infos: Vec = videos - .into_iter() - .map(|v| VideoInfoResponse { - uuid: v.uuid, - file_path: v.file_path, - file_name: v.file_name, - duration: v.duration, - width: v.width, - height: v.height, + Ok::(VideosResponse { + videos: video_infos, + }) }) - .collect(); + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - Ok(Json(VideosResponse { - videos: video_infos, - })) + Ok(Json(video_infos)) } #[derive(Debug, Serialize)] struct ProgressResponse { uuid: String, + user: Option, + group: Option, + file_name: Option, + duration: Option, + overall_progress: u32, + cpu_percent: Option, + gpu_percent: Option, + memory_percent: Option, + memory_mb: Option, processors: Vec, } @@ -325,13 +868,53 @@ struct ProcessorProgressInfo { status: String, current: u32, total: u32, + progress: u32, message: String, } +fn get_system_stats() -> (Option, Option, Option, Option) { + use std::process::Command; + + let pid = std::process::id().to_string(); + + let cpu = Command::new("ps") + .args(["-p", &pid, "-o", "%cpu="]) + .output() + .ok() + .and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok()); + + let (mem_percent, mem_rss) = Command::new("ps") + .args(["-p", &pid, "-o", "%mem=,rss="]) + .output() + .ok() + .map(|o| { + let output = String::from_utf8_lossy(&o.stdout); + let parts: Vec<&str> = output.split_whitespace().collect(); + let percent = parts.first().and_then(|s| s.parse().ok()); + let rss = parts.get(1).and_then(|s| s.parse().ok()); + (percent, rss) + }) + .unwrap_or((None, None)); + + let gpu = Command::new("nvidia-smi") + .args([ + "--query-gpu=utilization.gpu", + "--format=csv,noheader,nounits", + ]) + .output() + .ok() + .and_then(|o| String::from_utf8_lossy(&o.stdout).trim().parse().ok()); + + (cpu, gpu, mem_percent, mem_rss) +} + async fn get_progress( axum::extract::Path(uuid): axum::extract::Path, ) -> Result, StatusCode> { let redis = RedisClient::new().map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let pg = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let mut conn = redis .get_conn() @@ -340,6 +923,7 @@ async fn get_progress( let processor_names = ["asr", "cut", "asrx", "yolo", "ocr", "face", "pose"]; let mut processors = Vec::new(); + let mut completed_count = 0u32; for name in processor_names { let key = format!("momentry:job:{}:processor:{}", uuid, name); @@ -376,14 +960,176 @@ async fn get_progress( .await .unwrap_or_else(|_| "".to_string()); + let progress = if total > 0 { + ((current as f64 / total as f64) * 100.0) as u32 + } else if status == "complete" { + 100 + } else { + 0 + }; + + if status == "complete" { + completed_count += 1; + } + processors.push(ProcessorProgressInfo { name: name.to_string(), status, current, total, + progress, message, }); } - Ok(Json(ProgressResponse { uuid, processors })) + let overall_progress = (completed_count as f64 / processor_names.len() as f64 * 100.0) as u32; + + let job_key = format!("momentry:job:{}", uuid); + let user: Option = redis::cmd("HGET") + .arg(&job_key) + .arg("user") + .query_async(&mut conn) + .await + .ok() + .filter(|s: &String| !s.is_empty()); + + let group: Option = redis::cmd("HGET") + .arg(&job_key) + .arg("group") + .query_async(&mut conn) + .await + .ok() + .filter(|s: &String| !s.is_empty()); + + let (file_name, duration) = pg + .get_video_by_uuid(&uuid) + .await + .ok() + .flatten() + .map(|v| (Some(v.file_name), Some(v.duration))) + .unwrap_or((None, None)); + + let (cpu_percent, gpu_percent, memory_percent, memory_mb) = get_system_stats(); + + Ok(Json(ProgressResponse { + uuid, + user, + group, + file_name, + duration, + overall_progress, + cpu_percent, + gpu_percent, + memory_percent, + memory_mb, + processors, + })) +} + +async fn list_jobs() -> Result, StatusCode> { + let pg = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let jobs = pg + .get_pending_jobs(100) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let job_infos: Vec = jobs + .into_iter() + .map(|j| JobInfoResponse { + id: j.id, + uuid: j.uuid, + status: j.status.as_str().to_string(), + current_processor: j.current_processor, + progress_current: j.progress_current, + progress_total: j.progress_total, + created_at: j.created_at.to_string(), + started_at: j.started_at.map(|t| t.to_string()), + }) + .collect(); + + Ok(Json(JobListResponse { jobs: job_infos })) +} + +async fn get_job( + axum::extract::Path(uuid): axum::extract::Path, +) -> Result, StatusCode> { + let pg = PostgresDb::init() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let job = pg + .get_monitor_job_by_uuid(&uuid) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + + let results = pg + .get_processor_results_by_job(job.id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let processors: Vec = results + .into_iter() + .map(|r| ProcessorInfoResponse { + processor_type: r.processor_type.as_str().to_string(), + status: r.status.as_str().to_string(), + started_at: r.started_at, + completed_at: r.completed_at, + duration_secs: r.duration_secs, + error_message: r.error_message, + }) + .collect(); + + Ok(Json(JobDetailResponse { + id: job.id, + uuid: job.uuid, + status: job.status.as_str().to_string(), + current_processor: job.current_processor, + progress_current: job.progress_current, + progress_total: job.progress_total, + processors, + created_at: job.created_at.to_string(), + started_at: job.started_at.map(|t| t.to_string()), + updated_at: job.updated_at.map(|t| t.to_string()), + })) +} + +pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> { + let _ = SERVER_START.set(Instant::now()); + + let embedder = std::sync::Arc::new(Embedder::new("nomic-embed-text:v1.5".to_string())); + let mongo_cache = MongoCache::init().await?; + let redis_cache = RedisCache::new()?; + + let state = AppState { + embedder, + embedder_model: "nomic-embed-text:v1.5".to_string(), + mongo_cache, + redis_cache, + }; + + let app = Router::new() + .route("/health", get(health)) + .route("/health/detailed", get(health_detailed)) + .route("/api/v1/register", post(register)) + .route("/api/v1/search", post(search)) + .route("/api/v1/n8n/search", post(n8n_search)) + .route("/api/v1/search/hybrid", post(hybrid_search)) + .route("/api/v1/lookup", get(lookup)) + .route("/api/v1/videos", get(list_videos)) + .route("/api/v1/progress/:uuid", get(get_progress)) + .route("/api/v1/jobs", get(list_jobs)) + .route("/api/v1/jobs/:uuid", get(get_job)) + .with_state(state); + + let addr: std::net::SocketAddr = format!("{}:{}", host, port).parse().unwrap(); + tracing::info!("Starting API server at http://{}", addr); + + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app).await?; + + Ok(()) } diff --git a/src/core/config.rs b/src/core/config.rs index 2933c76..edf06a0 100644 --- a/src/core/config.rs +++ b/src/core/config.rs @@ -60,6 +60,14 @@ pub static SERVER_PORT: Lazy = Lazy::new(|| { pub static REDIS_KEY_PREFIX: Lazy = Lazy::new(|| env::var("MOMENTRY_REDIS_PREFIX").unwrap_or_else(|_| "momentry:".to_string())); +/// User data root path (sftpgo data directory) +/// This is the parent directory containing user directories like ./demo/, ./warren/, ./momentry/ +/// Example: /Users/accusys/momentry/var/sftpgo/data +pub static USER_DATA_ROOT: Lazy = Lazy::new(|| { + env::var("MOMENTRY_USER_DATA_ROOT") + .unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string()) +}); + pub mod processor { use super::*; diff --git a/src/core/db/postgres_db.rs b/src/core/db/postgres_db.rs index ac7146c..4a6772d 100644 --- a/src/core/db/postgres_db.rs +++ b/src/core/db/postgres_db.rs @@ -5,7 +5,7 @@ use sqlx::{postgres::PgPoolOptions, PgPool, Row}; use std::sync::Arc; use tokio::sync::RwLock; -use super::Database; +use super::{Database, QdrantDb}; use crate::core::chunk::types::{Chunk, ChunkRule, ChunkType}; #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -19,6 +19,88 @@ pub struct StorageStatus { pub qvector_chunk: bool, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum VideoStatus { + Pending, + Processing, + Completed, + Failed, +} + +impl VideoStatus { + pub fn as_str(&self) -> &'static str { + match self { + VideoStatus::Pending => "pending", + VideoStatus::Processing => "processing", + VideoStatus::Completed => "completed", + VideoStatus::Failed => "failed", + } + } + + pub fn from_db_str(s: &str) -> Option { + match s { + "pending" => Some(VideoStatus::Pending), + "processing" => Some(VideoStatus::Processing), + "completed" => Some(VideoStatus::Completed), + "failed" => Some(VideoStatus::Failed), + _ => None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct VideoRow { + pub id: i32, + pub uuid: String, + pub file_path: String, + pub file_name: String, + pub duration: f64, + pub width: i32, + pub height: i32, + pub fps: f64, + pub probe_json: Option, + pub fs_video: bool, + pub fs_json: bool, + pub psql_chunk: bool, + pub pobject_chunk: bool, + pub mobject_chunk: bool, + pub pvector_chunk: bool, + pub qvector_chunk: bool, + pub status: String, + pub user_id: Option, + pub job_id: Option, +} + +impl From for VideoRecord { + fn from(row: VideoRow) -> Self { + VideoRecord { + id: row.id as i64, + uuid: row.uuid, + file_path: row.file_path, + file_name: row.file_name, + duration: row.duration, + width: row.width as u32, + height: row.height as u32, + fps: row.fps, + probe_json: row.probe_json, + storage: StorageStatus { + fs_video: row.fs_video, + fs_json: row.fs_json, + psql_chunk: row.psql_chunk, + pobject_chunk: row.pobject_chunk, + mobject_chunk: row.mobject_chunk, + pvector_chunk: row.pvector_chunk, + qvector_chunk: row.qvector_chunk, + }, + status: VideoStatus::from_db_str(&row.status).unwrap_or(VideoStatus::Pending), + user_id: row.user_id.map(|v| v as i64), + job_id: row.job_id.map(|v| v as i64), + created_at: String::new(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VideoRecord { pub id: i64, @@ -31,6 +113,9 @@ pub struct VideoRecord { pub fps: f64, pub probe_json: Option, pub storage: StorageStatus, + pub status: VideoStatus, + pub user_id: Option, + pub job_id: Option, pub created_at: String, } @@ -67,6 +152,279 @@ pub struct Frame { pub created_at: String, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum MonitorJobStatus { + Pending, + Running, + Completed, + Failed, + Cancelled, +} + +impl MonitorJobStatus { + pub fn as_str(&self) -> &'static str { + match self { + MonitorJobStatus::Pending => "pending", + MonitorJobStatus::Running => "running", + MonitorJobStatus::Completed => "completed", + MonitorJobStatus::Failed => "failed", + MonitorJobStatus::Cancelled => "cancelled", + } + } + + pub fn from_db_str(s: &str) -> Option { + match s { + "pending" => Some(MonitorJobStatus::Pending), + "running" => Some(MonitorJobStatus::Running), + "completed" => Some(MonitorJobStatus::Completed), + "failed" => Some(MonitorJobStatus::Failed), + "cancelled" => Some(MonitorJobStatus::Cancelled), + _ => None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MonitorJob { + pub id: i32, + pub uuid: String, + pub video_path: Option, + pub status: MonitorJobStatus, + pub current_processor: Option, + pub progress_total: i32, + pub progress_current: i32, + pub error_count: i32, + pub last_error: Option, + pub started_at: Option, + pub updated_at: Option, + pub created_at: chrono::NaiveDateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MonitorJobStats { + pub pending: i32, + pub running: i32, + pub completed: i32, + pub failed: i32, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum ProcessorType { + Asr, + Cut, + Yolo, + Ocr, + Face, + Pose, + Asrx, +} + +impl ProcessorType { + pub fn as_str(&self) -> &'static str { + match self { + ProcessorType::Asr => "asr", + ProcessorType::Cut => "cut", + ProcessorType::Yolo => "yolo", + ProcessorType::Ocr => "ocr", + ProcessorType::Face => "face", + ProcessorType::Pose => "pose", + ProcessorType::Asrx => "asrx", + } + } + + pub fn from_db_str(s: &str) -> Option { + match s { + "asr" => Some(ProcessorType::Asr), + "cut" => Some(ProcessorType::Cut), + "yolo" => Some(ProcessorType::Yolo), + "ocr" => Some(ProcessorType::Ocr), + "face" => Some(ProcessorType::Face), + "pose" => Some(ProcessorType::Pose), + "asrx" => Some(ProcessorType::Asrx), + _ => None, + } + } + + pub fn all() -> Vec { + vec![ + ProcessorType::Asr, + ProcessorType::Cut, + ProcessorType::Yolo, + ProcessorType::Ocr, + ProcessorType::Face, + ProcessorType::Pose, + ProcessorType::Asrx, + ] + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum ProcessorJobStatus { + Pending, + Running, + Completed, + Failed, + Skipped, +} + +impl ProcessorJobStatus { + pub fn as_str(&self) -> &'static str { + match self { + ProcessorJobStatus::Pending => "pending", + ProcessorJobStatus::Running => "running", + ProcessorJobStatus::Completed => "completed", + ProcessorJobStatus::Failed => "failed", + ProcessorJobStatus::Skipped => "skipped", + } + } + + pub fn from_db_str(s: &str) -> Option { + match s { + "pending" => Some(ProcessorJobStatus::Pending), + "running" => Some(ProcessorJobStatus::Running), + "completed" => Some(ProcessorJobStatus::Completed), + "failed" => Some(ProcessorJobStatus::Failed), + "skipped" => Some(ProcessorJobStatus::Skipped), + _ => None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProcessorResult { + pub id: i32, + pub job_id: i32, + pub processor_type: ProcessorType, + pub status: ProcessorJobStatus, + pub started_at: Option, + pub completed_at: Option, + pub duration_secs: Option, + pub error_message: Option, + pub output_data: Option, + pub retry_count: i32, + pub created_at: String, + pub updated_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct ApiKeyRecord { + pub id: i32, + pub key_id: String, + pub key_hash: String, + pub key_prefix: String, + pub name: String, + pub key_type: String, + pub user_id: Option, + pub service_name: Option, + pub permissions: serde_json::Value, + pub expires_at: Option>, + pub last_used_at: Option>, + pub last_used_ip: Option, + pub usage_count: i64, + pub status: String, + pub rotation_required: bool, + pub rotation_reason: Option, + pub grace_period_end: Option>, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ApiKeyStats { + pub total_keys: i64, + pub active_keys: i64, + pub expired_keys: i64, + pub rotation_required: i64, + pub anomalies_last_24h: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct GiteaTokenRecord { + pub id: i32, + pub gitea_token_id: i64, + pub gitea_user: String, + pub token_name: String, + pub token_last_eight: String, + pub scopes: serde_json::Value, + pub api_key_id: Option, + pub last_verified: Option>, + pub created_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct N8nApiKeyRecord { + pub id: i32, + pub n8n_key_id: String, + pub label: String, + pub api_key_last_eight: String, + pub momentry_api_key_id: Option, + pub expires_at: Option>, + pub last_verified: Option>, + pub created_at: chrono::DateTime, +} + +/// Configuration for creating an API key +#[derive(Debug, Clone)] +pub struct CreateApiKeyConfig<'a> { + pub key_id: &'a str, + pub key_hash: &'a str, + pub key_prefix: &'a str, + pub name: &'a str, + pub key_type: &'a str, + pub user_id: Option, + pub service_name: Option<&'a str>, + pub permissions: &'a serde_json::Value, + pub expires_at: Option>, +} + +impl<'a> CreateApiKeyConfig<'a> { + pub fn new( + key_id: &'a str, + key_hash: &'a str, + key_prefix: &'a str, + name: &'a str, + key_type: &'a str, + ) -> Self { + static DEFAULT_PERMISSIONS: std::sync::LazyLock = + std::sync::LazyLock::new(|| serde_json::json!(["read", "write"])); + + Self { + key_id, + key_hash, + key_prefix, + name, + key_type, + user_id: None, + service_name: None, + permissions: &DEFAULT_PERMISSIONS, + expires_at: None, + } + } + + pub fn with_user_id(mut self, user_id: i64) -> Self { + self.user_id = Some(user_id); + self + } + + pub fn with_service_name(mut self, name: &'a str) -> Self { + self.service_name = Some(name); + self + } + + pub fn with_permissions(mut self, perms: &'a serde_json::Value) -> Self { + self.permissions = perms; + self + } + + pub fn with_expires_at(mut self, expires: chrono::DateTime) -> Self { + self.expires_at = Some(expires); + self + } +} + pub struct PostgresDb { pool: PgPool, cache: Arc>, @@ -81,9 +439,19 @@ pub struct PostgresCache { impl PostgresDb { pub async fn new(database_url: &str) -> Result { + let max_connections = std::env::var("DB_MAX_CONNECTIONS") + .unwrap_or_else(|_| "10".to_string()) + .parse::() + .unwrap_or(10); + + let acquire_timeout_secs = std::env::var("DB_ACQUIRE_TIMEOUT") + .unwrap_or_else(|_| "60".to_string()) + .parse::() + .unwrap_or(60); + let pool_options = PgPoolOptions::new() - .max_connections(10) - .acquire_timeout(std::time::Duration::from_secs(60)); + .max_connections(max_connections) + .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs)); let pool = pool_options.connect(database_url).await?; @@ -96,11 +464,16 @@ impl PostgresDb { Ok(db) } + /// Get a reference to the connection pool + pub fn pool(&self) -> &PgPool { + &self.pool + } + pub async fn register_video(&self, record: &VideoRecord) -> Result { let result = sqlx::query( r#" - INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE) + INSERT INTO videos (uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, status, user_id, job_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE, $9, $10, $11) ON CONFLICT (uuid) DO UPDATE SET file_path = EXCLUDED.file_path, file_name = EXCLUDED.file_name, @@ -110,6 +483,9 @@ impl PostgresDb { fps = EXCLUDED.fps, probe_json = EXCLUDED.probe_json, fs_video = TRUE, + status = COALESCE(EXCLUDED.status, videos.status), + user_id = COALESCE(EXCLUDED.user_id, videos.user_id), + job_id = COALESCE(EXCLUDED.job_id, videos.job_id), updated_at = CURRENT_TIMESTAMP RETURNING id::bigint "# @@ -122,6 +498,9 @@ impl PostgresDb { .bind(record.height as i32) .bind(record.fps) .bind(&record.probe_json) + .bind(record.status.as_str()) + .bind(record.user_id) + .bind(record.job_id) .fetch_one(&self.pool) .await?; @@ -145,35 +524,15 @@ impl PostgresDb { } } - let result = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option, bool, bool, bool, bool, bool, bool, bool)>( - "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos WHERE uuid = $1" + let result = sqlx::query_as::<_, VideoRow>( + "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk, status, user_id, job_id FROM videos WHERE uuid = $1" ) .bind(uuid) .fetch_optional(&self.pool) .await?; - if let Some(r) = result { - let video = VideoRecord { - id: r.0 as i64, - uuid: r.1.clone(), - file_path: r.2, - file_name: r.3, - duration: r.4, - width: r.5 as u32, - height: r.6 as u32, - fps: r.7, - probe_json: r.8, - storage: StorageStatus { - fs_video: r.9, - fs_json: r.10, - psql_chunk: r.11, - pobject_chunk: r.12, - mobject_chunk: r.13, - pvector_chunk: r.14, - qvector_chunk: r.15, - }, - created_at: String::new(), - }; + if let Some(row) = result { + let video: VideoRecord = row.into(); // Update cache let mut cache = self.cache.write().await; @@ -186,36 +545,13 @@ impl PostgresDb { } pub async fn list_videos(&self) -> Result> { - let rows = sqlx::query_as::<_, (i32, String, String, String, f64, i32, i32, f64, Option, bool, bool, bool, bool, bool, bool, bool)>( - "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk FROM videos ORDER BY id DESC" + let rows = sqlx::query_as::<_, VideoRow>( + "SELECT id, uuid, file_path, file_name, duration, width, height, fps, probe_json, fs_video, fs_json, psql_chunk, pobject_chunk, mobject_chunk, pvector_chunk, qvector_chunk, status, user_id, job_id FROM videos ORDER BY id DESC" ) .fetch_all(&self.pool) .await?; - let videos: Vec = rows - .into_iter() - .map(|r| VideoRecord { - id: r.0 as i64, - uuid: r.1, - file_path: r.2, - file_name: r.3, - duration: r.4, - width: r.5 as u32, - height: r.6 as u32, - fps: r.7, - probe_json: r.8, - storage: StorageStatus { - fs_video: r.9, - fs_json: r.10, - psql_chunk: r.11, - pobject_chunk: r.12, - mobject_chunk: r.13, - pvector_chunk: r.14, - qvector_chunk: r.15, - }, - created_at: String::new(), - }) - .collect(); + let videos: Vec = rows.into_iter().map(|r| r.into()).collect(); Ok(videos) } @@ -283,6 +619,617 @@ impl PostgresDb { Ok(count) } + pub async fn create_monitor_job( + &self, + uuid: &str, + video_path: Option<&str>, + ) -> Result { + let row = sqlx::query( + r#" + INSERT INTO monitor_jobs (uuid, video_path, status) + VALUES ($1, $2, 'pending') + ON CONFLICT DO NOTHING + RETURNING id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at + "# + ) + .bind(uuid) + .bind(video_path) + .fetch_one(&self.pool) + .await?; + + let status_str: String = row.get(3); + let status = + MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending); + + Ok(MonitorJob { + id: row.get(0), + uuid: row.get(1), + video_path: row.get(2), + status, + current_processor: row.get(4), + progress_total: row.get(5), + progress_current: row.get(6), + error_count: row.get(7), + last_error: row.get(8), + started_at: row.get(9), + updated_at: row.get(10), + created_at: row.get(11), + }) + } + + pub async fn update_video_job_id(&self, video_id: i64, job_id: i32) -> Result<()> { + sqlx::query( + r#" + UPDATE videos + SET job_id = $1, updated_at = CURRENT_TIMESTAMP + WHERE id = $2 + "#, + ) + .bind(job_id) + .bind(video_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_monitor_job_by_uuid(&self, uuid: &str) -> Result> { + let row = sqlx::query( + r#" + SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at + FROM monitor_jobs WHERE uuid = $1 + "# + ) + .bind(uuid) + .fetch_optional(&self.pool) + .await?; + + if let Some(r) = row { + let status_str: String = r.get(3); + let status = + MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending); + + Ok(Some(MonitorJob { + id: r.get(0), + uuid: r.get(1), + video_path: r.get(2), + status, + current_processor: r.get(4), + progress_total: r.get(5), + progress_current: r.get(6), + error_count: r.get(7), + last_error: r.get(8), + started_at: r.get(9), + updated_at: r.get(10), + created_at: r.get(11), + })) + } else { + Ok(None) + } + } + + pub async fn list_monitor_jobs_by_status( + &self, + status: MonitorJobStatus, + ) -> Result> { + let rows = sqlx::query( + r#" + SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, error_count, last_error, started_at, updated_at, created_at + FROM monitor_jobs WHERE status = $1 ORDER BY created_at DESC + "# + ) + .bind(status.as_str()) + .fetch_all(&self.pool) + .await?; + + let jobs: Vec = rows + .into_iter() + .map(|r| { + let status_str: String = r.get(3); + let status = + MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending); + + MonitorJob { + id: r.get(0), + uuid: r.get(1), + video_path: r.get(2), + status, + current_processor: r.get(4), + progress_total: r.get(5), + progress_current: r.get(6), + error_count: r.get(7), + last_error: r.get(8), + started_at: r.get(9), + updated_at: r.get(10), + created_at: r.get(11), + } + }) + .collect(); + + Ok(jobs) + } + + pub async fn update_monitor_job_status( + &self, + uuid: &str, + status: MonitorJobStatus, + ) -> Result<()> { + let started_at_clause = if status == MonitorJobStatus::Running { + ", started_at = COALESCE(started_at, CURRENT_TIMESTAMP)" + } else { + "" + }; + + sqlx::query(&format!( + "UPDATE monitor_jobs SET status = $1, updated_at = CURRENT_TIMESTAMP{} WHERE uuid = $2", + started_at_clause + )) + .bind(status.as_str()) + .bind(uuid) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn update_monitor_job_progress( + &self, + uuid: &str, + current_processor: Option<&str>, + progress_current: i32, + progress_total: i32, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE monitor_jobs + SET current_processor = COALESCE($3, current_processor), + progress_current = $4, + progress_total = $5, + updated_at = CURRENT_TIMESTAMP + WHERE uuid = $1 + "#, + ) + .bind(uuid) + .bind(uuid) + .bind(current_processor) + .bind(progress_current) + .bind(progress_total) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn update_monitor_job_error(&self, uuid: &str, error: &str) -> Result<()> { + sqlx::query( + r#" + UPDATE monitor_jobs + SET error_count = error_count + 1, + last_error = $2, + updated_at = CURRENT_TIMESTAMP + WHERE uuid = $1 + "#, + ) + .bind(uuid) + .bind(error) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn delete_monitor_job(&self, uuid: &str) -> Result { + let result = sqlx::query("DELETE FROM monitor_jobs WHERE uuid = $1") + .bind(uuid) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected() > 0) + } + + pub async fn get_monitor_job_stats(&self) -> Result { + let pending: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'pending'") + .fetch_one(&self.pool) + .await?; + + let running: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'running'") + .fetch_one(&self.pool) + .await?; + + let completed: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'completed'") + .fetch_one(&self.pool) + .await?; + + let failed: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'failed'") + .fetch_one(&self.pool) + .await?; + + Ok(MonitorJobStats { + pending: pending as i32, + running: running as i32, + completed: completed as i32, + failed: failed as i32, + }) + } + + pub async fn create_api_key(&self, config: CreateApiKeyConfig<'_>) -> Result { + let result = sqlx::query( + r#" + INSERT INTO api_keys (key_id, key_hash, key_prefix, name, key_type, user_id, service_name, permissions, expires_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9) + RETURNING id + "#, + ) + .bind(config.key_id) + .bind(config.key_hash) + .bind(config.key_prefix) + .bind(config.name) + .bind(config.key_type) + .bind(config.user_id) + .bind(config.service_name) + .bind(config.permissions) + .bind(config.expires_at) + .fetch_one(&self.pool) + .await?; + + let id: i32 = result.get(0); + Ok(id as i64) + } + + pub async fn get_api_key_by_hash(&self, key_hash: &str) -> Result> { + let result = sqlx::query_as::<_, ApiKeyRecord>( + r#" + SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, + permissions, expires_at, last_used_at, last_used_ip, usage_count, status, + rotation_required, rotation_reason, grace_period_end, created_at, updated_at + FROM api_keys WHERE key_hash = $1 + "#, + ) + .bind(key_hash) + .fetch_optional(&self.pool) + .await?; + + Ok(result) + } + + pub async fn get_api_key_by_key_id(&self, key_id: &str) -> Result> { + let result = sqlx::query_as::<_, ApiKeyRecord>( + r#" + SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, + permissions, expires_at, last_used_at, last_used_ip, usage_count, status, + rotation_required, rotation_reason, grace_period_end, created_at, updated_at + FROM api_keys WHERE key_id = $1 + "#, + ) + .bind(key_id) + .fetch_optional(&self.pool) + .await?; + + Ok(result) + } + + pub async fn list_api_keys(&self) -> Result> { + let results = sqlx::query_as::<_, ApiKeyRecord>( + r#" + SELECT id, key_id, key_hash, key_prefix, name, key_type, user_id, service_name, + permissions, expires_at, last_used_at, last_used_ip, usage_count, status, + rotation_required, rotation_reason, grace_period_end, created_at, updated_at + FROM api_keys ORDER BY created_at DESC + "#, + ) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } + + pub async fn update_api_key_usage(&self, key_id: &str, ip_address: Option<&str>) -> Result<()> { + sqlx::query( + r#" + UPDATE api_keys + SET last_used_at = CURRENT_TIMESTAMP, + last_used_ip = COALESCE($2, last_used_ip), + usage_count = usage_count + 1, + updated_at = CURRENT_TIMESTAMP + WHERE key_id = $1 + "#, + ) + .bind(key_id) + .bind(ip_address) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn require_api_key_rotation( + &self, + key_id: &str, + reason: &str, + grace_period_end: chrono::DateTime, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE api_keys + SET rotation_required = TRUE, + rotation_reason = $2, + grace_period_end = $3, + updated_at = CURRENT_TIMESTAMP + WHERE key_id = $1 + "#, + ) + .bind(key_id) + .bind(reason) + .bind(grace_period_end) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn revoke_api_key(&self, key_id: &str) -> Result<()> { + sqlx::query( + r#" + UPDATE api_keys + SET status = 'revoked', + updated_at = CURRENT_TIMESTAMP + WHERE key_id = $1 + "#, + ) + .bind(key_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + pub async fn log_api_key_audit( + &self, + key_id: &str, + action: &str, + actor: Option<&str>, + ip_address: Option<&str>, + user_agent: Option<&str>, + request_path: Option<&str>, + response_code: Option, + anomaly_type: Option<&str>, + details: Option<&serde_json::Value>, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO api_key_audit_log (key_id, action, actor, ip_address, user_agent, request_path, response_code, anomaly_type, details) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb) + "#, + ) + .bind(key_id) + .bind(action) + .bind(actor) + .bind(ip_address) + .bind(user_agent) + .bind(request_path) + .bind(response_code) + .bind(anomaly_type) + .bind(details) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_api_key_stats(&self) -> Result { + let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM api_keys") + .fetch_one(&self.pool) + .await?; + + let active: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM api_keys WHERE status = 'active'") + .fetch_one(&self.pool) + .await?; + + let expired: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM api_keys WHERE expires_at < CURRENT_TIMESTAMP", + ) + .fetch_one(&self.pool) + .await?; + + let rotation_required: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM api_keys WHERE rotation_required = TRUE") + .fetch_one(&self.pool) + .await?; + + let anomalies_24h: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM api_key_anomalies WHERE created_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'", + ) + .fetch_one(&self.pool) + .await?; + + Ok(ApiKeyStats { + total_keys: total, + active_keys: active, + expired_keys: expired, + rotation_required, + anomalies_last_24h: anomalies_24h, + }) + } + + pub async fn create_gitea_token( + &self, + gitea_token_id: i64, + gitea_user: &str, + token_name: &str, + token_last_eight: &str, + scopes: &serde_json::Value, + api_key_id: Option<&str>, + ) -> Result { + let result = sqlx::query( + r#" + INSERT INTO gitea_tokens (gitea_token_id, gitea_user, token_name, token_last_eight, scopes, api_key_id) + VALUES ($1, $2, $3, $4, $5::jsonb, $6) + RETURNING id + "#, + ) + .bind(gitea_token_id) + .bind(gitea_user) + .bind(token_name) + .bind(token_last_eight) + .bind(scopes) + .bind(api_key_id) + .fetch_one(&self.pool) + .await?; + + let id: i32 = result.get(0); + Ok(id as i64) + } + + pub async fn get_gitea_tokens_by_user( + &self, + gitea_user: &str, + ) -> Result> { + let results = sqlx::query_as::<_, GiteaTokenRecord>( + r#" + SELECT id, gitea_token_id, gitea_user, token_name, token_last_eight, scopes, api_key_id, last_verified, created_at + FROM gitea_tokens WHERE gitea_user = $1 ORDER BY created_at DESC + "#, + ) + .bind(gitea_user) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } + + pub async fn get_gitea_token_by_name( + &self, + gitea_user: &str, + token_name: &str, + ) -> Result> { + let result = sqlx::query_as::<_, GiteaTokenRecord>( + r#" + SELECT id, gitea_token_id, gitea_user, token_name, token_last_eight, scopes, api_key_id, last_verified, created_at + FROM gitea_tokens WHERE gitea_user = $1 AND token_name = $2 + "#, + ) + .bind(gitea_user) + .bind(token_name) + .fetch_optional(&self.pool) + .await?; + + Ok(result) + } + + pub async fn delete_gitea_token(&self, gitea_user: &str, token_name: &str) -> Result<()> { + sqlx::query("DELETE FROM gitea_tokens WHERE gitea_user = $1 AND token_name = $2") + .bind(gitea_user) + .bind(token_name) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn update_gitea_token_verification( + &self, + gitea_user: &str, + token_name: &str, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE gitea_tokens + SET last_verified = CURRENT_TIMESTAMP + WHERE gitea_user = $1 AND token_name = $2 + "#, + ) + .bind(gitea_user) + .bind(token_name) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn create_n8n_api_key( + &self, + n8n_key_id: &str, + label: &str, + api_key_last_eight: &str, + momentry_api_key_id: Option<&str>, + expires_at: Option>, + ) -> Result { + let result = sqlx::query( + r#" + INSERT INTO n8n_api_keys (n8n_key_id, label, api_key_last_eight, momentry_api_key_id, expires_at) + VALUES ($1, $2, $3, $4, $5) + RETURNING id + "#, + ) + .bind(n8n_key_id) + .bind(label) + .bind(api_key_last_eight) + .bind(momentry_api_key_id) + .bind(expires_at) + .fetch_one(&self.pool) + .await?; + + let id: i32 = result.get(0); + Ok(id as i64) + } + + pub async fn get_n8n_api_keys(&self) -> Result> { + let results = sqlx::query_as::<_, N8nApiKeyRecord>( + r#" + SELECT id, n8n_key_id, label, api_key_last_eight, momentry_api_key_id, expires_at, last_verified, created_at + FROM n8n_api_keys ORDER BY created_at DESC + "#, + ) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } + + pub async fn get_n8n_api_key_by_label(&self, label: &str) -> Result> { + let result = sqlx::query_as::<_, N8nApiKeyRecord>( + r#" + SELECT id, n8n_key_id, label, api_key_last_eight, momentry_api_key_id, expires_at, last_verified, created_at + FROM n8n_api_keys WHERE label = $1 + "#, + ) + .bind(label) + .fetch_optional(&self.pool) + .await?; + + Ok(result) + } + + pub async fn delete_n8n_api_key(&self, label: &str) -> Result<()> { + sqlx::query("DELETE FROM n8n_api_keys WHERE label = $1") + .bind(label) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn update_n8n_api_key_verification(&self, label: &str) -> Result<()> { + sqlx::query( + r#" + UPDATE n8n_api_keys + SET last_verified = CURRENT_TIMESTAMP + WHERE label = $1 + "#, + ) + .bind(label) + .execute(&self.pool) + .await?; + + Ok(()) + } + async fn init_schema(&self) -> Result<()> { sqlx::query( r#" @@ -492,11 +1439,268 @@ impl PostgresDb { sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS pre_chunk_ids INTEGER[]") .execute(&self.pool) .await?; + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS parent_chunk_id VARCHAR(64)") + .execute(&self.pool) + .await?; + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS child_chunk_ids TEXT[]") + .execute(&self.pool) + .await?; + + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS search_vector TSVECTOR") + .execute(&self.pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_chunks_search_vector ON chunks USING GIN(search_vector)", + ) + .execute(&self.pool) + .await?; + + sqlx::query( + "ALTER TABLE chunks ADD COLUMN IF NOT EXISTS fps DOUBLE PRECISION DEFAULT 24.0", + ) + .execute(&self.pool) + .await?; + + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS start_frame BIGINT DEFAULT 0") + .execute(&self.pool) + .await?; + + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS end_frame BIGINT DEFAULT 0") + .execute(&self.pool) + .await?; + + sqlx::query("ALTER TABLE chunks ADD COLUMN IF NOT EXISTS metadata JSONB") + .execute(&self.pool) + .await?; + + sqlx::query( + "ALTER TABLE chunks ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT NOW()", + ) + .execute(&self.pool) + .await?; + + sqlx::query( + "CREATE OR REPLACE FUNCTION update_search_vector() RETURNS TRIGGER AS $func$ + BEGIN + NEW.search_vector := to_tsvector('english', COALESCE(NEW.text_content, '')); + RETURN NEW; + END; + $func$ LANGUAGE plpgsql", + ) + .execute(&self.pool) + .await?; + + sqlx::query("DROP TRIGGER IF EXISTS chunks_search_vector_trigger ON chunks") + .execute(&self.pool) + .await?; + + sqlx::query( + "CREATE TRIGGER chunks_search_vector_trigger + BEFORE INSERT OR UPDATE ON chunks + FOR EACH ROW EXECUTE FUNCTION update_search_vector()", + ) + .execute(&self.pool) + .await?; sqlx::query("ALTER TABLE chunk_vectors ADD COLUMN IF NOT EXISTS file_id INTEGER REFERENCES videos(id)") .execute(&self.pool) .await?; + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS monitor_jobs ( + id SERIAL PRIMARY KEY, + uuid VARCHAR(16) NOT NULL, + video_path VARCHAR(512), + status VARCHAR(20) NOT NULL DEFAULT 'pending', + current_processor VARCHAR(20), + progress_total INT DEFAULT 0, + progress_current INT DEFAULT 0, + error_count INT DEFAULT 0, + last_error TEXT, + started_at TIMESTAMP, + updated_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_monitor_jobs_uuid ON monitor_jobs(uuid)") + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_monitor_jobs_status ON monitor_jobs(status)") + .execute(&self.pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_monitor_jobs_created_at ON monitor_jobs(created_at)", + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS api_keys ( + id SERIAL PRIMARY KEY, + key_id VARCHAR(48) UNIQUE NOT NULL, + key_hash VARCHAR(64) NOT NULL, + key_prefix VARCHAR(8) NOT NULL, + name VARCHAR(128) NOT NULL, + key_type VARCHAR(20) NOT NULL DEFAULT 'user', + user_id BIGINT, + service_name VARCHAR(64), + permissions JSONB DEFAULT '["read", "write"]', + expires_at TIMESTAMP, + last_used_at TIMESTAMP, + last_used_ip VARCHAR(45), + usage_count BIGINT DEFAULT 0, + status VARCHAR(20) NOT NULL DEFAULT 'active', + rotation_required BOOLEAN DEFAULT FALSE, + rotation_reason TEXT, + grace_period_end TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_api_keys_key_id ON api_keys(key_id)") + .execute(&self.pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_api_keys_hash ON api_keys(key_hash)") + .execute(&self.pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_api_keys_type ON api_keys(key_type)") + .execute(&self.pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_api_keys_status ON api_keys(status)") + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS api_key_audit_log ( + id SERIAL PRIMARY KEY, + key_id VARCHAR(32) NOT NULL, + action VARCHAR(50) NOT NULL, + actor VARCHAR(128), + ip_address VARCHAR(45), + user_agent TEXT, + request_path TEXT, + response_code INT, + anomaly_type VARCHAR(30), + details JSONB, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_audit_key_id ON api_key_audit_log(key_id)") + .execute(&self.pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_audit_action ON api_key_audit_log(action)") + .execute(&self.pool) + .await?; + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_audit_created_at ON api_key_audit_log(created_at)", + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS api_key_anomalies ( + id SERIAL PRIMARY KEY, + key_id VARCHAR(32) NOT NULL, + anomaly_type VARCHAR(30) NOT NULL, + severity VARCHAR(10) NOT NULL, + ip_address VARCHAR(45), + request_count INT, + error_count INT, + error_rate DOUBLE PRECISION, + unique_ips INT, + details JSONB, + resolved BOOLEAN DEFAULT FALSE, + resolved_at TIMESTAMP, + resolved_by VARCHAR(128), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_anomalies_key_id ON api_key_anomalies(key_id)") + .execute(&self.pool) + .await?; + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_anomalies_resolved ON api_key_anomalies(resolved)", + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS gitea_tokens ( + id SERIAL PRIMARY KEY, + gitea_token_id BIGINT NOT NULL, + gitea_user VARCHAR(128) NOT NULL, + token_name VARCHAR(128) NOT NULL, + token_last_eight VARCHAR(8) NOT NULL, + scopes JSONB DEFAULT '[]', + api_key_id VARCHAR(48), + last_verified TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(gitea_user, token_name) + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_gitea_tokens_user ON gitea_tokens(gitea_user)") + .execute(&self.pool) + .await?; + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_gitea_tokens_key_id ON gitea_tokens(api_key_id)", + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS n8n_api_keys ( + id SERIAL PRIMARY KEY, + n8n_key_id VARCHAR(64) UNIQUE NOT NULL, + label VARCHAR(100) NOT NULL, + api_key_last_eight VARCHAR(8) NOT NULL, + momentry_api_key_id VARCHAR(48), + expires_at TIMESTAMP WITH TIME ZONE, + last_verified TIMESTAMP WITH TIME ZONE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_n8n_api_keys_label ON n8n_api_keys(label)") + .execute(&self.pool) + .await?; + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_n8n_api_keys_key_id ON n8n_api_keys(momentry_api_key_id)", + ) + .execute(&self.pool) + .await?; + Ok(()) } @@ -508,8 +1712,8 @@ impl PostgresDb { sqlx::query( r#" - INSERT INTO chunks (file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14, $15, $16) + INSERT INTO chunks (file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb, $13::jsonb, $14, $15, $16, $17, $18) ON CONFLICT (uuid, chunk_id) DO UPDATE SET start_time = EXCLUDED.start_time, end_time = EXCLUDED.end_time, @@ -522,6 +1726,8 @@ impl PostgresDb { vector_id = EXCLUDED.vector_id, frame_count = EXCLUDED.frame_count, pre_chunk_ids = EXCLUDED.pre_chunk_ids, + parent_chunk_id = EXCLUDED.parent_chunk_id, + child_chunk_ids = EXCLUDED.child_chunk_ids, updated_at = CURRENT_TIMESTAMP "# ) @@ -541,6 +1747,8 @@ impl PostgresDb { .bind(&chunk.vector_id) .bind(chunk.frame_count) .bind(&chunk.pre_chunk_ids) + .bind(&chunk.parent_chunk_id) + .bind(&chunk.child_chunk_ids) .execute(&self.pool) .await?; @@ -549,7 +1757,7 @@ impl PostgresDb { pub async fn get_chunks_by_uuid(&self, uuid: &str) -> Result> { let rows = sqlx::query( - "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids FROM chunks WHERE uuid = $1 ORDER BY chunk_index" + "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE uuid = $1 ORDER BY chunk_index" ) .bind(uuid) .fetch_all(&self.pool) @@ -565,16 +1773,17 @@ impl PostgresDb { "sentence" => ChunkType::Sentence, "cut" => ChunkType::Cut, "trace" => ChunkType::Trace, + "story" => ChunkType::Story, _ => ChunkType::TimeBased, }; let content: serde_json::Value = r.get(11); let metadata: Option = r.get(12); - // Get pre_chunk_ids - try direct Vec decode first let pre_chunk_ids: Vec = r.try_get(15).unwrap_or_default(); + let parent_chunk_id: Option = r.try_get(16).ok().flatten(); + let child_chunk_ids: Vec = r.try_get(17).unwrap_or_default(); - // Extract rule from content let (rule, content_data) = if content.get("rule").is_some() { let rule_str = content .get("rule") @@ -612,6 +1821,8 @@ impl PostgresDb { vector_id: r.get("vector_id"), frame_count, pre_chunk_ids, + parent_chunk_id, + child_chunk_ids, } }) .collect(); @@ -619,6 +1830,78 @@ impl PostgresDb { Ok(chunks) } + pub async fn get_chunk_by_chunk_id(&self, chunk_id: &str) -> Result> { + let row = sqlx::query( + "SELECT COALESCE(file_id, 0) as file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, COALESCE(fps, 24.0) as fps, COALESCE(start_frame, 0) as start_frame, COALESCE(end_frame, 0) as end_frame, text_content, content, metadata, vector_id, COALESCE(frame_count, 0) as frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE chunk_id = $1" + ) + .bind(chunk_id) + .fetch_optional(&self.pool) + .await?; + + if let Some(r) = row { + let chunk_type_str: String = r.get(4); + let chunk_index: i32 = r.get(3); + let chunk_type = match chunk_type_str.as_str() { + "time" => ChunkType::TimeBased, + "sentence" => ChunkType::Sentence, + "cut" => ChunkType::Cut, + "trace" => ChunkType::Trace, + "story" => ChunkType::Story, + _ => ChunkType::TimeBased, + }; + + let content: serde_json::Value = r.get(11); + let metadata: Option = r.get(12); + + let pre_chunk_ids: Vec = r.try_get(15).unwrap_or_default(); + let parent_chunk_id: Option = r.try_get(16).ok().flatten(); + let child_chunk_ids: Vec = r.try_get(17).unwrap_or_default(); + + let (rule, content_data) = if content.get("rule").is_some() { + let rule_str = content + .get("rule") + .and_then(|v| v.as_str()) + .unwrap_or("rule_1"); + let rule = if rule_str == "rule_2" { + ChunkRule::Rule2 + } else { + ChunkRule::Rule1 + }; + let data = content.get("data").cloned().unwrap_or(content); + (rule, data) + } else { + (ChunkRule::Rule1, content) + }; + + let file_id: i32 = sqlx::Row::get(&r, "file_id"); + let frame_count: i32 = sqlx::Row::get(&r, "frame_count"); + + Ok(Some(Chunk { + file_id, + uuid: r.get("uuid"), + chunk_id: r.get("chunk_id"), + chunk_index: chunk_index as u32, + chunk_type, + rule, + start_time: r.get("start_time"), + end_time: r.get("end_time"), + fps: r.get("fps"), + start_frame: r.get("start_frame"), + end_frame: r.get("end_frame"), + text_content: r.get("text_content"), + content: content_data, + metadata, + vector_id: r.get("vector_id"), + frame_count, + pre_chunk_ids, + parent_chunk_id, + child_chunk_ids, + })) + } else { + Ok(None) + } + } + pub async fn store_pre_chunk(&self, pre_chunk: &PreChunk) -> Result { let row = sqlx::query( r#" @@ -733,7 +2016,7 @@ impl PostgresDb { end_time: f64, ) -> Result> { let rows = sqlx::query( - "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids + "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE file_id = $1 AND start_time >= $2 AND end_time <= $3 ORDER BY start_time" @@ -754,14 +2037,16 @@ impl PostgresDb { "sentence" => ChunkType::Sentence, "cut" => ChunkType::Cut, "trace" => ChunkType::Trace, + "story" => ChunkType::Story, _ => ChunkType::TimeBased, }; let content: serde_json::Value = r.get(11); let metadata: Option = r.get(12); - // Get pre_chunk_ids - try direct Vec decode let pre_chunk_ids: Vec = r.try_get(15).unwrap_or_default(); + let parent_chunk_id: Option = r.try_get(16).ok().flatten(); + let child_chunk_ids: Vec = r.try_get(17).unwrap_or_default(); let (rule, content_data) = if content.get("rule").is_some() { let rule_str = content @@ -800,6 +2085,87 @@ impl PostgresDb { vector_id: r.get("vector_id"), frame_count, pre_chunk_ids, + parent_chunk_id, + child_chunk_ids, + } + }) + .collect(); + + Ok(chunks) + } + + pub async fn get_chunks_by_ids(&self, chunk_ids: &[String]) -> Result> { + if chunk_ids.is_empty() { + return Ok(vec![]); + } + + let rows = sqlx::query( + "SELECT file_id, uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, text_content, content, metadata, vector_id, frame_count, pre_chunk_ids, parent_chunk_id, child_chunk_ids FROM chunks WHERE chunk_id = ANY($1) ORDER BY chunk_index", + ) + .bind(chunk_ids) + .fetch_all(&self.pool) + .await?; + + let chunks: Vec = rows + .into_iter() + .map(|r| { + let chunk_type_str: String = r.get(4); + let chunk_index: i32 = r.get(3); + let chunk_type = match chunk_type_str.as_str() { + "time" => ChunkType::TimeBased, + "sentence" => ChunkType::Sentence, + "cut" => ChunkType::Cut, + "trace" => ChunkType::Trace, + "story" => ChunkType::Story, + _ => ChunkType::TimeBased, + }; + + let content: serde_json::Value = r.get(11); + let metadata: Option = r.get(12); + + let pre_chunk_ids: Vec = r.try_get(15).unwrap_or_default(); + let parent_chunk_id: Option = r.try_get(16).ok().flatten(); + let child_chunk_ids: Vec = r.try_get(17).unwrap_or_default(); + + let (rule, content_data) = if content.get("rule").is_some() { + let rule_str = content + .get("rule") + .and_then(|v| v.as_str()) + .unwrap_or("rule_1"); + let rule = if rule_str == "rule_2" { + ChunkRule::Rule2 + } else { + ChunkRule::Rule1 + }; + let data = content.get("data").cloned().unwrap_or(content); + (rule, data) + } else { + (ChunkRule::Rule1, content) + }; + + let file_id: i32 = sqlx::Row::get(&r, "file_id"); + let frame_count: i32 = sqlx::Row::get(&r, "frame_count"); + + Chunk { + file_id, + uuid: r.get("uuid"), + chunk_id: r.get("chunk_id"), + chunk_index: chunk_index as u32, + chunk_type, + rule, + start_time: r.get("start_time"), + end_time: r.get("end_time"), + fps: r.get("fps"), + start_frame: r.get("start_frame"), + end_frame: r.get("end_frame"), + text_content: r.get("text_content"), + content: content_data, + metadata, + vector_id: r.get("vector_id"), + frame_count, + pre_chunk_ids, + parent_chunk_id, + child_chunk_ids, } }) .collect(); @@ -818,44 +2184,22 @@ impl PostgresDb { pub async fn store_vector(&self, chunk_id: &str, vector: &[f32], uuid: &str) -> Result<()> { let vector_json = serde_json::json!(vector); - let embedding_str = vector_json.to_string(); - // Clone for use in closure - let chunk_id = chunk_id.to_string(); - let uuid = uuid.to_string(); - - // Use blocking task - this needs to wait for result - let join_result = tokio::task::spawn_blocking(move || { - let output = std::process::Command::new("psql") - .args([ - "postgres://accusys@localhost:5432/momentry", - "-c", - &format!( - "INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding) VALUES ('{}', '{}', 'sentence', '{}') ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding", - chunk_id, uuid, embedding_str.replace('\'', "''") - ) - ]) - .output(); - - (chunk_id, output) - }) - .await; - - match join_result { - Ok((cid, Ok(output))) => { - if !output.status.success() { - let err = String::from_utf8_lossy(&output.stderr); - tracing::error!("psql error for {}: {}", cid, err); - } - } - Ok((cid, Err(e))) => { - tracing::error!("psql output error for {}: {}", cid, e); - } - Err(e) => { - tracing::error!("join error: {}", e); - } - } + sqlx::query( + r#" + INSERT INTO chunk_vectors (chunk_id, uuid, chunk_type, embedding) + VALUES ($1, $2, 'sentence', $3::jsonb) + ON CONFLICT (chunk_id) DO UPDATE SET + embedding = EXCLUDED.embedding + "#, + ) + .bind(chunk_id) + .bind(uuid) + .bind(&vector_json) + .execute(&self.pool) + .await?; + tracing::info!("Stored vector for chunk: {}", chunk_id); Ok(()) } @@ -881,8 +2225,8 @@ impl PostgresDb { let query_pattern = format!("%{}%", query); let sql = match chunk_type { - Some(_) => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 AND chunk_type = $2 ORDER BY chunk_index", - None => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id FROM chunks WHERE content->>'text' ILIKE $1 ORDER BY chunk_index", + Some(_) => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id, parent_chunk_id, child_chunk_ids FROM chunks WHERE content->>'text' ILIKE $1 AND chunk_type = $2 ORDER BY chunk_index", + None => "SELECT uuid, chunk_id, chunk_index, chunk_type, start_time, end_time, fps, start_frame, end_frame, content, metadata, vector_id, parent_chunk_id, child_chunk_ids FROM chunks WHERE content->>'text' ILIKE $1 ORDER BY chunk_index", }; let chunks = if let Some(ct) = chunk_type { @@ -901,6 +2245,8 @@ impl PostgresDb { String, Option, Option, + Option, + Vec, ), >(sql) .bind(&query_pattern) @@ -923,6 +2269,8 @@ impl PostgresDb { String, Option, Option, + Option, + Vec, ), >(sql) .bind(&query_pattern) @@ -937,6 +2285,8 @@ impl PostgresDb { "time_based" => ChunkType::TimeBased, "sentence" => ChunkType::Sentence, "cut" => ChunkType::Cut, + "trace" => ChunkType::Trace, + "story" => ChunkType::Story, _ => ChunkType::TimeBased, }; @@ -964,19 +2314,677 @@ impl PostgresDb { vector_id: r.11, frame_count: 0, pre_chunk_ids: vec![], + parent_chunk_id: r.12, + child_chunk_ids: r.13, } }) .collect(); Ok(results) } + + pub async fn search_bm25( + &self, + query: &str, + uuid: Option<&str>, + limit: usize, + ) -> Result> { + let tsquery = self.prepare_tsquery(query)?; + + let sql = match uuid { + Some(_) => { + r#" + SELECT chunk_id, uuid, chunk_index, chunk_type, start_time, end_time, + text_content, ts_rank_cd(search_vector, $1) as bm25_score + FROM chunks + WHERE search_vector @@ $1 AND uuid = $2 + ORDER BY bm25_score DESC + LIMIT $3 + "# + } + None => { + r#" + SELECT chunk_id, uuid, chunk_index, chunk_type, start_time, end_time, + text_content, ts_rank_cd(search_vector, $1) as bm25_score + FROM chunks + WHERE search_vector @@ $1 + ORDER BY bm25_score DESC + LIMIT $2 + "# + } + }; + + let rows = if let Some(uuid) = uuid { + sqlx::query_as::<_, (String, String, i32, String, f64, f64, Option, f32)>(sql) + .bind(&tsquery) + .bind(uuid) + .bind(limit as i64) + .fetch_all(&self.pool) + .await? + } else { + sqlx::query_as::<_, (String, String, i32, String, f64, f64, Option, f32)>(sql) + .bind(&tsquery) + .bind(limit as i64) + .fetch_all(&self.pool) + .await? + }; + + let results: Vec = rows + .into_iter() + .map(|r| Bm25Result { + chunk_id: r.0, + uuid: r.1, + chunk_index: r.2 as u32, + chunk_type: r.3, + start_time: r.4, + end_time: r.5, + text: r.6.unwrap_or_default(), + bm25_score: r.7, + }) + .collect(); + + Ok(results) + } + + pub async fn hybrid_search( + &self, + query: &str, + query_vector: &[f32], + uuid: Option<&str>, + limit: usize, + vector_weight: f32, + bm25_weight: f32, + ) -> Result> { + let bm25_results = self.search_bm25(query, uuid, limit * 2).await?; + + let qdrant = QdrantDb::init().await?; + let vector_results = if let Some(uuid) = uuid { + let query_f64: Vec = query_vector.iter().map(|&x| x as f64).collect(); + qdrant.search_in_uuid(&query_f64, uuid, limit * 2).await? + } else { + qdrant.search(query_vector, limit * 2).await? + }; + + let mut combined: std::collections::HashMap = + std::collections::HashMap::new(); + + let max_bm25 = bm25_results + .first() + .map(|r| r.bm25_score) + .unwrap_or(1.0) + .max(0.001); + for r in &bm25_results { + let normalized_score = r.bm25_score / max_bm25; + let combined_score = (normalized_score * bm25_weight) as f64; + combined.insert( + r.chunk_id.clone(), + HybridSearchResult { + chunk_id: r.chunk_id.clone(), + uuid: r.uuid.clone(), + chunk_index: r.chunk_index, + chunk_type: r.chunk_type.clone(), + start_time: r.start_time, + end_time: r.end_time, + text: r.text.clone(), + vector_score: 0.0, + bm25_score: normalized_score as f64, + combined_score, + }, + ); + } + + let max_vector = vector_results + .first() + .map(|r| r.score) + .unwrap_or(1.0) + .max(0.001); + + let chunk_ids: Vec = vector_results.iter().map(|r| r.chunk_id.clone()).collect(); + let vector_chunks = self.get_chunks_by_ids(&chunk_ids).await?; + let chunk_map: std::collections::HashMap = vector_chunks + .iter() + .map(|c| (c.chunk_id.clone(), c)) + .collect(); + + for r in &vector_results { + let normalized_score = r.score / max_vector; + let combined_score = (normalized_score * vector_weight) as f64; + if let Some(existing) = combined.get_mut(&r.chunk_id) { + existing.vector_score = normalized_score as f64; + existing.combined_score += combined_score; + } else { + let chunk_data = chunk_map.get(&r.chunk_id); + combined.insert( + r.chunk_id.clone(), + HybridSearchResult { + chunk_id: r.chunk_id.clone(), + uuid: chunk_data.map(|c| c.uuid.clone()).unwrap_or_default(), + chunk_index: chunk_data.map(|c| c.chunk_index).unwrap_or(0), + chunk_type: chunk_data + .map(|c| c.chunk_type.as_str().to_string()) + .unwrap_or_default(), + start_time: chunk_data.map(|c| c.start_time).unwrap_or(0.0), + end_time: chunk_data.map(|c| c.end_time).unwrap_or(0.0), + text: chunk_data + .and_then(|c| c.text_content.clone()) + .unwrap_or_default(), + vector_score: normalized_score as f64, + bm25_score: 0.0, + combined_score, + }, + ); + } + } + + let mut results: Vec = combined.into_values().collect(); + results.sort_by(|a, b| { + b.combined_score + .partial_cmp(&a.combined_score) + .unwrap_or(std::cmp::Ordering::Equal) + }); + results.truncate(limit); + + Ok(results) + } + + fn prepare_tsquery_internal(&self, query: &str) -> Result { + let words: Vec = query + .split_whitespace() + .map(|w| { + let cleaned = w + .chars() + .filter(|c| c.is_alphanumeric()) + .collect::(); + if cleaned.is_empty() { + String::new() + } else { + format!("{}:*", cleaned.to_lowercase()) + } + }) + .filter(|w| !w.is_empty()) + .collect(); + + if words.is_empty() { + anyhow::bail!("Query contains no searchable terms"); + } + + Ok(words.join(" & ")) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Bm25Result { + pub chunk_id: String, + pub uuid: String, + pub chunk_index: u32, + pub chunk_type: String, + pub start_time: f64, + pub end_time: f64, + pub text: String, + pub bm25_score: f32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HybridSearchResult { + pub chunk_id: String, + pub uuid: String, + pub chunk_index: u32, + pub chunk_type: String, + pub start_time: f64, + pub end_time: f64, + pub text: String, + pub vector_score: f64, + pub bm25_score: f64, + pub combined_score: f64, +} + +impl PostgresDb { + pub fn prepare_tsquery(&self, query: &str) -> Result { + self.prepare_tsquery_internal(query) + } + + pub async fn get_pending_jobs(&self, limit: i32) -> Result> { + let rows = sqlx::query( + r#" + SELECT id, uuid, video_path, status, current_processor, progress_total, progress_current, + error_count, last_error, started_at, updated_at, created_at + FROM monitor_jobs + WHERE status = 'pending' + ORDER BY created_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + "# + ) + .bind(limit) + .fetch_all(&self.pool) + .await?; + + let jobs: Vec = rows + .into_iter() + .map(|r| { + let status_str: String = r.get(3); + let status = + MonitorJobStatus::from_db_str(&status_str).unwrap_or(MonitorJobStatus::Pending); + MonitorJob { + id: r.get(0), + uuid: r.get(1), + video_path: r.get(2), + status, + current_processor: r.get(4), + progress_total: r.get(5), + progress_current: r.get(6), + error_count: r.get(7), + last_error: r.get(8), + started_at: r.get(9), + updated_at: r.get(10), + created_at: r.get(11), + } + }) + .collect(); + + Ok(jobs) + } + + pub async fn update_job_status(&self, job_id: i32, status: MonitorJobStatus) -> Result<()> { + sqlx::query( + "UPDATE monitor_jobs SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2", + ) + .bind(status.as_str()) + .bind(job_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn update_job_progress( + &self, + job_id: i32, + current_processor: Option<&str>, + progress_current: i32, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE monitor_jobs + SET current_processor = $1, progress_current = $2, updated_at = CURRENT_TIMESTAMP + WHERE id = $3 + "#, + ) + .bind(current_processor) + .bind(progress_current) + .bind(job_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn create_processor_result( + &self, + job_id: i32, + processor_type: ProcessorType, + ) -> Result { + let row = sqlx::query( + r#" + INSERT INTO processor_results (job_id, processor, status) + VALUES ($1, $2, 'pending') + RETURNING id + "#, + ) + .bind(job_id) + .bind(processor_type.as_str()) + .fetch_one(&self.pool) + .await?; + + let id: i32 = row.get(0); + Ok(id) + } + + pub async fn update_processor_result( + &self, + id: i32, + status: ProcessorJobStatus, + error_message: Option<&str>, + output_data: Option<&serde_json::Value>, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE processor_results + SET status = $1, + error_message = $2, + output_data = $3, + completed_at = CASE WHEN $1 IN ('completed', 'failed', 'skipped') THEN CURRENT_TIMESTAMP ELSE completed_at END, + duration_secs = CASE WHEN $1 IN ('completed', 'failed', 'skipped') THEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at)) ELSE duration_secs END, + updated_at = CURRENT_TIMESTAMP + WHERE id = $4 + "#, + ) + .bind(status.as_str()) + .bind(error_message) + .bind(output_data) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn get_processor_results_by_job(&self, job_id: i32) -> Result> { + let rows = sqlx::query( + r#" + SELECT id, job_id, processor, status, started_at, completed_at, duration_secs, + error_message, output_data, retry_count, created_at, updated_at + FROM processor_results + WHERE job_id = $1 + ORDER BY created_at ASC + "#, + ) + .bind(job_id) + .fetch_all(&self.pool) + .await?; + + let results: Vec = rows + .into_iter() + .map(|r| { + let status_str: String = r.get(3); + let processor_type_str: String = r.get(2); + ProcessorResult { + id: r.get(0), + job_id: r.get(1), + processor_type: ProcessorType::from_db_str(&processor_type_str) + .unwrap_or(ProcessorType::Asr), + status: ProcessorJobStatus::from_db_str(&status_str) + .unwrap_or(ProcessorJobStatus::Pending), + started_at: r.get(4), + completed_at: r.get(5), + duration_secs: r.get(6), + error_message: r.get(7), + output_data: r.get(8), + retry_count: r.get(9), + created_at: r.get(10), + updated_at: r.get(11), + } + }) + .collect(); + + Ok(results) + } + + pub async fn get_video_status(&self, uuid: &str) -> Result> { + let result: Option = + sqlx::query_scalar("SELECT status FROM videos WHERE uuid = $1") + .bind(uuid) + .fetch_optional(&self.pool) + .await?; + + Ok(result.and_then(|s| VideoStatus::from_db_str(&s))) + } + + pub async fn update_video_status(&self, uuid: &str, status: VideoStatus) -> Result<()> { + sqlx::query( + "UPDATE videos SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE uuid = $2", + ) + .bind(status.as_str()) + .bind(uuid) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn get_running_job_count(&self) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM monitor_jobs WHERE status = 'running'") + .fetch_one(&self.pool) + .await?; + Ok(count) + } } #[async_trait] impl Database for PostgresDb { async fn init() -> Result { - let database_url = std::env::var("DATABASE_URL") - .unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string()); - Self::new(&database_url).await + Self::new(&crate::core::config::DATABASE_URL).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_video_record_serialization() { + let record = VideoRecord { + id: 1, + uuid: "test-uuid-123".to_string(), + file_path: "/path/to/video.mp4".to_string(), + file_name: "video.mp4".to_string(), + duration: 120.5, + width: 1920, + height: 1080, + fps: 30.0, + probe_json: Some("{}".to_string()), + storage: StorageStatus::default(), + status: VideoStatus::Pending, + user_id: None, + job_id: None, + created_at: "2024-01-01T00:00:00Z".to_string(), + }; + + let json = serde_json::to_string(&record).unwrap(); + assert!(json.contains("test-uuid-123")); + assert!(json.contains("video.mp4")); + } + + #[test] + fn test_chunk_type_serialization() { + let chunk_type = ChunkType::TimeBased; + let json = serde_json::to_string(&chunk_type).unwrap(); + assert!(json.contains("time_based")); + } + + #[test] + fn test_chunk_deserialization() { + let json = r#"{ + "file_id": 1, + "uuid": "test", + "chunk_id": "c1", + "chunk_index": 0, + "chunk_type": "time_based", + "rule": "rule1", + "start_time": 0.0, + "end_time": 10.0, + "fps": 30.0, + "start_frame": 0, + "end_frame": 300, + "content": {}, + "frame_count": 300, + "pre_chunk_ids": [], + "parent_chunk_id": null, + "child_chunk_ids": [] + }"#; + + let chunk: Chunk = serde_json::from_str(json).unwrap(); + assert_eq!(chunk.chunk_id, "c1"); + assert_eq!(chunk.chunk_type, ChunkType::TimeBased); + assert_eq!(chunk.rule, ChunkRule::Rule1); + } + + #[test] + fn test_storage_status_default() { + let status = StorageStatus::default(); + assert!(!status.fs_video); + assert!(!status.fs_json); + } + + #[test] + fn test_monitor_job_status_from_db_str() { + assert_eq!( + MonitorJobStatus::from_db_str("pending"), + Some(MonitorJobStatus::Pending) + ); + assert_eq!( + MonitorJobStatus::from_db_str("running"), + Some(MonitorJobStatus::Running) + ); + assert_eq!( + MonitorJobStatus::from_db_str("completed"), + Some(MonitorJobStatus::Completed) + ); + assert_eq!( + MonitorJobStatus::from_db_str("failed"), + Some(MonitorJobStatus::Failed) + ); + assert_eq!( + MonitorJobStatus::from_db_str("cancelled"), + Some(MonitorJobStatus::Cancelled) + ); + assert_eq!(MonitorJobStatus::from_db_str("unknown"), None); + } + + #[test] + fn test_monitor_job_serialization() { + let job = MonitorJob { + id: 1, + uuid: "test-uuid-456".to_string(), + video_path: Some("/path/to/video.mp4".to_string()), + status: MonitorJobStatus::Running, + current_processor: Some("asr".to_string()), + progress_total: 100, + progress_current: 50, + error_count: 0, + last_error: None, + started_at: Some("2024-01-01T10:00:00Z".to_string()), + updated_at: Some("2024-01-01T10:05:00Z".to_string()), + created_at: "2024-01-01T09:55:00Z".to_string(), + }; + + let json = serde_json::to_string(&job).unwrap(); + assert!(json.contains("test-uuid-456")); + assert!(json.contains("running")); + assert!(json.contains("asr")); + } + + #[test] + fn test_monitor_job_deserialization() { + let json = r#"{ + "id": 2, + "uuid": "abc123", + "video_path": "/video/test.mp4", + "status": "pending", + "current_processor": null, + "progress_total": 0, + "progress_current": 0, + "error_count": 0, + "last_error": null, + "started_at": null, + "updated_at": null, + "created_at": "2024-01-01T00:00:00Z" + }"#; + + let job: MonitorJob = serde_json::from_str(json).unwrap(); + assert_eq!(job.uuid, "abc123"); + assert_eq!(job.status, MonitorJobStatus::Pending); + assert!(job.current_processor.is_none()); + } + + #[test] + fn test_monitor_job_stats_serialization() { + let stats = MonitorJobStats { + pending: 5, + running: 2, + completed: 100, + failed: 3, + }; + + let json = serde_json::to_string(&stats).unwrap(); + assert!(json.contains("\"pending\":5")); + assert!(json.contains("\"running\":2")); + assert!(json.contains("\"completed\":100")); + assert!(json.contains("\"failed\":3")); + } + + #[test] + fn test_bm25_result_serialization() { + let result = Bm25Result { + chunk_id: "sentence_001".to_string(), + uuid: "test-uuid".to_string(), + chunk_index: 1, + chunk_type: "sentence".to_string(), + start_time: 0.0, + end_time: 5.0, + text: "Hello world".to_string(), + bm25_score: 0.75, + }; + + let json = serde_json::to_string(&result).unwrap(); + assert!(json.contains("sentence_001")); + assert!(json.contains("Hello world")); + assert!(json.contains("0.75")); + } + + #[test] + fn test_hybrid_search_result_serialization() { + let result = HybridSearchResult { + chunk_id: "sentence_001".to_string(), + uuid: "test-uuid".to_string(), + chunk_index: 1, + chunk_type: "sentence".to_string(), + start_time: 0.0, + end_time: 5.0, + text: "Hello world".to_string(), + vector_score: 0.85, + bm25_score: 0.75, + combined_score: 0.80, + }; + + let json = serde_json::to_string(&result).unwrap(); + assert!(json.contains("sentence_001")); + assert!(json.contains("0.85")); + assert!(json.contains("0.75")); + assert!(json.contains("0.8")); + } + + fn prepare_tsquery_test_helper(query: &str) -> Result { + let words: Vec = query + .split_whitespace() + .map(|w| { + let cleaned = w + .chars() + .filter(|c| c.is_alphanumeric()) + .collect::(); + if cleaned.is_empty() { + String::new() + } else { + format!("{}:*", cleaned.to_lowercase()) + } + }) + .filter(|w| !w.is_empty()) + .collect(); + + if words.is_empty() { + anyhow::bail!("Query contains no searchable terms"); + } + + Ok(words.join(" & ")) + } + + #[test] + fn test_prepare_tsquery() { + let query = "Hello World"; + let tsquery = prepare_tsquery_test_helper(query).unwrap(); + assert!(tsquery.contains("hello:*")); + assert!(tsquery.contains("world:*")); + assert!(tsquery.contains(" & ")); + } + + #[test] + fn test_prepare_tsquery_single_word() { + let query = "search"; + let tsquery = prepare_tsquery_test_helper(query).unwrap(); + assert_eq!(tsquery, "search:*"); + } + + #[test] + fn test_prepare_tsquery_empty_result() { + let query = " !!! "; + let result = prepare_tsquery_test_helper(query); + assert!(result.is_err()); } } diff --git a/src/core/db/redis_client.rs b/src/core/db/redis_client.rs index 98b0069..a3023f4 100644 --- a/src/core/db/redis_client.rs +++ b/src/core/db/redis_client.rs @@ -336,7 +336,7 @@ impl RedisClient { pub async fn update_worker_job_status( &self, uuid: &str, - job_id: i64, + job_id: i32, status: &str, current_processor: Option<&str>, progress: i32, @@ -401,7 +401,7 @@ impl RedisClient { } let status: String = conn.hget(&key, "status").await?; - let job_id: i64 = conn.hget(&key, "job_id").await?; + let job_id: i32 = conn.hget(&key, "job_id").await?; let current_processor: String = conn.hget(&key, "current_processor").await?; let progress_current: i32 = conn.hget(&key, "progress_current").await?; let progress_total: i32 = conn.hget(&key, "progress_total").await?; @@ -538,7 +538,7 @@ pub struct AnomalyAlertMessage { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerJobStatus { pub uuid: String, - pub job_id: i64, + pub job_id: i32, pub status: String, pub current_processor: String, pub progress_current: i32, @@ -549,7 +549,7 @@ pub struct WorkerJobStatus { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerJobInfo { pub uuid: String, - pub job_id: i64, + pub job_id: i32, pub status: String, pub progress_current: i32, pub progress_total: i32, diff --git a/src/core/storage/uuid.rs b/src/core/storage/uuid.rs index 649b79e..13f36ee 100644 --- a/src/core/storage/uuid.rs +++ b/src/core/storage/uuid.rs @@ -25,6 +25,41 @@ pub fn compute_uuid_from_path(full_path: &str) -> String { compute_uuid(&parent, &filename) } +/// Extract relative path from full path given data root +/// Returns (relative_path, filename) +pub fn extract_relative_path(full_path: &str, data_root: &str) -> (String, String) { + let full_path = PathBuf::from(full_path); + let data_root = PathBuf::from(data_root); + + // Canonicalize both paths + let full_canonical = full_path.canonicalize().unwrap_or(full_path.clone()); + let root_canonical = data_root.canonicalize().unwrap_or(data_root.clone()); + + // Try to strip the data root prefix + let relative = full_canonical + .strip_prefix(&root_canonical) + .unwrap_or(&full_canonical); + + // Separate into parent directory and filename + let filename = relative + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_default(); + + let parent = relative + .parent() + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_default(); + + (parent, filename) +} + +/// Compute UUID from full path using data root for relative path extraction +pub fn compute_uuid_from_path_with_root(full_path: &str, data_root: &str) -> String { + let (parent, filename) = extract_relative_path(full_path, data_root); + compute_uuid(&parent, &filename) +} + #[cfg(test)] mod tests { use super::*; @@ -41,4 +76,26 @@ mod tests { let uuid = compute_uuid_from_path("/Users/test/Videos/video.mp4"); assert_eq!(uuid.len(), 16); } + + #[test] + fn test_relative_path_extraction() { + let (parent, filename) = + extract_relative_path("/data/sftpgo/data/demo/video.mp4", "/data/sftpgo/data"); + assert_eq!(parent, "demo"); + assert_eq!(filename, "video.mp4"); + } + + #[test] + fn test_uuid_with_data_root() { + let uuid1 = compute_uuid_from_path_with_root( + "/data/sftpgo/data/demo/video.mp4", + "/data/sftpgo/data", + ); + let uuid2 = compute_uuid_from_path_with_root( + "/data/sftpgo/data/demo/video.mp4", + "/data/sftpgo/data", + ); + assert_eq!(uuid1, uuid2); + assert_eq!(uuid1.len(), 16); + } } diff --git a/src/worker/config.rs b/src/worker/config.rs new file mode 100644 index 0000000..31e6fbd --- /dev/null +++ b/src/worker/config.rs @@ -0,0 +1,77 @@ +use once_cell::sync::Lazy; +use std::env; + +#[derive(Debug, Clone)] +pub struct WorkerConfig { + pub max_concurrent: usize, + pub poll_interval_secs: u64, + pub enabled: bool, + pub batch_size: i32, + pub processor_timeout_secs: u64, +} + +impl Default for WorkerConfig { + fn default() -> Self { + Self { + max_concurrent: Self::max_concurrent_from_env(), + poll_interval_secs: Self::poll_interval_from_env(), + enabled: Self::enabled_from_env(), + batch_size: Self::batch_size_from_env(), + processor_timeout_secs: Self::timeout_from_env(), + } + } +} + +impl WorkerConfig { + fn max_concurrent_from_env() -> usize { + env::var("MOMENTRY_MAX_CONCURRENT") + .unwrap_or_else(|_| "2".to_string()) + .parse() + .unwrap_or(2) + } + + fn poll_interval_from_env() -> u64 { + env::var("MOMENTRY_POLL_INTERVAL") + .unwrap_or_else(|_| "5".to_string()) + .parse() + .unwrap_or(5) + } + + fn enabled_from_env() -> bool { + env::var("MOMENTRY_WORKER_ENABLED") + .unwrap_or_else(|_| "true".to_string()) + .parse() + .unwrap_or(true) + } + + fn batch_size_from_env() -> i32 { + env::var("MOMENTRY_WORKER_BATCH_SIZE") + .unwrap_or_else(|_| "10".to_string()) + .parse() + .unwrap_or(10) + } + + fn timeout_from_env() -> u64 { + env::var("MOMENTRY_WORKER_PROCESSOR_TIMEOUT") + .unwrap_or_else(|_| "3600".to_string()) + .parse() + .unwrap_or(3600) + } +} + +pub static WORKER_CONFIG: Lazy = Lazy::new(WorkerConfig::default); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_worker_config_default() { + let config = WorkerConfig::default(); + assert_eq!(config.max_concurrent, 2); + assert_eq!(config.poll_interval_secs, 5); + assert!(config.enabled); + assert_eq!(config.batch_size, 10); + assert_eq!(config.processor_timeout_secs, 3600); + } +} diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs new file mode 100644 index 0000000..cbb2c39 --- /dev/null +++ b/src/worker/job_worker.rs @@ -0,0 +1,184 @@ +use anyhow::Result; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{error, info, warn}; + +use crate::core::db::{MonitorJobStatus, PostgresDb, ProcessorType, RedisClient}; +use crate::worker::config::WorkerConfig; +use crate::worker::processor::{ProcessorPool, ProcessorTask}; + +pub struct JobWorker { + db: Arc, + redis: Arc, + config: WorkerConfig, + processor_pool: ProcessorPool, +} + +impl JobWorker { + pub fn new(db: Arc, redis: Arc, config: WorkerConfig) -> Self { + let processor_pool = ProcessorPool::new(db.clone(), redis.clone(), config.max_concurrent); + + Self { + db, + redis, + config, + processor_pool, + } + } + + pub async fn run(&self) -> Result<()> { + info!( + "Job Worker started with config: max_concurrent={}, poll_interval={}s", + self.config.max_concurrent, self.config.poll_interval_secs + ); + + loop { + if !self.config.enabled { + warn!("Worker is disabled, sleeping for 60s..."); + sleep(Duration::from_secs(60)).await; + continue; + } + + if let Err(e) = self.poll_and_process().await { + error!("Error during poll and process: {}", e); + } + + sleep(Duration::from_secs(self.config.poll_interval_secs)).await; + } + } + + async fn poll_and_process(&self) -> Result<()> { + let pending_jobs = self.db.get_pending_jobs(self.config.batch_size).await?; + + if pending_jobs.is_empty() { + return Ok(()); + } + + info!("Found {} pending jobs", pending_jobs.len()); + + for job in pending_jobs { + if !self.processor_pool.can_start().await { + info!("Max concurrent processors reached, waiting..."); + break; + } + + if let Err(e) = self.process_job(job).await { + error!("Failed to process job: {}", e); + } + } + + Ok(()) + } + + async fn process_job(&self, job: crate::core::db::MonitorJob) -> Result<()> { + info!("Processing job: {} ({})", job.uuid, job.id); + + let total_processors = ProcessorType::all().len() as i32; + + self.db + .update_job_status(job.id, MonitorJobStatus::Running) + .await?; + + self.redis + .update_worker_job_status(&job.uuid, job.id, "running", None, 0, total_processors) + .await?; + + for processor_type in ProcessorType::all() { + let processor_result_id = self + .db + .create_processor_result(job.id, processor_type) + .await?; + + self.redis + .update_worker_processor_status(&job.uuid, processor_type.as_str(), "pending", None) + .await?; + + let task = ProcessorTask { + job: job.clone(), + processor_type, + processor_result_id, + }; + + self.processor_pool.start_processor(task).await?; + } + + self.check_and_complete_job(job.id, &job.uuid).await?; + + Ok(()) + } + + async fn check_and_complete_job(&self, job_id: i32, uuid: &str) -> Result<()> { + let results = self.db.get_processor_results_by_job(job_id).await?; + + let all_completed = results.iter().all(|r| { + matches!( + r.status, + crate::core::db::ProcessorJobStatus::Completed + | crate::core::db::ProcessorJobStatus::Skipped + ) + }); + + let any_failed = results + .iter() + .any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Failed)); + + let completed_count = results + .iter() + .filter(|r| { + matches!( + r.status, + crate::core::db::ProcessorJobStatus::Completed + | crate::core::db::ProcessorJobStatus::Skipped + ) + }) + .count() as i32; + + if all_completed && !any_failed { + self.db + .update_job_status(job_id, MonitorJobStatus::Completed) + .await?; + + self.redis + .update_worker_job_status(uuid, job_id, "completed", None, completed_count, 7) + .await?; + + self.redis.delete_worker_job(uuid).await?; + + info!("Job {} completed successfully", job_id); + } else if any_failed { + self.db + .update_job_status(job_id, MonitorJobStatus::Failed) + .await?; + + self.redis + .update_worker_job_status(uuid, job_id, "failed", None, completed_count, 7) + .await?; + + warn!("Job {} completed with failures", job_id); + } else { + self.redis + .update_worker_job_status(uuid, job_id, "running", None, completed_count, 7) + .await?; + } + + Ok(()) + } + + pub async fn shutdown(&self) { + info!("Shutting down worker..."); + self.processor_pool.cancel_all().await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_worker_config() { + let config = WorkerConfig::default(); + assert!(config.enabled); + assert!(config.max_concurrent >= 1); + } +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs new file mode 100644 index 0000000..86b4cdf --- /dev/null +++ b/src/worker/mod.rs @@ -0,0 +1,6 @@ +pub mod config; +pub mod job_worker; +pub mod processor; + +pub use config::WorkerConfig; +pub use job_worker::JobWorker; diff --git a/src/worker/processor.rs b/src/worker/processor.rs new file mode 100644 index 0000000..9d1190e --- /dev/null +++ b/src/worker/processor.rs @@ -0,0 +1,354 @@ +use anyhow::{Context, Result}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tracing::{error, info}; + +use crate::core::db::RedisClient; +use crate::core::db::{MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType}; + +#[derive(Debug, Clone)] +pub struct ProcessorTask { + pub job: MonitorJob, + pub processor_type: ProcessorType, + pub processor_result_id: i32, +} + +pub struct ProcessorPool { + db: Arc, + redis: Arc, + max_concurrent: usize, + running: Arc>>, + running_count: Arc>, +} + +struct ProcessorHandle { + #[allow(dead_code)] + processor_type: ProcessorType, + cancel_tx: mpsc::Sender<()>, +} + +impl ProcessorPool { + pub fn new(db: Arc, redis: Arc, max_concurrent: usize) -> Self { + Self { + db, + redis, + max_concurrent, + running: Arc::new(RwLock::new(HashMap::new())), + running_count: Arc::new(RwLock::new(0)), + } + } + + pub async fn can_start(&self) -> bool { + let count = *self.running_count.read().await; + count < self.max_concurrent + } + + pub async fn start_processor(&self, task: ProcessorTask) -> Result<()> { + let (cancel_tx, cancel_rx) = mpsc::channel(1); + let job_id = task.job.id; + let processor_type = task.processor_type; + + { + let mut count = self.running_count.write().await; + if *count >= self.max_concurrent { + anyhow::bail!("Max concurrent processors reached"); + } + *count += 1; + } + + let running = self.running.clone(); + let running_count = self.running_count.clone(); + running.write().await.insert( + job_id, + ProcessorHandle { + processor_type, + cancel_tx, + }, + ); + + let db = self.db.clone(); + let redis = self.redis.clone(); + let job = task.job.clone(); + let processor_result_id = task.processor_result_id; + let processor_name = processor_type.as_str().to_string(); + + tokio::spawn(async move { + info!("Starting processor {} for job {}", processor_name, job.uuid); + + let _ = db + .update_processor_result( + processor_result_id, + ProcessorJobStatus::Running, + None, + None, + ) + .await; + + let _ = redis + .update_worker_processor_status(&job.uuid, &processor_name, "running", None) + .await; + + let result = Self::run_processor(&db, &redis, &job, processor_type, cancel_rx).await; + + { + let mut running_guard = running.write().await; + running_guard.remove(&job_id); + let mut count_guard = running_count.write().await; + *count_guard -= 1; + } + + match result { + Ok(output) => { + info!( + "Processor {} completed for job {}", + processor_name, job.uuid + ); + let _ = db + .update_processor_result( + processor_result_id, + ProcessorJobStatus::Completed, + None, + Some(&output), + ) + .await; + + let _ = redis + .update_worker_processor_status( + &job.uuid, + &processor_name, + "completed", + None, + ) + .await; + } + Err(e) => { + error!( + "Processor {} failed for job {}: {}", + processor_name, job.uuid, e + ); + let _ = db + .update_processor_result( + processor_result_id, + ProcessorJobStatus::Failed, + Some(&e.to_string()), + None, + ) + .await; + + let _ = redis + .update_worker_processor_status( + &job.uuid, + &processor_name, + "failed", + Some(&e.to_string()), + ) + .await; + } + } + }); + + Ok(()) + } + + async fn run_processor( + db: &PostgresDb, + redis: &RedisClient, + job: &MonitorJob, + processor_type: ProcessorType, + mut cancel_rx: mpsc::Receiver<()>, + ) -> Result { + let video_path = job.video_path.as_ref().context("No video path in job")?; + + match processor_type { + ProcessorType::Asr => Self::run_asr(db, redis, video_path, &mut cancel_rx).await, + ProcessorType::Cut => Self::run_cut(db, redis, video_path, &mut cancel_rx).await, + ProcessorType::Yolo => Self::run_yolo(db, redis, video_path, &mut cancel_rx).await, + ProcessorType::Ocr => Self::run_ocr(db, redis, video_path, &mut cancel_rx).await, + ProcessorType::Face => Self::run_face(db, redis, video_path, &mut cancel_rx).await, + ProcessorType::Pose => Self::run_pose(db, redis, video_path, &mut cancel_rx).await, + ProcessorType::Asrx => Self::run_asrx(db, redis, video_path, &mut cancel_rx).await, + } + } + + async fn run_asr( + _db: &PostgresDb, + _redis: &RedisClient, + video_path: &str, + _cancel_rx: &mut mpsc::Receiver<()>, + ) -> Result { + let script_path = std::env::var("MOMENTRY_ASR_SCRIPT") + .unwrap_or_else(|_| "/Users/accusys/momentry/scripts/asr.py".to_string()); + + let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11") + .arg(&script_path) + .arg(video_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("ASR script failed: {}", stderr); + } + + let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; + Ok(result) + } + + async fn run_cut( + _db: &PostgresDb, + _redis: &RedisClient, + video_path: &str, + _cancel_rx: &mut mpsc::Receiver<()>, + ) -> Result { + let script_path = std::env::var("MOMENTRY_CUT_SCRIPT") + .unwrap_or_else(|_| "/Users/accusys/momentry/scripts/cut.py".to_string()); + + let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11") + .arg(&script_path) + .arg(video_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("CUT script failed: {}", stderr); + } + + let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; + Ok(result) + } + + async fn run_yolo( + _db: &PostgresDb, + _redis: &RedisClient, + video_path: &str, + _cancel_rx: &mut mpsc::Receiver<()>, + ) -> Result { + let script_path = std::env::var("MOMENTRY_YOLO_SCRIPT") + .unwrap_or_else(|_| "/Users/accusys/momentry/scripts/yolo_processor.py".to_string()); + + let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11") + .arg(&script_path) + .arg(video_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("YOLO script failed: {}", stderr); + } + + let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; + Ok(result) + } + + async fn run_ocr( + _db: &PostgresDb, + _redis: &RedisClient, + video_path: &str, + _cancel_rx: &mut mpsc::Receiver<()>, + ) -> Result { + let script_path = std::env::var("MOMENTRY_OCR_SCRIPT") + .unwrap_or_else(|_| "/Users/accusys/momentry/scripts/ocr.py".to_string()); + + let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11") + .arg(&script_path) + .arg(video_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("OCR script failed: {}", stderr); + } + + let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; + Ok(result) + } + + async fn run_face( + _db: &PostgresDb, + _redis: &RedisClient, + video_path: &str, + _cancel_rx: &mut mpsc::Receiver<()>, + ) -> Result { + let script_path = std::env::var("MOMENTRY_FACE_SCRIPT") + .unwrap_or_else(|_| "/Users/accusys/momentry/scripts/face.py".to_string()); + + let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11") + .arg(&script_path) + .arg(video_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Face script failed: {}", stderr); + } + + let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; + Ok(result) + } + + async fn run_pose( + _db: &PostgresDb, + _redis: &RedisClient, + video_path: &str, + _cancel_rx: &mut mpsc::Receiver<()>, + ) -> Result { + let script_path = std::env::var("MOMENTRY_POSE_SCRIPT") + .unwrap_or_else(|_| "/Users/accusys/momentry/scripts/pose.py".to_string()); + + let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11") + .arg(&script_path) + .arg(video_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Pose script failed: {}", stderr); + } + + let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; + Ok(result) + } + + async fn run_asrx( + _db: &PostgresDb, + _redis: &RedisClient, + video_path: &str, + _cancel_rx: &mut mpsc::Receiver<()>, + ) -> Result { + let script_path = std::env::var("MOMENTRY_ASRX_SCRIPT") + .unwrap_or_else(|_| "/Users/accusys/momentry/scripts/asrx.py".to_string()); + + let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11") + .arg(&script_path) + .arg(video_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("ASRX script failed: {}", stderr); + } + + let result: serde_json::Value = serde_json::from_slice(&output.stdout)?; + Ok(result) + } + + pub async fn get_running_count(&self) -> usize { + *self.running_count.read().await + } + + pub async fn cancel_all(&self) { + let mut running = self.running.write().await; + for (_, handle) in running.drain() { + let _ = handle.cancel_tx.send(()).await; + } + let mut count = self.running_count.write().await; + *count = 0; + } +}