- Add separate momentry_playground binary with distinct configuration - Production (momentry): Port 3002, Redis prefix 'momentry:' - Development (momentry_playground): Port 3003, Redis prefix 'momentry_dev:' - Add SERVER_PORT and REDIS_KEY_PREFIX config via environment variables - Replace all hardcoded Redis key prefixes with configurable values - Create .env.development for playground environment settings - Update .env with production defaults - Add dotenv dependency for environment file loading Configuration isolation allows running both binaries simultaneously without port conflicts or Redis key collisions.
3209 lines
134 KiB
Rust
3209 lines
134 KiB
Rust
use anyhow::{Context, Result};
|
|
use clap::{Parser, Subcommand};
|
|
use futures_util::StreamExt;
|
|
use std::path::Path;
|
|
use std::str;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use momentry_core::core::api_key::{ApiKeyService, ApiKeyType};
|
|
use momentry_core::core::chunk::types::{Chunk, ChunkRule, ChunkType};
|
|
use momentry_core::core::db::Database;
|
|
use momentry_core::ui::progress::{ProcessorType, ProgressState, ProgressUi};
|
|
use momentry_core::{
|
|
Embedder, OutputDir, PostgresDb, QdrantDb, RedisClient, VectorPayload, VideoRecord, VideoStatus,
|
|
};
|
|
|
|
fn parse_key_type(s: Option<&str>) -> ApiKeyType {
|
|
match s.map(|s| s.to_lowercase()).as_deref() {
|
|
Some("system") => ApiKeyType::System,
|
|
Some("user") => ApiKeyType::User,
|
|
Some("service") => ApiKeyType::Service,
|
|
Some("integration") => ApiKeyType::Integration,
|
|
Some("emergency") => ApiKeyType::Emergency,
|
|
_ => ApiKeyType::User,
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq)]
|
|
pub enum ProcessingDecision {
|
|
Process,
|
|
SkipComplete,
|
|
ResumePartial,
|
|
ForceReprocess,
|
|
}
|
|
|
|
impl ProcessingDecision {
|
|
pub fn should_process(&self) -> bool {
|
|
matches!(
|
|
self,
|
|
ProcessingDecision::Process
|
|
| ProcessingDecision::ResumePartial
|
|
| ProcessingDecision::ForceReprocess
|
|
)
|
|
}
|
|
|
|
pub fn should_resume(&self) -> bool {
|
|
matches!(self, ProcessingDecision::ResumePartial)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct SystemResources {
|
|
pub cpu_idle_percent: f64,
|
|
pub memory_available_mb: u64,
|
|
pub memory_total_mb: u64,
|
|
pub memory_used_percent: f64,
|
|
pub gpu_available: bool,
|
|
pub gpu_type: GpuType,
|
|
pub gpu_utilization: Option<f64>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy)]
|
|
pub enum GpuType {
|
|
Nvidia,
|
|
AppleMps,
|
|
}
|
|
|
|
impl SystemResources {
|
|
pub fn check() -> Self {
|
|
let cpu_idle = Self::get_cpu_idle();
|
|
let (mem_available, mem_total) = Self::get_memory_info();
|
|
let mem_used_pct = if mem_total > 0 && mem_available <= mem_total {
|
|
((mem_total - mem_available) as f64 / mem_total as f64) * 100.0
|
|
} else if mem_total > 0 {
|
|
100.0
|
|
} else {
|
|
0.0
|
|
};
|
|
let (gpu_available, gpu_type, gpu_util) = Self::get_gpu_info();
|
|
|
|
Self {
|
|
cpu_idle_percent: cpu_idle,
|
|
memory_available_mb: mem_available,
|
|
memory_total_mb: mem_total,
|
|
memory_used_percent: mem_used_pct,
|
|
gpu_available,
|
|
gpu_type,
|
|
gpu_utilization: gpu_util,
|
|
}
|
|
}
|
|
|
|
pub fn can_parallel(&self, required_memory_mb: u64) -> bool {
|
|
const MIN_CPU_IDLE: f64 = 30.0;
|
|
const MIN_MEMORY_MB: u64 = 4096;
|
|
|
|
self.cpu_idle_percent >= MIN_CPU_IDLE
|
|
&& self.memory_available_mb >= required_memory_mb
|
|
&& self.memory_available_mb >= MIN_MEMORY_MB
|
|
}
|
|
|
|
pub fn recommend_parallel_modules(&self) -> Vec<&'static str> {
|
|
let mut recommended = Vec::new();
|
|
|
|
if self.gpu_available {
|
|
recommended.push("yolo");
|
|
}
|
|
|
|
if self.memory_available_mb >= 8192 {
|
|
recommended.push("ocr");
|
|
recommended.push("face");
|
|
recommended.push("pose");
|
|
}
|
|
|
|
recommended
|
|
}
|
|
|
|
fn get_cpu_idle() -> f64 {
|
|
use std::process::Command;
|
|
let output = Command::new("top").args(["-l", "1", "-n", "1"]).output();
|
|
match output {
|
|
Ok(o) => {
|
|
let s = String::from_utf8_lossy(&o.stdout);
|
|
if let Some(line) = s.lines().find(|l| l.contains("idle")) {
|
|
if let Some(pct) = line
|
|
.split_whitespace()
|
|
.find_map(|s| s.strip_suffix("%idle"))
|
|
{
|
|
pct.trim().parse().ok().unwrap_or(50.0)
|
|
} else {
|
|
50.0
|
|
}
|
|
} else {
|
|
50.0
|
|
}
|
|
}
|
|
Err(_) => 50.0,
|
|
}
|
|
}
|
|
|
|
fn get_memory_info() -> (u64, u64) {
|
|
use std::process::Command;
|
|
let output = Command::new("sysctl").args(["hw.memsize"]).output();
|
|
match output {
|
|
Ok(o) => {
|
|
let s = String::from_utf8_lossy(&o.stdout);
|
|
let total = s
|
|
.split_whitespace()
|
|
.nth(1)
|
|
.and_then(|v| v.parse::<u64>().ok())
|
|
.unwrap_or(0)
|
|
/ 1024
|
|
/ 1024;
|
|
|
|
let vm_stat = Command::new("vm_stat").output();
|
|
let available = match vm_stat {
|
|
Ok(v) => {
|
|
let vs = String::from_utf8_lossy(&v.stdout);
|
|
let mut free_pages: u64 = 0;
|
|
let mut inactive_pages: u64 = 0;
|
|
|
|
for line in vs.lines() {
|
|
if line.contains("Pages free:") {
|
|
free_pages = line
|
|
.split_whitespace()
|
|
.last()
|
|
.and_then(|v| v.trim_end_matches('.').parse().ok())
|
|
.unwrap_or(0);
|
|
} else if line.contains("Pages inactive:") {
|
|
inactive_pages = line
|
|
.split_whitespace()
|
|
.last()
|
|
.and_then(|v| v.trim_end_matches('.').parse().ok())
|
|
.unwrap_or(0);
|
|
}
|
|
}
|
|
|
|
// Pages * 4096 bytes / 1024 / 1024 = MB
|
|
(free_pages + inactive_pages) * 4096 / 1024 / 1024
|
|
}
|
|
Err(_) => total / 4,
|
|
};
|
|
|
|
(available, total)
|
|
}
|
|
Err(_) => (0, 0),
|
|
}
|
|
}
|
|
|
|
fn get_gpu_info() -> (bool, GpuType, Option<f64>) {
|
|
use std::process::Command;
|
|
|
|
// Check NVIDIA GPU
|
|
let nvidia_output = Command::new("nvidia-smi")
|
|
.args([
|
|
"--query-gpu=utilization.gpu",
|
|
"--format=csv,noheader,nounits",
|
|
])
|
|
.output();
|
|
|
|
if let Ok(o) = nvidia_output {
|
|
if o.status.success() {
|
|
let s = String::from_utf8_lossy(&o.stdout);
|
|
let util = s.trim().parse::<f64>().ok();
|
|
return (true, GpuType::Nvidia, util);
|
|
}
|
|
}
|
|
|
|
// Check Apple MPS (Metal Performance Shaders)
|
|
let mps_output = Command::new("system_profiler")
|
|
.args(["SPDisplaysDataType", "-detailLevel", "mini"])
|
|
.output();
|
|
|
|
if let Ok(o) = mps_output {
|
|
let s = String::from_utf8_lossy(&o.stdout);
|
|
if s.contains("Metal") || s.contains("Apple") {
|
|
return (true, GpuType::AppleMps, Some(0.0));
|
|
}
|
|
}
|
|
|
|
(false, GpuType::Nvidia, None)
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Display for SystemResources {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(
|
|
f,
|
|
"CPU: {:.1}% idle, Memory: {:.1}GB/{:.1}GB ({:.0}% used), GPU: {}",
|
|
self.cpu_idle_percent,
|
|
self.memory_available_mb as f64 / 1024.0,
|
|
self.memory_total_mb as f64 / 1024.0,
|
|
self.memory_used_percent,
|
|
if self.gpu_available {
|
|
format!("{:.0}% utilized", self.gpu_utilization.unwrap_or(0.0))
|
|
} else {
|
|
"N/A".to_string()
|
|
}
|
|
)
|
|
}
|
|
}
|
|
|
|
fn decide_processing(json_path: &Path, force: bool, resume: bool) -> ProcessingDecision {
|
|
if !json_path.exists() {
|
|
return ProcessingDecision::Process;
|
|
}
|
|
|
|
if force {
|
|
return ProcessingDecision::ForceReprocess;
|
|
}
|
|
|
|
if resume {
|
|
return ProcessingDecision::ResumePartial;
|
|
}
|
|
|
|
match check_json_completeness(json_path) {
|
|
JsonCompleteness::Complete => ProcessingDecision::SkipComplete,
|
|
JsonCompleteness::Partial { current, total } => {
|
|
eprintln!("\n⚠️ Found incomplete JSON file: {}", json_path.display());
|
|
eprintln!(
|
|
" Progress: {}/{} ({:.1}%)",
|
|
current,
|
|
total,
|
|
(current as f64 / total as f64) * 100.0
|
|
);
|
|
eprintln!(" Use --resume to continue from checkpoint");
|
|
eprintln!(" Use --force to reprocess from scratch");
|
|
ProcessingDecision::SkipComplete
|
|
}
|
|
JsonCompleteness::Empty => ProcessingDecision::Process,
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub enum JsonCompleteness {
|
|
Complete,
|
|
Partial { current: u32, total: u32 },
|
|
Empty,
|
|
}
|
|
|
|
fn check_json_completeness(json_path: &Path) -> JsonCompleteness {
|
|
let content = match std::fs::read_to_string(json_path) {
|
|
Ok(c) => c,
|
|
Err(_) => return JsonCompleteness::Empty,
|
|
};
|
|
|
|
if content.trim().is_empty() {
|
|
return JsonCompleteness::Empty;
|
|
}
|
|
|
|
let json: serde_json::Value = match serde_json::from_str(&content) {
|
|
Ok(v) => v,
|
|
Err(_) => return JsonCompleteness::Empty,
|
|
};
|
|
|
|
match json.get("segments") {
|
|
Some(serde_json::Value::Array(arr)) if !arr.is_empty() => JsonCompleteness::Complete,
|
|
Some(serde_json::Value::Object(obj)) => {
|
|
let current = obj.get("current").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
|
|
let total = obj.get("total").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
|
|
if total > 0 && current < total {
|
|
JsonCompleteness::Partial { current, total }
|
|
} else {
|
|
JsonCompleteness::Complete
|
|
}
|
|
}
|
|
_ => JsonCompleteness::Complete,
|
|
}
|
|
}
|
|
|
|
async fn process_asr_module(
|
|
asr_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Asr).start(1);
|
|
}
|
|
let asr_result = momentry_core::core::processor::process_asr(
|
|
video_path,
|
|
asr_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let asr_json = serde_json::to_string_pretty(&asr_result)?;
|
|
std::fs::write(asr_path, &asr_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "asr.json");
|
|
println!(" ✓ ASR saved: {} segments", asr_result.segments.len());
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Asr)
|
|
.complete(&format!("{} segments", asr_result.segments.len()));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_cut_module(
|
|
cut_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Cut).start(1);
|
|
}
|
|
let cut_result = momentry_core::core::processor::process_cut(
|
|
video_path,
|
|
cut_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let cut_json = serde_json::to_string_pretty(&cut_result)?;
|
|
std::fs::write(cut_path, &cut_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "cut.json");
|
|
println!(" ✓ CUT saved: {} scenes", cut_result.scenes.len());
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Cut)
|
|
.complete(&format!("{} scenes", cut_result.scenes.len()));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_asrx_module(
|
|
asrx_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Asrx).start(1);
|
|
}
|
|
let asrx_result = momentry_core::core::processor::process_asrx(
|
|
video_path,
|
|
asrx_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let asrx_json = serde_json::to_string_pretty(&asrx_result)?;
|
|
std::fs::write(asrx_path, &asrx_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "asrx.json");
|
|
println!(" ✓ ASRX saved: {} segments", asrx_result.segments.len());
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Asrx)
|
|
.complete(&format!("{} segments", asrx_result.segments.len()));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_yolo_module(
|
|
yolo_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Yolo).start(1);
|
|
}
|
|
let yolo_result = momentry_core::core::processor::process_yolo(
|
|
video_path,
|
|
yolo_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let yolo_json = serde_json::to_string_pretty(&yolo_result)?;
|
|
std::fs::write(yolo_path, &yolo_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "yolo.json");
|
|
println!(" ✓ YOLO saved: {} frames", yolo_result.frame_count);
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Yolo)
|
|
.complete(&format!("{} frames", yolo_result.frame_count));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_ocr_module(
|
|
ocr_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Ocr).start(1);
|
|
}
|
|
let ocr_result = momentry_core::core::processor::process_ocr(
|
|
video_path,
|
|
ocr_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let ocr_json = serde_json::to_string_pretty(&ocr_result)?;
|
|
std::fs::write(ocr_path, &ocr_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "ocr.json");
|
|
println!(
|
|
" ✓ OCR saved: {} frames with text",
|
|
ocr_result.frames.len()
|
|
);
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Ocr)
|
|
.complete(&format!("{} frames", ocr_result.frames.len()));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_face_module(
|
|
face_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Face).start(1);
|
|
}
|
|
let face_result = momentry_core::core::processor::process_face(
|
|
video_path,
|
|
face_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let face_json = serde_json::to_string_pretty(&face_result)?;
|
|
std::fs::write(face_path, &face_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "face.json");
|
|
println!(" ✓ Face saved: {} frames", face_result.frames.len());
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Face)
|
|
.complete(&format!("{} frames", face_result.frames.len()));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_pose_module(
|
|
pose_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Pose).start(1);
|
|
}
|
|
let pose_result = momentry_core::core::processor::process_pose(
|
|
video_path,
|
|
pose_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let pose_json = serde_json::to_string_pretty(&pose_result)?;
|
|
std::fs::write(pose_path, &pose_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "pose.json");
|
|
println!(" ✓ Pose saved: {} frames", pose_result.frames.len());
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Pose)
|
|
.complete(&format!("{} frames", pose_result.frames.len()));
|
|
state.stop();
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_story_module(
|
|
story_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Story).start(1);
|
|
}
|
|
let story_result = momentry_core::core::processor::process_story(
|
|
video_path,
|
|
story_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let story_json = serde_json::to_string_pretty(&story_result)?;
|
|
std::fs::write(story_path, &story_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "story.json");
|
|
println!(
|
|
" ✓ Story saved: {} parent chunks, {} child chunks",
|
|
story_result.stats.total_parent_chunks, story_result.stats.total_child_chunks
|
|
);
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Story).complete(&format!(
|
|
"{} parents, {} children",
|
|
story_result.stats.total_parent_chunks, story_result.stats.total_child_chunks
|
|
));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn process_caption_module(
|
|
caption_path: &Path,
|
|
video_path: &str,
|
|
uuid: &str,
|
|
progress_state: &Arc<Mutex<ProgressState>>,
|
|
ui: &Arc<Mutex<Option<ProgressUi>>>,
|
|
) -> anyhow::Result<()> {
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state.get_processor(ProcessorType::Caption).start(1);
|
|
}
|
|
let caption_result = momentry_core::core::processor::process_caption(
|
|
video_path,
|
|
caption_path.to_str().unwrap(),
|
|
Some(uuid),
|
|
)
|
|
.await?;
|
|
let caption_json = serde_json::to_string_pretty(&caption_result)?;
|
|
std::fs::write(caption_path, &caption_json)?;
|
|
let output_dir = OutputDir::new();
|
|
let _ = output_dir.backup_file(uuid, "caption.json");
|
|
println!(" ✓ Caption saved: {} frames", caption_result.total_frames);
|
|
{
|
|
let mut state = progress_state.lock().unwrap();
|
|
state
|
|
.get_processor(ProcessorType::Caption)
|
|
.complete(&format!("{} frames", caption_result.total_frames));
|
|
}
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Parser)]
|
|
#[command(name = "momentry")]
|
|
#[command(about = "Digital asset management system with video analysis and RAG")]
|
|
struct Cli {
|
|
#[command(subcommand)]
|
|
command: Commands,
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum Commands {
|
|
/// Register a video file
|
|
Register {
|
|
/// Video file path or URL
|
|
path: String,
|
|
},
|
|
/// Process video (generate all JSON files)
|
|
Process {
|
|
/// UUID or path
|
|
target: String,
|
|
/// Modules to process (comma separated: asr,cut,asrx,yolo,ocr,face,pose,story,caption)
|
|
/// If not specified, processes all modules
|
|
#[arg(short, long, value_delimiter = ',')]
|
|
modules: Option<Vec<String>>,
|
|
/// Modules to process via cloud (comma separated)
|
|
/// Example: --cloud asr,yolo
|
|
#[arg(long, value_delimiter = ',')]
|
|
cloud: Option<Vec<String>>,
|
|
/// Force reprocess even if JSON exists (skip completeness check)
|
|
#[arg(long, default_value = "false")]
|
|
force: bool,
|
|
/// Resume from last checkpoint if processing was interrupted
|
|
#[arg(long, default_value = "false")]
|
|
resume: bool,
|
|
},
|
|
/// Generate chunks and store in database
|
|
Chunk {
|
|
/// UUID
|
|
uuid: String,
|
|
},
|
|
/// Generate story for cut scenes
|
|
Story {
|
|
/// UUID
|
|
uuid: String,
|
|
},
|
|
/// Vectorize chunks
|
|
Vectorize {
|
|
/// UUID (or 'all' for all)
|
|
uuid: String,
|
|
},
|
|
/// Play video with overlays
|
|
Play {
|
|
/// Video path or UUID
|
|
target: String,
|
|
},
|
|
/// Start watching directories
|
|
Watch {
|
|
/// Directories to watch (comma separated)
|
|
directories: Option<String>,
|
|
},
|
|
/// Check system resources and recommend processing strategy
|
|
System {
|
|
/// Show detailed GPU info (NVIDIA/MPS)
|
|
#[arg(long)]
|
|
gpu: bool,
|
|
},
|
|
/// Start API server
|
|
Server {
|
|
/// Host
|
|
#[arg(long, default_value = "127.0.0.1")]
|
|
host: String,
|
|
/// Port (defaults to MOMENTRY_SERVER_PORT env var, or 3003 for development)
|
|
#[arg(long)]
|
|
port: Option<u16>,
|
|
},
|
|
/// Start job worker
|
|
Worker {
|
|
/// Max concurrent processors
|
|
#[arg(long)]
|
|
max_concurrent: Option<usize>,
|
|
/// Poll interval in seconds
|
|
#[arg(long)]
|
|
poll_interval: Option<u64>,
|
|
/// Batch size
|
|
#[arg(long)]
|
|
batch_size: Option<i32>,
|
|
},
|
|
/// Query using RAG
|
|
Query {
|
|
/// Query text
|
|
query: String,
|
|
},
|
|
/// Lookup UUID from path
|
|
Lookup {
|
|
/// File path
|
|
path: String,
|
|
},
|
|
/// Resolve path from UUID
|
|
Resolve {
|
|
/// UUID
|
|
uuid: String,
|
|
},
|
|
/// Generate thumbnails for videos
|
|
Thumbnails {
|
|
/// UUID (optional, generates for all if not specified)
|
|
uuid: Option<String>,
|
|
/// Number of thumbnails per video
|
|
#[arg(short, long, default_value = "6")]
|
|
count: u32,
|
|
},
|
|
/// Show storage status report
|
|
Status {
|
|
/// UUID (optional, shows all if not specified)
|
|
uuid: Option<String>,
|
|
},
|
|
/// Manage output backups
|
|
Backup {
|
|
/// Action: list, cleanup
|
|
action: String,
|
|
/// Days to keep (for cleanup)
|
|
days: Option<u32>,
|
|
},
|
|
/// Manage API keys
|
|
ApiKey {
|
|
/// Action: create, list, validate, revoke, rotate, stats
|
|
#[arg(value_enum)]
|
|
action: ApiKeyAction,
|
|
/// Key name (for create)
|
|
name: Option<String>,
|
|
/// Key type (system, user, service, integration, emergency)
|
|
#[arg(long)]
|
|
key_type: Option<String>,
|
|
/// TTL in days (for create)
|
|
#[arg(long)]
|
|
ttl: Option<i64>,
|
|
/// API key to validate/revoke
|
|
#[arg(long)]
|
|
key: Option<String>,
|
|
},
|
|
/// Manage Gitea API tokens
|
|
Gitea {
|
|
/// Action: create, list, delete, verify
|
|
#[arg(value_enum)]
|
|
action: GiteaAction,
|
|
/// Gitea username
|
|
#[arg(long)]
|
|
username: Option<String>,
|
|
/// Gitea password (for create/list/delete)
|
|
#[arg(long)]
|
|
password: Option<String>,
|
|
/// Token name (for create/delete)
|
|
#[arg(long)]
|
|
token_name: Option<String>,
|
|
/// Token scopes (comma separated: read:repository,write:issue)
|
|
#[arg(long)]
|
|
scopes: Option<String>,
|
|
},
|
|
/// Manage n8n API keys
|
|
N8n {
|
|
/// Action: create, list, delete, verify
|
|
#[arg(value_enum)]
|
|
action: N8nAction,
|
|
/// n8n API key (for create/list/delete)
|
|
#[arg(long)]
|
|
api_key: Option<String>,
|
|
/// API key label (for create/delete)
|
|
#[arg(long)]
|
|
label: Option<String>,
|
|
/// Expiration days (for create)
|
|
#[arg(long)]
|
|
expires_in_days: Option<i64>,
|
|
},
|
|
}
|
|
|
|
#[derive(clap::ValueEnum, Clone)]
|
|
enum ApiKeyAction {
|
|
Create,
|
|
List,
|
|
Validate,
|
|
Revoke,
|
|
Rotate,
|
|
Stats,
|
|
}
|
|
|
|
#[derive(clap::ValueEnum, Clone)]
|
|
enum GiteaAction {
|
|
Create,
|
|
List,
|
|
Delete,
|
|
Verify,
|
|
}
|
|
|
|
#[derive(clap::ValueEnum, Clone)]
|
|
enum N8nAction {
|
|
Create,
|
|
List,
|
|
Delete,
|
|
Verify,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
// Load development environment first
|
|
dotenv::from_filename(".env.development").ok();
|
|
|
|
tracing_subscriber::fmt::init();
|
|
|
|
tracing::info!("Starting momentry_playground (development binary)");
|
|
tracing::info!("Port: {}", *momentry_core::core::config::SERVER_PORT);
|
|
tracing::info!(
|
|
"Redis prefix: {}",
|
|
*momentry_core::core::config::REDIS_KEY_PREFIX
|
|
);
|
|
|
|
let cli = Cli::parse();
|
|
|
|
match cli.command {
|
|
Commands::Register { path } => {
|
|
println!("Registering: {}", path);
|
|
|
|
// Compute UUID
|
|
let uuid = momentry_core::uuid::compute_uuid_from_path(&path);
|
|
println!("UUID: {}", uuid);
|
|
|
|
// Run ffprobe
|
|
let probe_result = momentry_core::core::probe::probe_video(&path)?;
|
|
|
|
println!("\nVideo probe results:");
|
|
let duration = probe_result
|
|
.format
|
|
.duration
|
|
.as_ref()
|
|
.and_then(|s| s.parse::<f64>().ok())
|
|
.unwrap_or(0.0);
|
|
println!(" Duration: {}s", duration);
|
|
if let Some(size) = &probe_result.format.size {
|
|
println!(" Size: {}", size);
|
|
}
|
|
|
|
let mut width = 0u32;
|
|
let mut height = 0u32;
|
|
let mut fps = 0.0;
|
|
|
|
for stream in &probe_result.streams {
|
|
if stream.codec_type.as_deref() == Some("video") {
|
|
width = stream.width.unwrap_or(0);
|
|
height = stream.height.unwrap_or(0);
|
|
if let Some(fps_str) = &stream.r_frame_rate {
|
|
if let Some((num, den)) = fps_str.split_once('/') {
|
|
if let (Ok(n), Ok(d)) = (num.parse::<f64>(), den.parse::<f64>()) {
|
|
if d > 0.0 {
|
|
fps = n / d;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
println!(" Video: {}x{}", width, height);
|
|
if let Some(fps) = &stream.r_frame_rate {
|
|
println!(" FPS: {}", fps);
|
|
}
|
|
}
|
|
if stream.codec_type.as_deref() == Some("audio") {
|
|
println!(" Audio: {} channels", stream.channels.unwrap_or(0));
|
|
if let Some(sr) = &stream.sample_rate {
|
|
println!(" Sample Rate: {}", sr);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Save probe JSON to file
|
|
let file_manager = momentry_core::FileManager::new(std::path::PathBuf::from("."));
|
|
let json_str = serde_json::to_string_pretty(&probe_result)?;
|
|
let json_path = file_manager.save_json(&uuid, "probe", &json_str)?;
|
|
println!("\nProbe JSON saved to: {:?}", json_path);
|
|
|
|
// Store in PostgreSQL
|
|
println!("\nStoring in database...");
|
|
let db = PostgresDb::init().await?;
|
|
let file_path = Path::new(&path)
|
|
.canonicalize()
|
|
.map(|p| p.to_string_lossy().to_string())
|
|
.unwrap_or_else(|_| path.clone());
|
|
let file_name = Path::new(&path)
|
|
.file_name()
|
|
.map(|n| n.to_string_lossy().to_string())
|
|
.unwrap_or_default();
|
|
|
|
let record = VideoRecord {
|
|
id: 0,
|
|
uuid: uuid.clone(),
|
|
file_path,
|
|
file_name,
|
|
duration,
|
|
width,
|
|
height,
|
|
fps,
|
|
probe_json: Some(json_str),
|
|
storage: Default::default(),
|
|
status: VideoStatus::Pending,
|
|
user_id: None,
|
|
job_id: None,
|
|
created_at: String::new(),
|
|
};
|
|
|
|
let video_id = db.register_video(&record).await?;
|
|
println!("Video registered with ID: {}", video_id);
|
|
|
|
Ok(())
|
|
}
|
|
Commands::Process {
|
|
target,
|
|
modules,
|
|
cloud,
|
|
force,
|
|
resume,
|
|
} => {
|
|
println!("Processing: {}", target);
|
|
println!(" force: {}, resume: {}", force, resume);
|
|
|
|
// Parse selected modules
|
|
let selected_modules: Option<Vec<ProcessorType>> = modules.as_ref().map(|m| {
|
|
m.iter()
|
|
.filter_map(|name| {
|
|
let name_lower = name.to_lowercase();
|
|
match name_lower.as_str() {
|
|
"asr" => Some(ProcessorType::Asr),
|
|
"cut" => Some(ProcessorType::Cut),
|
|
"asrx" => Some(ProcessorType::Asrx),
|
|
"yolo" => Some(ProcessorType::Yolo),
|
|
"ocr" => Some(ProcessorType::Ocr),
|
|
"face" => Some(ProcessorType::Face),
|
|
"pose" => Some(ProcessorType::Pose),
|
|
"story" => Some(ProcessorType::Story),
|
|
"caption" => Some(ProcessorType::Caption),
|
|
_ => {
|
|
eprintln!("Unknown module: {}", name);
|
|
None
|
|
}
|
|
}
|
|
})
|
|
.collect()
|
|
});
|
|
|
|
// Parse cloud modules
|
|
let cloud_modules: Vec<ProcessorType> = cloud
|
|
.as_ref()
|
|
.map(|c| {
|
|
c.iter()
|
|
.filter_map(|name| {
|
|
let name_lower = name.to_lowercase();
|
|
match name_lower.as_str() {
|
|
"asr" => Some(ProcessorType::Asr),
|
|
"cut" => Some(ProcessorType::Cut),
|
|
"asrx" => Some(ProcessorType::Asrx),
|
|
"yolo" => Some(ProcessorType::Yolo),
|
|
"ocr" => Some(ProcessorType::Ocr),
|
|
"face" => Some(ProcessorType::Face),
|
|
"pose" => Some(ProcessorType::Pose),
|
|
"story" => Some(ProcessorType::Story),
|
|
"caption" => Some(ProcessorType::Caption),
|
|
_ => {
|
|
eprintln!("Unknown cloud module: {}", name);
|
|
None
|
|
}
|
|
}
|
|
})
|
|
.collect()
|
|
})
|
|
.unwrap_or_default();
|
|
|
|
if let Some(ref mods) = selected_modules {
|
|
println!(
|
|
" Modules: {}",
|
|
mods.iter()
|
|
.map(|m| m.to_string())
|
|
.collect::<Vec<_>>()
|
|
.join(", ")
|
|
);
|
|
} else {
|
|
println!(" Modules: ALL");
|
|
}
|
|
|
|
if !cloud_modules.is_empty() {
|
|
println!(
|
|
" Cloud: {}",
|
|
cloud_modules
|
|
.iter()
|
|
.map(|m| m.to_string())
|
|
.collect::<Vec<_>>()
|
|
.join(", ")
|
|
);
|
|
}
|
|
|
|
let processing_mode = if force {
|
|
"FORCE (reprocess all)"
|
|
} else if resume {
|
|
"RESUME (continue from checkpoint)"
|
|
} else {
|
|
"SMART (skip complete, resume partial)"
|
|
};
|
|
println!(" Mode: {}", processing_mode);
|
|
|
|
// Compute UUID if path is given
|
|
let uuid = if target.len() == 16 && !target.contains('/') {
|
|
target.clone()
|
|
} else {
|
|
momentry_core::uuid::compute_uuid_from_path(&target)
|
|
};
|
|
|
|
// Get video from database
|
|
let db = PostgresDb::init().await?;
|
|
let video = db
|
|
.get_video_by_uuid(&uuid)
|
|
.await?
|
|
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?;
|
|
|
|
let video_path = &video.file_path;
|
|
let video_name = video.file_name.clone();
|
|
let _file_manager = momentry_core::FileManager::new(std::path::PathBuf::from("."));
|
|
|
|
// Initialize output directory
|
|
let output_dir = OutputDir::new();
|
|
output_dir.ensure_dir()?;
|
|
println!("Output directory: {:?}", output_dir.get_base_path());
|
|
|
|
// Initialize progress UI
|
|
let progress_state = Arc::new(Mutex::new(ProgressState::new(&video_name)));
|
|
progress_state.lock().unwrap().start();
|
|
|
|
// Helper closure to check if a module should be processed
|
|
let should_process = |module: ProcessorType| -> bool {
|
|
selected_modules
|
|
.as_ref()
|
|
.map(|mods| mods.contains(&module))
|
|
.unwrap_or(true)
|
|
};
|
|
|
|
// Helper closure to check if a module should run in the cloud
|
|
let is_cloud = |module: ProcessorType| -> bool { cloud_modules.contains(&module) };
|
|
|
|
// Create UI and wrap in Arc for sharing with Redis subscriber
|
|
let ui = Arc::new(Mutex::new(ProgressUi::new(&video_name).ok()));
|
|
if let Some(ref mut ui) = *ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
|
|
// Spawn Redis subscriber for real-time progress updates
|
|
let redis_progress_state = progress_state.clone();
|
|
let redis_ui = ui.clone();
|
|
let redis_uuid = uuid.clone();
|
|
let redis_handle = tokio::spawn(async move {
|
|
if let Ok(redis_client) = momentry_core::core::db::RedisClient::new() {
|
|
loop {
|
|
if let Ok(mut pubsub) = redis_client.subscribe_progress(&redis_uuid).await {
|
|
let mut stream = pubsub.on_message();
|
|
while let Some(msg) = stream.next().await {
|
|
if let Ok(payload) = msg.get_payload::<String>() {
|
|
if let Ok(progress_msg) =
|
|
serde_json::from_str::<
|
|
momentry_core::core::db::ProgressMessage,
|
|
>(&payload)
|
|
{
|
|
let mut state = redis_progress_state.lock().unwrap();
|
|
state.update_from_redis(
|
|
&progress_msg.msg_type,
|
|
&progress_msg.processor,
|
|
progress_msg.data.current,
|
|
progress_msg.data.total,
|
|
progress_msg.data.message.as_deref(),
|
|
);
|
|
|
|
// Store progress in Redis Hash for HTTP API
|
|
let uuid = progress_msg.uuid.clone();
|
|
let processor = progress_msg.processor.clone();
|
|
let msg_type = progress_msg.msg_type.clone();
|
|
let current = progress_msg.data.current;
|
|
let total = progress_msg.data.total;
|
|
let message = progress_msg.data.message.clone();
|
|
|
|
tokio::spawn(async move {
|
|
if let Ok(redis_client) =
|
|
momentry_core::core::db::RedisClient::new()
|
|
{
|
|
if let Ok(mut conn) = redis_client.get_conn().await
|
|
{
|
|
let prefix = momentry_core::core::config::REDIS_KEY_PREFIX.as_str();
|
|
let key = format!(
|
|
"{}job:{}:processor:{}",
|
|
prefix, uuid, processor
|
|
);
|
|
let _: () = redis::cmd("HSET")
|
|
.arg(&key)
|
|
.arg("status")
|
|
.arg(&msg_type)
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or(());
|
|
if let Some(c) = current {
|
|
let _: () = redis::cmd("HSET")
|
|
.arg(&key)
|
|
.arg("current")
|
|
.arg(c)
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or(());
|
|
}
|
|
if let Some(t) = total {
|
|
let _: () = redis::cmd("HSET")
|
|
.arg(&key)
|
|
.arg("total")
|
|
.arg(t)
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or(());
|
|
}
|
|
if let Some(ref m) = message {
|
|
let _: () = redis::cmd("HSET")
|
|
.arg(&key)
|
|
.arg("message")
|
|
.arg(m)
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or(());
|
|
}
|
|
let _: () = redis::cmd("EXPIRE")
|
|
.arg(&key)
|
|
.arg(86400i64)
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or(());
|
|
}
|
|
}
|
|
});
|
|
|
|
// Trigger UI render on progress update
|
|
if let Some(ref mut ui) = *redis_ui.lock().unwrap() {
|
|
let _ = ui.render();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Process ASR (Automatic Speech Recognition)
|
|
if should_process(ProcessorType::Asr) {
|
|
let asr_path = output_dir.get_output_path(&uuid, "asr.json");
|
|
let decision = decide_processing(&asr_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nASR: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nASR: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&asr_path).ok();
|
|
if is_cloud(ProcessorType::Asr) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nASR: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Asr) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Asr) {
|
|
println!("\nASR: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nASR: ⚙️ Processing...");
|
|
process_asr_module(&asr_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update storage status
|
|
db.update_storage_status(&uuid, "fs_json", true).await?;
|
|
|
|
// Process CUT (scene detection)
|
|
if should_process(ProcessorType::Cut) {
|
|
let cut_path = output_dir.get_output_path(&uuid, "cut.json");
|
|
let decision = decide_processing(&cut_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nCUT: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nCUT: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&cut_path).ok();
|
|
if is_cloud(ProcessorType::Cut) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nCUT: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Cut) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Cut) {
|
|
println!("\nCUT: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nCUT: ⚙️ Processing...");
|
|
process_cut_module(&cut_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process ASRX (speaker diarization)
|
|
if should_process(ProcessorType::Asrx) {
|
|
let asrx_path = output_dir.get_output_path(&uuid, "asrx.json");
|
|
let decision = decide_processing(&asrx_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nASRX: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nASRX: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&asrx_path).ok();
|
|
if is_cloud(ProcessorType::Asrx) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_asrx_module(
|
|
&asrx_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nASRX: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Asrx) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_asrx_module(
|
|
&asrx_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Asrx) {
|
|
println!("\nASRX: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nASRX: ⚙️ Processing...");
|
|
process_asrx_module(
|
|
&asrx_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process YOLO (object detection)
|
|
if should_process(ProcessorType::Yolo) {
|
|
let yolo_path = output_dir.get_output_path(&uuid, "yolo.json");
|
|
let decision = decide_processing(&yolo_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nYOLO: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nYOLO: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&yolo_path).ok();
|
|
if is_cloud(ProcessorType::Yolo) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_yolo_module(
|
|
&yolo_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nYOLO: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Yolo) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_yolo_module(
|
|
&yolo_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Yolo) {
|
|
println!("\nYOLO: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nYOLO: ⚙️ Processing...");
|
|
process_yolo_module(
|
|
&yolo_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process OCR (text recognition)
|
|
if should_process(ProcessorType::Ocr) {
|
|
let ocr_path = output_dir.get_output_path(&uuid, "ocr.json");
|
|
let decision = decide_processing(&ocr_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nOCR: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nOCR: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&ocr_path).ok();
|
|
if is_cloud(ProcessorType::Ocr) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nOCR: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Ocr) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Ocr) {
|
|
println!("\nOCR: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nOCR: ⚙️ Processing...");
|
|
process_ocr_module(&ocr_path, video_path, &uuid, &progress_state, &ui)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process Face (face detection)
|
|
if should_process(ProcessorType::Face) {
|
|
let face_path = output_dir.get_output_path(&uuid, "face.json");
|
|
let decision = decide_processing(&face_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nFace: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nFace: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&face_path).ok();
|
|
if is_cloud(ProcessorType::Face) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_face_module(
|
|
&face_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nFace: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Face) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_face_module(
|
|
&face_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Face) {
|
|
println!("\nFace: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nFace: ⚙️ Processing...");
|
|
process_face_module(
|
|
&face_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process Pose (pose estimation)
|
|
if should_process(ProcessorType::Pose) {
|
|
let pose_path = output_dir.get_output_path(&uuid, "pose.json");
|
|
let decision = decide_processing(&pose_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nPose: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nPose: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&pose_path).ok();
|
|
if is_cloud(ProcessorType::Pose) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_pose_module(
|
|
&pose_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nPose: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Pose) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_pose_module(
|
|
&pose_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Pose) {
|
|
println!("\nPose: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nPose: ⚙️ Processing...");
|
|
process_pose_module(
|
|
&pose_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process Story (video narrative)
|
|
if should_process(ProcessorType::Story) {
|
|
let story_path = output_dir.get_output_path(&uuid, "story.json");
|
|
let decision = decide_processing(&story_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nStory: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nStory: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&story_path).ok();
|
|
if is_cloud(ProcessorType::Story) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_story_module(
|
|
&story_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nStory: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Story) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_story_module(
|
|
&story_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Story) {
|
|
println!("\nStory: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nStory: ⚙️ Processing...");
|
|
process_story_module(
|
|
&story_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process Caption (image captions)
|
|
if should_process(ProcessorType::Caption) {
|
|
let caption_path = output_dir.get_output_path(&uuid, "caption.json");
|
|
let decision = decide_processing(&caption_path, force, resume);
|
|
|
|
match decision {
|
|
ProcessingDecision::SkipComplete => {
|
|
println!("\nCaption: ✓ Already complete, skipping");
|
|
}
|
|
ProcessingDecision::ForceReprocess => {
|
|
println!("\nCaption: ⟳ Force reprocessing from scratch...");
|
|
std::fs::remove_file(&caption_path).ok();
|
|
if is_cloud(ProcessorType::Caption) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_caption_module(
|
|
&caption_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::ResumePartial => {
|
|
println!("\nCaption: ↻ Resuming from checkpoint...");
|
|
if is_cloud(ProcessorType::Caption) {
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
process_caption_module(
|
|
&caption_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
ProcessingDecision::Process => {
|
|
if is_cloud(ProcessorType::Caption) {
|
|
println!("\nCaption: ☁️ Running via cloud...");
|
|
println!(" [Cloud processing not implemented yet - run locally]");
|
|
} else {
|
|
println!("\nCaption: ⚙️ Processing...");
|
|
process_caption_module(
|
|
&caption_path,
|
|
video_path,
|
|
&uuid,
|
|
&progress_state,
|
|
&ui,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO: Store pre_chunks and frames to database
|
|
|
|
// Stop Redis subscriber
|
|
redis_handle.abort();
|
|
|
|
println!("\n✓ Process stage completed!");
|
|
if should_process(ProcessorType::Asr) {
|
|
let path = output_dir.get_output_path(&uuid, "asr.json");
|
|
println!(" - ASR JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Cut) {
|
|
let path = output_dir.get_output_path(&uuid, "cut.json");
|
|
println!(" - CUT JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Asrx) {
|
|
let path = output_dir.get_output_path(&uuid, "asrx.json");
|
|
println!(" - ASRX JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Yolo) {
|
|
let path = output_dir.get_output_path(&uuid, "yolo.json");
|
|
println!(" - YOLO JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Ocr) {
|
|
let path = output_dir.get_output_path(&uuid, "ocr.json");
|
|
println!(" - OCR JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Face) {
|
|
let path = output_dir.get_output_path(&uuid, "face.json");
|
|
println!(" - Face JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Pose) {
|
|
let path = output_dir.get_output_path(&uuid, "pose.json");
|
|
println!(" - Pose JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Story) {
|
|
let path = output_dir.get_output_path(&uuid, "story.json");
|
|
println!(" - Story JSON: {}", path.display());
|
|
}
|
|
if should_process(ProcessorType::Caption) {
|
|
let path = output_dir.get_output_path(&uuid, "caption.json");
|
|
println!(" - Caption JSON: {}", path.display());
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
Commands::Chunk { uuid } => {
|
|
println!("Chunking: {}", uuid);
|
|
|
|
let db = PostgresDb::init().await?;
|
|
let video = db
|
|
.get_video_by_uuid(&uuid)
|
|
.await?
|
|
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?;
|
|
|
|
let file_id = video.id;
|
|
let fps = video.fps;
|
|
|
|
// ========== Read all JSON files ==========
|
|
|
|
// Read ASR JSON
|
|
let asr_path = format!("{}.asr.json", uuid);
|
|
let asr_json = std::fs::read_to_string(&asr_path)
|
|
.context("ASR file not found. Run 'process' first.")?;
|
|
let asr_result: momentry_core::core::processor::asr::AsrResult =
|
|
serde_json::from_str(&asr_json)?;
|
|
println!("Loaded ASR: {} segments", asr_result.segments.len());
|
|
|
|
// Read CUT JSON
|
|
let cut_path = format!("{}.cut.json", uuid);
|
|
let cut_json = std::fs::read_to_string(&cut_path)
|
|
.context("CUT file not found. Run 'process' first.")?;
|
|
let cut_result: momentry_core::core::processor::cut::CutResult =
|
|
serde_json::from_str(&cut_json)?;
|
|
println!("Loaded CUT: {} scenes", cut_result.scenes.len());
|
|
|
|
// Read YOLO JSON (optional)
|
|
let yolo_path = format!("{}.yolo.json", uuid);
|
|
let yolo_result = match std::fs::read_to_string(&yolo_path) {
|
|
Ok(yolo_json) => match serde_json::from_str::<
|
|
momentry_core::core::processor::yolo::YoloResult,
|
|
>(&yolo_json)
|
|
{
|
|
Ok(result) => {
|
|
println!("Loaded YOLO: {} frames", result.frames.len());
|
|
result
|
|
}
|
|
Err(e) => {
|
|
println!("Warning: Failed to parse YOLO JSON: {}. Skipping YOLO.", e);
|
|
momentry_core::core::processor::yolo::YoloResult {
|
|
frame_count: 0,
|
|
fps: 0.0,
|
|
frames: vec![],
|
|
}
|
|
}
|
|
},
|
|
Err(_) => {
|
|
println!("Warning: YOLO file not found. Skipping YOLO.");
|
|
momentry_core::core::processor::yolo::YoloResult {
|
|
frame_count: 0,
|
|
fps: 0.0,
|
|
frames: vec![],
|
|
}
|
|
}
|
|
};
|
|
|
|
// Read OCR JSON (optional)
|
|
let ocr_path = format!("{}.ocr.json", uuid);
|
|
let ocr_result = match std::fs::read_to_string(&ocr_path) {
|
|
Ok(ocr_json) => match serde_json::from_str::<
|
|
momentry_core::core::processor::ocr::OcrResult,
|
|
>(&ocr_json)
|
|
{
|
|
Ok(result) => {
|
|
println!("Loaded OCR: {} frames", result.frames.len());
|
|
result
|
|
}
|
|
Err(e) => {
|
|
println!("Warning: Failed to parse OCR JSON: {}. Skipping OCR.", e);
|
|
momentry_core::core::processor::ocr::OcrResult {
|
|
frame_count: 0,
|
|
fps: 0.0,
|
|
frames: vec![],
|
|
}
|
|
}
|
|
},
|
|
Err(_) => {
|
|
println!("Warning: OCR file not found. Skipping OCR.");
|
|
momentry_core::core::processor::ocr::OcrResult {
|
|
frame_count: 0,
|
|
fps: 0.0,
|
|
frames: vec![],
|
|
}
|
|
}
|
|
};
|
|
|
|
// Read Face JSON (optional)
|
|
let face_path = format!("{}.face.json", uuid);
|
|
let face_result = match std::fs::read_to_string(&face_path) {
|
|
Ok(face_json) => match serde_json::from_str::<
|
|
momentry_core::core::processor::face::FaceResult,
|
|
>(&face_json)
|
|
{
|
|
Ok(result) => {
|
|
println!("Loaded Face: {} frames", result.frames.len());
|
|
result
|
|
}
|
|
Err(e) => {
|
|
println!("Warning: Failed to parse Face JSON: {}. Skipping Face.", e);
|
|
momentry_core::core::processor::face::FaceResult {
|
|
frame_count: 0,
|
|
fps: 0.0,
|
|
frames: vec![],
|
|
}
|
|
}
|
|
},
|
|
Err(_) => {
|
|
println!("Warning: Face file not found. Skipping Face.");
|
|
momentry_core::core::processor::face::FaceResult {
|
|
frame_count: 0,
|
|
fps: 0.0,
|
|
frames: vec![],
|
|
}
|
|
}
|
|
};
|
|
|
|
// ========== Store pre_chunks (from ASR, CUT) ==========
|
|
|
|
println!("\nStoring pre_chunks...");
|
|
|
|
// Store ASR sentence pre_chunks
|
|
let mut asr_pre_chunk_ids = Vec::new();
|
|
for seg in asr_result.segments.iter() {
|
|
let start_frame = (seg.start * fps) as i64;
|
|
let end_frame = (seg.end * fps) as i64;
|
|
let pre_chunk = momentry_core::core::db::postgres_db::PreChunk {
|
|
id: 0,
|
|
file_id,
|
|
source_type: "asr".to_string(),
|
|
source_file: Some(asr_path.clone()),
|
|
chunk_type: "sentence".to_string(),
|
|
start_time: seg.start,
|
|
end_time: seg.end,
|
|
start_frame,
|
|
end_frame,
|
|
fps,
|
|
raw_json: serde_json::json!({"text": seg.text}),
|
|
text_content: Some(seg.text.clone()),
|
|
processed: false,
|
|
chunk_id: None,
|
|
created_at: String::new(),
|
|
};
|
|
let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?;
|
|
asr_pre_chunk_ids.push(pre_chunk_id);
|
|
}
|
|
|
|
// Store CUT scene pre_chunks
|
|
let mut cut_pre_chunk_ids = Vec::new();
|
|
for scene in &cut_result.scenes {
|
|
let pre_chunk = momentry_core::core::db::postgres_db::PreChunk {
|
|
id: 0,
|
|
file_id,
|
|
source_type: "cut".to_string(),
|
|
source_file: Some(cut_path.clone()),
|
|
chunk_type: "cut".to_string(),
|
|
start_time: scene.start_time,
|
|
end_time: scene.end_time,
|
|
start_frame: scene.start_frame as i64,
|
|
end_frame: scene.end_frame as i64,
|
|
fps,
|
|
raw_json: serde_json::json!({
|
|
"scene_number": scene.scene_number,
|
|
}),
|
|
text_content: None,
|
|
processed: false,
|
|
chunk_id: None,
|
|
created_at: String::new(),
|
|
};
|
|
let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?;
|
|
cut_pre_chunk_ids.push(pre_chunk_id);
|
|
}
|
|
|
|
// Store time-based pre_chunks (every 10 seconds)
|
|
let duration = video.duration;
|
|
let mut time_pre_chunk_ids = Vec::new();
|
|
let mut time_start = 0.0;
|
|
while time_start < duration {
|
|
let time_end = (time_start + 10.0).min(duration);
|
|
let start_frame = (time_start * fps) as i64;
|
|
let end_frame = (time_end * fps) as i64;
|
|
|
|
let pre_chunk = momentry_core::core::db::postgres_db::PreChunk {
|
|
id: 0,
|
|
file_id,
|
|
source_type: "time".to_string(),
|
|
source_file: None,
|
|
chunk_type: "time".to_string(),
|
|
start_time: time_start,
|
|
end_time: time_end,
|
|
start_frame,
|
|
end_frame,
|
|
fps,
|
|
raw_json: serde_json::json!({"interval": 10.0}),
|
|
text_content: None,
|
|
processed: false,
|
|
chunk_id: None,
|
|
created_at: String::new(),
|
|
};
|
|
let pre_chunk_id = db.store_pre_chunk(&pre_chunk).await?;
|
|
time_pre_chunk_ids.push(pre_chunk_id);
|
|
time_start = time_end;
|
|
}
|
|
|
|
println!(
|
|
"Stored pre_chunks: {} asr + {} cut + {} time",
|
|
asr_result.segments.len(),
|
|
cut_result.scenes.len(),
|
|
time_pre_chunk_ids.len()
|
|
);
|
|
|
|
// ========== Store frames (from YOLO, OCR, Face) ==========
|
|
|
|
println!("\nStoring frames...");
|
|
|
|
// Group YOLO, OCR, Face results by frame_number
|
|
let mut frame_data: std::collections::HashMap<
|
|
u64,
|
|
momentry_core::core::processor::yolo::YoloFrame,
|
|
> = std::collections::HashMap::new();
|
|
for frame in &yolo_result.frames {
|
|
frame_data.insert(frame.frame, frame.clone());
|
|
}
|
|
|
|
let mut ocr_by_frame: std::collections::HashMap<
|
|
u64,
|
|
momentry_core::core::processor::ocr::OcrFrame,
|
|
> = std::collections::HashMap::new();
|
|
for frame in &ocr_result.frames {
|
|
ocr_by_frame.insert(frame.frame, frame.clone());
|
|
}
|
|
|
|
let mut face_by_frame: std::collections::HashMap<
|
|
u64,
|
|
momentry_core::core::processor::face::FaceFrame,
|
|
> = std::collections::HashMap::new();
|
|
for frame in &face_result.frames {
|
|
face_by_frame.insert(frame.frame, frame.clone());
|
|
}
|
|
|
|
// Store frames (merge data from YOLO, OCR, Face)
|
|
let mut all_frames: Vec<u64> = frame_data
|
|
.keys()
|
|
.cloned()
|
|
.chain(ocr_by_frame.keys().cloned())
|
|
.chain(face_by_frame.keys().cloned())
|
|
.collect();
|
|
all_frames.sort();
|
|
all_frames.dedup();
|
|
|
|
for frame_num in &all_frames {
|
|
let timestamp = (*frame_num as f64) / fps;
|
|
let yolo_frame = frame_data.get(frame_num);
|
|
let ocr_frame = ocr_by_frame.get(frame_num);
|
|
let face_frame = face_by_frame.get(frame_num);
|
|
|
|
let frame = momentry_core::core::db::postgres_db::Frame {
|
|
id: 0,
|
|
file_id,
|
|
frame_number: *frame_num as i64,
|
|
timestamp,
|
|
fps,
|
|
yolo_objects: yolo_frame.map(|f| serde_json::json!(&f.objects)),
|
|
ocr_results: ocr_frame.map(|f| serde_json::json!(&f.texts)),
|
|
face_results: face_frame.map(|f| serde_json::json!(&f.faces)),
|
|
frame_path: None,
|
|
created_at: String::new(),
|
|
};
|
|
db.store_frame(&frame).await?;
|
|
}
|
|
|
|
println!("Stored {} frames", all_frames.len());
|
|
|
|
// ========== Create chunks ==========
|
|
|
|
println!("\nCreating chunks...");
|
|
|
|
// Rule 1: Direct conversion (sentence pre_chunk -> sentence chunk)
|
|
let mut sentence_chunks = Vec::new();
|
|
for (i, seg) in asr_result.segments.iter().enumerate() {
|
|
let pre_chunk_id = asr_pre_chunk_ids.get(i).copied().unwrap_or(0);
|
|
let chunk = Chunk::new(
|
|
file_id as i32,
|
|
uuid.clone(),
|
|
i as u32,
|
|
ChunkType::Sentence,
|
|
ChunkRule::Rule1,
|
|
seg.start,
|
|
seg.end,
|
|
fps,
|
|
serde_json::json!({
|
|
"text": seg.text,
|
|
}),
|
|
)
|
|
.with_text_content(seg.text.clone())
|
|
.with_pre_chunk_ids(vec![pre_chunk_id as i32]);
|
|
sentence_chunks.push(chunk);
|
|
}
|
|
|
|
// Rule 1: CUT chunks
|
|
let mut cut_chunks = Vec::new();
|
|
for (i, scene) in cut_result.scenes.iter().enumerate() {
|
|
let pre_chunk_id = cut_pre_chunk_ids.get(i).copied().unwrap_or(0);
|
|
let chunk = Chunk::new(
|
|
file_id as i32,
|
|
uuid.clone(),
|
|
i as u32,
|
|
ChunkType::Cut,
|
|
ChunkRule::Rule1,
|
|
scene.start_time,
|
|
scene.end_time,
|
|
fps,
|
|
serde_json::json!({
|
|
"scene_number": scene.scene_number,
|
|
}),
|
|
)
|
|
.with_pre_chunk_ids(vec![pre_chunk_id as i32]);
|
|
cut_chunks.push(chunk);
|
|
}
|
|
|
|
// Rule 1: Time-based chunks
|
|
let splitter = momentry_core::core::chunk::ChunkSplitter::new(10.0);
|
|
let mut time_chunks = Vec::new();
|
|
let time_chunk_list = splitter.split_time_based(&uuid, video.duration);
|
|
for (i, tc) in time_chunk_list.iter().enumerate() {
|
|
let pre_chunk_id = time_pre_chunk_ids.get(i).copied().unwrap_or(0);
|
|
let chunk = Chunk::new(
|
|
file_id as i32,
|
|
uuid.clone(),
|
|
i as u32,
|
|
ChunkType::TimeBased,
|
|
ChunkRule::Rule1,
|
|
tc.start_time,
|
|
tc.end_time,
|
|
fps,
|
|
serde_json::json!({"interval": 10.0}),
|
|
)
|
|
.with_pre_chunk_ids(vec![pre_chunk_id as i32]);
|
|
time_chunks.push(chunk);
|
|
}
|
|
|
|
// Store chunks
|
|
println!(
|
|
"Storing {} sentence chunks (rule_1)...",
|
|
sentence_chunks.len()
|
|
);
|
|
for chunk in &sentence_chunks {
|
|
db.store_chunk(chunk).await?;
|
|
}
|
|
|
|
println!("Storing {} cut chunks (rule_1)...", cut_chunks.len());
|
|
for chunk in &cut_chunks {
|
|
db.store_chunk(chunk).await?;
|
|
}
|
|
|
|
println!(
|
|
"Storing {} time-based chunks (rule_1)...",
|
|
time_chunks.len()
|
|
);
|
|
for chunk in &time_chunks {
|
|
db.store_chunk(chunk).await?;
|
|
}
|
|
|
|
let total_chunks = sentence_chunks.len() + cut_chunks.len() + time_chunks.len();
|
|
|
|
// Update storage status
|
|
db.update_storage_status(&uuid, "psql_chunk", true).await?;
|
|
|
|
println!("\n✓ Chunk stage completed!");
|
|
println!(
|
|
" - pre_chunks: {} (asr + cut + time)",
|
|
asr_result.segments.len() + cut_result.scenes.len() + time_pre_chunk_ids.len()
|
|
);
|
|
println!(" - frames: {}", all_frames.len());
|
|
println!(" - chunks: {} (sentence + cut + time_based)", total_chunks);
|
|
|
|
Ok(())
|
|
}
|
|
Commands::Story { uuid } => {
|
|
println!("Generating story for: {}", uuid);
|
|
|
|
let db = PostgresDb::init().await?;
|
|
let video = db
|
|
.get_video_by_uuid(&uuid)
|
|
.await?
|
|
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?;
|
|
|
|
let file_id = video.id;
|
|
let _fps = video.fps;
|
|
let duration = video.duration;
|
|
|
|
// Get all chunks
|
|
let all_chunks = db.get_chunks_by_uuid(&uuid).await?;
|
|
|
|
// Try cut chunks first, fall back to sentence chunks
|
|
let mut story_chunks: Vec<&Chunk> = all_chunks
|
|
.iter()
|
|
.filter(|c| c.chunk_type == ChunkType::Cut)
|
|
.collect();
|
|
|
|
let story_type = if story_chunks.is_empty() {
|
|
// Fall back to sentence chunks
|
|
story_chunks = all_chunks
|
|
.iter()
|
|
.filter(|c| c.chunk_type == ChunkType::Sentence && c.text_content.is_some())
|
|
.collect();
|
|
"sentence"
|
|
} else {
|
|
"cut"
|
|
};
|
|
|
|
if story_chunks.is_empty() {
|
|
println!("No story chunks found. Run 'chunk' command first.");
|
|
return Ok(());
|
|
}
|
|
|
|
println!("Found {} {} scenes", story_chunks.len(), story_type);
|
|
|
|
// Generate story for each scene
|
|
for (i, story_chunk) in story_chunks.iter().enumerate() {
|
|
println!("\n=== Scene {} ===", i + 1);
|
|
println!(
|
|
"Time: {:.2}s - {:.2}s",
|
|
story_chunk.start_time, story_chunk.end_time
|
|
);
|
|
|
|
// Get context: expand time range by 5 seconds before and after
|
|
let context_start = (story_chunk.start_time - 5.0).max(0.0);
|
|
let context_end = (story_chunk.end_time + 5.0).min(duration);
|
|
|
|
// Get chunks in context range (sentence chunks with ASR text)
|
|
let context_chunks = db
|
|
.get_chunks_by_time_range(file_id, context_start, context_end)
|
|
.await?;
|
|
|
|
// Get frames in context range
|
|
let context_frames = db
|
|
.get_frames_by_time_range(file_id, context_start, context_end)
|
|
.await?;
|
|
|
|
// Build story
|
|
let mut story = String::new();
|
|
story.push_str(&format!(
|
|
"Scene {} ({:.1}s - {:.1}s)\n\n",
|
|
i + 1,
|
|
story_chunk.start_time,
|
|
story_chunk.end_time
|
|
));
|
|
|
|
// Add audio/text content
|
|
let sentence_chunks: Vec<&Chunk> = context_chunks
|
|
.iter()
|
|
.filter(|c| c.chunk_type == ChunkType::Sentence)
|
|
.collect();
|
|
|
|
if !sentence_chunks.is_empty() {
|
|
story.push_str("【Speech】\n");
|
|
for sc in &sentence_chunks {
|
|
if let Some(text) = &sc.text_content {
|
|
story.push_str(&format!(" - {}\n", text));
|
|
}
|
|
}
|
|
story.push('\n');
|
|
}
|
|
|
|
// Aggregate YOLO objects
|
|
let mut all_objects: std::collections::HashMap<String, u32> =
|
|
std::collections::HashMap::new();
|
|
for frame in &context_frames {
|
|
if let Some(objects) = &frame.yolo_objects {
|
|
if let Some(arr) = objects.as_array() {
|
|
for obj in arr {
|
|
if let Some(class_name) =
|
|
obj.get("class_name").and_then(|v| v.as_str())
|
|
{
|
|
*all_objects.entry(class_name.to_string()).or_insert(0) += 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !all_objects.is_empty() {
|
|
story.push_str("【Objects】\n");
|
|
let mut sorted_objects: Vec<_> = all_objects.iter().collect();
|
|
sorted_objects.sort_by(|a, b| b.1.cmp(a.1));
|
|
for (obj, count) in sorted_objects.iter().take(10) {
|
|
story.push_str(&format!(" - {} ({} frames)\n", obj, count));
|
|
}
|
|
story.push('\n');
|
|
}
|
|
|
|
// Aggregate OCR text
|
|
let mut all_texts: Vec<String> = Vec::new();
|
|
for frame in &context_frames {
|
|
if let Some(texts) = &frame.ocr_results {
|
|
if let Some(arr) = texts.as_array() {
|
|
for txt in arr {
|
|
if let Some(text) = txt.get("text").and_then(|v| v.as_str()) {
|
|
if !text.is_empty() && text.len() > 2 {
|
|
all_texts.push(text.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !all_texts.is_empty() {
|
|
story.push_str("【Text in video】\n");
|
|
for txt in all_texts.iter().take(10) {
|
|
story.push_str(&format!(" - {}\n", txt));
|
|
}
|
|
story.push('\n');
|
|
}
|
|
|
|
// Aggregate faces
|
|
let mut face_count = 0;
|
|
for frame in &context_frames {
|
|
if let Some(faces) = &frame.face_results {
|
|
if let Some(arr) = faces.as_array() {
|
|
face_count += arr.len();
|
|
}
|
|
}
|
|
}
|
|
|
|
if face_count > 0 {
|
|
story.push_str(&format!(
|
|
"【Faces】\n - {} face(s) detected\n\n",
|
|
face_count
|
|
));
|
|
}
|
|
|
|
println!("{}", story);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
Commands::Vectorize { uuid } => {
|
|
println!("Vectorizing: {}", uuid);
|
|
|
|
let pg = PostgresDb::init()
|
|
.await
|
|
.context("Failed to init PostgreSQL")?;
|
|
let qdrant = QdrantDb::init().await.context("Failed to init Qdrant")?;
|
|
let embedder = Embedder::new("nomic-embed-text:v1.5".to_string());
|
|
|
|
let target_uuid = if uuid == "all" {
|
|
None
|
|
} else {
|
|
Some(uuid.as_str())
|
|
};
|
|
|
|
let mut stored_count = 0usize;
|
|
|
|
if let Some(target) = target_uuid {
|
|
let chunks = pg.get_chunks_by_uuid(target).await?;
|
|
let sentence_chunks: Vec<_> = chunks
|
|
.into_iter()
|
|
.filter(|c| c.chunk_type == ChunkType::Sentence)
|
|
.collect();
|
|
|
|
println!(
|
|
"Found {} sentence chunks for {}",
|
|
sentence_chunks.len(),
|
|
target
|
|
);
|
|
|
|
for chunk in sentence_chunks {
|
|
let text = chunk
|
|
.content
|
|
.get("text")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or("");
|
|
|
|
if text.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
print!("Embedding chunk {}... ", chunk.chunk_id);
|
|
|
|
match embedder.embed_document(text).await {
|
|
Ok(vector) => {
|
|
let vector_id = format!("{}_{}", chunk.uuid, chunk.chunk_id);
|
|
|
|
if let Err(e) =
|
|
pg.store_vector(&chunk.chunk_id, &vector, &chunk.uuid).await
|
|
{
|
|
eprintln!("store_vector error for {}: {}", chunk.chunk_id, e);
|
|
continue;
|
|
}
|
|
|
|
let qdrant_payload = VectorPayload {
|
|
uuid: chunk.uuid.clone(),
|
|
chunk_id: chunk.chunk_id.clone(),
|
|
chunk_type: "sentence".to_string(),
|
|
start_time: chunk.start_time,
|
|
end_time: chunk.end_time,
|
|
text: Some(text.to_string()),
|
|
};
|
|
if let Err(e) = qdrant
|
|
.upsert_vector(&chunk.chunk_id, &vector, qdrant_payload)
|
|
.await
|
|
{
|
|
eprintln!("upsert_vector error for {}: {}", chunk.chunk_id, e);
|
|
continue;
|
|
}
|
|
|
|
if let Err(e) = pg.update_vector_id(&chunk.chunk_id, &vector_id).await {
|
|
eprintln!("update_vector_id error for {}: {}", chunk.chunk_id, e);
|
|
continue;
|
|
}
|
|
|
|
stored_count += 1;
|
|
println!("done ({} dims)", vector.len());
|
|
}
|
|
Err(e) => {
|
|
println!("failed: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Only update storage status if vectors were actually stored
|
|
if stored_count > 0 {
|
|
pg.update_storage_status(target, "pvector_chunk", true)
|
|
.await?;
|
|
pg.update_storage_status(target, "qvector_chunk", true)
|
|
.await?;
|
|
println!(
|
|
"\n✓ Vectorize stage completed for {}! ({} vectors stored)",
|
|
target, stored_count
|
|
);
|
|
} else {
|
|
println!(
|
|
"\n✗ Vectorize stage failed for {}! (0 vectors stored)",
|
|
target
|
|
);
|
|
}
|
|
} else {
|
|
println!("\n✓ Vectorize stage completed for all videos!");
|
|
}
|
|
Ok(())
|
|
}
|
|
Commands::Play { target } => {
|
|
println!("Playing: {}", target);
|
|
// TODO: Implement play
|
|
Ok(())
|
|
}
|
|
Commands::Watch { directories } => {
|
|
println!("Starting watcher: {:?}", directories);
|
|
// TODO: Implement watch
|
|
Ok(())
|
|
}
|
|
Commands::System { gpu } => {
|
|
let resources = SystemResources::check();
|
|
println!("╔══════════════════════════════════════════════════════════════╗");
|
|
println!("║ System Resources Report ║");
|
|
println!("╠══════════════════════════════════════════════════════════════╣");
|
|
println!(
|
|
"║ CPU: {:.1}% idle ║",
|
|
resources.cpu_idle_percent
|
|
);
|
|
println!(
|
|
"║ Memory: {:.1}GB / {:.1}GB available ({:.0}% used) ║",
|
|
resources.memory_available_mb as f64 / 1024.0,
|
|
resources.memory_total_mb as f64 / 1024.0,
|
|
resources.memory_used_percent
|
|
);
|
|
|
|
if resources.gpu_available {
|
|
match resources.gpu_type {
|
|
GpuType::Nvidia => {
|
|
let util = resources.gpu_utilization.unwrap_or(0.0);
|
|
println!(
|
|
"║ GPU: NVIDIA - {:.0}% utilized ║",
|
|
util
|
|
);
|
|
}
|
|
GpuType::AppleMps => {
|
|
println!(
|
|
"║ GPU: Apple MPS (Metal) - available ║"
|
|
);
|
|
}
|
|
}
|
|
} else {
|
|
println!("║ GPU: None detected ║");
|
|
}
|
|
println!("╠══════════════════════════════════════════════════════════════╣");
|
|
|
|
if resources.can_parallel(4096) {
|
|
println!("║ Mode: PARALLEL - Can run multiple modules together ║");
|
|
println!(
|
|
"║ Recommended modules: {} ║",
|
|
resources.recommend_parallel_modules().join(", ")
|
|
);
|
|
} else {
|
|
println!("║ Mode: SEQUENTIAL - Low resources, run one at a time ║");
|
|
}
|
|
println!("╚══════════════════════════════════════════════════════════════╝");
|
|
|
|
if gpu {
|
|
println!("\n=== GPU Details ===");
|
|
let output = std::process::Command::new("system_profiler")
|
|
.args(["SPDisplaysDataType", "-detailLevel", "mini"])
|
|
.output();
|
|
if let Ok(o) = output {
|
|
println!("{}", String::from_utf8_lossy(&o.stdout));
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
Commands::Server { host, port } => {
|
|
let port = port.unwrap_or_else(|| *momentry_core::core::config::SERVER_PORT);
|
|
momentry_core::api::start_server(&host, port).await?;
|
|
Ok(())
|
|
}
|
|
Commands::Worker {
|
|
max_concurrent,
|
|
poll_interval,
|
|
batch_size,
|
|
} => {
|
|
use momentry_core::worker::{JobWorker, WorkerConfig};
|
|
|
|
let config = WorkerConfig {
|
|
max_concurrent: max_concurrent.unwrap_or(2),
|
|
poll_interval_secs: poll_interval.unwrap_or(5),
|
|
enabled: true,
|
|
batch_size: batch_size.unwrap_or(10),
|
|
processor_timeout_secs: 3600,
|
|
};
|
|
|
|
let db = PostgresDb::init().await?;
|
|
let redis = RedisClient::new()?;
|
|
|
|
let worker = JobWorker::new(
|
|
std::sync::Arc::new(db),
|
|
std::sync::Arc::new(redis),
|
|
config.clone(),
|
|
);
|
|
|
|
println!(
|
|
"Starting worker with max_concurrent={}, poll_interval={}s",
|
|
config.max_concurrent, config.poll_interval_secs
|
|
);
|
|
|
|
worker.run().await?;
|
|
Ok(())
|
|
}
|
|
Commands::Query { query } => {
|
|
println!("Query: {}", query);
|
|
// TODO: Implement query
|
|
Ok(())
|
|
}
|
|
Commands::Lookup { path } => {
|
|
let uuid = momentry_core::uuid::compute_uuid_from_path(&path);
|
|
println!("Path: {}", path);
|
|
println!("UUID: {}", uuid);
|
|
Ok(())
|
|
}
|
|
Commands::Resolve { uuid } => {
|
|
println!("Resolving UUID: {}", uuid);
|
|
// TODO: Look up path from UUID in database
|
|
println!("(Database lookup not implemented yet)");
|
|
Ok(())
|
|
}
|
|
Commands::Thumbnails { uuid, count } => {
|
|
let db = PostgresDb::init().await?;
|
|
|
|
let videos = if let Some(ref uuid) = uuid {
|
|
vec![db
|
|
.get_video_by_uuid(uuid)
|
|
.await?
|
|
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", uuid))?]
|
|
} else {
|
|
db.list_videos().await?
|
|
};
|
|
|
|
let output_dir = std::path::PathBuf::from("thumbnails");
|
|
let extractor = momentry_core::ThumbnailExtractor::new(output_dir, count);
|
|
|
|
for video in videos {
|
|
println!(
|
|
"\nGenerating thumbnails for: {} ({})",
|
|
video.file_name, video.uuid
|
|
);
|
|
|
|
match extractor.get_or_create(&video.file_path, &video.uuid) {
|
|
Ok(result) => {
|
|
println!(" Generated {} thumbnails", result.count);
|
|
}
|
|
Err(e) => {
|
|
println!(" Error: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
println!("\nThumbnails generated successfully!");
|
|
Ok(())
|
|
}
|
|
Commands::Status { uuid } => {
|
|
let db = PostgresDb::init().await?;
|
|
|
|
let videos = if let Some(ref u) = uuid {
|
|
vec![db
|
|
.get_video_by_uuid(u)
|
|
.await?
|
|
.ok_or_else(|| anyhow::anyhow!("Video not found: {}", u))?]
|
|
} else {
|
|
db.list_videos().await?
|
|
};
|
|
|
|
println!("\n╔══════════════════════════════════════════════════════════════════════════════════╗");
|
|
println!(
|
|
"║ 📊 Storage Status Report ║"
|
|
);
|
|
println!("╠══════════════════════════════════════════════════════════════════════════════════╣");
|
|
println!(
|
|
"║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║",
|
|
"Video", "FS", "FS", "PSQL", "PObj", "MObj", "PVec", "QVec"
|
|
);
|
|
println!(
|
|
"║ {:32} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} │ {:8} ║",
|
|
"", "Video", "JSON", "Chunk", "Chunk", "Chunk", "Chunk", "Chunk"
|
|
);
|
|
println!(
|
|
"╠{:33}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╪{:9}╣",
|
|
str::repeat("─", 32),
|
|
str::repeat("─", 8),
|
|
str::repeat("─", 8),
|
|
str::repeat("─", 8),
|
|
str::repeat("─", 8),
|
|
str::repeat("─", 8),
|
|
str::repeat("─", 8),
|
|
str::repeat("─", 8)
|
|
);
|
|
|
|
for video in videos {
|
|
let (sentence_count, time_count) =
|
|
db.get_chunk_count(&video.uuid).await.unwrap_or((0, 0));
|
|
let vector_count = db.get_vector_count(&video.uuid).await.unwrap_or(0);
|
|
let total_chunks = sentence_count + time_count;
|
|
|
|
let psql_status = if total_chunks > 0 { "✓" } else { "-" };
|
|
let pvec_status = if vector_count > 0 && total_chunks > 0 {
|
|
if vector_count >= total_chunks {
|
|
"✓"
|
|
} else {
|
|
"◐"
|
|
}
|
|
} else {
|
|
"-"
|
|
};
|
|
let qvec_status = if video.storage.qvector_chunk {
|
|
"✓"
|
|
} else {
|
|
"-"
|
|
};
|
|
|
|
let file_name = if video.file_name.len() > 30 {
|
|
format!("...{}", &video.file_name[video.file_name.len() - 27..])
|
|
} else {
|
|
video.file_name
|
|
};
|
|
|
|
println!(
|
|
"║ {:32} │ {} │ {} │ {} │ - │ - │ {} │ {} ║",
|
|
file_name,
|
|
if video.storage.fs_video { "✓" } else { "✗" },
|
|
if video.storage.fs_json { "✓" } else { "-" },
|
|
psql_status,
|
|
pvec_status,
|
|
qvec_status
|
|
);
|
|
}
|
|
|
|
println!("╠══════════════════════════════════════════════════════════════════════════════════╣");
|
|
println!(
|
|
"║ Storage Types: ║"
|
|
);
|
|
println!(
|
|
"║ FS_Video - Video file on filesystem ║"
|
|
);
|
|
println!(
|
|
"║ FS_JSON - JSON files (probe, ASR, YOLO, etc.) ║"
|
|
);
|
|
println!(
|
|
"║ PSQL_Chunk - Chunks stored in PostgreSQL ║"
|
|
);
|
|
println!(
|
|
"║ PObject - Chunks as JSON objects in PostgreSQL (future) ║"
|
|
);
|
|
println!(
|
|
"║ MObject - Chunks as JSON objects in MongoDB (future) ║"
|
|
);
|
|
println!(
|
|
"║ PVector - Vectors in PostgreSQL ║"
|
|
);
|
|
println!(
|
|
"║ QVector - Vectors in Qdrant ║"
|
|
);
|
|
println!("╚══════════════════════════════════════════════════════════════════════════════════╝");
|
|
Ok(())
|
|
}
|
|
Commands::Backup { action, days } => {
|
|
let output_dir = OutputDir::new();
|
|
output_dir.ensure_dir()?;
|
|
|
|
println!("\n📁 Backup directory: {:?}", output_dir.get_backup_dir());
|
|
|
|
match action.as_str() {
|
|
"list" => {
|
|
let backups = output_dir.list_backups()?;
|
|
println!("\n📦 Available backups:");
|
|
if backups.is_empty() {
|
|
println!(" (no backups found)");
|
|
} else {
|
|
for backup in &backups {
|
|
println!(" - {}", backup.filename);
|
|
}
|
|
}
|
|
println!("\nTotal: {} backup(s)", backups.len());
|
|
}
|
|
"cleanup" => {
|
|
let days = days.unwrap_or(30);
|
|
let deleted = output_dir.cleanup_old_backups(days)?;
|
|
println!(
|
|
"\n🗑️ Cleaned up {} old backup(s) (older than {} days)",
|
|
deleted, days
|
|
);
|
|
}
|
|
"verify" => {
|
|
println!("\n🔍 Verifying backups...");
|
|
let backups = output_dir.list_backups()?;
|
|
let mut verified = 0;
|
|
let mut failed = 0;
|
|
for backup in &backups {
|
|
match output_dir.verify_backup(&backup.path) {
|
|
Ok(true) => {
|
|
println!(" ✓ {}", backup.filename);
|
|
verified += 1;
|
|
}
|
|
Ok(false) => {
|
|
println!(" ✗ {} (missing checksum)", backup.filename);
|
|
failed += 1;
|
|
}
|
|
Err(e) => {
|
|
println!(" ✗ {} ({})", backup.filename, e);
|
|
failed += 1;
|
|
}
|
|
}
|
|
}
|
|
println!("\nVerified: {} OK, {} failed", verified, failed);
|
|
}
|
|
_ => {
|
|
println!("\n⚠️ Unknown action: {}", action);
|
|
println!("Available actions: list, cleanup, verify");
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
Commands::ApiKey {
|
|
action,
|
|
name,
|
|
key_type,
|
|
ttl,
|
|
key,
|
|
} => {
|
|
let db = PostgresDb::init().await?;
|
|
let db_url = std::env::var("DATABASE_URL")
|
|
.unwrap_or_else(|_| "postgres://accusys@localhost:5432/momentry".to_string());
|
|
|
|
let service = ApiKeyService::new(db_url);
|
|
|
|
match action {
|
|
ApiKeyAction::Create => {
|
|
let name = name.unwrap_or_else(|| "unnamed-key".to_string());
|
|
let kt = parse_key_type(key_type.as_deref());
|
|
let request = momentry_core::core::api_key::CreateApiKeyRequest {
|
|
name: name.clone(),
|
|
key_type: kt,
|
|
user_id: None,
|
|
service_name: None,
|
|
permissions: vec!["read".to_string(), "write".to_string()],
|
|
ttl_days: ttl,
|
|
};
|
|
|
|
match service.create_key(request) {
|
|
Ok(response) => {
|
|
let key_hash = service.hash_key(&response.key);
|
|
let key_type_str =
|
|
serde_json::to_string(&kt).unwrap_or_else(|_| "user".to_string());
|
|
let permissions = serde_json::json!(["read", "write"]);
|
|
|
|
let config = momentry_core::core::db::CreateApiKeyConfig::new(
|
|
&response.key_id,
|
|
&key_hash,
|
|
kt.prefix(),
|
|
&name,
|
|
&key_type_str,
|
|
)
|
|
.with_permissions(&permissions)
|
|
.with_expires_at(response.expires_at);
|
|
|
|
if let Err(e) = db.create_api_key(config).await {
|
|
eprintln!(
|
|
"\n⚠️ Key generated but failed to store in database: {}",
|
|
e
|
|
);
|
|
}
|
|
|
|
println!("\n✅ API Key created successfully!");
|
|
println!("\n┌─────────────────────────────────────────────────────────────────────────────┐");
|
|
println!("│ ⚠️ IMPORTANT: Save this key now - it will not be shown again! │");
|
|
println!("└─────────────────────────────────────────────────────────────────────────────┘");
|
|
println!("\nKey ID: {}", response.key_id);
|
|
println!("API Key: {}", response.key);
|
|
println!("Expires: {}", response.expires_at);
|
|
if !response.warning.is_empty() {
|
|
println!("\n⚠️ {}", response.warning);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to create API key: {}", e);
|
|
}
|
|
}
|
|
}
|
|
ApiKeyAction::List => match db.list_api_keys().await {
|
|
Ok(keys) => {
|
|
println!("\n📋 API Key List");
|
|
if keys.is_empty() {
|
|
println!(" (no API keys found)");
|
|
} else {
|
|
println!("\n┌────────────────────────────────────────────────────────────────────────────┐");
|
|
println!(
|
|
"│ {:8} │ {:20} │ {:12} │ {:8} │ {:15} │",
|
|
"Status", "Name", "Type", "Usage", "Last Used"
|
|
);
|
|
println!("├────────────────────────────────────────────────────────────────────────────┤");
|
|
for k in &keys {
|
|
let status = if k.status == "active" {
|
|
"✓ active"
|
|
} else {
|
|
&k.status
|
|
};
|
|
let last_used = k
|
|
.last_used_at
|
|
.map(|dt| dt.format("%Y-%m-%d %H:%M").to_string())
|
|
.unwrap_or_else(|| "never".to_string());
|
|
println!(
|
|
"│ {:8} │ {:20} │ {:12} │ {:8} │ {:15} │",
|
|
status,
|
|
if k.name.len() > 20 {
|
|
&k.name[..17]
|
|
} else {
|
|
&k.name
|
|
},
|
|
k.key_type,
|
|
k.usage_count,
|
|
last_used
|
|
);
|
|
}
|
|
println!("└────────────────────────────────────────────────────────────────────────────┘");
|
|
println!("\nTotal: {} key(s)", keys.len());
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to list API keys: {}", e);
|
|
}
|
|
},
|
|
ApiKeyAction::Validate => {
|
|
let api_key =
|
|
key.ok_or_else(|| anyhow::anyhow!("--key required for validate"))?;
|
|
let key_hash = service.hash_key(&api_key);
|
|
|
|
match db.get_api_key_by_hash(&key_hash).await {
|
|
Ok(Some(record)) => {
|
|
if record.status == "active" {
|
|
db.update_api_key_usage(&record.key_id, None).await.ok();
|
|
println!("\n✅ API Key is valid");
|
|
println!("Key ID: {}", record.key_id);
|
|
println!("Name: {}", record.name);
|
|
println!("Type: {}", record.key_type);
|
|
println!("Usage: {} times", record.usage_count + 1);
|
|
if record.rotation_required {
|
|
println!(
|
|
"⚠️ Rotation required: {}",
|
|
record.rotation_reason.as_deref().unwrap_or("unknown")
|
|
);
|
|
}
|
|
} else {
|
|
println!("\n❌ API Key is {}", record.status);
|
|
}
|
|
}
|
|
Ok(None) => {
|
|
println!("\n❌ API Key is invalid or not found");
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Validation error: {}", e);
|
|
}
|
|
}
|
|
}
|
|
ApiKeyAction::Revoke => {
|
|
let key = key.ok_or_else(|| anyhow::anyhow!("--key required for revoke"))?;
|
|
let key_id = service.extract_key_id(&key);
|
|
match db.revoke_api_key(&key_id).await {
|
|
Ok(_) => {
|
|
println!("\n🔴 API Key {} revoked successfully", key_id);
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to revoke API key: {}", e);
|
|
}
|
|
}
|
|
}
|
|
ApiKeyAction::Rotate => {
|
|
let key = key.ok_or_else(|| anyhow::anyhow!("--key required for rotate"))?;
|
|
let key_id = service.extract_key_id(&key);
|
|
let grace_period_end =
|
|
service.calculate_grace_period_end(parse_key_type(key_type.as_deref()));
|
|
match db
|
|
.require_api_key_rotation(
|
|
&key_id,
|
|
"manual rotation requested",
|
|
grace_period_end,
|
|
)
|
|
.await
|
|
{
|
|
Ok(_) => {
|
|
println!("\n🔄 Rotation requested for key: {}", key_id);
|
|
println!("Grace period ends: {}", grace_period_end);
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Rotation request failed: {}", e);
|
|
}
|
|
}
|
|
}
|
|
ApiKeyAction::Stats => {
|
|
match db.get_api_key_stats().await {
|
|
Ok(stats) => {
|
|
println!("\n📊 API Key Statistics");
|
|
println!("\n┌─────────────────────────────────────────┐");
|
|
println!("│ Total Keys: {:5} │", stats.total_keys);
|
|
println!(
|
|
"│ Active Keys: {:5} │",
|
|
stats.active_keys
|
|
);
|
|
println!(
|
|
"│ Expired Keys: {:5} │",
|
|
stats.expired_keys
|
|
);
|
|
println!(
|
|
"│ Rotation Required: {:4} │",
|
|
stats.rotation_required
|
|
);
|
|
println!(
|
|
"│ Anomalies (24h): {:5} │",
|
|
stats.anomalies_last_24h
|
|
);
|
|
println!("└─────────────────────────────────────────┘");
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n⚠️ Failed to get stats: {}", e);
|
|
}
|
|
}
|
|
|
|
let config = service.get_config();
|
|
println!("\n┌─────────────────────────────────────────┐");
|
|
println!("│ Anomaly Detection Thresholds │");
|
|
println!("├─────────────────────────────────────────┤");
|
|
println!(
|
|
"│ Requests/minute: {:5} │",
|
|
config.requests_per_minute_threshold
|
|
);
|
|
println!(
|
|
"│ Requests/hour: {:5} │",
|
|
config.requests_per_hour_threshold
|
|
);
|
|
println!(
|
|
"│ Error rate: {:5.1}% │",
|
|
config.error_rate_threshold * 100.0
|
|
);
|
|
println!(
|
|
"│ Unique IPs/hour: {:5} │",
|
|
config.unique_ips_per_hour_threshold
|
|
);
|
|
println!(
|
|
"│ Lockout threshold: {:5} │",
|
|
config.lockout_threshold
|
|
);
|
|
println!("└─────────────────────────────────────────┘");
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
Commands::Gitea {
|
|
action,
|
|
username,
|
|
password,
|
|
token_name,
|
|
scopes,
|
|
} => {
|
|
use momentry_core::core::api_key::gitea::{
|
|
CreateGiteaTokenRequest, GiteaClient, GiteaScope,
|
|
};
|
|
|
|
let db = PostgresDb::init().await?;
|
|
let gitea = GiteaClient::new()?;
|
|
|
|
match action {
|
|
GiteaAction::Create => {
|
|
let username = username
|
|
.ok_or_else(|| anyhow::anyhow!("--username required for create"))?;
|
|
let password = password
|
|
.ok_or_else(|| anyhow::anyhow!("--password required for create"))?;
|
|
let token_name = token_name
|
|
.ok_or_else(|| anyhow::anyhow!("--token-name required for create"))?;
|
|
|
|
let scopes_vec: Vec<GiteaScope> = scopes
|
|
.map(|s| {
|
|
s.split(',')
|
|
.filter_map(|scope| scope.trim().parse::<GiteaScope>().ok())
|
|
.collect()
|
|
})
|
|
.unwrap_or_else(|| {
|
|
vec![GiteaScope::ReadRepository, GiteaScope::WriteRepository]
|
|
});
|
|
|
|
let request = CreateGiteaTokenRequest {
|
|
username: username.clone(),
|
|
password,
|
|
token_name: token_name.clone(),
|
|
scopes: scopes_vec.clone(),
|
|
};
|
|
|
|
match gitea.create_token(&request).await {
|
|
Ok(response) => {
|
|
if let Err(e) = db
|
|
.create_gitea_token(
|
|
response.id,
|
|
&username,
|
|
&token_name,
|
|
&response.token_last_eight,
|
|
&serde_json::json!(scopes_vec
|
|
.iter()
|
|
.map(|s| s.as_str())
|
|
.collect::<Vec<_>>()),
|
|
None,
|
|
)
|
|
.await
|
|
{
|
|
eprintln!("\n⚠️ Token created but failed to store: {}", e);
|
|
}
|
|
|
|
println!("\n✅ Gitea Token created successfully!");
|
|
println!("\n┌─────────────────────────────────────────────────────────────────────────────┐");
|
|
println!("│ ⚠️ IMPORTANT: Save this token now - it will not be shown again! │");
|
|
println!("└─────────────────────────────────────────────────────────────────────────────┘");
|
|
println!("\nToken ID: {}", response.id);
|
|
println!("Token Name: {}", response.name);
|
|
println!("SHA1: {}", response.sha1);
|
|
println!("Last 8: {}", response.token_last_eight);
|
|
println!("\nAuthorization Header:");
|
|
println!(" Authorization: token {}", response.sha1);
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to create Gitea token: {}", e);
|
|
}
|
|
}
|
|
}
|
|
GiteaAction::List => {
|
|
let username =
|
|
username.ok_or_else(|| anyhow::anyhow!("--username required for list"))?;
|
|
let password =
|
|
password.ok_or_else(|| anyhow::anyhow!("--password required for list"))?;
|
|
|
|
match gitea.list_tokens(&username, &password).await {
|
|
Ok(tokens) => {
|
|
println!("\n📋 Gitea Tokens for user: {}", username);
|
|
if tokens.is_empty() {
|
|
println!(" (no tokens found)");
|
|
} else {
|
|
println!("\n┌────────────────────────────────────────────────────────────────────────────┐");
|
|
println!("│ ID │ Name │ Last 8 │ Registered │");
|
|
println!("├────────────────────────────────────────────────────────────────────────────┤");
|
|
for token in &tokens {
|
|
let registered = db
|
|
.get_gitea_token_by_name(&username, &token.name)
|
|
.await
|
|
.ok()
|
|
.flatten()
|
|
.map(|_| "✓")
|
|
.unwrap_or("-");
|
|
println!(
|
|
"│ {:8} │ {:20} │ {:9} │ {:27} │",
|
|
token.id,
|
|
if token.name.len() > 20 {
|
|
&token.name[..17]
|
|
} else {
|
|
&token.name
|
|
},
|
|
token.token_last_eight,
|
|
registered
|
|
);
|
|
}
|
|
println!("└────────────────────────────────────────────────────────────────────────────┘");
|
|
println!("\nTotal: {} token(s)", tokens.len());
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to list Gitea tokens: {}", e);
|
|
}
|
|
}
|
|
}
|
|
GiteaAction::Delete => {
|
|
let username = username
|
|
.ok_or_else(|| anyhow::anyhow!("--username required for delete"))?;
|
|
let password = password
|
|
.ok_or_else(|| anyhow::anyhow!("--password required for delete"))?;
|
|
let token_name = token_name
|
|
.ok_or_else(|| anyhow::anyhow!("--token-name required for delete"))?;
|
|
|
|
match gitea.delete_token(&username, &password, &token_name).await {
|
|
Ok(_) => {
|
|
let _ = db.delete_gitea_token(&username, &token_name).await;
|
|
println!("\n🗑️ Token '{}' deleted successfully", token_name);
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to delete Gitea token: {}", e);
|
|
}
|
|
}
|
|
}
|
|
GiteaAction::Verify => {
|
|
let token_name = token_name
|
|
.ok_or_else(|| anyhow::anyhow!("--token-name required for verify"))?;
|
|
|
|
let record = db
|
|
.get_gitea_token_by_name(
|
|
&username.unwrap_or_else(|| "unknown".to_string()),
|
|
&token_name,
|
|
)
|
|
.await?;
|
|
|
|
match record {
|
|
Some(r) => {
|
|
println!("\n📋 Gitea Token: {}", r.token_name);
|
|
println!(" User: {}", r.gitea_user);
|
|
println!(" Token ID: {}", r.gitea_token_id);
|
|
println!(" Last 8: {}", r.token_last_eight);
|
|
println!(" Scopes: {}", r.scopes);
|
|
println!(" Created: {}", r.created_at);
|
|
if let Some(verified) = r.last_verified {
|
|
println!(" Last Verified: {}", verified);
|
|
} else {
|
|
println!(" Last Verified: never");
|
|
}
|
|
}
|
|
None => {
|
|
println!("\n❌ Token not found in local database");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
Commands::N8n {
|
|
action,
|
|
api_key,
|
|
label,
|
|
expires_in_days,
|
|
} => {
|
|
use momentry_core::core::api_key::n8n::{
|
|
extract_last_eight, CreateN8nApiKeyRequest, N8nClient,
|
|
};
|
|
|
|
let db = PostgresDb::init().await?;
|
|
|
|
match action {
|
|
N8nAction::Create => {
|
|
let api_key_value = api_key.ok_or_else(|| {
|
|
anyhow::anyhow!("--api-key required for create (existing n8n API key)")
|
|
})?;
|
|
let label =
|
|
label.ok_or_else(|| anyhow::anyhow!("--label required for create"))?;
|
|
|
|
let n8n = N8nClient::new(api_key_value)?;
|
|
|
|
let expires_at = expires_in_days
|
|
.map(|days| chrono::Utc::now() + chrono::Duration::days(days));
|
|
|
|
let request = CreateN8nApiKeyRequest {
|
|
label: label.clone(),
|
|
expires_at,
|
|
};
|
|
|
|
match n8n.create_api_key(&request).await {
|
|
Ok(response) => {
|
|
if let Err(e) = db
|
|
.create_n8n_api_key(
|
|
&response.id,
|
|
&label,
|
|
&extract_last_eight(&response.api_key),
|
|
None,
|
|
response.expires_at,
|
|
)
|
|
.await
|
|
{
|
|
eprintln!("\n⚠️ API key created but failed to store: {}", e);
|
|
}
|
|
|
|
println!("\n✅ n8n API Key created successfully!");
|
|
println!("\n┌─────────────────────────────────────────────────────────────────────────────┐");
|
|
println!("│ ⚠️ IMPORTANT: Save this API key now - it will not be shown again! │");
|
|
println!("└─────────────────────────────────────────────────────────────────────────────┘");
|
|
println!("\nKey ID: {}", response.id);
|
|
println!("Label: {}", response.label);
|
|
println!("API Key: {}", response.api_key);
|
|
println!("\nUsage:");
|
|
println!(" curl -H 'X-N8N-API-KEY: {}' https://n8n.momentry.ddns.net/api/v1/workflows", response.api_key);
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to create n8n API key: {}", e);
|
|
}
|
|
}
|
|
}
|
|
N8nAction::List => {
|
|
let api_key_value =
|
|
api_key.ok_or_else(|| anyhow::anyhow!("--api-key required for list"))?;
|
|
|
|
let n8n = N8nClient::new(api_key_value)?;
|
|
|
|
match n8n.list_api_keys().await {
|
|
Ok(keys) => {
|
|
println!("\n📋 n8n API Keys");
|
|
if keys.is_empty() {
|
|
println!(" (no API keys found)");
|
|
} else {
|
|
println!("\n┌────────────────────────────────────────────────────────────────────────────┐");
|
|
println!("│ Label │ ID │");
|
|
println!("├────────────────────────────────────────────────────────────────────────────┤");
|
|
for key in &keys {
|
|
println!(
|
|
"│ {:27} │ {:39} │",
|
|
if key.label.len() > 27 {
|
|
&key.label[..24]
|
|
} else {
|
|
&key.label
|
|
},
|
|
key.id
|
|
);
|
|
}
|
|
println!("└────────────────────────────────────────────────────────────────────────────┘");
|
|
println!("\nTotal: {} key(s)", keys.len());
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to list n8n API keys: {}", e);
|
|
}
|
|
}
|
|
}
|
|
N8nAction::Delete => {
|
|
let api_key_value =
|
|
api_key.ok_or_else(|| anyhow::anyhow!("--api-key required for delete"))?;
|
|
let label =
|
|
label.ok_or_else(|| anyhow::anyhow!("--label required for delete"))?;
|
|
|
|
let record = db.get_n8n_api_key_by_label(&label).await?;
|
|
if let Some(r) = record {
|
|
let n8n = N8nClient::new(api_key_value)?;
|
|
match n8n.delete_api_key(&r.n8n_key_id).await {
|
|
Ok(_) => {
|
|
let _ = db.delete_n8n_api_key(&label).await;
|
|
println!("\n🗑️ API key '{}' deleted successfully", label);
|
|
}
|
|
Err(e) => {
|
|
eprintln!("\n❌ Failed to delete n8n API key: {}", e);
|
|
}
|
|
}
|
|
} else {
|
|
println!("\n❌ API key '{}' not found in local database", label);
|
|
}
|
|
}
|
|
N8nAction::Verify => {
|
|
let label =
|
|
label.ok_or_else(|| anyhow::anyhow!("--label required for verify"))?;
|
|
|
|
let record = db.get_n8n_api_key_by_label(&label).await?;
|
|
|
|
match record {
|
|
Some(r) => {
|
|
println!("\n📋 n8n API Key: {}", r.label);
|
|
println!(" Key ID: {}", r.n8n_key_id);
|
|
println!(" Last 8: {}", r.api_key_last_eight);
|
|
println!(" Created: {}", r.created_at);
|
|
if let Some(expires) = r.expires_at {
|
|
println!(" Expires: {}", expires);
|
|
}
|
|
if let Some(verified) = r.last_verified {
|
|
println!(" Last Verified: {}", verified);
|
|
} else {
|
|
println!(" Last Verified: never");
|
|
}
|
|
}
|
|
None => {
|
|
println!("\n❌ API key not found in local database");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|