use axum::{ body::Body, extract::{Path, Query, State}, http::{header, StatusCode}, response::{IntoResponse, Json, Response}, routing::{get, post}, Router, }; use serde::{Deserialize, Serialize}; use crate::core::db::PostgresDb; pub fn trace_agent_routes() -> Router { Router::new() .route("/api/v1/file/:file_uuid/traces", post(list_traces_sorted)) .route( "/api/v1/file/:file_uuid/trace/:trace_id/faces", get(list_trace_faces), ) .route( "/api/v1/file/:file_uuid/trace/:trace_id/representative-face", get(get_representative_face), ) .route( "/api/v1/file/:file_uuid/trace/:trace_id/thumbnail", get(get_trace_thumbnail), ) .route( "/api/v1/file/:file_uuid/stranger/:stranger_id/representative-face", get(get_stranger_representative_face), ) .route( "/api/v1/file/:file_uuid/stranger/:stranger_id/thumbnail", get(get_stranger_thumbnail), ) .route( "/api/v1/file/:file_uuid/identities/:identity_uuid_a/co-occur-with/:identity_uuid_b", get(get_cooccurrence), ) .route("/api/v1/file/:file_uuid/tkg/rebuild", post(rebuild_tkg)) .route("/api/v1/file/:file_uuid/rule2", post(ingest_rule2)) .route( "/api/v1/file/:file_uuid/representative-frame", get(get_representative_frame), ) .route("/api/v1/file/:file_uuid/tkg/nodes", post(query_tkg_nodes)) .route("/api/v1/file/:file_uuid/tkg/edges", post(query_tkg_edges)) .route( "/api/v1/file/:file_uuid/tkg/node/:node_id", get(get_tkg_node_detail), ) } #[derive(Debug, Deserialize)] struct TracesRequest { min_faces: Option, sort_by: Option, page: Option, page_size: Option, limit: Option, min_confidence: Option, max_confidence: Option, } #[derive(Debug, Serialize)] struct TraceInfo { trace_id: i32, face_count: i64, start_frame: i64, end_frame: i64, start_time: f64, end_time: f64, duration_sec: f64, avg_confidence: f64, sample_face_id: Option, } #[derive(Debug, Serialize)] struct TracesResponse { success: bool, file_uuid: String, fps: f64, total_traces: i64, total_faces: i64, page: i64, page_size: i64, traces: Vec, } async fn list_traces_sorted( State(state): State, Path(file_uuid): Path, Json(req): Json, ) -> Result, (StatusCode, String)> { let min_faces = req.min_faces.unwrap_or(1); let sort = req.sort_by.as_deref().unwrap_or("first_appearance"); let page = req.page.unwrap_or(1).max(1); let page_size = req.page_size.unwrap_or(50).max(1).min(500); let hard_limit = req.limit.unwrap_or(500); let effective_limit = hard_limit.min(page_size); let db_offset = (page - 1) * page_size; let min_confidence = req.min_confidence.unwrap_or(0.0); let max_confidence = req.max_confidence.unwrap_or(1.0); let order_clause = match sort { "face_count" => "face_count DESC", "duration" => "duration_sec DESC", _ => "start_frame ASC", }; let fps: f64 = sqlx::query_scalar(&format!( "SELECT COALESCE(fps, 24.0) FROM {} WHERE file_uuid = $1", crate::core::db::schema::table_name("videos") )) .bind(&file_uuid) .fetch_optional(state.db.pool()) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .unwrap_or(24.0); let query = format!( "SELECT tt.*, fd.id AS sample_face_id FROM ( SELECT trace_id::int AS trace_id, COUNT(*) AS face_count, MIN(frame_number)::bigint AS start_frame, MAX(frame_number)::bigint AS end_frame, (MAX(frame_number) - MIN(frame_number))::float8 AS duration_sec, AVG(confidence)::float8 AS avg_confidence FROM {} WHERE file_uuid = $1 AND trace_id IS NOT NULL AND confidence >= $5 AND confidence <= $6 GROUP BY trace_id HAVING COUNT(*) >= $2 ORDER BY {} LIMIT $3 OFFSET $4 ) tt LEFT JOIN LATERAL ( SELECT id FROM {} WHERE trace_id = tt.trace_id AND file_uuid = $1 ORDER BY confidence DESC LIMIT 1 ) fd ON true", crate::core::db::schema::table_name("face_detections"), order_clause, crate::core::db::schema::table_name("face_detections"), ); let rows: Vec<(i32, i64, i64, i64, f64, f64, Option)> = sqlx::query_as(&query) .bind(&file_uuid) .bind(min_faces) .bind(effective_limit) .bind(db_offset) .bind(min_confidence) .bind(max_confidence) .fetch_all(state.db.pool()) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let traces: Vec = rows .into_iter() .map(|(tid, fc, sf, ef, dur, conf, fid)| TraceInfo { trace_id: tid, face_count: fc, start_frame: sf, end_frame: ef, start_time: sf as f64 / fps, end_time: ef as f64 / fps, duration_sec: dur / fps, avg_confidence: conf, sample_face_id: fid.map(|v| v.to_string()), }) .collect(); let (total_traces, total_faces): (i64, i64) = sqlx::query_as( &format!("SELECT COUNT(DISTINCT trace_id), COUNT(*) FROM {} WHERE file_uuid = $1 AND trace_id IS NOT NULL", crate::core::db::schema::table_name("face_detections")) ) .bind(&file_uuid) .fetch_one(state.db.pool()) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; Ok(Json(TracesResponse { success: true, file_uuid, fps, total_traces, total_faces, page, page_size, traces, })) } // ── Individual face detections for a trace ── #[derive(Debug, Deserialize)] struct TraceFacesQuery { page: Option, page_size: Option, limit: Option, offset: Option, interpolate: Option, } #[derive(Debug, Serialize)] struct TraceFaceItem { id: i32, start_frame: i64, end_frame: i64, start_time: f64, end_time: f64, x: Option, y: Option, width: Option, height: Option, confidence: f64, interpolated: bool, } #[derive(Debug, Serialize)] struct TraceFacesResponse { success: bool, file_uuid: String, trace_id: i32, fps: f64, total: i64, faces: Vec, } fn lerp_i32(a: Option, b: Option, t: f64) -> Option { match (a, b) { (Some(av), Some(bv)) => Some((av as f64 + (bv - av) as f64 * t).round() as i32), _ => None, } } async fn list_trace_faces( State(state): State, Path((file_uuid, trace_id)): Path<(String, i32)>, Query(q): Query, ) -> Result, (StatusCode, String)> { let limit = q.limit.unwrap_or(200).min(1000); // Support both page/page_size and offset; page/page_size takes precedence let offset = if q.page.is_some() || q.page_size.is_some() { let p = q.page.unwrap_or(1).max(1); let ps = q.page_size.unwrap_or(200).max(1).min(1000); (p - 1) * ps } else { q.offset.unwrap_or(0) }; let interpolate = q.interpolate.unwrap_or(false); let fps: f64 = sqlx::query_scalar(&format!( "SELECT COALESCE(fps, 24.0) FROM {} WHERE file_uuid = $1", crate::core::db::schema::table_name("videos") )) .bind(&file_uuid) .fetch_optional(state.db.pool()) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .unwrap_or(24.0); let total_detected: i64 = sqlx::query_scalar(&format!( "SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND trace_id = $2", crate::core::db::schema::table_name("face_detections") )) .bind(&file_uuid) .bind(trace_id) .fetch_one(state.db.pool()) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let rows: Vec<( i32, i64, Option, Option, Option, Option, f32, )> = sqlx::query_as(&format!( "SELECT id, frame_number, x, y, width, height, confidence::float4 \ FROM {} WHERE file_uuid = $1 AND trace_id = $2 \ ORDER BY frame_number ASC LIMIT $3 OFFSET $4", crate::core::db::schema::table_name("face_detections") )) .bind(&file_uuid) .bind(trace_id) .bind(limit) .bind(offset) .fetch_all(state.db.pool()) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let mut faces: Vec = Vec::new(); for (i, (id, frame, x, y, w, h, conf)) in rows.iter().enumerate() { let cur = (x, y, w, h); // Add interpolated frames between previous and current detection if interpolate && i > 0 { let prev = &rows[i - 1]; let prev_frame = prev.1; let gap = frame - prev_frame; if gap > 1 { for mid in 1..gap { let t = mid as f64 / gap as f64; let mid_x = lerp_i32(prev.2, *x, t); let mid_y = lerp_i32(prev.3, *y, t); let mid_w = lerp_i32(prev.4, *w, t); let mid_h = lerp_i32(prev.5, *h, t); let mid_frame = prev_frame + mid; let mt = (mid_frame as f64 / fps * 10.0).round() / 10.0; faces.push(TraceFaceItem { id: 0, start_frame: mid_frame, end_frame: mid_frame, start_time: mt, end_time: mt, x: mid_x, y: mid_y, width: mid_w, height: mid_h, confidence: 0.0, interpolated: true, }); } } } // Add the real detection let frame_val = *frame; let ft = (frame_val as f64 / fps * 10.0).round() / 10.0; faces.push(TraceFaceItem { id: *id, start_frame: frame_val, end_frame: frame_val, start_time: ft, end_time: ft, x: *x, y: *y, width: *w, height: *h, confidence: *conf as f64, interpolated: false, }); } let total = if interpolate && faces.len() as i64 > total_detected { faces.len() as i64 } else { total_detected }; Ok(Json(TraceFacesResponse { success: true, file_uuid, trace_id, fps, total, faces, })) } #[derive(Debug, Serialize)] struct RepFaceBbox { x: i32, y: i32, width: i32, height: i32, } #[derive(Debug, Serialize)] struct RepFaceResult { frame_number: i64, timestamp_secs: f64, bbox: RepFaceBbox, confidence: f64, quality_score: f64, blur_score: f64, } #[derive(Debug, Serialize)] struct RepFaceResponse { success: bool, file_uuid: String, trace_id: i32, face_count: i64, representative: RepFaceResult, } struct RepFaceSelection { frame: i64, x: i32, y: i32, w: i32, h: i32, conf: f64, blur: f64, score: f64, video_path: String, fps: f64, face_count: i64, } async fn select_rep_face( pool: &sqlx::PgPool, file_uuid: &str, trace_id: i32, err_fn: F, ) -> Result where F: Fn(anyhow::Error) -> T, { use crate::core::db::schema; let fd_table = schema::table_name("face_detections"); let video_table = schema::table_name("videos"); let fps: f64 = sqlx::query_scalar(&format!( "SELECT COALESCE(fps, 25.0) FROM {} WHERE file_uuid = $1", video_table )) .bind(file_uuid) .fetch_optional(pool) .await .map_err(|e| err_fn(anyhow::anyhow!("{}", e)))? .unwrap_or(25.0); let face_count: (i64,) = sqlx::query_as(&format!( "SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND trace_id = $2", fd_table )) .bind(file_uuid) .bind(trace_id) .fetch_one(pool) .await .map_err(|e| err_fn(anyhow::anyhow!("{}", e)))?; struct Candidate { frame: i64, x: i32, y: i32, w: i32, h: i32, conf: f64, score: f64, } let rows = sqlx::query_as::<_, (i64, i32, i32, i32, i32, f64)>(&format!( "SELECT frame_number::bigint, x, y, width, height, confidence::float8 \ FROM {} WHERE file_uuid = $1 AND trace_id = $2 AND confidence > 0.7 \ AND ((metadata->>'qc_ok')::boolean IS NULL OR (metadata->>'qc_ok')::boolean = true) \ ORDER BY (width::float8 * height::float8) * confidence::float8 DESC LIMIT 10", fd_table )) .bind(file_uuid) .bind(trace_id) .fetch_all(pool) .await .map_err(|e| err_fn(anyhow::anyhow!("{}", e)))?; if rows.is_empty() { return Err(err_fn(anyhow::anyhow!("No suitable face found"))); } let candidates: Vec = rows .into_iter() .map(|(frame, x, y, w, h, conf)| { let score = (w as f64 * h as f64) * conf; Candidate { frame, x, y, w, h, conf, score, } }) .collect(); let video_path: String = sqlx::query_scalar(&format!( "SELECT file_path FROM {} WHERE file_uuid = $1", video_table )) .bind(file_uuid) .fetch_optional(pool) .await .map_err(|e| err_fn(anyhow::anyhow!("{}", e)))? .ok_or_else(|| err_fn(anyhow::anyhow!("Video not found")))?; let mut best = candidates[0].frame; let mut best_blur = f64::MAX; let mut best_idx = 0usize; for (i, c) in candidates.iter().enumerate() { let seek = c.frame as f64 / fps; if let Ok(output) = tokio::process::Command::new("ffmpeg") .args([ "-ss", &format!("{:.2}", seek), "-i", &video_path, "-vframes", "1", "-vf", &format!("crop={}:{}:{}:{},blurdetect", c.w, c.h, c.x, c.y), "-f", "null", "-", ]) .output() .await { let stderr = String::from_utf8_lossy(&output.stderr); for line in stderr.lines() { if let Some(blur_str) = line.split("blur mean: ").nth(1) { if let Ok(blur) = blur_str.trim().parse::() { if blur < best_blur { best_blur = blur; best = c.frame; best_idx = i; } } } } } } let chosen = &candidates[best_idx]; Ok(RepFaceSelection { frame: chosen.frame, x: chosen.x, y: chosen.y, w: chosen.w, h: chosen.h, conf: chosen.conf, blur: best_blur, score: chosen.score, video_path, fps, face_count: face_count.0, }) } async fn get_representative_face( State(state): State, Path((file_uuid, trace_id)): Path<(String, i32)>, ) -> Result, (StatusCode, Json)> { get_representative_face_inner(&state, &file_uuid, trace_id).await } async fn get_representative_face_inner( state: &crate::api::types::AppState, file_uuid: &str, trace_id: i32, ) -> Result, (StatusCode, Json)> { let sel = select_rep_face(state.db.pool(), file_uuid, trace_id, |e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) }) .await?; Ok(Json(RepFaceResponse { success: true, file_uuid: file_uuid.to_string(), trace_id, face_count: sel.face_count, representative: RepFaceResult { frame_number: sel.frame, timestamp_secs: sel.frame as f64 / sel.fps, bbox: RepFaceBbox { x: sel.x, y: sel.y, width: sel.w, height: sel.h, }, confidence: sel.conf, quality_score: sel.score, blur_score: sel.blur, }, })) } async fn get_trace_thumbnail( State(state): State, Path((file_uuid, trace_id)): Path<(String, i32)>, ) -> Result)> { get_trace_thumbnail_inner(&state, &file_uuid, trace_id).await } async fn get_trace_thumbnail_inner( state: &crate::api::types::AppState, file_uuid: &str, trace_id: i32, ) -> Result)> { // Step 1: Check for pre-stored face crops in .faces/{file_uuid}/{trace_id}/ // For trace_id=0 (untracked/stranger), check unbound directory instead let output_dir = crate::core::config::OUTPUT_DIR.as_str(); let trace_id_str = trace_id.to_string(); let trace_dir_name = if trace_id == 0 { "unbound" } else { &trace_id_str }; let trace_dir = std::path::PathBuf::from(output_dir) .join(".faces") .join(&file_uuid) .join(trace_dir_name); if trace_dir.exists() { // Find any cached face crop in this trace directory if let Ok(mut entries) = std::fs::read_dir(&trace_dir) { while let Some(Ok(entry)) = entries.next() { let path = entry.path(); if path.extension().map_or(false, |e| e == "jpg") { tracing::info!( "[trace_thumbnail] Using cached face crop: {}", path.display() ); let bytes = tokio::fs::read(&path).await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })?; // Validate cached JPEG crate::core::thumbnail::validator::validate_jpeg(&bytes).map_err(|e| { tracing::warn!("[trace_thumbnail] Cached JPEG validation failed: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Invalid cached JPEG"})), ) })?; return Ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "image/jpeg") .header(header::CACHE_CONTROL, "public, max-age=86400") .body(Body::from(bytes)) .unwrap()); } } } } // Step 2: Fallback to ffmpeg on-demand extraction let sel = select_rep_face(state.db.pool(), &file_uuid, trace_id, |e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) }) .await?; let seek = sel.frame as f64 / sel.fps; let tmp = std::env::temp_dir().join(format!("trace_{}_{}.jpg", file_uuid, trace_id)); tracing::debug!( "[trace_thumbnail] Fallback to ffmpeg for trace {} frame {}", trace_id, sel.frame ); let status = tokio::process::Command::new("ffmpeg") .args([ "-ss", &format!("{:.2}", seek), "-i", &sel.video_path, "-vframes", "1", "-vf", &format!("crop={}:{}:{}:{},scale=320:320", sel.w, sel.h, sel.x, sel.y), "-q:v", "2", "-y", &tmp.to_string_lossy().to_string(), ]) .output() .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })?; if !status.status.success() { return Err(( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "FFmpeg failed"})), )); } let bytes = tokio::fs::read(&tmp).await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })?; crate::core::thumbnail::validator::validate_jpeg(&bytes).map_err(|e| { tracing::warn!("[trace_thumbnail] JPEG validation failed: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Invalid JPEG output"})), ) })?; let _ = tokio::fs::remove_file(&tmp).await; Ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "image/jpeg") .header(header::CACHE_CONTROL, "public, max-age=86400") .body(Body::from(bytes)) .unwrap()) } #[derive(Debug, Serialize)] struct CoOccurIdentity { identity_uuid: String, name: String, trace_id: i32, } #[derive(Debug, Serialize)] struct CoOccurRepFace { frame_number: i64, bbox: RepFaceBbox, confidence: f64, thumbnail_url: String, } #[derive(Debug, Serialize)] struct CoOccurrence { frame_number: i64, timestamp_secs: f64, total_cooccurrence_frames: i64, representative_face_a: Option, representative_face_b: Option, } #[derive(Debug, Serialize)] struct CoOccurResponse { success: bool, file_uuid: String, identity_a: CoOccurIdentity, identity_b: CoOccurIdentity, first_cooccurrence: CoOccurrence, } async fn get_cooccurrence( State(state): State, Path((file_uuid, identity_uuid_a, identity_uuid_b)): Path<(String, String, String)>, ) -> Result, (StatusCode, Json)> { use crate::core::db::schema; let id_table = schema::table_name("identities"); let fd_table = schema::table_name("face_detections"); // Stage 1: Get identity names and IDs let id_a = sqlx::query_as::<_, (i32, String)>(&format!( "SELECT id, name FROM {} WHERE uuid::text = $1 OR REPLACE(uuid::text, '-', '') = $1", id_table )) .bind(&identity_uuid_a) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? .ok_or_else(|| { ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Identity A not found"})), ) })?; let id_b = sqlx::query_as::<_, (i32, String)>(&format!( "SELECT id, name FROM {} WHERE uuid::text = $1 OR REPLACE(uuid::text, '-', '') = $1", id_table )) .bind(&identity_uuid_b) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? .ok_or_else(|| { ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Identity B not found"})), ) })?; // Stage 2: Find first frame where both identity_ids appear let cooccur: Option<(i64,)> = sqlx::query_as(&format!( "SELECT MIN(fd.frame_number)::bigint FROM {} fd \ WHERE fd.file_uuid = $1 AND fd.identity_id = $2 \ AND fd.frame_number IN ( \ SELECT frame_number FROM {} \ WHERE file_uuid = $1 AND identity_id = $3 \ )", fd_table, fd_table )) .bind(&file_uuid) .bind(id_a.0) .bind(id_b.0) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })?; let (first_frame,) = cooccur.ok_or_else(|| { (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "These two identities never appear together in this file"}))) })?; // Get fps for timestamp let video_table = schema::table_name("videos"); let fps: f64 = sqlx::query_scalar(&format!( "SELECT COALESCE(fps, 25.0) FROM {} WHERE file_uuid = $1", video_table )) .bind(&file_uuid) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? .unwrap_or(25.0); // Stage 3: Get trace_ids for both at this frame let trace_a: Option<(i32,)> = sqlx::query_as( &format!("SELECT trace_id FROM {} WHERE file_uuid = $1 AND frame_number = $2 AND identity_id = $3 AND trace_id IS NOT NULL LIMIT 1", fd_table) ) .bind(&file_uuid).bind(first_frame).bind(id_a.0) .fetch_optional(state.db.pool()).await .map_err(|e| { (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))) })?; let trace_b: Option<(i32,)> = sqlx::query_as( &format!("SELECT trace_id FROM {} WHERE file_uuid = $1 AND frame_number = $2 AND identity_id = $3 AND trace_id IS NOT NULL LIMIT 1", fd_table) ) .bind(&file_uuid).bind(first_frame).bind(id_b.0) .fetch_optional(state.db.pool()).await .map_err(|e| { (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))) })?; // Stage 4: Get representative faces for both traces (reusing select_rep_face) let rep_a = if let Some((tid,)) = trace_a { select_rep_face(state.db.pool(), &file_uuid, tid, |e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) }) .await .ok() .map(|sel| CoOccurRepFace { frame_number: sel.frame, bbox: RepFaceBbox { x: sel.x, y: sel.y, width: sel.w, height: sel.h, }, confidence: sel.conf, thumbnail_url: format!("/api/v1/file/{}/trace/{}/thumbnail", file_uuid, tid), }) } else { None }; let rep_b = if let Some((tid,)) = trace_b { select_rep_face(state.db.pool(), &file_uuid, tid, |e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) }) .await .ok() .map(|sel| CoOccurRepFace { frame_number: sel.frame, bbox: RepFaceBbox { x: sel.x, y: sel.y, width: sel.w, height: sel.h, }, confidence: sel.conf, thumbnail_url: format!("/api/v1/file/{}/trace/{}/thumbnail", file_uuid, tid), }) } else { None }; // Total co-occurrence frames (from TKG if available, otherwise from face_detections) let total_cooccurrence_frames: i64 = sqlx::query_scalar(&format!( "SELECT COUNT(DISTINCT fd.frame_number)::bigint FROM {} fd \ WHERE fd.file_uuid = $1 AND fd.identity_id = $2 \ AND fd.frame_number IN ( \ SELECT frame_number FROM {} \ WHERE file_uuid = $1 AND identity_id = $3 \ )", fd_table, fd_table )) .bind(&file_uuid) .bind(id_a.0) .bind(id_b.0) .fetch_one(state.db.pool()) .await .unwrap_or(0); Ok(Json(CoOccurResponse { success: true, file_uuid, identity_a: CoOccurIdentity { identity_uuid: identity_uuid_a, name: id_a.1, trace_id: trace_a.map(|t| t.0).unwrap_or(0), }, identity_b: CoOccurIdentity { identity_uuid: identity_uuid_b, name: id_b.1, trace_id: trace_b.map(|t| t.0).unwrap_or(0), }, first_cooccurrence: CoOccurrence { frame_number: first_frame, timestamp_secs: first_frame as f64 / fps, total_cooccurrence_frames, representative_face_a: rep_a, representative_face_b: rep_b, }, })) } use crate::core::config::OUTPUT_DIR; #[derive(Serialize)] struct TkgRebuildResponse { success: bool, file_uuid: String, result: Option, error: Option, } async fn rebuild_tkg( State(state): State, Path(file_uuid): Path, ) -> Json { use crate::core::chunk::rule2_ingest::ingest_rule2; use tracing::info; let result = crate::core::processor::tkg::build_tkg(&state.db, &file_uuid, &OUTPUT_DIR).await; match result { Ok(r) => { let total_edges = r.speaker_face_edges + r.mutual_gaze_edges + r.face_face_edges + r.co_occurrence_edges + r.has_appearance_edges + r.wears_edges; if total_edges > 0 { info!( "[TKG] {} relationship edges found, triggering Rule 2 ingestion...", total_edges ); match ingest_rule2(state.db.pool(), &file_uuid).await { Ok(count) => info!("[TKG] Rule 2 created {} relationship chunks", count), Err(e) => info!("[TKG] Rule 2 ingestion failed: {}", e), } } Json(TkgRebuildResponse { success: true, file_uuid, result: Some(serde_json::json!({ "face_track_nodes": r.face_track_nodes, "gaze_track_nodes": r.gaze_track_nodes, "lip_track_nodes": r.lip_track_nodes, "text_region_nodes": r.text_region_nodes, "appearance_trace_nodes": r.appearance_trace_nodes, "accessory_nodes": r.accessory_nodes, "object_nodes": r.object_nodes, "hand_nodes": r.hand_nodes, "speaker_nodes": r.speaker_nodes, "co_occurrence_edges": r.co_occurrence_edges, "speaker_face_edges": r.speaker_face_edges, "face_face_edges": r.face_face_edges, "mutual_gaze_edges": r.mutual_gaze_edges, "lip_sync_edges": r.lip_sync_edges, "has_appearance_edges": r.has_appearance_edges, "wears_edges": r.wears_edges, "hand_object_edges": r.hand_object_edges, })), error: None, }) } Err(e) => Json(TkgRebuildResponse { success: false, file_uuid, result: None, error: Some(e.to_string()), }), } } // ── Representative Frame (JSON) ─────────────────────────────────── use crate::core::processor::tkg; #[derive(Serialize)] struct RepFrameResponse { success: bool, file_uuid: String, frame_number: i64, timestamp_secs: f64, face_quality: f64, main_identities: Vec, traces: Vec, } async fn get_representative_frame( State(state): State, Path(file_uuid): Path, ) -> Result, (StatusCode, Json)> { let result = tkg::query_auto_representative_frame(state.db.pool(), &file_uuid) .await .map_err(|e| { ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": e.to_string()})), ) })?; let fps = query_fps(state.db.pool(), &file_uuid).await; Ok(Json(RepFrameResponse { success: true, file_uuid, frame_number: result.frame_number, timestamp_secs: result.frame_number as f64 / fps, face_quality: result.face_quality, main_identities: result.main_identities, traces: result.traces, })) } async fn query_fps(pool: &sqlx::PgPool, file_uuid: &str) -> f64 { use crate::core::db::schema; let video_table = schema::table_name("videos"); sqlx::query_scalar(&format!( "SELECT COALESCE(fps, 25.0) FROM {} WHERE file_uuid = $1", video_table )) .bind(file_uuid) .fetch_optional(pool) .await .ok() .flatten() .unwrap_or(25.0) } async fn get_stranger_representative_face( State(state): State, Path((file_uuid, stranger_id)): Path<(String, i32)>, ) -> Result, (StatusCode, Json)> { let faces_table = crate::core::db::schema::table_name("face_detections"); let trace_id: i32 = sqlx::query_scalar(&format!( "SELECT trace_id FROM {} WHERE file_uuid = $1 AND stranger_id = $2 LIMIT 1", faces_table )) .bind(&file_uuid) .bind(stranger_id) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? .ok_or(( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Stranger not found"})), ))?; get_representative_face_inner(&state, &file_uuid, trace_id).await } async fn get_stranger_thumbnail( State(state): State, Path((file_uuid, stranger_id)): Path<(String, i32)>, ) -> Result)> { let faces_table = crate::core::db::schema::table_name("face_detections"); let trace_id: i32 = sqlx::query_scalar(&format!( "SELECT trace_id FROM {} WHERE file_uuid = $1 AND stranger_id = $2 LIMIT 1", faces_table )) .bind(&file_uuid) .bind(stranger_id) .fetch_optional(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? .ok_or(( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Stranger not found"})), ))?; get_trace_thumbnail_inner(&state, &file_uuid, trace_id).await } // ── TKG Node/Edge Query APIs ───────────────────────────────────── fn t(name: &str) -> String { let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string()); if schema == "public" { name.to_string() } else { format!("{}.{}", schema, name) } } #[derive(Debug, Deserialize)] struct TkgNodesRequest { node_type: Option, page: Option, page_size: Option, } #[derive(Debug, Serialize)] struct TkgNodeInfo { id: i64, node_type: String, external_id: String, label: String, properties: serde_json::Value, } #[derive(Debug, Serialize)] struct TkgNodesResponse { success: bool, file_uuid: String, total: i64, page: i64, page_size: i64, nodes: Vec, } async fn query_tkg_nodes( State(state): State, Path(file_uuid): Path, Json(req): Json, ) -> Result, (StatusCode, Json)> { let nodes_table = t("tkg_nodes"); let page = req.page.unwrap_or(1).max(1); let page_size = req.page_size.unwrap_or(100).max(1).min(500); let offset = (page - 1) * page_size; let (where_clause, count_args, query_args) = if let Some(ref node_type) = req.node_type { ( "WHERE file_uuid = $1 AND node_type = $2".to_string(), vec![serde_json::json!([&file_uuid, node_type])], vec![serde_json::json!([ &file_uuid, node_type, page_size, offset ])], ) } else { ( "WHERE file_uuid = $1".to_string(), vec![serde_json::json!([&file_uuid])], vec![serde_json::json!([&file_uuid, page_size, offset])], ) }; let total: i64 = if let Some(ref node_type) = req.node_type { sqlx::query_scalar(&format!( "SELECT COUNT(*) FROM {} {}", nodes_table, where_clause )) .bind(&file_uuid) .bind(node_type) .fetch_one(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? } else { sqlx::query_scalar(&format!( "SELECT COUNT(*) FROM {} {}", nodes_table, where_clause )) .bind(&file_uuid) .fetch_one(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? }; let query = format!( "SELECT id, node_type, external_id, label, properties FROM {} {} ORDER BY id LIMIT ${} OFFSET ${}", nodes_table, where_clause, if req.node_type.is_some() { 3 } else { 2 }, if req.node_type.is_some() { 4 } else { 3 } ); let rows: Vec<(i64, String, String, String, serde_json::Value)> = if let Some(ref node_type) = req.node_type { sqlx::query_as(&query) .bind(&file_uuid) .bind(node_type) .bind(page_size) .bind(offset) .fetch_all(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? } else { sqlx::query_as(&query) .bind(&file_uuid) .bind(page_size) .bind(offset) .fetch_all(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? }; let nodes = rows .into_iter() .map( |(id, node_type, external_id, label, properties)| TkgNodeInfo { id, node_type, external_id, label, properties, }, ) .collect(); Ok(Json(TkgNodesResponse { success: true, file_uuid, total, page, page_size, nodes, })) } #[derive(Debug, Deserialize)] struct TkgEdgesRequest { edge_type: Option, source_type: Option, target_type: Option, page: Option, page_size: Option, } #[derive(Debug, Serialize)] struct TkgEdgeInfo { id: i64, edge_type: String, source_node_id: i64, target_node_id: i64, properties: serde_json::Value, } #[derive(Debug, Serialize)] struct TkgEdgesResponse { success: bool, file_uuid: String, total: i64, page: i64, page_size: i64, edges: Vec, } async fn query_tkg_edges( State(state): State, Path(file_uuid): Path, Json(req): Json, ) -> Result, (StatusCode, Json)> { let edges_table = t("tkg_edges"); let nodes_table = t("tkg_nodes"); let page = req.page.unwrap_or(1).max(1); let page_size = req.page_size.unwrap_or(100).max(1).min(500); let offset = (page - 1) * page_size; let mut conditions = vec!["e.file_uuid = $1".to_string()]; let mut param_idx = 2i32; let mut joins = String::new(); if let Some(ref edge_type) = req.edge_type { conditions.push(format!("e.edge_type = ${}", param_idx)); param_idx += 1; } if req.source_type.is_some() || req.target_type.is_some() { joins = format!( " JOIN {} sn ON e.source_node_id = sn.id JOIN {} tn ON e.target_node_id = tn.id", nodes_table, nodes_table ); } if let Some(ref source_type) = req.source_type { conditions.push(format!("sn.node_type = ${}", param_idx)); param_idx += 1; } if let Some(ref target_type) = req.target_type { conditions.push(format!("tn.node_type = ${}", param_idx)); param_idx += 1; } let where_clause = conditions.join(" AND "); let count_query = format!( "SELECT COUNT(*) FROM {} e {} WHERE {}", edges_table, joins, where_clause ); let total: i64 = { let mut q = sqlx::query_scalar::<_, i64>(&count_query).bind(&file_uuid); if let Some(ref edge_type) = req.edge_type { q = q.bind(edge_type); } if let Some(ref source_type) = req.source_type { q = q.bind(source_type); } if let Some(ref target_type) = req.target_type { q = q.bind(target_type); } q.fetch_one(state.db.pool()).await.map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? }; let query = format!( "SELECT e.id, e.edge_type, e.source_node_id, e.target_node_id, e.properties FROM {} e {} WHERE {} ORDER BY e.id LIMIT ${} OFFSET ${}", edges_table, joins, where_clause, param_idx, param_idx + 1 ); let rows: Vec<(i64, String, i64, i64, serde_json::Value)> = { let mut q = sqlx::query_as(&query).bind(&file_uuid); if let Some(ref edge_type) = req.edge_type { q = q.bind(edge_type); } if let Some(ref source_type) = req.source_type { q = q.bind(source_type); } if let Some(ref target_type) = req.target_type { q = q.bind(target_type); } q.bind(page_size) .bind(offset) .fetch_all(state.db.pool()) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })? }; let edges = rows .into_iter() .map( |(id, edge_type, source_node_id, target_node_id, properties)| TkgEdgeInfo { id, edge_type, source_node_id, target_node_id, properties, }, ) .collect(); Ok(Json(TkgEdgesResponse { success: true, file_uuid, total, page, page_size, edges, })) } #[derive(Debug, Serialize)] struct TkgNodeWithEdges { node: TkgNodeInfo, incoming_edges: Vec, outgoing_edges: Vec, } #[derive(Debug, Serialize)] struct TkgNodeDetailResponse { success: bool, file_uuid: String, node: Option, error: Option, } async fn get_tkg_node_detail( State(state): State, Path((file_uuid, node_id)): Path<(String, i64)>, ) -> Json { let nodes_table = t("tkg_nodes"); let edges_table = t("tkg_edges"); let node: Option<(i64, String, String, String, serde_json::Value)> = sqlx::query_as( &format!("SELECT id, node_type, external_id, label, properties FROM {} WHERE file_uuid = $1 AND id = $2", nodes_table) ) .bind(&file_uuid).bind(node_id) .fetch_optional(state.db.pool()).await.ok().flatten(); match node { Some((id, node_type, external_id, label, properties)) => { let incoming: Vec = sqlx::query_as( &format!("SELECT id, edge_type, source_node_id, target_node_id, properties FROM {} WHERE file_uuid = $1 AND target_node_id = $2", edges_table) ) .bind(&file_uuid).bind(node_id) .fetch_all(state.db.pool()).await.unwrap_or_default() .into_iter().map(|(id, edge_type, source_node_id, target_node_id, properties)| { TkgEdgeInfo { id, edge_type, source_node_id, target_node_id, properties } }).collect(); let outgoing: Vec = sqlx::query_as( &format!("SELECT id, edge_type, source_node_id, target_node_id, properties FROM {} WHERE file_uuid = $1 AND source_node_id = $2", edges_table) ) .bind(&file_uuid).bind(node_id) .fetch_all(state.db.pool()).await.unwrap_or_default() .into_iter().map(|(id, edge_type, source_node_id, target_node_id, properties)| { TkgEdgeInfo { id, edge_type, source_node_id, target_node_id, properties } }).collect(); Json(TkgNodeDetailResponse { success: true, file_uuid, node: Some(TkgNodeWithEdges { node: TkgNodeInfo { id, node_type, external_id, label, properties, }, incoming_edges: incoming, outgoing_edges: outgoing, }), error: None, }) } None => Json(TkgNodeDetailResponse { success: false, file_uuid, node: None, error: Some("Node not found".to_string()), }), } } // ── Rule 2 Ingest ─────────────────────────────────────────────────── #[derive(Serialize)] struct IngestRule2Response { success: bool, file_uuid: String, rule2_chunks: i64, vectorized_chunks: Option, error: Option, } async fn ingest_rule2( State(state): State, Path(file_uuid): Path, ) -> Result, (StatusCode, Json)> { use crate::core::chunk::rule2_ingest::ingest_rule2; use crate::core::db::qdrant_db::{QdrantDb, VectorPayload}; use crate::core::db::schema; use crate::core::embedding::Embedder; use tracing::info; let result = ingest_rule2(state.db.pool(), &file_uuid).await; match result { Ok(rule2_chunks) => { info!( "[Rule2API] {} relationship chunks created for {}", rule2_chunks, file_uuid ); // Auto-vectorize relationship chunks let embedder = Embedder::new("embeddinggemma-300m".to_string()); let qdrant = QdrantDb::new(); let pool = state.db.pool(); let chunk_table = schema::table_name("chunk"); let rows: Vec<(String, String, i64, i64, f64, f64)> = sqlx::query_as(&format!( "SELECT chunk_id, text_content, start_frame, end_frame, start_time, end_time \ FROM {} WHERE file_uuid = $1 AND chunk_type = 'relationship' \ AND embedding IS NULL AND (text_content IS NOT NULL AND text_content != '')", chunk_table )) .bind(&file_uuid) .fetch_all(pool) .await .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()})), ) })?; let mut vectorized = 0i64; for (chunk_id, text, start_frame, end_frame, start_time, end_time) in &rows { if text.is_empty() { continue; } if let Ok(vector) = embedder.embed_document(&text).await { if state .db .store_vector(&chunk_id, &vector, &file_uuid) .await .is_ok() { let payload = VectorPayload { file_uuid: file_uuid.clone(), chunk_id: chunk_id.clone(), chunk_type: "relationship".to_string(), start_frame: *start_frame, end_frame: *end_frame, start_time: *start_time, end_time: *end_time, text: Some(text.clone()), }; if qdrant .upsert_vector(&chunk_id, &vector, payload) .await .is_ok() { vectorized += 1; } } } } Ok(Json(IngestRule2Response { success: true, file_uuid, rule2_chunks: rule2_chunks as i64, vectorized_chunks: Some(vectorized), error: None, })) } Err(e) => Ok(Json(IngestRule2Response { success: false, file_uuid, rule2_chunks: 0, vectorized_chunks: None, error: Some(e.to_string()), })), } }