feat: add job worker and duplicate registration check

Job Worker System:
- Add polling-based job worker (max 2 concurrent processors)
- Create monitor_jobs records when videos are registered
- Link videos.job_id to monitor_jobs
- Fix type mismatches (i32 vs i64) for database IDs

Duplicate Registration:
- Check if video already exists before registering
- Return existing video info with already_exists: true
- Use canonical path for UUID computation

USER_DATA_ROOT Configuration:
- Add MOMENTRY_USER_DATA_ROOT environment variable
- UUID computed from relative path (username/filename)
- Ensures consistent UUIDs when data root changes
This commit is contained in:
accusys
2026-03-25 02:50:31 +08:00
parent cd0f952aeb
commit 12a7b59232
9 changed files with 3669 additions and 229 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -60,6 +60,14 @@ pub static SERVER_PORT: Lazy<u16> = Lazy::new(|| {
pub static REDIS_KEY_PREFIX: Lazy<String> =
Lazy::new(|| env::var("MOMENTRY_REDIS_PREFIX").unwrap_or_else(|_| "momentry:".to_string()));
/// User data root path (sftpgo data directory)
/// This is the parent directory containing user directories like ./demo/, ./warren/, ./momentry/
/// Example: /Users/accusys/momentry/var/sftpgo/data
pub static USER_DATA_ROOT: Lazy<String> = Lazy::new(|| {
env::var("MOMENTRY_USER_DATA_ROOT")
.unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string())
});
pub mod processor {
use super::*;

File diff suppressed because it is too large Load Diff

View File

@@ -336,7 +336,7 @@ impl RedisClient {
pub async fn update_worker_job_status(
&self,
uuid: &str,
job_id: i64,
job_id: i32,
status: &str,
current_processor: Option<&str>,
progress: i32,
@@ -401,7 +401,7 @@ impl RedisClient {
}
let status: String = conn.hget(&key, "status").await?;
let job_id: i64 = conn.hget(&key, "job_id").await?;
let job_id: i32 = conn.hget(&key, "job_id").await?;
let current_processor: String = conn.hget(&key, "current_processor").await?;
let progress_current: i32 = conn.hget(&key, "progress_current").await?;
let progress_total: i32 = conn.hget(&key, "progress_total").await?;
@@ -538,7 +538,7 @@ pub struct AnomalyAlertMessage {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerJobStatus {
pub uuid: String,
pub job_id: i64,
pub job_id: i32,
pub status: String,
pub current_processor: String,
pub progress_current: i32,
@@ -549,7 +549,7 @@ pub struct WorkerJobStatus {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerJobInfo {
pub uuid: String,
pub job_id: i64,
pub job_id: i32,
pub status: String,
pub progress_current: i32,
pub progress_total: i32,

View File

@@ -25,6 +25,41 @@ pub fn compute_uuid_from_path(full_path: &str) -> String {
compute_uuid(&parent, &filename)
}
/// Extract relative path from full path given data root
/// Returns (relative_path, filename)
pub fn extract_relative_path(full_path: &str, data_root: &str) -> (String, String) {
let full_path = PathBuf::from(full_path);
let data_root = PathBuf::from(data_root);
// Canonicalize both paths
let full_canonical = full_path.canonicalize().unwrap_or(full_path.clone());
let root_canonical = data_root.canonicalize().unwrap_or(data_root.clone());
// Try to strip the data root prefix
let relative = full_canonical
.strip_prefix(&root_canonical)
.unwrap_or(&full_canonical);
// Separate into parent directory and filename
let filename = relative
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let parent = relative
.parent()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_default();
(parent, filename)
}
/// Compute UUID from full path using data root for relative path extraction
pub fn compute_uuid_from_path_with_root(full_path: &str, data_root: &str) -> String {
let (parent, filename) = extract_relative_path(full_path, data_root);
compute_uuid(&parent, &filename)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -41,4 +76,26 @@ mod tests {
let uuid = compute_uuid_from_path("/Users/test/Videos/video.mp4");
assert_eq!(uuid.len(), 16);
}
#[test]
fn test_relative_path_extraction() {
let (parent, filename) =
extract_relative_path("/data/sftpgo/data/demo/video.mp4", "/data/sftpgo/data");
assert_eq!(parent, "demo");
assert_eq!(filename, "video.mp4");
}
#[test]
fn test_uuid_with_data_root() {
let uuid1 = compute_uuid_from_path_with_root(
"/data/sftpgo/data/demo/video.mp4",
"/data/sftpgo/data",
);
let uuid2 = compute_uuid_from_path_with_root(
"/data/sftpgo/data/demo/video.mp4",
"/data/sftpgo/data",
);
assert_eq!(uuid1, uuid2);
assert_eq!(uuid1.len(), 16);
}
}

77
src/worker/config.rs Normal file
View File

@@ -0,0 +1,77 @@
use once_cell::sync::Lazy;
use std::env;
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub max_concurrent: usize,
pub poll_interval_secs: u64,
pub enabled: bool,
pub batch_size: i32,
pub processor_timeout_secs: u64,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
max_concurrent: Self::max_concurrent_from_env(),
poll_interval_secs: Self::poll_interval_from_env(),
enabled: Self::enabled_from_env(),
batch_size: Self::batch_size_from_env(),
processor_timeout_secs: Self::timeout_from_env(),
}
}
}
impl WorkerConfig {
fn max_concurrent_from_env() -> usize {
env::var("MOMENTRY_MAX_CONCURRENT")
.unwrap_or_else(|_| "2".to_string())
.parse()
.unwrap_or(2)
}
fn poll_interval_from_env() -> u64 {
env::var("MOMENTRY_POLL_INTERVAL")
.unwrap_or_else(|_| "5".to_string())
.parse()
.unwrap_or(5)
}
fn enabled_from_env() -> bool {
env::var("MOMENTRY_WORKER_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true)
}
fn batch_size_from_env() -> i32 {
env::var("MOMENTRY_WORKER_BATCH_SIZE")
.unwrap_or_else(|_| "10".to_string())
.parse()
.unwrap_or(10)
}
fn timeout_from_env() -> u64 {
env::var("MOMENTRY_WORKER_PROCESSOR_TIMEOUT")
.unwrap_or_else(|_| "3600".to_string())
.parse()
.unwrap_or(3600)
}
}
pub static WORKER_CONFIG: Lazy<WorkerConfig> = Lazy::new(WorkerConfig::default);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_worker_config_default() {
let config = WorkerConfig::default();
assert_eq!(config.max_concurrent, 2);
assert_eq!(config.poll_interval_secs, 5);
assert!(config.enabled);
assert_eq!(config.batch_size, 10);
assert_eq!(config.processor_timeout_secs, 3600);
}
}

184
src/worker/job_worker.rs Normal file
View File

@@ -0,0 +1,184 @@
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{error, info, warn};
use crate::core::db::{MonitorJobStatus, PostgresDb, ProcessorType, RedisClient};
use crate::worker::config::WorkerConfig;
use crate::worker::processor::{ProcessorPool, ProcessorTask};
pub struct JobWorker {
db: Arc<PostgresDb>,
redis: Arc<RedisClient>,
config: WorkerConfig,
processor_pool: ProcessorPool,
}
impl JobWorker {
pub fn new(db: Arc<PostgresDb>, redis: Arc<RedisClient>, config: WorkerConfig) -> Self {
let processor_pool = ProcessorPool::new(db.clone(), redis.clone(), config.max_concurrent);
Self {
db,
redis,
config,
processor_pool,
}
}
pub async fn run(&self) -> Result<()> {
info!(
"Job Worker started with config: max_concurrent={}, poll_interval={}s",
self.config.max_concurrent, self.config.poll_interval_secs
);
loop {
if !self.config.enabled {
warn!("Worker is disabled, sleeping for 60s...");
sleep(Duration::from_secs(60)).await;
continue;
}
if let Err(e) = self.poll_and_process().await {
error!("Error during poll and process: {}", e);
}
sleep(Duration::from_secs(self.config.poll_interval_secs)).await;
}
}
async fn poll_and_process(&self) -> Result<()> {
let pending_jobs = self.db.get_pending_jobs(self.config.batch_size).await?;
if pending_jobs.is_empty() {
return Ok(());
}
info!("Found {} pending jobs", pending_jobs.len());
for job in pending_jobs {
if !self.processor_pool.can_start().await {
info!("Max concurrent processors reached, waiting...");
break;
}
if let Err(e) = self.process_job(job).await {
error!("Failed to process job: {}", e);
}
}
Ok(())
}
async fn process_job(&self, job: crate::core::db::MonitorJob) -> Result<()> {
info!("Processing job: {} ({})", job.uuid, job.id);
let total_processors = ProcessorType::all().len() as i32;
self.db
.update_job_status(job.id, MonitorJobStatus::Running)
.await?;
self.redis
.update_worker_job_status(&job.uuid, job.id, "running", None, 0, total_processors)
.await?;
for processor_type in ProcessorType::all() {
let processor_result_id = self
.db
.create_processor_result(job.id, processor_type)
.await?;
self.redis
.update_worker_processor_status(&job.uuid, processor_type.as_str(), "pending", None)
.await?;
let task = ProcessorTask {
job: job.clone(),
processor_type,
processor_result_id,
};
self.processor_pool.start_processor(task).await?;
}
self.check_and_complete_job(job.id, &job.uuid).await?;
Ok(())
}
async fn check_and_complete_job(&self, job_id: i32, uuid: &str) -> Result<()> {
let results = self.db.get_processor_results_by_job(job_id).await?;
let all_completed = results.iter().all(|r| {
matches!(
r.status,
crate::core::db::ProcessorJobStatus::Completed
| crate::core::db::ProcessorJobStatus::Skipped
)
});
let any_failed = results
.iter()
.any(|r| matches!(r.status, crate::core::db::ProcessorJobStatus::Failed));
let completed_count = results
.iter()
.filter(|r| {
matches!(
r.status,
crate::core::db::ProcessorJobStatus::Completed
| crate::core::db::ProcessorJobStatus::Skipped
)
})
.count() as i32;
if all_completed && !any_failed {
self.db
.update_job_status(job_id, MonitorJobStatus::Completed)
.await?;
self.redis
.update_worker_job_status(uuid, job_id, "completed", None, completed_count, 7)
.await?;
self.redis.delete_worker_job(uuid).await?;
info!("Job {} completed successfully", job_id);
} else if any_failed {
self.db
.update_job_status(job_id, MonitorJobStatus::Failed)
.await?;
self.redis
.update_worker_job_status(uuid, job_id, "failed", None, completed_count, 7)
.await?;
warn!("Job {} completed with failures", job_id);
} else {
self.redis
.update_worker_job_status(uuid, job_id, "running", None, completed_count, 7)
.await?;
}
Ok(())
}
pub async fn shutdown(&self) {
info!("Shutting down worker...");
self.processor_pool.cancel_all().await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_worker_config() {
let config = WorkerConfig::default();
assert!(config.enabled);
assert!(config.max_concurrent >= 1);
}
}

6
src/worker/mod.rs Normal file
View File

@@ -0,0 +1,6 @@
pub mod config;
pub mod job_worker;
pub mod processor;
pub use config::WorkerConfig;
pub use job_worker::JobWorker;

354
src/worker/processor.rs Normal file
View File

@@ -0,0 +1,354 @@
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{error, info};
use crate::core::db::RedisClient;
use crate::core::db::{MonitorJob, PostgresDb, ProcessorJobStatus, ProcessorType};
#[derive(Debug, Clone)]
pub struct ProcessorTask {
pub job: MonitorJob,
pub processor_type: ProcessorType,
pub processor_result_id: i32,
}
pub struct ProcessorPool {
db: Arc<PostgresDb>,
redis: Arc<RedisClient>,
max_concurrent: usize,
running: Arc<RwLock<HashMap<i32, ProcessorHandle>>>,
running_count: Arc<RwLock<usize>>,
}
struct ProcessorHandle {
#[allow(dead_code)]
processor_type: ProcessorType,
cancel_tx: mpsc::Sender<()>,
}
impl ProcessorPool {
pub fn new(db: Arc<PostgresDb>, redis: Arc<RedisClient>, max_concurrent: usize) -> Self {
Self {
db,
redis,
max_concurrent,
running: Arc::new(RwLock::new(HashMap::new())),
running_count: Arc::new(RwLock::new(0)),
}
}
pub async fn can_start(&self) -> bool {
let count = *self.running_count.read().await;
count < self.max_concurrent
}
pub async fn start_processor(&self, task: ProcessorTask) -> Result<()> {
let (cancel_tx, cancel_rx) = mpsc::channel(1);
let job_id = task.job.id;
let processor_type = task.processor_type;
{
let mut count = self.running_count.write().await;
if *count >= self.max_concurrent {
anyhow::bail!("Max concurrent processors reached");
}
*count += 1;
}
let running = self.running.clone();
let running_count = self.running_count.clone();
running.write().await.insert(
job_id,
ProcessorHandle {
processor_type,
cancel_tx,
},
);
let db = self.db.clone();
let redis = self.redis.clone();
let job = task.job.clone();
let processor_result_id = task.processor_result_id;
let processor_name = processor_type.as_str().to_string();
tokio::spawn(async move {
info!("Starting processor {} for job {}", processor_name, job.uuid);
let _ = db
.update_processor_result(
processor_result_id,
ProcessorJobStatus::Running,
None,
None,
)
.await;
let _ = redis
.update_worker_processor_status(&job.uuid, &processor_name, "running", None)
.await;
let result = Self::run_processor(&db, &redis, &job, processor_type, cancel_rx).await;
{
let mut running_guard = running.write().await;
running_guard.remove(&job_id);
let mut count_guard = running_count.write().await;
*count_guard -= 1;
}
match result {
Ok(output) => {
info!(
"Processor {} completed for job {}",
processor_name, job.uuid
);
let _ = db
.update_processor_result(
processor_result_id,
ProcessorJobStatus::Completed,
None,
Some(&output),
)
.await;
let _ = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"completed",
None,
)
.await;
}
Err(e) => {
error!(
"Processor {} failed for job {}: {}",
processor_name, job.uuid, e
);
let _ = db
.update_processor_result(
processor_result_id,
ProcessorJobStatus::Failed,
Some(&e.to_string()),
None,
)
.await;
let _ = redis
.update_worker_processor_status(
&job.uuid,
&processor_name,
"failed",
Some(&e.to_string()),
)
.await;
}
}
});
Ok(())
}
async fn run_processor(
db: &PostgresDb,
redis: &RedisClient,
job: &MonitorJob,
processor_type: ProcessorType,
mut cancel_rx: mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let video_path = job.video_path.as_ref().context("No video path in job")?;
match processor_type {
ProcessorType::Asr => Self::run_asr(db, redis, video_path, &mut cancel_rx).await,
ProcessorType::Cut => Self::run_cut(db, redis, video_path, &mut cancel_rx).await,
ProcessorType::Yolo => Self::run_yolo(db, redis, video_path, &mut cancel_rx).await,
ProcessorType::Ocr => Self::run_ocr(db, redis, video_path, &mut cancel_rx).await,
ProcessorType::Face => Self::run_face(db, redis, video_path, &mut cancel_rx).await,
ProcessorType::Pose => Self::run_pose(db, redis, video_path, &mut cancel_rx).await,
ProcessorType::Asrx => Self::run_asrx(db, redis, video_path, &mut cancel_rx).await,
}
}
async fn run_asr(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_ASR_SCRIPT")
.unwrap_or_else(|_| "/Users/accusys/momentry/scripts/asr.py".to_string());
let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11")
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("ASR script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
async fn run_cut(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_CUT_SCRIPT")
.unwrap_or_else(|_| "/Users/accusys/momentry/scripts/cut.py".to_string());
let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11")
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("CUT script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
async fn run_yolo(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_YOLO_SCRIPT")
.unwrap_or_else(|_| "/Users/accusys/momentry/scripts/yolo_processor.py".to_string());
let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11")
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("YOLO script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
async fn run_ocr(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_OCR_SCRIPT")
.unwrap_or_else(|_| "/Users/accusys/momentry/scripts/ocr.py".to_string());
let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11")
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("OCR script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
async fn run_face(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_FACE_SCRIPT")
.unwrap_or_else(|_| "/Users/accusys/momentry/scripts/face.py".to_string());
let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11")
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Face script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
async fn run_pose(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_POSE_SCRIPT")
.unwrap_or_else(|_| "/Users/accusys/momentry/scripts/pose.py".to_string());
let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11")
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Pose script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
async fn run_asrx(
_db: &PostgresDb,
_redis: &RedisClient,
video_path: &str,
_cancel_rx: &mut mpsc::Receiver<()>,
) -> Result<serde_json::Value> {
let script_path = std::env::var("MOMENTRY_ASRX_SCRIPT")
.unwrap_or_else(|_| "/Users/accusys/momentry/scripts/asrx.py".to_string());
let output = tokio::process::Command::new("/opt/homebrew/bin/python3.11")
.arg(&script_path)
.arg(video_path)
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("ASRX script failed: {}", stderr);
}
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(result)
}
pub async fn get_running_count(&self) -> usize {
*self.running_count.read().await
}
pub async fn cancel_all(&self) {
let mut running = self.running.write().await;
for (_, handle) in running.drain() {
let _ = handle.cancel_tx.send(()).await;
}
let mut count = self.running_count.write().await;
*count = 0;
}
}