feat: add job worker and duplicate registration check

Job Worker System:
- Add polling-based job worker (max 2 concurrent processors)
- Create monitor_jobs records when videos are registered
- Link videos.job_id to monitor_jobs
- Fix type mismatches (i32 vs i64) for database IDs

Duplicate Registration:
- Check if video already exists before registering
- Return existing video info with already_exists: true
- Use canonical path for UUID computation

USER_DATA_ROOT Configuration:
- Add MOMENTRY_USER_DATA_ROOT environment variable
- UUID computed from relative path (username/filename)
- Ensures consistent UUIDs when data root changes
This commit is contained in:
accusys
2026-03-25 02:50:31 +08:00
parent 2d787b7806
commit 786381ac67
9 changed files with 3669 additions and 229 deletions

View File

@@ -60,6 +60,14 @@ pub static SERVER_PORT: Lazy<u16> = Lazy::new(|| {
pub static REDIS_KEY_PREFIX: Lazy<String> =
Lazy::new(|| env::var("MOMENTRY_REDIS_PREFIX").unwrap_or_else(|_| "momentry:".to_string()));
/// User data root path (sftpgo data directory)
/// This is the parent directory containing user directories like ./demo/, ./warren/, ./momentry/
/// Example: /Users/accusys/momentry/var/sftpgo/data
pub static USER_DATA_ROOT: Lazy<String> = Lazy::new(|| {
env::var("MOMENTRY_USER_DATA_ROOT")
.unwrap_or_else(|_| "/Users/accusys/momentry/var/sftpgo/data".to_string())
});
pub mod processor {
use super::*;

File diff suppressed because it is too large Load Diff

View File

@@ -336,7 +336,7 @@ impl RedisClient {
pub async fn update_worker_job_status(
&self,
uuid: &str,
job_id: i64,
job_id: i32,
status: &str,
current_processor: Option<&str>,
progress: i32,
@@ -401,7 +401,7 @@ impl RedisClient {
}
let status: String = conn.hget(&key, "status").await?;
let job_id: i64 = conn.hget(&key, "job_id").await?;
let job_id: i32 = conn.hget(&key, "job_id").await?;
let current_processor: String = conn.hget(&key, "current_processor").await?;
let progress_current: i32 = conn.hget(&key, "progress_current").await?;
let progress_total: i32 = conn.hget(&key, "progress_total").await?;
@@ -538,7 +538,7 @@ pub struct AnomalyAlertMessage {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerJobStatus {
pub uuid: String,
pub job_id: i64,
pub job_id: i32,
pub status: String,
pub current_processor: String,
pub progress_current: i32,
@@ -549,7 +549,7 @@ pub struct WorkerJobStatus {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerJobInfo {
pub uuid: String,
pub job_id: i64,
pub job_id: i32,
pub status: String,
pub progress_current: i32,
pub progress_total: i32,

View File

@@ -25,6 +25,41 @@ pub fn compute_uuid_from_path(full_path: &str) -> String {
compute_uuid(&parent, &filename)
}
/// Extract relative path from full path given data root
/// Returns (relative_path, filename)
pub fn extract_relative_path(full_path: &str, data_root: &str) -> (String, String) {
let full_path = PathBuf::from(full_path);
let data_root = PathBuf::from(data_root);
// Canonicalize both paths
let full_canonical = full_path.canonicalize().unwrap_or(full_path.clone());
let root_canonical = data_root.canonicalize().unwrap_or(data_root.clone());
// Try to strip the data root prefix
let relative = full_canonical
.strip_prefix(&root_canonical)
.unwrap_or(&full_canonical);
// Separate into parent directory and filename
let filename = relative
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let parent = relative
.parent()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_default();
(parent, filename)
}
/// Compute UUID from full path using data root for relative path extraction
pub fn compute_uuid_from_path_with_root(full_path: &str, data_root: &str) -> String {
let (parent, filename) = extract_relative_path(full_path, data_root);
compute_uuid(&parent, &filename)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -41,4 +76,26 @@ mod tests {
let uuid = compute_uuid_from_path("/Users/test/Videos/video.mp4");
assert_eq!(uuid.len(), 16);
}
#[test]
fn test_relative_path_extraction() {
let (parent, filename) =
extract_relative_path("/data/sftpgo/data/demo/video.mp4", "/data/sftpgo/data");
assert_eq!(parent, "demo");
assert_eq!(filename, "video.mp4");
}
#[test]
fn test_uuid_with_data_root() {
let uuid1 = compute_uuid_from_path_with_root(
"/data/sftpgo/data/demo/video.mp4",
"/data/sftpgo/data",
);
let uuid2 = compute_uuid_from_path_with_root(
"/data/sftpgo/data/demo/video.mp4",
"/data/sftpgo/data",
);
assert_eq!(uuid1, uuid2);
assert_eq!(uuid1.len(), 16);
}
}