Files
momentry_core/scripts/resume_framework.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

484 lines
15 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
ResumeFramework - Shared Resume Support for All Processors
This module provides a unified resume mechanism for all processors (YOLO, OCR, Face, Pose, etc.).
Features:
- Auto-detect existing results and resume from last checkpoint
- Auto-save at configurable intervals (time-based or frame-based)
- Graceful Ctrl+C handling with progress save
- JSON Lines (.jsonl) support for incremental writes
- Progress tracking and ETA calculation
Usage:
from resume_framework import ResumeFramework
framework = ResumeFramework(
output_path="output.json",
processor_name="yolo",
uuid="vid_001",
auto_save_interval=30,
auto_save_frames=300
)
# Load existing data (if resuming)
existing_data, last_checkpoint = framework.load_existing_data()
# Set data for signal handler
framework.set_data(detection_data)
# Save progress periodically
framework.save_progress(frame_count, is_interrupted=False)
# Finalize on completion
framework.finalize(total_frames)
"""
import sys
import os
import json
import signal
import time
from datetime import datetime
from typing import Dict, Optional, Tuple, Any, Callable
class ResumeFramework:
"""
Resume Framework for Processors
Attributes:
output_path (str): Output JSON/JSONL file path
processor_name (str): Processor name (yolo, ocr, face, pose, etc.)
uuid (str): Video UUID
auto_save_interval (int): Auto-save interval in seconds
auto_save_frames (int): Auto-save interval in frames
publisher (RedisPublisher): Redis publisher for progress updates
data (Dict): Current processing data
use_jsonl (bool): Use JSON Lines format (.jsonl)
"""
def __init__(
self,
output_path: str,
processor_name: str,
uuid: str = "",
auto_save_interval: int = 30,
auto_save_frames: int = 300,
use_jsonl: bool = False,
force_restart: bool = False,
progress_callback: Optional[Callable] = None,
):
"""
Initialize Resume Framework
Args:
output_path: Output file path
processor_name: Processor name
uuid: Video UUID
auto_save_interval: Auto-save interval in seconds (default: 30)
auto_save_frames: Auto-save interval in frames (default: 300)
use_jsonl: Use JSON Lines format (.jsonl) for incremental writes
force_restart: Force restart (ignore existing data)
progress_callback: Optional callback for progress updates
"""
self.output_path = output_path
self.processor_name = processor_name
self.uuid = uuid
self.auto_save_interval = auto_save_interval
self.auto_save_frames = auto_save_frames
self.use_jsonl = use_jsonl
self.force_restart = force_restart
self.progress_callback = progress_callback
self.data: Optional[Dict] = None
self.publisher = None
self.last_save_time = 0.0
self.last_save_frame = 0
self.auto_save_count = 0
# Import RedisPublisher if uuid provided
if uuid:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
self.publisher = RedisPublisher(uuid)
# Register signal handler
self._register_signal_handler()
def _register_signal_handler(self):
"""Register signal handlers for graceful pause"""
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle Ctrl+C / SIGTERM to pause and save progress"""
print(f"\n\n{'=' * 60}")
print(f"PAUSE - Saving progress for {self.processor_name}...")
print(f"{'=' * 60}")
if self.data:
success, file_size = self.save_progress(
checkpoint=self.last_save_frame,
is_interrupted=True,
silent=False
)
if success:
print(f"Progress saved to: {self.output_path}")
print(f"Last checkpoint: frame {self.last_save_frame}")
print(f"File size: {file_size} bytes")
print("Run the same command again to resume")
print(f"{'=' * 60}\n")
sys.exit(0)
def load_existing_data(self) -> Tuple[Optional[Dict], int]:
"""
Load existing data from file
Returns:
Tuple of (existing_data, last_checkpoint)
- existing_data: Loaded data dict or None
- last_checkpoint: Last processed frame/segment index
"""
if self.force_restart:
return None, 0
if not os.path.exists(self.output_path):
return None, 0
try:
if self.use_jsonl:
return self._load_jsonl()
else:
return self._load_json()
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Warning: Could not load existing file: {e}")
return None, 0
def _load_json(self) -> Tuple[Optional[Dict], int]:
"""Load JSON format file"""
with open(self.output_path, "r", encoding="utf-8") as f:
data = json.load(f)
metadata = data.get("metadata", {})
last_checkpoint = metadata.get("last_saved_frame", 0)
if last_checkpoint > 0:
return data, last_checkpoint
return None, 0
def _load_jsonl(self) -> Tuple[Optional[Dict], int]:
"""Load JSON Lines format file"""
data = {"metadata": {}, "frames": {}}
last_checkpoint = 0
with open(self.output_path, "r", encoding="utf-8") as f:
for line in f:
try:
entry = json.loads(line.strip())
if "metadata" in entry:
data["metadata"] = entry["metadata"]
elif "frame" in entry:
frame_num = entry["frame"]
data["frames"][str(frame_num)] = entry
last_checkpoint = max(last_checkpoint, frame_num)
except json.JSONDecodeError:
continue
if last_checkpoint > 0:
return data, last_checkpoint
return None, 0
def set_data(self, data: Dict):
"""
Set current processing data for signal handler
Args:
data: Current processing data dict
"""
self.data = data
def save_progress(
self,
checkpoint: int,
is_interrupted: bool = False,
silent: bool = False,
extra_metadata: Optional[Dict] = None,
) -> Tuple[bool, int]:
"""
Save progress to file
Args:
checkpoint: Current checkpoint (frame/segment index)
is_interrupted: Is this an interrupted save
silent: Suppress output
extra_metadata: Extra metadata to add
Returns:
Tuple of (success, file_size)
"""
if not self.data:
return False, 0
try:
metadata = self.data.get("metadata", {})
metadata["last_saved_at"] = datetime.now().isoformat()
metadata["status"] = "interrupted" if is_interrupted else "in_progress"
metadata["last_saved_frame"] = checkpoint
metadata["auto_save_count"] = self.auto_save_count
if extra_metadata:
metadata.update(extra_metadata)
self.data["metadata"] = metadata
if self.use_jsonl:
file_size = self._save_jsonl(is_interrupted)
else:
file_size = self._save_json()
self.last_save_frame = checkpoint
self.last_save_time = time.time()
self.auto_save_count += 1
if not silent:
self._print_save_info(checkpoint, file_size, is_interrupted)
return True, file_size
except Exception as e:
print(f"Error saving progress: {e}")
return False, 0
def _save_json(self) -> int:
"""Save as JSON format"""
with open(self.output_path, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2, ensure_ascii=False)
return os.path.getsize(self.output_path)
def _save_jsonl(self, is_interrupted: bool = False) -> int:
"""
Save as JSON Lines format
For resume, we append new frames to existing .jsonl file
"""
mode = "a" if self.last_save_frame > 0 else "w"
with open(self.output_path, mode, encoding="utf-8") as f:
if mode == "w":
metadata_entry = {"metadata": self.data["metadata"]}
f.write(json.dumps(metadata_entry, ensure_ascii=False) + "\n")
for frame_key, frame_data in self.data.get("frames", {}).items():
if int(frame_key) > self.last_save_frame:
f.write(json.dumps(frame_data, ensure_ascii=False) + "\n")
return os.path.getsize(self.output_path)
def _print_save_info(self, checkpoint: int, file_size: int, is_interrupted: bool):
"""Print save info"""
status = "INTERRUPTED" if is_interrupted else "AUTO-SAVE"
print(
f"\n[{status}] Saved progress: frame {checkpoint}, "
f"file size: {file_size} bytes, auto_save #{self.auto_save_count}\n"
)
def should_auto_save(self, current_checkpoint: int) -> bool:
"""
Check if should auto-save
Args:
current_checkpoint: Current checkpoint
Returns:
True if should auto-save
"""
current_time = time.time()
time_elapsed = current_time - self.last_save_time >= self.auto_save_interval
frames_elapsed = current_checkpoint - self.last_save_frame >= self.auto_save_frames
return time_elapsed or frames_elapsed
def init_metadata(
self,
video_path: str,
fps: float,
width: int,
height: int,
total_frames: int,
total_duration: float,
extra: Optional[Dict] = None,
) -> Dict:
"""
Initialize metadata for new processing
Args:
video_path: Video file path
fps: Frame rate
width: Video width
height: Video height
total_frames: Total frames
total_duration: Total duration in seconds
extra: Extra metadata
Returns:
Metadata dict
"""
metadata = {
"video_path": os.path.abspath(video_path),
"fps": fps,
"width": width,
"height": height,
"total_frames": total_frames,
"total_duration": total_duration,
"processor": self.processor_name,
"processed_at": datetime.now().isoformat(),
"auto_save_interval": self.auto_save_interval,
"auto_save_frames": self.auto_save_frames,
"status": "in_progress",
"last_saved_at": datetime.now().isoformat(),
"last_saved_frame": 0,
"auto_save_count": 0,
}
if extra:
metadata.update(extra)
return metadata
def finalize(
self,
total_processed: int,
extra_metadata: Optional[Dict] = None,
):
"""
Finalize processing (mark as completed)
Args:
total_processed: Total processed frames/segments
extra_metadata: Extra metadata to add
"""
if not self.data:
return
metadata = self.data.get("metadata", {})
metadata["status"] = "completed"
metadata["completed_at"] = datetime.now().isoformat()
metadata["total_processed"] = total_processed
metadata["last_saved_frame"] = total_processed
if extra_metadata:
metadata.update(extra_metadata)
self.data["metadata"] = metadata
# Final save
self.save_progress(
checkpoint=total_processed,
is_interrupted=False,
silent=True
)
print(f"\n[COMPLETED] {self.processor_name} processed {total_processed} items")
print(f"Output saved to: {self.output_path}")
if self.publisher:
self.publisher.complete(
self.processor_name,
f"{total_processed} items"
)
def publish_progress(self, current: int, total: int, message: str = ""):
"""
Publish progress to Redis
Args:
current: Current progress
total: Total count
message: Progress message
"""
if self.publisher:
self.publisher.progress(self.processor_name, current, total, message)
if self.progress_callback:
self.progress_callback(current, total, message)
def publish_info(self, message: str):
"""
Publish info message to Redis
Args:
message: Info message
"""
if self.publisher:
self.publisher.info(self.processor_name, message)
def publish_error(self, message: str):
"""
Publish error message to Redis
Args:
message: Error message
"""
if self.publisher:
self.publisher.error(self.processor_name, message)
def format_time(seconds: float) -> str:
"""
Format seconds to HH:MM:SS
Args:
seconds: Time in seconds
Returns:
Formatted time string
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def calculate_eta(elapsed: float, current: int, total: int) -> float:
"""
Calculate ETA
Args:
elapsed: Elapsed time in seconds
current: Current progress
total: Total count
Returns:
ETA in seconds
"""
if current <= 0:
return 0
return (elapsed / current) * (total - current)
def print_progress(
current: int,
total: int,
elapsed: float,
extra_info: str = "",
):
"""
Print progress indicator
Args:
current: Current progress
total: Total count
elapsed: Elapsed time in seconds
extra_info: Extra info to display
"""
progress_pct = (current / total) * 100 if total > 0 else 0
eta = calculate_eta(elapsed, current, total)
print(
f" Progress: {current}/{total} ({progress_pct:.1f}%) - "
f"ETA: {eta:.0f}s - {extra_info}"
)