- Fix markdown lint issues (MD030, MD047, MD051, MD028, MD005) - Update AI agents, architecture, implementation docs - Add new identity, face recognition, and API documentation - Remove deprecated face/person API guides
8.0 KiB
8.0 KiB
Rule 1 启动机制分析
Date: 2026-04-28 20:10 Version: V4.0
启动方式概览
Rule 1 有两种启动机制:
| 方式 | 触发源 | 时机 | 文件 |
|---|---|---|---|
| 方式 A | Processor 完成 | 自动触发 | job_worker.rs |
| 方式 B | Jobs 表 | Job Worker 轮询 | job_runner.rs |
方式 A: Processor 完成后自动触发
流程图
Processor 执行 (processor.rs)
↓
processor_results 表更新
↓
check_and_complete_job() (job_worker.rs)
↓
检查前提条件: has_asr && has_asrx
↓
tokio::spawn(execute_rule1)
↓
Rule 1 Chunking (rule1_ingest.rs)
前提条件检查
位置: src/worker/job_worker.rs:248-252
// 检查完成的处理器
let has_asr = completed_processors.iter().any(|p| p == "asr");
let has_asrx = completed_processors.iter().any(|p| p == "asrx");
let has_cut = completed_processors.iter().any(|p| p == "cut");
let has_face = completed_processors.iter().any(|p| p == "face");
let has_yolo = completed_processors.iter().any(|p| p == "yolo");
Rule 触发矩阵
| Rule | 前提条件 | 优先级 | 功能 |
|---|---|---|---|
| Rule 1 | has_asr && has_asrx |
P1 | Sentence Chunking |
| Rule 3 | has_cut && has_asr |
P1 | Scene Chunking |
| Identity Agent | has_face && has_asrx |
P3 | Person Identity |
| 5W1H Agent | has_cut && has_asr |
P4 | Story Summary |
触发代码
位置: src/worker/job_worker.rs:260-281
if has_asr && has_asrx {
info!("📝 Prerequisites met for Rule 1 Chunking. Starting ingestion...");
let db_clone = self.db.clone();
let uuid_clone = uuid.to_string();
tokio::spawn(async move {
match db_clone.get_video_by_uuid(&uuid_clone).await {
Ok(Some(video)) => {
let fps = video.fps;
match rule1_ingest::execute_rule1(&db_clone, &uuid_clone, fps).await {
Ok(count) => info!("✅ Rule 1 Ingestion completed: {} chunks inserted.", count),
Err(e) => error!("❌ Rule 1 Ingestion failed: {}", e),
}
}
Ok(None) => error!("Video not found for chunking: {}", uuid_clone),
Err(e) => error!("Failed to get video info for chunking: {}", e),
}
});
}
方式 B: Job Worker 轮询
流程图
Job Worker 启动 (job_runner.rs)
↓
轮询 jobs 表 (QUEUED 状态)
↓
原子更新 status = 'RUNNING'
↓
根据 rule 字段执行
↓
rule = "rule1" → execute_rule1()
Job 表结构
CREATE TABLE dev.jobs (
id UUID PRIMARY KEY,
asset_uuid VARCHAR(32) NOT NULL,
processor_list TEXT[],
assigned_processor_id UUID,
rule VARCHAR(20), -- Rule 标识
status VARCHAR(20) DEFAULT 'QUEUED',
total_frames BIGINT DEFAULT 0,
processed_frames BIGINT DEFAULT 0,
error_message TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Job 获取逻辑
位置: src/core/worker/job_runner.rs:47-62
let job_row: Option<(String, String, String, String, String, i64)> = sqlx::query_as(
r#"
UPDATE dev.jobs
SET status = 'RUNNING', updated_at = NOW()
WHERE id = (
SELECT id FROM dev.jobs
WHERE status = 'QUEUED'
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED -- 防止并发竞争
)
RETURNING id::text, asset_uuid, rule, status, processor_list, total_frames
"#,
)
.fetch_optional(&self.pool)
.await?;
Rule 执行逻辑
位置: src/core/worker/job_runner.rs:76-86
let result = match rule.as_str() {
"rule1" => {
let fps = self.get_asset_fps(&asset_uuid).await?;
let db = PostgresDb::from_pool(self.pool.clone());
chunk::rule1_ingest::execute_rule1(&db, &asset_uuid, fps).await
}
_ => {
tracing::warn!("Unknown rule type: {}", rule);
Ok(0)
}
};
执行时机对比
| 场景 | 方式 A | 方式 B |
|---|---|---|
| 实时处理 | Processor 完成后立即触发 | 依赖 Job Worker 轮询间隔 |
| 并发处理 | 多个视频可并行 | 串行处理(单 worker) |
| 重试机制 | Processor 失败则不触发 | Job 可重新 QUEUED |
| 适用场景 | 自动化处理 | 手动触发/定时任务 |
当前状态分析
Jobs 表
SELECT id, asset_uuid, rule, status FROM dev.jobs WHERE rule IS NOT NULL;
-- Result:
id: 751d90b5... | asset_uuid: 384b0ff44aaaa1f14cb2cd63b3fea966 | rule: rule1 | status: QUEUED
id: 9e5df703... | asset_uuid: 384b0ff44aaaa1f14cb2cd63b3fea966 | rule: rule1 | status: QUEUED
问题: 2 个 Rule 1 Job 处于 QUEUED 状态,未被 Job Runner 执行
Processor Results 表
SELECT job_id, processor_type, status FROM dev.processor_results WHERE job_id IS NOT NULL;
-- Result:
job_id: 21 | processor_type: NULL | status: failed
job_id: 20 | processor_type: NULL | status: completed
问题: processor_type 为 NULL,无法判断哪些处理器完成
问题诊断
问题 1: Job Worker 未启动
检查:
ps aux | grep momentry | grep worker
可能原因:
- Job Worker 进程未运行
- 仅运行 processor worker,未运行 job worker
问题 2: Processor Results 缺少类型信息
影响:
completed_processors无法正确构建- Rule 1 前提条件判断失败
解决方案: 修复 processor 执行时写入 processor_type:
// src/worker/processor.rs:300
// 确保写入 processor_type 到 processor_results
问题 3: 重复 Job
现象: 同一 asset_uuid 有 2 个 QUEUED job
原因: Job 创建逻辑未检查现有 Job
启动流程完整图
graph TD
A[Video Registered] --> B[Job Created]
B --> C{Job Type?}
C -->|Processor Job| D[Processor Worker]
C -->|Rule Job| E[Job Runner]
D --> F[Execute Processor]
F --> G[Update processor_results]
G --> H[check_and_complete_job]
H --> I{Check Prerequisites}
I -->|has_asr && has_asrx| J[Trigger Rule 1]
I -->|has_cut && has_asr| K[Trigger Rule 3]
E --> L[Poll QUEUED Jobs]
L --> M{rule == 'rule1'?}
M -->|Yes| N[execute_rule1]
J --> O[Rule 1 Ingestion]
N --> O
O --> P[Create Chunks]
P --> Q[Store in chunks table]
启动参数
| 参数 | 来源 | 说明 |
|---|---|---|
| file_uuid | asset_uuid | Video UUID |
| fps | videos.fps | 从 video record 获取 |
| db | PostgresDb | Database connection |
配置检查
Job Worker 配置
# 检查 Job Worker 是否运行
ps aux | grep "momentry worker"
# 检查 Processor Worker
ps aux | grep "momentry" | grep "worker" | grep "max-concurrent"
当前运行的 Worker
# 从之前的检查
accusys 309 ... target/release/momentry worker --max-concurrent 2
分析:
- Processor Worker 正在运行(max-concurrent 2)
- 但这是 Processor Worker,不是 Job Worker
- Job Runner (job_runner.rs) 是独立的 worker
解决方案
方案 1: 启动 Job Runner Worker
# 启动 Job Runner
cargo run --release -- worker --type job_runner --poll-interval 10
方案 2: 使用方式 A(推荐)
确保 Processor Worker 正确触发 Rule 1:
-
修复 processor_type 写入
- processor.rs 执行完成后,正确写入 processor_type
- 确保 processor_results 包含类型信息
-
检查前提条件逻辑
- 确保 ASR + ASRX 都成功完成
- 修复 ASRX chunks_produced = 0 问题
相关文件
| 文件 | 功能 |
|---|---|
src/worker/job_worker.rs |
Processor 完成后触发 Rule |
src/core/worker/job_runner.rs |
Job Worker 轮询执行 |
src/core/chunk/rule1_ingest.rs |
Rule 1 执行逻辑 |
src/worker/processor.rs |
Processor 执行 |
migrations/003_job_worker.sql |
Job/processor_results 表 |
下一步
- 检查 Job Runner 是否运行
- 修复 processor_type 写入
- 清理重复 QUEUED jobs
- 重新运行 Rule 1