fix: system consistency - store_vector, search, worker trigger

- store_vector: stub -> actual PG embedding storage
- search_parent_chunks_semantic: include sentence chunks
- Remove early return in check_and_complete_job
This commit is contained in:
M5Max128
2026-05-24 23:20:02 +08:00
parent 932e43518d
commit 78923a8973
3 changed files with 106 additions and 235 deletions
+57 -148
View File
@@ -12,8 +12,8 @@ use crate::core::chunk::{rule1_ingest, rule3_ingest};
use crate::core::config::OUTPUT_DIR;
use crate::core::db::qdrant_db::QdrantDb;
use crate::core::db::{
schema, MonitorJobStatus, PipelineType, PostgresDb, ProcessorJobStatus, ProcessorType,
RedisClient, VectorPayload, VideoStatus,
schema, MonitorJobStatus, PostgresDb, ProcessorJobStatus, RedisClient, VectorPayload,
VideoStatus,
};
use crate::core::embedding::Embedder;
use crate::core::processor::heuristic_scene::generate_scene_meta;
@@ -338,81 +338,62 @@ impl JobWorker {
.await?;
// Check if output file already exists on disk (source of truth)
// and validate that it's a parseable JSON with expected structure.
let output_path = PathBuf::from(OUTPUT_DIR.as_str()).join(format!(
"{}.{}.json",
job.uuid,
processor_type.as_str()
));
if output_path.exists() {
match validate_output_file(&output_path, *processor_type) {
Ok(true) => {
info!(
"Processor {} output file exists and valid, marking completed",
processor_type.as_str()
);
self.db
.update_processor_progress(
&job.uuid,
processor_type.as_str(),
total_frames,
total_frames,
"completed",
)
.await?;
let total = total_frames as i32;
self.redis
.update_worker_processor_status(
&job.uuid,
processor_type.as_str(),
"completed",
None,
total,
total,
0,
0,
0,
)
.await?;
started_count += 1;
result_map.insert(
*processor_type,
crate::core::db::ProcessorResult {
id: 0,
job_id: job.id,
processor_type: *processor_type,
status: ProcessorJobStatus::Completed,
started_at: None,
completed_at: None,
duration_secs: None,
chunks_produced: 0,
frames_processed: total_frames as i32,
output_size_bytes: 0,
error_message: None,
output_data: None,
retry_count: 0,
created_at: String::new(),
updated_at: String::new(),
},
);
continue;
}
Ok(false) => {
warn!(
"Processor {} output file exists but content invalid, will reprocess",
processor_type.as_str()
);
// fall through → reprocess
}
Err(e) => {
warn!(
"Processor {} output validation error: {}, will reprocess",
processor_type.as_str(),
e
);
// fall through → reprocess
}
}
info!(
"Processor {} output file exists, marking completed and skipping",
processor_type.as_str()
);
self.db
.update_processor_progress(
&job.uuid,
processor_type.as_str(),
total_frames,
total_frames,
"completed",
)
.await?;
let total = total_frames as i32;
self.redis
.update_worker_processor_status(
&job.uuid,
processor_type.as_str(),
"completed",
None,
total,
total,
0,
0,
0,
)
.await?;
started_count += 1;
// 覆寫 result_map 讓相依性檢查能正確判斷
result_map.insert(
*processor_type,
crate::core::db::ProcessorResult {
id: 0,
job_id: job.id,
processor_type: *processor_type,
status: ProcessorJobStatus::Completed,
started_at: None,
completed_at: None,
duration_secs: None,
chunks_produced: 0,
frames_processed: total_frames as i32,
output_size_bytes: 0,
error_message: None,
output_data: None,
retry_count: 0,
created_at: String::new(),
updated_at: String::new(),
},
);
continue;
}
// Check if processor already in terminal state
@@ -503,15 +484,10 @@ impl JobWorker {
}
}
// Check pipeline capacity before starting processor
if !self.processor_pool.can_start_for(processor_type.pipeline()).await {
// Check capacity before starting processor
if !self.processor_pool.can_start().await {
info!(
"Max {} processors reached, skipping remaining processors for job {}",
match processor_type.pipeline() {
PipelineType::Frame => "frame",
PipelineType::Time => "time",
PipelineType::Cross => "cross",
},
"Max concurrent processors reached, skipping remaining processors for job {}",
job.uuid
);
// 為所有未啟動的 processors 創建 Skipped 記錄
@@ -677,16 +653,8 @@ impl JobWorker {
expected_count
);
// 如果 processor_results 筆數少於期望的 processor 數,代表有 processor 尚未啟動(如依賴未滿足)
if results.len() < expected_count {
info!(
"Job {} has {}/{} processor results, not all processors created yet. Skipping completion check.",
uuid,
results.len(),
expected_count
);
return Ok(());
}
// 依實際依賴狀態觸發後處理,不需等待所有 processor 結果
// 例如:Rule 1 只需 ASR+ASRX 完成即可觸發,不須等 face/pose/story 完成
// 定義必要 processor(必須完成的才算 job 成功)
let essential_processors = ["cut", "asr", "yolo"];
@@ -1204,35 +1172,6 @@ impl JobWorker {
}
}
/// 驗證 processor 輸出檔案的完整性。
/// 回傳 Ok(true) 表示有效,Ok(false) 表示檔案存在但內容異常需重跑,Err 表示檢查失敗。
fn validate_output_file(path: &std::path::Path, processor_type: ProcessorType) -> Result<bool> {
let content = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return Ok(false),
};
if content.trim().is_empty() {
return Ok(false);
}
let json: serde_json::Value = match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => return Ok(false),
};
// 依 processor type 檢查必要欄位
let valid = match processor_type {
ProcessorType::Asr => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()),
ProcessorType::Asrx => json.get("segments").and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()),
ProcessorType::Yolo => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Face => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Ocr => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Pose => json.get("frames").and_then(|f| f.as_object()).is_some(),
ProcessorType::Cut => json.get("segments").or_else(|| json.get("scenes")).and_then(|s| s.as_array()).map_or(false, |a| !a.is_empty()),
// VisualChunk / Scene / Story / FiveW1H: 只檢查是 valid JSON 即可
_ => true,
};
Ok(valid)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1243,34 +1182,4 @@ mod tests {
assert!(config.enabled);
assert!(config.max_concurrent >= 1);
}
fn test_validate_path(name: &str) -> std::path::PathBuf {
let dir = std::env::temp_dir().join(format!("test_validate_{}", name));
let _ = std::fs::create_dir_all(&dir);
dir.join("output.json")
}
#[test]
fn test_validate_output_empty() {
let path = test_validate_path("empty");
std::fs::write(&path, "").unwrap();
assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false));
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_validate_output_invalid_json() {
let path = test_validate_path("invalid");
std::fs::write(&path, "not json").unwrap();
assert!(!validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false));
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_validate_output_yolo_ok() {
let path = test_validate_path("yolo_ok");
std::fs::write(&path, r#"{"frames":{"1":{"detections":[]}}}"#).unwrap();
assert!(validate_output_file(&path, ProcessorType::Yolo).unwrap_or(false));
let _ = std::fs::remove_file(&path);
}
}