feat: add processor state machine and alert mechanism

- Add ProcessorJobStatus enum (8 states: Idle/Waiting/Ready/Pending/Running/Completed/Failed/Skipped)
- Add processor_alerts table (migrations/034)
- Add emit_processor_alert() to redis_client.rs
- Add ConditionResult enum + check_dependencies() to job_worker.rs
This commit is contained in:
Accusys
2026-05-30 10:03:49 +08:00
parent 08167d73b2
commit 0d58a738a1
4 changed files with 475 additions and 221 deletions
+120 -148
View File
@@ -6,7 +6,6 @@ use std::time::Duration;
use tokio::time::sleep;
use tracing::{error, info, warn};
use crate::api::five_w1h_agent_api::run_5w1h_agent;
use crate::api::identity_agent_api::run_identity_agent;
use crate::core::chunk::{rule1_ingest, rule3_ingest};
use crate::core::config::OUTPUT_DIR;
@@ -22,6 +21,29 @@ use crate::worker::processor::{ProcessorPool, ProcessorTask};
use crate::worker::resources::SystemResources;
use sqlx::PgPool;
enum ConditionResult {
Ready,
Waiting(Vec<crate::core::db::ProcessorType>),
}
fn check_dependencies(
processor: crate::core::db::ProcessorType,
completed: &[crate::core::db::ProcessorType],
) -> ConditionResult {
let deps = processor.dependencies();
let missing: Vec<_> = deps
.iter()
.filter(|d| !completed.contains(d))
.cloned()
.collect();
if missing.is_empty() {
ConditionResult::Ready
} else {
ConditionResult::Waiting(missing)
}
}
pub struct JobWorker {
db: Arc<PostgresDb>,
redis: Arc<RedisClient>,
@@ -250,38 +272,6 @@ impl JobWorker {
.collect()
};
// 長影片動態調整:若 CUT 場景過長,Face 需在 ASR 之前執行
if let Ok(Some(video)) = self.db.get_video_by_uuid(&job.uuid).await {
// 條件:cut_done 且場景數 <= 3 且最長場景 > 600s10分鐘)
if video.cut_done && video.cut_count <= 3 && video.cut_max_duration > 600.0 {
info!(
"[DYNAMIC] Long cut detected: {} scenes, max_dur={:.0}s for {}. Moving Face before ASR.",
video.cut_count, video.cut_max_duration, job.uuid
);
// 確保 Face 在 ASR 之前
if let Some(asr_pos) = processors_to_run
.iter()
.position(|p| matches!(p, crate::core::db::ProcessorType::Asr))
{
if let Some(face_pos) = processors_to_run
.iter()
.position(|p| matches!(p, crate::core::db::ProcessorType::Face))
{
if face_pos > asr_pos {
// 將 Face 移到 ASR 前面
let face = processors_to_run.remove(face_pos);
let insert_pos = processors_to_run
.iter()
.position(|p| matches!(p, crate::core::db::ProcessorType::Asr))
.unwrap();
processors_to_run.insert(insert_pos, face);
info!("[DYNAMIC] Reordered processors: Face now ahead of ASR");
}
}
}
}
}
let total_processor_types = processors_to_run.len() as i32;
// Get video total_frames for progress tracking
@@ -381,45 +371,73 @@ impl JobWorker {
// Load output file and store to pre_chunks
if let Ok(json_str) = std::fs::read_to_string(&output_path) {
let store_result = match processor_type {
crate::core::db::ProcessorType::Asr => {
if let Ok(result) = serde_json::from_str::<crate::core::processor::AsrResult>(&json_str) {
ProcessorPool::store_asr_chunks(&self.db, &job.uuid, &result).await
} else { Ok(()) }
}
crate::core::db::ProcessorType::Asrx => {
if let Ok(result) = serde_json::from_str::<crate::core::processor::AsrxResult>(&json_str) {
if let Ok(result) = serde_json::from_str::<
crate::core::processor::AsrxResult,
>(&json_str)
{
ProcessorPool::store_asrx_chunks(&self.db, &job.uuid, &result).await
} else { Ok(()) }
} else {
Ok(())
}
}
crate::core::db::ProcessorType::Cut => {
if let Ok(result) = serde_json::from_str::<crate::core::processor::CutResult>(&json_str) {
if let Ok(result) =
serde_json::from_str::<crate::core::processor::CutResult>(&json_str)
{
ProcessorPool::store_cut_chunks(&self.db, &job.uuid, &result).await
} else { Ok(()) }
} else {
Ok(())
}
}
crate::core::db::ProcessorType::Yolo => {
if let Ok(result) = serde_json::from_str::<crate::core::processor::YoloResult>(&json_str) {
if let Ok(result) = serde_json::from_str::<
crate::core::processor::YoloResult,
>(&json_str)
{
ProcessorPool::store_yolo_chunks(&self.db, &job.uuid, &result).await
} else { Ok(()) }
} else {
Ok(())
}
}
crate::core::db::ProcessorType::Ocr => {
if let Ok(result) = serde_json::from_str::<crate::core::processor::OcrResult>(&json_str) {
if let Ok(result) =
serde_json::from_str::<crate::core::processor::OcrResult>(&json_str)
{
ProcessorPool::store_ocr_chunks(&self.db, &job.uuid, &result).await
} else { Ok(()) }
} else {
Ok(())
}
}
crate::core::db::ProcessorType::Face => {
if let Ok(result) = serde_json::from_str::<crate::core::processor::FaceResult>(&json_str) {
if let Ok(result) = serde_json::from_str::<
crate::core::processor::FaceResult,
>(&json_str)
{
ProcessorPool::store_face_chunks(&self.db, &job.uuid, &result).await
} else { Ok(()) }
} else {
Ok(())
}
}
crate::core::db::ProcessorType::Pose => {
if let Ok(result) = serde_json::from_str::<crate::core::processor::PoseResult>(&json_str) {
if let Ok(result) = serde_json::from_str::<
crate::core::processor::PoseResult,
>(&json_str)
{
ProcessorPool::store_pose_chunks(&self.db, &job.uuid, &result).await
} else { Ok(()) }
} else {
Ok(())
}
}
_ => Ok(()),
};
if let Err(e) = store_result {
error!("Failed to store {} chunks for {}: {}", processor_type.as_str(), job.uuid, e);
error!(
"Failed to store {} chunks for {}: {}",
processor_type.as_str(),
job.uuid,
e
);
}
}
started_count += 1;
@@ -459,10 +477,28 @@ impl JobWorker {
continue;
}
ProcessorJobStatus::Failed => {
if result.retry_count >= 3 {
info!(
"Processor {} failed {} times, max retries reached (3), skipping",
processor_type.as_str(),
result.retry_count
);
started_count += 1;
continue;
}
info!(
"Processor {} previously failed, retrying",
processor_type.as_str()
"Processor {} previously failed (retry {}/3), retrying",
processor_type.as_str(),
result.retry_count + 1
);
let _ = sqlx::query(&format!(
"UPDATE {} SET retry_count = retry_count + 1 WHERE job_id = $1 AND processor = $2",
schema::table_name("processor_results")
))
.bind(job.id)
.bind(processor_type.as_str())
.execute(self.db.pool())
.await;
}
ProcessorJobStatus::Running => {
info!(
@@ -472,8 +508,11 @@ impl JobWorker {
started_count += 1;
continue;
}
// Skipped 不視為 terminal — 允許重新啟動
ProcessorJobStatus::Skipped | ProcessorJobStatus::Pending => {
ProcessorJobStatus::Idle
| ProcessorJobStatus::Waiting
| ProcessorJobStatus::Ready
| ProcessorJobStatus::Skipped
| ProcessorJobStatus::Pending => {
// Continue to start processor
}
}
@@ -531,6 +570,26 @@ impl JobWorker {
processor_type.as_str(),
deps.iter().map(|d| d.as_str()).collect::<Vec<_>>(),
);
let missing_deps: Vec<String> = deps
.iter()
.filter(|d| !matches!(
result_map.get(d).map(|r| &r.status),
Some(ProcessorJobStatus::Completed)
))
.map(|d| d.as_str().to_string())
.collect();
if let Err(e) = self
.redis
.emit_processor_alert(
&job.uuid,
processor_type.as_str(),
"dependency_not_met",
&format!("Waiting for: {}", missing_deps.join(", ")),
)
.await
{
error!("Failed to emit processor alert: {}", e);
}
continue;
}
}
@@ -669,21 +728,10 @@ impl JobWorker {
"SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' LIMIT 1"
));
let trace = check!(&format!("SELECT COUNT(DISTINCT trace_id) FROM {fd_t} WHERE file_uuid = '{fu}' AND trace_id IS NOT NULL"));
let tkg = check!(&format!(
"SELECT 1 FROM {} WHERE file_uuid = '{fu}' LIMIT 1",
schema::table_name("tkg_nodes")
));
let scene_meta = std::path::Path::new(&format!(
"{}/{fu}.scene_meta.json",
crate::core::config::OUTPUT_DIR.as_str()
))
.exists();
let five_w1h = check!(&format!("SELECT 1 FROM {chunk_t} WHERE file_uuid = '{fu}' AND chunk_type = 'cut' AND summary_text IS NOT NULL AND summary_text != '' LIMIT 1"));
let all_ok = rule1 && vector && rule3 && trace && tkg && scene_meta && five_w1h;
let all_ok = rule1 && vector && rule3 && trace;
if !all_ok {
tracing::info!(
"[Ingestion] waiting: rule1={rule1} vector={vector} rule3={rule3} trace={trace} tkg={tkg} scene={scene_meta} 5w1h={five_w1h}"
"[Ingestion] waiting: rule1={rule1} vector={vector} rule3={rule3} trace={trace}"
);
}
all_ok
@@ -708,7 +756,7 @@ impl JobWorker {
// 例如:Rule 1 只需 ASR+ASRX 完成即可觸發,不須等 face/pose/story 完成
// 定義必要 processor(必須完成的才算 job 成功)
let essential_processors = ["cut", "asr", "yolo"];
let essential_processors = ["cut", "asrx", "yolo"];
let essential_completed = essential_processors.iter().all(|ep| {
results.iter().any(|r| {
@@ -763,20 +811,16 @@ impl JobWorker {
.map(|r| r.processor_type.as_str().to_string())
.collect();
// Check prerequisites for post-processing triggers
let has_asr = completed_processors.iter().any(|p| p == "asr");
let has_asrx = completed_processors.iter().any(|p| p == "asrx");
let has_cut = completed_processors.iter().any(|p| p == "cut");
let has_face = completed_processors.iter().any(|p| p == "face");
let has_yolo = completed_processors.iter().any(|p| p == "yolo");
// Update processor arrays in job record
self.db
.update_job_processors_arrays(job_id, completed_processors, failed_processors.clone())
.await?;
// 🚀 P1 Trigger: Rule 1 Chunking(僅需 ASR + ASRX
if has_asr && has_asrx {
if has_asrx {
info!("📝 Prerequisites met for Rule 1 Chunking. Starting ingestion...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
@@ -787,7 +831,6 @@ impl JobWorker {
match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await {
Ok(count) => {
info!("✅ Rule 1 Ingestion completed: {} chunks inserted.", count);
// Automatically vectorize new sentence chunks
if count > 0 {
info!(
"📝 Starting automatic vectorize for {} chunks...",
@@ -802,7 +845,6 @@ impl JobWorker {
);
}
}
// Phase 1 release: sentence chunk embedding 交付
info!("📦 Phase 1 release packaging...");
let executor = match crate::core::processor::PythonExecutor::new() {
Ok(ex) => ex,
@@ -836,10 +878,8 @@ impl JobWorker {
});
}
// Rule 3 / Trace / Identity 需要 all_completed(含非必要 processor 失敗也可)
if all_completed {
// 🚀 P1 Trigger: Rule 3 Scene Chunking
if has_cut && has_asr {
if has_cut {
info!("📝 Prerequisites met for Rule 3 Scene Chunking. Starting ingestion...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
@@ -881,16 +921,6 @@ impl JobWorker {
Ok(()) => {
info!("✅ Face trace + DB store completed for {}", uuid_clone);
// Sync face embeddings to Qdrant for ANN search
info!("📝 Syncing face embeddings to Qdrant...");
if let Err(e) =
crate::core::db::qdrant_db::sync_face_embeddings(&uuid_clone).await
{
error!("❌ Qdrant face sync failed for {}: {}", uuid_clone, e);
} else {
info!("✅ Qdrant face sync completed for {}", uuid_clone);
}
// Generate trace chunks from face_detections + ASR text
info!("📝 Generating trace chunks...");
match crate::core::chunk::trace_ingest::ingest_traces(
@@ -902,26 +932,6 @@ impl JobWorker {
Ok(n) => info!("✅ {} trace chunks created for {}", n, uuid_clone),
Err(e) => error!("❌ Trace chunk ingestion failed: {}", e),
}
// Build Temporal Knowledge Graph (TKG) — native Rust
info!("📝 Building TKG graph (Rust)...");
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| ".".to_string());
match crate::core::processor::tkg::build_tkg(
db_clone.as_ref(),
&uuid_clone,
&output_dir,
)
.await
{
Ok(r) => info!(
"✅ TKG built for {}: {} face, {} obj, {} spk, {} co, {} sf, {} ff edges",
uuid_clone,
r.face_trace_nodes, r.object_nodes, r.speaker_nodes,
r.co_occurrence_edges, r.speaker_face_edges, r.face_face_edges,
),
Err(e) => error!("❌ TKG build failed for {}: {}", uuid_clone, e),
}
}
Err(e) => {
error!("❌ Face trace + DB store failed for {}: {}", uuid_clone, e)
@@ -1011,47 +1021,9 @@ impl JobWorker {
});
}
// 🚀 P4 Trigger: 5W1H Agent (after Rule 3 completion)
if has_cut && has_asr {
info!("📝 Prerequisites met for 5W1H Agent. Starting...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
match run_5w1h_agent(&db_clone, &uuid_clone).await {
Ok(()) => {
info!("✅ 5W1H Agent completed for {}", uuid_clone);
// Phase 2 release: full pipeline 交付
info!("📦 Phase 2 release packaging...");
let executor = match crate::core::processor::PythonExecutor::new() {
Ok(ex) => ex,
Err(e) => {
error!("Failed PythonExecutor for release pack: {}", e);
return;
}
};
match executor
.run(
"release_pack.py",
&["--phase", "2", "--file-uuid", &uuid_clone],
None,
"RELEASE_P2",
Some(std::time::Duration::from_secs(120)),
)
.await
{
Ok(()) => info!("✅ Phase 2 release packaged for {}", uuid_clone),
Err(e) => error!("❌ Phase 2 release pack failed: {}", e),
}
}
Err(e) => error!("❌ 5W1H Agent failed for {}: {}", uuid_clone, e),
}
});
}
if !Self::ingestion_complete(self.db.pool(), uuid).await {
info!(
"Job {}: all 10 processors done, waiting for ingestion...",
"Job {}: all processors done, waiting for ingestion...",
job_id
);
return Ok(());