From d4c26deae257fc1d82b88fe5159563421116ade8 Mon Sep 17 00:00:00 2001 From: Accusys Date: Thu, 2 Jul 2026 15:11:25 +0800 Subject: [PATCH] fix: pipeline progress computed from DB state instead of Redis - get_pipeline_progress_handler now queries actual DB counts - Fixed processor_results query (requires JOIN with monitor_jobs) - Card progress bar and right-click content now consistent --- src/api/scan.rs | 84 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 78 insertions(+), 6 deletions(-) diff --git a/src/api/scan.rs b/src/api/scan.rs index ec67d0a..8fcd1b1 100644 --- a/src/api/scan.rs +++ b/src/api/scan.rs @@ -944,10 +944,82 @@ async fn get_pipeline_progress_handler( State(state): State, Path(file_uuid): Path, ) -> Result, StatusCode> { - let redis_lock = state.redis_cache.get_client().await; - let redis_guard = redis_lock.read().await; - let pipeline = crate::core::progress::get_pipeline_progress(&*redis_guard, &file_uuid) - .await - .unwrap_or_else(|| crate::core::progress::PipelineProgress::new(&file_uuid)); - Ok(Json(pipeline)) + let pool = state.db.pool(); + let chunk_table = schema::table_name("chunk"); + let tkg_nodes_table = schema::table_name("tkg_nodes"); + let tkg_edges_table = schema::table_name("tkg_edges"); + let pr_table = schema::table_name("processor_results"); + let mj_table = schema::table_name("monitor_jobs"); + + // Compute actual progress from DB state + let sentence_count: i64 = sqlx::query_scalar::<_, i64>( + &format!("SELECT COUNT(*) FROM {chunk_table} WHERE file_uuid = $1 AND chunk_type = 'sentence'") + ).bind(&file_uuid).fetch_one(pool).await.unwrap_or(0); + + let sentence_embedded: i64 = sqlx::query_scalar::<_, i64>( + &format!("SELECT COUNT(*) FROM {chunk_table} WHERE file_uuid = $1 AND chunk_type = 'sentence' AND embedding IS NOT NULL") + ).bind(&file_uuid).fetch_one(pool).await.unwrap_or(0); + + let face_traced: i64 = sqlx::query_scalar::<_, i64>( + &format!("SELECT COUNT(*) FROM {pr_table} pr JOIN {mj_table} mj ON pr.job_id = mj.id WHERE mj.uuid = $1 AND pr.processor = 'face' AND pr.status = 'completed'") + ).bind(&file_uuid).fetch_one(pool).await.unwrap_or(0); + + let tkg_node_count: i64 = sqlx::query_scalar::<_, i64>( + &format!("SELECT COUNT(*) FROM {tkg_nodes_table} WHERE file_uuid = $1") + ).bind(&file_uuid).fetch_one(pool).await.unwrap_or(0); + + let tkg_edge_count: i64 = sqlx::query_scalar::<_, i64>( + &format!("SELECT COUNT(*) FROM {tkg_edges_table} WHERE file_uuid = $1") + ).bind(&file_uuid).fetch_one(pool).await.unwrap_or(0); + + let relationship_count: i64 = sqlx::query_scalar::<_, i64>( + &format!("SELECT COUNT(*) FROM {chunk_table} WHERE file_uuid = $1 AND chunk_type = 'relationship'") + ).bind(&file_uuid).fetch_one(pool).await.unwrap_or(0); + + let asrx_completed: i64 = sqlx::query_scalar::<_, i64>( + &format!("SELECT COUNT(*) FROM {pr_table} pr JOIN {mj_table} mj ON pr.job_id = mj.id WHERE mj.uuid = $1 AND pr.processor = 'asrx' AND pr.status = 'completed'") + ).bind(&file_uuid).fetch_one(pool).await.unwrap_or(0); + + // Determine processor completion + let processors_done = asrx_completed > 0; + + let mut pp = crate::core::progress::PipelineProgress::new(&file_uuid); + + if processors_done { + pp.update_stage("processors", 1.0, "completed", None); + } + if sentence_count > 0 { + let detail = if sentence_embedded > 0 { + Some(format!("{} chunks, {} embedded", sentence_count, sentence_embedded)) + } else { + Some(format!("{} chunks", sentence_count)) + }; + pp.update_stage("rule1_ingestion", 1.0, "completed", detail); + } + if face_traced > 0 { + pp.update_stage("face_tracing", 1.0, "completed", None); + } + if tkg_node_count > 0 { + pp.update_stage("tkg_nodes", 1.0, "completed", Some(format!("{} nodes", tkg_node_count))); + } + if tkg_edge_count > 0 { + pp.update_stage("tkg_edges", 1.0, "completed", Some(format!("{} edges", tkg_edge_count))); + } + if relationship_count > 0 { + pp.update_stage("rule2_ingestion", 1.0, "completed", Some(format!("{} chunks", relationship_count))); + } + + // Check identity agent from _seeds + use crate::core::db::qdrant_db::QdrantDb; + use serde_json::json; + let qdrant = QdrantDb::new(); + let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string()); + let seeds_collection = if schema == "public" { "momentry_public_speaker" } else { &format!("momentry_{}_speaker", schema) }; + let seeds_filter = json!({"must": [{"key": "file_uuid", "match": {"value": &file_uuid}}]}); + let seed_points = qdrant.scroll_all_points("_seeds", seeds_filter, 100).await.unwrap_or_default(); + if !seed_points.is_empty() { + pp.update_stage("identity_agent", 1.0, "completed", Some(format!("{} seeds", seed_points.len()))); + } + + Ok(Json(pp)) }