fix: Rule 1/TKG trigger conditions + essential_failed guard

- Rule 1 trigger: has_asr_or_asrx → has_asrx (wait for ASRX pre_chunks)
- P3/P4 triggers: has_asr_or_asrx → has_asrx (need ASRX data)
- Add essential_failed check: job only fails if essential processor fails
- P2/P3/P4 triggers: all_completed → has_face && has_asrx
- Add publish_pipeline_progress calls at each pipeline stage
This commit is contained in:
Accusys
2026-07-02 13:31:38 +08:00
parent 3eabd45882
commit 64f29d614b
+13 -5
View File
@@ -1237,6 +1237,13 @@ impl JobWorker {
// 定義必要 processor(必須完成的才算 job 成功)
let essential_processors = ["cut", "asr", "asrx", "yolo"];
let essential_failed = essential_processors.iter().any(|ep| {
results.iter().any(|r| {
r.processor_type.as_str() == *ep
&& matches!(r.status, crate::core::db::ProcessorJobStatus::Failed)
})
});
let essential_completed = essential_processors.iter().all(|ep| {
results.iter().any(|r| {
r.processor_type.as_str() == *ep
@@ -1436,6 +1443,7 @@ impl JobWorker {
let has_asr_or_asrx = completed_processors
.iter()
.any(|p| p == "asrx" || p == "asr");
let has_asrx = completed_processors.iter().any(|p| p == "asrx");
let has_cut = completed_processors.iter().any(|p| p == "cut");
let has_face = completed_processors.iter().any(|p| p == "face");
let has_yolo = completed_processors.iter().any(|p| p == "yolo");
@@ -1444,7 +1452,7 @@ impl JobWorker {
.update_job_processors_arrays(job_id, completed_processors, failed_processors.clone())
.await?;
if has_asr_or_asrx {
if has_asrx {
// Guard: only spawn Rule 1 if sentence chunks don't exist yet
let chunk_t = schema::table_name("chunk");
let already_spawned: bool = sqlx::query_scalar::<_, i32>(&format!(
@@ -1532,7 +1540,7 @@ impl JobWorker {
}
}
if all_completed {
if has_face && has_asrx {
let mut pp = PipelineProgress::new(uuid);
pp.update_stage("processors", 1.0, "completed", None);
publish_pipeline_progress(self.redis.as_ref(), uuid, &pp).await;
@@ -1723,7 +1731,7 @@ impl JobWorker {
}
// 🚀 P3 Trigger: Identity Agent (Face + ASRX)
if has_face && has_asr_or_asrx {
if has_face && has_asrx {
info!("📝 Prerequisites met for Identity Agent. Starting analysis...");
let db_clone = self.db.clone();
let redis_clone = self.redis.clone();
@@ -1742,7 +1750,7 @@ impl JobWorker {
}
// 🚀 P4 Trigger: TKG Build (Face + ASRX) → then Rule2 ingestion
if has_face && has_asr_or_asrx {
if has_face && has_asrx {
info!("📝 Prerequisites met for TKG Build. Starting graph construction...");
let db_clone = self.db.clone();
let redis_clone = self.redis.clone();
@@ -1845,7 +1853,7 @@ impl JobWorker {
job_id,
failed_processors.len()
);
} else if any_failed {
} else if essential_failed {
self.db
.update_job_status(job_id, MonitorJobStatus::Failed)
.await?;