From 6507766ea2e83a45be74169fecca1eaf2149bc04 Mon Sep 17 00:00:00 2001 From: Accusys Date: Thu, 2 Jul 2026 13:44:45 +0800 Subject: [PATCH] fix: Qdrant collection name + PipelineProgress accumulation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - scan.rs: rule1 collection 'momentry_public_rule1_v2' → 'momentry_rule1' - progress.rs: publish_pipeline_progress now reads existing progress and merges stages --- src/api/scan.rs | 6 +++++- src/core/progress.rs | 29 +++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/api/scan.rs b/src/api/scan.rs index dd36e74..a852c83 100644 --- a/src/api/scan.rs +++ b/src/api/scan.rs @@ -801,7 +801,11 @@ async fn get_file_stats( // Text chunk stats (rule1 collection) let schema = std::env::var("DATABASE_SCHEMA").unwrap_or_else(|_| "dev".to_string()); - let rule1_collection = format!("momentry_{}_rule1_v2", schema); + let rule1_collection = if schema == "public" { + "momentry_rule1".to_string() + } else { + format!("momentry_{}_rule1_v2", schema) + }; let text_filter = json!({ "must": [{"key": "file_uuid", "match": {"value": file_uuid}}] }); diff --git a/src/core/progress.rs b/src/core/progress.rs index 4515d8c..dad7bf5 100644 --- a/src/core/progress.rs +++ b/src/core/progress.rs @@ -518,7 +518,7 @@ pub async fn get_progress( } } -/// Publish pipeline progress to Redis +/// Publish pipeline progress to Redis (accumulates with existing progress) pub async fn publish_pipeline_progress( redis: &RedisClient, file_uuid: &str, @@ -530,7 +530,32 @@ pub async fn publish_pipeline_progress( file_uuid ); if let Ok(mut conn) = redis.get_conn().await { - let json = serde_json::to_string(progress).unwrap_or_default(); + // Try to read existing progress first + let existing: Option = redis::cmd("GET") + .arg(&key) + .query_async(&mut conn) + .await + .ok() + .and_then(|s: String| serde_json::from_str(&s).ok()); + + let merged = if let Some(mut existing) = existing { + // Merge: update stages from new progress onto existing + for new_stage in &progress.stages { + if new_stage.status == "completed" || new_stage.progress > 0.0 { + if let Some(existing_stage) = existing.stages.iter_mut().find(|s| s.name == new_stage.name) { + existing_stage.status = new_stage.status.clone(); + existing_stage.progress = new_stage.progress; + existing_stage.detail = new_stage.detail.clone(); + } + } + } + existing.recalculate_overall(); + existing + } else { + progress.clone() + }; + + let json = serde_json::to_string(&merged).unwrap_or_default(); let _: Result<(), _> = redis::cmd("SET") .arg(&[&key, &json]) .query_async(&mut conn)