Files
momentry_core/docs_v1.0/RULE1_TRIGGER_MECHANISM.md
Warren 4d75b2e251 docs: update docs_v1.0/ documentation
- Fix markdown lint issues (MD030, MD047, MD051, MD028, MD005)
- Update AI agents, architecture, implementation docs
- Add new identity, face recognition, and API documentation
- Remove deprecated face/person API guides
2026-04-30 15:10:41 +08:00

8.0 KiB
Raw Permalink Blame History

Rule 1 启动机制分析

Date: 2026-04-28 20:10 Version: V4.0


启动方式概览

Rule 1 有两种启动机制:

方式 触发源 时机 文件
方式 A Processor 完成 自动触发 job_worker.rs
方式 B Jobs 表 Job Worker 轮询 job_runner.rs

方式 A: Processor 完成后自动触发

流程图

Processor 执行 (processor.rs)
  ↓
processor_results 表更新
  ↓
check_and_complete_job() (job_worker.rs)
  ↓
检查前提条件: has_asr && has_asrx
  ↓
tokio::spawn(execute_rule1)
  ↓
Rule 1 Chunking (rule1_ingest.rs)

前提条件检查

位置: src/worker/job_worker.rs:248-252

// 检查完成的处理器
let has_asr = completed_processors.iter().any(|p| p == "asr");
let has_asrx = completed_processors.iter().any(|p| p == "asrx");
let has_cut = completed_processors.iter().any(|p| p == "cut");
let has_face = completed_processors.iter().any(|p| p == "face");
let has_yolo = completed_processors.iter().any(|p| p == "yolo");

Rule 触发矩阵

Rule 前提条件 优先级 功能
Rule 1 has_asr && has_asrx P1 Sentence Chunking
Rule 3 has_cut && has_asr P1 Scene Chunking
Identity Agent has_face && has_asrx P3 Person Identity
5W1H Agent has_cut && has_asr P4 Story Summary

触发代码

位置: src/worker/job_worker.rs:260-281

if has_asr && has_asrx {
    info!("📝 Prerequisites met for Rule 1 Chunking. Starting ingestion...");
    let db_clone = self.db.clone();
    let uuid_clone = uuid.to_string();
    tokio::spawn(async move {
        match db_clone.get_video_by_uuid(&uuid_clone).await {
            Ok(Some(video)) => {
                let fps = video.fps;
                match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await {
                    Ok(count) => info!("✅ Rule 1 Ingestion completed: {} chunks inserted.", count),
                    Err(e) => error!("❌ Rule 1 Ingestion failed: {}", e),
                }
            }
            Ok(None) => error!("Video not found for chunking: {}", uuid_clone),
            Err(e) => error!("Failed to get video info for chunking: {}", e),
        }
    });
}

方式 B: Job Worker 轮询

流程图

Job Worker 启动 (job_runner.rs)
  ↓
轮询 jobs 表 (QUEUED 状态)
  ↓
原子更新 status = 'RUNNING'
  ↓
根据 rule 字段执行
  ↓
rule = "rule1" → execute_rule1()

Job 表结构

CREATE TABLE dev.jobs (
    id UUID PRIMARY KEY,
    asset_uuid VARCHAR(32) NOT NULL,
    processor_list TEXT[],
    assigned_processor_id UUID,
    rule VARCHAR(20),            -- Rule 标识
    status VARCHAR(20) DEFAULT 'QUEUED',
    total_frames BIGINT DEFAULT 0,
    processed_frames BIGINT DEFAULT 0,
    error_message TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

Job 获取逻辑

位置: src/core/worker/job_runner.rs:47-62

let job_row: Option<(String, String, String, String, String, i64)> = sqlx::query_as(
    r#"
    UPDATE dev.jobs 
    SET status = 'RUNNING', updated_at = NOW()
    WHERE id = (
        SELECT id FROM dev.jobs 
        WHERE status = 'QUEUED' 
        ORDER BY created_at ASC 
        LIMIT 1 
        FOR UPDATE SKIP LOCKED  -- 防止并发竞争
    )
    RETURNING id::text, asset_uuid, rule, status, processor_list, total_frames
    "#,
)
.fetch_optional(&self.pool)
.await?;

Rule 执行逻辑

位置: src/core/worker/job_runner.rs:76-86

let result = match rule.as_str() {
    "rule1" => {
        let fps = self.get_asset_fps(&asset_uuid).await?;
        let db = PostgresDb::from_pool(self.pool.clone());
        chunk::rule1_ingest::execute_rule1(&db, &asset_uuid, fps).await
    }
    _ => {
        tracing::warn!("Unknown rule type: {}", rule);
        Ok(0)
    }
};

执行时机对比

场景 方式 A 方式 B
实时处理 Processor 完成后立即触发 依赖 Job Worker 轮询间隔
并发处理 多个视频可并行 串行处理(单 worker
重试机制 Processor 失败则不触发 Job 可重新 QUEUED
适用场景 自动化处理 手动触发/定时任务

当前状态分析

Jobs 表

SELECT id, asset_uuid, rule, status FROM dev.jobs WHERE rule IS NOT NULL;

-- Result:
id: 751d90b5... | asset_uuid: 384b0ff44aaaa1f14cb2cd63b3fea966 | rule: rule1 | status: QUEUED
id: 9e5df703... | asset_uuid: 384b0ff44aaaa1f14cb2cd63b3fea966 | rule: rule1 | status: QUEUED

问题: 2 个 Rule 1 Job 处于 QUEUED 状态,未被 Job Runner 执行

Processor Results 表

SELECT job_id, processor_type, status FROM dev.processor_results WHERE job_id IS NOT NULL;

-- Result:
job_id: 21 | processor_type: NULL | status: failed
job_id: 20 | processor_type: NULL | status: completed

问题: processor_type 为 NULL无法判断哪些处理器完成


问题诊断

问题 1: Job Worker 未启动

检查:

ps aux | grep momentry | grep worker

可能原因:

  • Job Worker 进程未运行
  • 仅运行 processor worker未运行 job worker

问题 2: Processor Results 缺少类型信息

影响:

  • completed_processors 无法正确构建
  • Rule 1 前提条件判断失败

解决方案: 修复 processor 执行时写入 processor_type:

// src/worker/processor.rs:300
// 确保写入 processor_type 到 processor_results

问题 3: 重复 Job

现象: 同一 asset_uuid 有 2 个 QUEUED job

原因: Job 创建逻辑未检查现有 Job


启动流程完整图

graph TD
    A[Video Registered] --> B[Job Created]
    B --> C{Job Type?}
    
    C -->|Processor Job| D[Processor Worker]
    C -->|Rule Job| E[Job Runner]
    
    D --> F[Execute Processor]
    F --> G[Update processor_results]
    G --> H[check_and_complete_job]
    
    H --> I{Check Prerequisites}
    I -->|has_asr && has_asrx| J[Trigger Rule 1]
    I -->|has_cut && has_asr| K[Trigger Rule 3]
    
    E --> L[Poll QUEUED Jobs]
    L --> M{rule == 'rule1'?}
    M -->|Yes| N[execute_rule1]
    
    J --> O[Rule 1 Ingestion]
    N --> O
    
    O --> P[Create Chunks]
    P --> Q[Store in chunks table]

启动参数

参数 来源 说明
file_uuid asset_uuid Video UUID
fps videos.fps 从 video record 获取
db PostgresDb Database connection

配置检查

Job Worker 配置

# 检查 Job Worker 是否运行
ps aux | grep "momentry worker"

# 检查 Processor Worker
ps aux | grep "momentry" | grep "worker" | grep "max-concurrent"

当前运行的 Worker

# 从之前的检查
accusys 309 ... target/release/momentry worker --max-concurrent 2

分析:

  • Processor Worker 正在运行max-concurrent 2
  • 但这是 Processor Worker不是 Job Worker
  • Job Runner (job_runner.rs) 是独立的 worker

解决方案

方案 1: 启动 Job Runner Worker

# 启动 Job Runner
cargo run --release -- worker --type job_runner --poll-interval 10

方案 2: 使用方式 A推荐

确保 Processor Worker 正确触发 Rule 1:

  1. 修复 processor_type 写入

    • processor.rs 执行完成后,正确写入 processor_type
    • 确保 processor_results 包含类型信息
  2. 检查前提条件逻辑

    • 确保 ASR + ASRX 都成功完成
    • 修复 ASRX chunks_produced = 0 问题

相关文件

文件 功能
src/worker/job_worker.rs Processor 完成后触发 Rule
src/core/worker/job_runner.rs Job Worker 轮询执行
src/core/chunk/rule1_ingest.rs Rule 1 执行逻辑
src/worker/processor.rs Processor 执行
migrations/003_job_worker.sql Job/processor_results 表

下一步

  1. 检查 Job Runner 是否运行
  2. 修复 processor_type 写入
  3. 清理重复 QUEUED jobs
  4. 重新运行 Rule 1