- VIDEO_REGISTRATION.md: Add Probe API reference and comparison table - JOB_WORKER_IMPLEMENTATION_PLAN.md: Update status to implemented - MOMENTRY_CORE_MONITORING.md: Add Job Worker monitoring section - monitor_jobs and processor_results table docs - Worker status check commands - Redis job monitoring commands
23 KiB
23 KiB
Job Worker 實作計畫
| 項目 | 內容 |
|---|---|
| 建立者 | Warren / OpenCode |
| 建立時間 | 2026-03-24 |
| 文件版本 | V1.1 |
| 狀態 | ✅ 已實作 |
版本歷史
| 版本 | 日期 | 目的 | 操作人 |
|---|---|---|---|
| V1.0 | 2026-03-24 | 建立實作計畫 | OpenCode |
| V1.1 | 2026-03-25 | 實作完成,更新狀態 | OpenCode |
實作狀態
✅ 已完成
| 元件 | 檔案 | 狀態 |
|---|---|---|
| MonitorJob 結構 | src/core/db/postgres_db.rs |
✅ |
| ProcessorResult 結構 | src/core/db/postgres_db.rs |
✅ |
| Worker 配置 | src/worker/config.rs |
✅ |
| Job Worker | src/worker/job_worker.rs |
✅ |
| Processor Pool | src/worker/processor.rs |
✅ |
| Worker 模組 | src/worker/mod.rs |
✅ |
| PostgreSQL 表格 | monitor_jobs, processor_results |
✅ |
| 類型修復 | i32, NaiveDateTime |
✅ |
待整合
| 項目 | 說明 |
|---|---|
| Worker 服務啟動 | 需要加入 launchd plist |
| 監控整合 | 需要加入 MOMENTRY_CORE_MONITORING.md |
| 備份涵蓋 | 需要確認備份包含新表格 |
1. 設計決策
1.1 確認的設計決策
| 項目 | 決策 | 理由 |
|---|---|---|
| 觸發方式 | 輪詢(Job Worker) | 暫無可靠的 API 觸發機制 |
| 並行處理 | 最多 2 個 | 可根據 CPU/GPU 能力調整 |
| 失敗處理 | 獨立模組,部分完成可接續 | 任何模組失敗都產出狀態記錄 |
| Worker 啟動 | 獨立進程 | 隔離、易管理 |
| 並行上限調整 | 環境變數 + 預設值 | 靈活、可調整 |
| 狀態同步 | PostgreSQL + Redis | 可靠 + 即時 |
1.2 環境變數
| 變數 | 預設值 | 說明 |
|---|---|---|
MOMENTRY_MAX_CONCURRENT |
2 | 最大並行 processor 數 |
MOMENTRY_POLL_INTERVAL |
5 | 輪詢間隔(秒) |
MOMENTRY_WORKER_ENABLED |
true | 是否啟用 worker |
2. 系統架構
2.1 完整流程圖
┌─────────────────────────────────────────────────────────────────────────┐
│ 檔案註冊觸發處理流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. SFTPGo 上傳 │
│ │ │
│ ▼ │
│ 2. Hook 呼叫 Register API │
│ │ │
│ ▼ │
│ 3. Register API │
│ ├─► ffprobe 提取 metadata │
│ ├─► 寫入 videos 表 │
│ └─► 建立 monitor_jobs 記錄 (status=pending) │
│ │ │
│ ▼ │
│ 4. Job Worker (獨立進程,輪詢機制) │
│ ├─► 輪詢 pending jobs │
│ ├─► 檢查 videos 表 fs_json 決定需要處理什麼 │
│ ├─► 並行執行 processors (最多 2 個) │
│ └─► 更新 videos, monitor_jobs, processor_results 表 │
│ │ │
│ ▼ │
│ 5. 處理結果 │
│ ├─► 更新 videos 表 (fs_json, psql_chunk, qvector_chunk) │
│ ├─► 更新 monitor_jobs 表 (status, progress) │
│ ├─► 更新 processor_results 表 (每個模組狀態) │
│ └─► Redis Pub/Sub 即時進度 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
2.2 Job Worker 架構
┌─────────────────────────────────────────────────────────────────────┐
│ Job Worker 架構 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PostgreSQL │ ───▶ │ Worker │ ───▶ │ Processor │ │
│ │ Job Queue │ │ Loop │ │ Pool │ │
│ └─────────────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Video State │ │ Processor 1 │ │
│ │ Check │ │ (ASR/YOLO) │ │
│ └─────────────┘ ├─────────────┤ │
│ │ Processor 2 │ │
│ │ (CUT/OCR) │ │
│ └─────────────┘ │
│ │
│ Redis ──── Pub/Sub ──── 即時進度 │
│ │
└─────────────────────────────────────────────────────────────────────┘
3. 資料庫結構
3.1 Migration 檔案
檔案: migrations/003_job_worker.sql
-- ================================================================
-- Migration 003: Job Worker System
-- ================================================================
-- 3.1.1 更新 videos 表
ALTER TABLE videos ADD COLUMN IF NOT EXISTS status VARCHAR(20) DEFAULT 'pending';
ALTER TABLE videos ADD COLUMN IF NOT EXISTS user_id BIGINT;
ALTER TABLE videos ADD COLUMN IF NOT EXISTS job_id INTEGER REFERENCES monitor_jobs(id);
COMMENT ON COLUMN videos.status IS 'pending, processing, completed, failed';
COMMENT ON COLUMN videos.user_id IS 'WordPress user ID';
COMMENT ON COLUMN videos.job_id IS 'Associated monitor_jobs ID';
-- 3.1.2 更新 monitor_jobs 表
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS video_id BIGINT REFERENCES videos(id);
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS user_id BIGINT;
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS processors VARCHAR(20)[];
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS completed_processors VARCHAR(20)[];
ALTER TABLE monitor_jobs ADD COLUMN IF NOT EXISTS failed_processors VARCHAR(20)[];
COMMENT ON COLUMN monitor_jobs.processors IS 'Processors to run: asr, cut, yolo, ocr, face, pose, asrx';
COMMENT ON COLUMN monitor_jobs.completed_processors IS 'Successfully completed processors';
COMMENT ON COLUMN monitor_jobs.failed_processors IS 'Failed processors';
-- 3.1.3 新增 processor_results 表
CREATE TABLE IF NOT EXISTS processor_results (
id SERIAL PRIMARY KEY,
job_id INTEGER REFERENCES monitor_jobs(id) ON DELETE CASCADE,
video_id BIGINT REFERENCES videos(id) ON DELETE CASCADE,
processor VARCHAR(20) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
output_path TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
progress_total INT DEFAULT 0,
progress_current INT DEFAULT 0,
last_checkpoint JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(job_id, processor)
);
CREATE INDEX IF NOT EXISTS idx_processor_results_job ON processor_results(job_id);
CREATE INDEX IF NOT EXISTS idx_processor_results_video ON processor_results(video_id);
CREATE INDEX IF NOT EXISTS idx_processor_results_status ON processor_results(status);
COMMENT ON TABLE processor_results IS 'Tracks individual processor execution status';
COMMENT ON COLUMN processor_results.status IS 'pending, running, completed, failed, skipped';
-- 3.1.4 更新 videos 表標記欄位用途
COMMENT ON COLUMN videos.fs_video IS 'Video file exists on filesystem';
COMMENT ON COLUMN videos.fs_json IS 'All processor JSON files generated';
COMMENT ON COLUMN videos.fs_chunks IS 'Chunk files generated';
COMMENT ON COLUMN videos.fs_vectors IS 'Vector files generated';
COMMENT ON COLUMN videos.psql_chunk IS 'Chunks stored in PostgreSQL';
COMMENT ON COLUMN videos.pvector_chunk IS 'Vectors stored in PostgreSQL';
COMMENT ON COLUMN videos.qvector_chunk IS 'Vectors stored in Qdrant';
3.2 表關係圖
videos monitor_jobs
┌──────────────────────┐ ┌──────────────────────┐
│ id (PK) │◄────────│ video_id (FK) │
│ uuid │ │ user_id │
│ status │ │ processors[] │
│ fs_video │ │ completed_processors[]│
│ fs_json │ │ failed_processors[] │
│ job_id (FK)─────────┼────────►│ status │
│ user_id │ │ id (PK) │
└──────────────────────┘ └──────────────────────┘
│
│
processor_results
┌──────────────────────┐
│ job_id (FK) │
│ video_id (FK) │
│ processor │
│ status │
│ progress_current │
│ last_checkpoint │
│ id (PK) │
└──────────────────────┘
4. 模組並行策略
4.1 模組分類
| 模組 | 資源需求 | 獨立性 | 建議並行 |
|---|---|---|---|
| ASR | GPU/CPU | 高 | ✅ 可並行 |
| CUT | CPU | 高 | ✅ 可並行 |
| YOLO | GPU | 中 | ✅ 可並行 |
| OCR | GPU/CPU | 高 | ✅ 可並行 |
| Face | GPU | 中 | ✅ 可並行 |
| Pose | GPU | 中 | ✅ 可並行 |
| ASRX | GPU/CPU | 高 | ✅ 可並行 |
4.2 建議並行組合
| 組合 | 模組 1 | 模組 2 | 說明 |
|---|---|---|---|
| GPU+CPU | YOLO/Pose/Face | ASR/CUT/OCR | 平衡負載 |
| 雙GPU | YOLO | Pose | 雙 GPU 卡片 |
| 雙CPU | ASR | CUT/OCR | 無 GPU 時 |
4.3 Worker 配置
// src/worker/config.rs
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub max_concurrent: usize, // 預設 2
pub poll_interval_secs: u64, // 預設 5
pub enabled: bool, // 預設 true
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
max_concurrent: 2,
poll_interval_secs: 5,
enabled: true,
}
}
}
impl WorkerConfig {
pub fn from_env() -> Self {
Self {
max_concurrent: std::env::var("MOMENTRY_MAX_CONCURRENT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2),
poll_interval_secs: std::env::var("MOMENTRY_POLL_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5),
enabled: std::env::var("MOMENTRY_WORKER_ENABLED")
.ok()
.map(|v| v != "false")
.unwrap_or(true),
}
}
}
5. 失敗處理機制
5.1 設計原則
每個模組獨立處理:
- 成功 → 產出完整 .json,status=completed
- 失敗 → 產出 .json 包含 error 狀態,status=failed
- 部分完成 → 可從 checkpoint 繼續,status=running
5.2 Processor 輸出格式
{
"processor": "asr",
"status": "completed|failed|partial",
"completed_at": "2026-03-24T12:00:00Z",
"result": { ... },
"error": null,
"last_checkpoint": {
"frame": 5000,
"timestamp": 180.5
}
}
5.3 失敗處理流程
async fn run_processor(&self, module: &str, video: &Video) -> Result<()> {
let output_path = self.get_output_path(video, module);
match self.execute_processor(module, video, &output_path).await {
Ok(result) => {
// 成功:更新狀態
self.db.update_processor_status(job_id, module, "completed").await?;
self.publish_progress(job_id, module, 100).await?;
}
Err(e) => {
// 失敗:仍然保存部分結果
let partial_result = self.get_partial_result(&output_path);
self.db.update_processor_status(job_id, module, "failed").await?;
self.db.save_error_message(job_id, module, &e.to_string()).await?;
// 記錄錯誤但不中斷其他模組
tracing::warn!("Processor {} failed: {}", module, e);
}
}
Ok(())
}
6. 實作結構
6.1 目錄結構
src/
├── worker/
│ ├── mod.rs # Worker 模組導出
│ ├── config.rs # Worker 配置
│ ├── worker.rs # Worker 主邏輯
│ ├── processor.rs # Processor 執行器
│ ├── queue.rs # Job 佇列管理
│ └── progress.rs # 進度追蹤
├── api/
│ └── server.rs # 更新 Register API
└── main.rs # 新增 worker 命令
6.2 核心模組
6.2.1 Worker Config (src/worker/config.rs)
pub struct WorkerConfig {
pub max_concurrent: usize,
pub poll_interval_secs: u64,
pub enabled: bool,
}
impl WorkerConfig {
pub fn from_env() -> Self { ... }
}
6.2.2 Worker Loop (src/worker/worker.rs)
pub struct JobWorker {
db: PostgresDb,
redis: RedisCache,
config: WorkerConfig,
semaphore: Arc<Semaphore>,
}
impl JobWorker {
pub async fn run(&self) -> Result<()> {
loop {
if self.config.enabled {
self.process_pending_jobs().await?;
}
tokio::time::sleep(Duration::from_secs(self.config.poll_interval_secs)).await;
}
}
async fn process_pending_jobs(&self) -> Result<()> {
// 1. 檢查並發數
// 2. 取得 pending jobs
// 3. 分配給 worker pool
// 4. 並行執行 processors
}
}
6.2.3 Processor Pool (src/worker/processor.rs)
pub struct ProcessorPool {
max_concurrent: usize,
}
impl ProcessorPool {
pub async fn execute(&self, job: &Job, video: &Video) -> Result<ProcessorResult> {
// 根據 videos 表決定需要執行哪些 processor
// 並行執行最多 2 個
// 處理失敗但不中斷其他 processor
}
}
7. API 端點設計
7.1 新增端點
| 端點 | 方法 | 說明 |
|---|---|---|
/api/v1/jobs |
GET | 列出所有 jobs |
/api/v1/jobs/:uuid |
GET | 取得特定 job 詳細 |
/api/v1/jobs/:uuid/retry |
POST | 重試失敗的 processor |
/api/v1/jobs/:uuid/cancel |
POST | 取消 job |
7.2 端點詳情
GET /api/v1/jobs
Response:
{
"jobs": [
{
"id": 1,
"uuid": "abc123def456",
"status": "running",
"progress": 60,
"processors": ["asr", "cut", "yolo", "ocr", "face", "pose"],
"completed": ["asr", "cut", "yolo"],
"failed": []
}
]
}
GET /api/v1/jobs/:uuid
Response:
{
"id": 1,
"uuid": "abc123def456",
"video_id": 10,
"status": "running",
"processors": {
"asr": {"status": "completed", "progress": 100},
"cut": {"status": "completed", "progress": 100},
"yolo": {"status": "running", "progress": 45, "current": 5000, "total": 11000},
"ocr": {"status": "pending"},
"face": {"status": "pending"},
"pose": {"status": "pending"}
},
"created_at": "2026-03-24T12:00:00Z",
"started_at": "2026-03-24T12:01:00Z"
}
8. Redis Key 設計
8.1 現有 Key 保持
momentry:job:{uuid} # Job Hash
momentry:job:{uuid}:processor:{name} # Processor Hash
momentry:progress:{uuid} # Pub/Sub Channel
momentry:jobs:active # Set: 運行中 UUIDs
momentry:jobs:completed # Set: 完成 UUIDs
momentry:jobs:failed # Set: 失敗 UUIDs
8.2 進度更新時序
Processor 執行
│
├─► 每秒更新 Redis Hash (即時)
│
├─► 每 10% 或完成時更新 PostgreSQL (持久)
│
└─► 失敗時立即更新 PostgreSQL (錯誤記錄)
9. 實作順序
Phase 1: 資料庫遷移
| 任務 | 說明 |
|---|---|
| 1.1 | 建立 migrations/003_job_worker.sql |
| 1.2 | 更新 postgres_db.rs 對應的 struct |
| 1.3 | 執行 migration 驗證 |
Phase 2: Worker 框架
| 任務 | 說明 |
|---|---|
| 2.1 | 建立 src/worker/mod.rs |
| 2.2 | 建立 src/worker/config.rs |
| 2.3 | 建立 src/worker/worker.rs |
| 2.4 | 建立 src/worker/processor.rs |
Phase 3: Register API 整合
| 任務 | 說明 |
|---|---|
| 3.1 | 修改 src/api/server.rs 的 register 函數 |
| 3.2 | 加入建立 monitor_jobs 的邏輯 |
| 3.3 | 更新 videos 表 status 欄位 |
Phase 4: Processor 執行
| 任務 | 說明 |
|---|---|
| 4.1 | 實作 processor 並行執行(最多 2 個) |
| 4.2 | 實作失敗處理(保存部分結果) |
| 4.3 | 實作 checkpoint 恢復 |
Phase 5: 進度追蹤
| 任務 | 說明 |
|---|---|
| 5.1 | Redis Pub/Sub 整合 |
| 5.2 | PostgreSQL 定期同步 |
| 5.3 | API 進度端點更新 |
Phase 6: API 端點
| 任務 | 說明 |
|---|---|
| 6.1 | GET /api/v1/jobs |
| 6.2 | GET /api/v1/jobs/:uuid |
| 6.3 | POST /api/v1/jobs/:uuid/retry |
| 6.4 | POST /api/v1/jobs/:uuid/cancel |
Phase 7: CLI 命令
| 任務 | 說明 |
|---|---|
| 7.1 | cargo run -- worker 命令 |
| 7.2 | Worker 啟動/停止/狀態顯示 |
| 7.3 | launchd plist 設定 |
Phase 8: 測試
| 任務 | 說明 |
|---|---|
| 8.1 | 單元測試 |
| 8.2 | 端到端測試 |
| 8.3 | 失敗處理測試 |
| 8.4 | 並行執行測試 |
10. CLI 命令
10.1 Worker 命令
# 啟動 worker
cargo run -- worker
# 顯示 worker 幫助
cargo run -- worker --help
10.2 環境變數
# Worker 配置
export MOMENTRY_MAX_CONCURRENT=2
export MOMENTRY_POLL_INTERVAL=5
export MOMENTRY_WORKER_ENABLED=true
# 現有環境變數
export DATABASE_URL=postgres://accusys@localhost:5432/momentry
export REDIS_URL=redis://:accusys@localhost:6379
11. 預估工時
| Phase | 任務 | 預估工時 |
|---|---|---|
| 1 | 資料庫遷移 | 2h |
| 2 | Worker 框架 | 4h |
| 3 | Register API 整合 | 2h |
| 4 | Processor 執行 | 4h |
| 5 | 進度追蹤 | 2h |
| 6 | API 端點 | 3h |
| 7 | CLI 命令 | 2h |
| 8 | 測試 | 4h |
| 總計 | 23h |
12. 參考文件
| 文件 | 用途 |
|---|---|
docs/MOMENTRY_CORE_MONITORING.md |
監控系統規範 |
docs/MOMENTRY_CORE_REDIS_KEYS.md |
Redis Key 設計 |
docs/PROCESSING_PIPELINE.md |
處理流程 |
docs/CHUNK_DESIGN.md |
資料庫設計 |
docs/API_REFERENCE.md |
API 參考 |
13. 附錄
A. 狀態機
┌──────────────┐
│ PENDING │
└──────┬───────┘
│ register 後
▼
┌──────────────┐
┌─────▶│ PROCESSING │◀──────┐
│ └──────┬───────┘ │
│ │ │
部分失敗 all completed 全部失敗
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ PARTIAL │ │COMPLETED │ │ FAILED │
└──────────┘ └──────────┘ └──────────┘
B. videos 表 status 欄位
| 值 | 說明 |
|---|---|
pending |
已註冊,等待處理 |
processing |
處理中 |
completed |
所有處理完成 |
failed |
處理失敗 |
C. processor_results 表 status 欄位
| 值 | 說明 |
|---|---|
pending |
等待執行 |
running |
執行中 |
completed |
執行成功 |
failed |
執行失敗 |
skipped |
跳過(如檔案已存在) |