- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
484 lines
15 KiB
Python
484 lines
15 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
ResumeFramework - Shared Resume Support for All Processors
|
|
|
|
This module provides a unified resume mechanism for all processors (YOLO, OCR, Face, Pose, etc.).
|
|
|
|
Features:
|
|
- Auto-detect existing results and resume from last checkpoint
|
|
- Auto-save at configurable intervals (time-based or frame-based)
|
|
- Graceful Ctrl+C handling with progress save
|
|
- JSON Lines (.jsonl) support for incremental writes
|
|
- Progress tracking and ETA calculation
|
|
|
|
Usage:
|
|
from resume_framework import ResumeFramework
|
|
|
|
framework = ResumeFramework(
|
|
output_path="output.json",
|
|
processor_name="yolo",
|
|
uuid="vid_001",
|
|
auto_save_interval=30,
|
|
auto_save_frames=300
|
|
)
|
|
|
|
# Load existing data (if resuming)
|
|
existing_data, last_checkpoint = framework.load_existing_data()
|
|
|
|
# Set data for signal handler
|
|
framework.set_data(detection_data)
|
|
|
|
# Save progress periodically
|
|
framework.save_progress(frame_count, is_interrupted=False)
|
|
|
|
# Finalize on completion
|
|
framework.finalize(total_frames)
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import signal
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, Optional, Tuple, Any, Callable
|
|
|
|
|
|
class ResumeFramework:
|
|
"""
|
|
Resume Framework for Processors
|
|
|
|
Attributes:
|
|
output_path (str): Output JSON/JSONL file path
|
|
processor_name (str): Processor name (yolo, ocr, face, pose, etc.)
|
|
uuid (str): Video UUID
|
|
auto_save_interval (int): Auto-save interval in seconds
|
|
auto_save_frames (int): Auto-save interval in frames
|
|
publisher (RedisPublisher): Redis publisher for progress updates
|
|
data (Dict): Current processing data
|
|
use_jsonl (bool): Use JSON Lines format (.jsonl)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
output_path: str,
|
|
processor_name: str,
|
|
uuid: str = "",
|
|
auto_save_interval: int = 30,
|
|
auto_save_frames: int = 300,
|
|
use_jsonl: bool = False,
|
|
force_restart: bool = False,
|
|
progress_callback: Optional[Callable] = None,
|
|
):
|
|
"""
|
|
Initialize Resume Framework
|
|
|
|
Args:
|
|
output_path: Output file path
|
|
processor_name: Processor name
|
|
uuid: Video UUID
|
|
auto_save_interval: Auto-save interval in seconds (default: 30)
|
|
auto_save_frames: Auto-save interval in frames (default: 300)
|
|
use_jsonl: Use JSON Lines format (.jsonl) for incremental writes
|
|
force_restart: Force restart (ignore existing data)
|
|
progress_callback: Optional callback for progress updates
|
|
"""
|
|
self.output_path = output_path
|
|
self.processor_name = processor_name
|
|
self.uuid = uuid
|
|
self.auto_save_interval = auto_save_interval
|
|
self.auto_save_frames = auto_save_frames
|
|
self.use_jsonl = use_jsonl
|
|
self.force_restart = force_restart
|
|
self.progress_callback = progress_callback
|
|
|
|
self.data: Optional[Dict] = None
|
|
self.publisher = None
|
|
self.last_save_time = 0.0
|
|
self.last_save_frame = 0
|
|
self.auto_save_count = 0
|
|
|
|
# Import RedisPublisher if uuid provided
|
|
if uuid:
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
self.publisher = RedisPublisher(uuid)
|
|
|
|
# Register signal handler
|
|
self._register_signal_handler()
|
|
|
|
def _register_signal_handler(self):
|
|
"""Register signal handlers for graceful pause"""
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""Handle Ctrl+C / SIGTERM to pause and save progress"""
|
|
print(f"\n\n{'=' * 60}")
|
|
print(f"PAUSE - Saving progress for {self.processor_name}...")
|
|
print(f"{'=' * 60}")
|
|
|
|
if self.data:
|
|
success, file_size = self.save_progress(
|
|
checkpoint=self.last_save_frame,
|
|
is_interrupted=True,
|
|
silent=False
|
|
)
|
|
if success:
|
|
print(f"Progress saved to: {self.output_path}")
|
|
print(f"Last checkpoint: frame {self.last_save_frame}")
|
|
print(f"File size: {file_size} bytes")
|
|
print("Run the same command again to resume")
|
|
|
|
print(f"{'=' * 60}\n")
|
|
sys.exit(0)
|
|
|
|
def load_existing_data(self) -> Tuple[Optional[Dict], int]:
|
|
"""
|
|
Load existing data from file
|
|
|
|
Returns:
|
|
Tuple of (existing_data, last_checkpoint)
|
|
- existing_data: Loaded data dict or None
|
|
- last_checkpoint: Last processed frame/segment index
|
|
"""
|
|
if self.force_restart:
|
|
return None, 0
|
|
|
|
if not os.path.exists(self.output_path):
|
|
return None, 0
|
|
|
|
try:
|
|
if self.use_jsonl:
|
|
return self._load_jsonl()
|
|
else:
|
|
return self._load_json()
|
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
|
print(f"Warning: Could not load existing file: {e}")
|
|
return None, 0
|
|
|
|
def _load_json(self) -> Tuple[Optional[Dict], int]:
|
|
"""Load JSON format file"""
|
|
with open(self.output_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
metadata = data.get("metadata", {})
|
|
last_checkpoint = metadata.get("last_saved_frame", 0)
|
|
|
|
if last_checkpoint > 0:
|
|
return data, last_checkpoint
|
|
|
|
return None, 0
|
|
|
|
def _load_jsonl(self) -> Tuple[Optional[Dict], int]:
|
|
"""Load JSON Lines format file"""
|
|
data = {"metadata": {}, "frames": {}}
|
|
last_checkpoint = 0
|
|
|
|
with open(self.output_path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
try:
|
|
entry = json.loads(line.strip())
|
|
if "metadata" in entry:
|
|
data["metadata"] = entry["metadata"]
|
|
elif "frame" in entry:
|
|
frame_num = entry["frame"]
|
|
data["frames"][str(frame_num)] = entry
|
|
last_checkpoint = max(last_checkpoint, frame_num)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if last_checkpoint > 0:
|
|
return data, last_checkpoint
|
|
|
|
return None, 0
|
|
|
|
def set_data(self, data: Dict):
|
|
"""
|
|
Set current processing data for signal handler
|
|
|
|
Args:
|
|
data: Current processing data dict
|
|
"""
|
|
self.data = data
|
|
|
|
def save_progress(
|
|
self,
|
|
checkpoint: int,
|
|
is_interrupted: bool = False,
|
|
silent: bool = False,
|
|
extra_metadata: Optional[Dict] = None,
|
|
) -> Tuple[bool, int]:
|
|
"""
|
|
Save progress to file
|
|
|
|
Args:
|
|
checkpoint: Current checkpoint (frame/segment index)
|
|
is_interrupted: Is this an interrupted save
|
|
silent: Suppress output
|
|
extra_metadata: Extra metadata to add
|
|
|
|
Returns:
|
|
Tuple of (success, file_size)
|
|
"""
|
|
if not self.data:
|
|
return False, 0
|
|
|
|
try:
|
|
metadata = self.data.get("metadata", {})
|
|
metadata["last_saved_at"] = datetime.now().isoformat()
|
|
metadata["status"] = "interrupted" if is_interrupted else "in_progress"
|
|
metadata["last_saved_frame"] = checkpoint
|
|
metadata["auto_save_count"] = self.auto_save_count
|
|
|
|
if extra_metadata:
|
|
metadata.update(extra_metadata)
|
|
|
|
self.data["metadata"] = metadata
|
|
|
|
if self.use_jsonl:
|
|
file_size = self._save_jsonl(is_interrupted)
|
|
else:
|
|
file_size = self._save_json()
|
|
|
|
self.last_save_frame = checkpoint
|
|
self.last_save_time = time.time()
|
|
self.auto_save_count += 1
|
|
|
|
if not silent:
|
|
self._print_save_info(checkpoint, file_size, is_interrupted)
|
|
|
|
return True, file_size
|
|
except Exception as e:
|
|
print(f"Error saving progress: {e}")
|
|
return False, 0
|
|
|
|
def _save_json(self) -> int:
|
|
"""Save as JSON format"""
|
|
with open(self.output_path, "w", encoding="utf-8") as f:
|
|
json.dump(self.data, f, indent=2, ensure_ascii=False)
|
|
return os.path.getsize(self.output_path)
|
|
|
|
def _save_jsonl(self, is_interrupted: bool = False) -> int:
|
|
"""
|
|
Save as JSON Lines format
|
|
|
|
For resume, we append new frames to existing .jsonl file
|
|
"""
|
|
mode = "a" if self.last_save_frame > 0 else "w"
|
|
|
|
with open(self.output_path, mode, encoding="utf-8") as f:
|
|
if mode == "w":
|
|
metadata_entry = {"metadata": self.data["metadata"]}
|
|
f.write(json.dumps(metadata_entry, ensure_ascii=False) + "\n")
|
|
|
|
for frame_key, frame_data in self.data.get("frames", {}).items():
|
|
if int(frame_key) > self.last_save_frame:
|
|
f.write(json.dumps(frame_data, ensure_ascii=False) + "\n")
|
|
|
|
return os.path.getsize(self.output_path)
|
|
|
|
def _print_save_info(self, checkpoint: int, file_size: int, is_interrupted: bool):
|
|
"""Print save info"""
|
|
status = "INTERRUPTED" if is_interrupted else "AUTO-SAVE"
|
|
print(
|
|
f"\n[{status}] Saved progress: frame {checkpoint}, "
|
|
f"file size: {file_size} bytes, auto_save #{self.auto_save_count}\n"
|
|
)
|
|
|
|
def should_auto_save(self, current_checkpoint: int) -> bool:
|
|
"""
|
|
Check if should auto-save
|
|
|
|
Args:
|
|
current_checkpoint: Current checkpoint
|
|
|
|
Returns:
|
|
True if should auto-save
|
|
"""
|
|
current_time = time.time()
|
|
time_elapsed = current_time - self.last_save_time >= self.auto_save_interval
|
|
frames_elapsed = current_checkpoint - self.last_save_frame >= self.auto_save_frames
|
|
|
|
return time_elapsed or frames_elapsed
|
|
|
|
def init_metadata(
|
|
self,
|
|
video_path: str,
|
|
fps: float,
|
|
width: int,
|
|
height: int,
|
|
total_frames: int,
|
|
total_duration: float,
|
|
extra: Optional[Dict] = None,
|
|
) -> Dict:
|
|
"""
|
|
Initialize metadata for new processing
|
|
|
|
Args:
|
|
video_path: Video file path
|
|
fps: Frame rate
|
|
width: Video width
|
|
height: Video height
|
|
total_frames: Total frames
|
|
total_duration: Total duration in seconds
|
|
extra: Extra metadata
|
|
|
|
Returns:
|
|
Metadata dict
|
|
"""
|
|
metadata = {
|
|
"video_path": os.path.abspath(video_path),
|
|
"fps": fps,
|
|
"width": width,
|
|
"height": height,
|
|
"total_frames": total_frames,
|
|
"total_duration": total_duration,
|
|
"processor": self.processor_name,
|
|
"processed_at": datetime.now().isoformat(),
|
|
"auto_save_interval": self.auto_save_interval,
|
|
"auto_save_frames": self.auto_save_frames,
|
|
"status": "in_progress",
|
|
"last_saved_at": datetime.now().isoformat(),
|
|
"last_saved_frame": 0,
|
|
"auto_save_count": 0,
|
|
}
|
|
|
|
if extra:
|
|
metadata.update(extra)
|
|
|
|
return metadata
|
|
|
|
def finalize(
|
|
self,
|
|
total_processed: int,
|
|
extra_metadata: Optional[Dict] = None,
|
|
):
|
|
"""
|
|
Finalize processing (mark as completed)
|
|
|
|
Args:
|
|
total_processed: Total processed frames/segments
|
|
extra_metadata: Extra metadata to add
|
|
"""
|
|
if not self.data:
|
|
return
|
|
|
|
metadata = self.data.get("metadata", {})
|
|
metadata["status"] = "completed"
|
|
metadata["completed_at"] = datetime.now().isoformat()
|
|
metadata["total_processed"] = total_processed
|
|
metadata["last_saved_frame"] = total_processed
|
|
|
|
if extra_metadata:
|
|
metadata.update(extra_metadata)
|
|
|
|
self.data["metadata"] = metadata
|
|
|
|
# Final save
|
|
self.save_progress(
|
|
checkpoint=total_processed,
|
|
is_interrupted=False,
|
|
silent=True
|
|
)
|
|
|
|
print(f"\n[COMPLETED] {self.processor_name} processed {total_processed} items")
|
|
print(f"Output saved to: {self.output_path}")
|
|
|
|
if self.publisher:
|
|
self.publisher.complete(
|
|
self.processor_name,
|
|
f"{total_processed} items"
|
|
)
|
|
|
|
def publish_progress(self, current: int, total: int, message: str = ""):
|
|
"""
|
|
Publish progress to Redis
|
|
|
|
Args:
|
|
current: Current progress
|
|
total: Total count
|
|
message: Progress message
|
|
"""
|
|
if self.publisher:
|
|
self.publisher.progress(self.processor_name, current, total, message)
|
|
|
|
if self.progress_callback:
|
|
self.progress_callback(current, total, message)
|
|
|
|
def publish_info(self, message: str):
|
|
"""
|
|
Publish info message to Redis
|
|
|
|
Args:
|
|
message: Info message
|
|
"""
|
|
if self.publisher:
|
|
self.publisher.info(self.processor_name, message)
|
|
|
|
def publish_error(self, message: str):
|
|
"""
|
|
Publish error message to Redis
|
|
|
|
Args:
|
|
message: Error message
|
|
"""
|
|
if self.publisher:
|
|
self.publisher.error(self.processor_name, message)
|
|
|
|
|
|
def format_time(seconds: float) -> str:
|
|
"""
|
|
Format seconds to HH:MM:SS
|
|
|
|
Args:
|
|
seconds: Time in seconds
|
|
|
|
Returns:
|
|
Formatted time string
|
|
"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
|
|
|
|
|
def calculate_eta(elapsed: float, current: int, total: int) -> float:
|
|
"""
|
|
Calculate ETA
|
|
|
|
Args:
|
|
elapsed: Elapsed time in seconds
|
|
current: Current progress
|
|
total: Total count
|
|
|
|
Returns:
|
|
ETA in seconds
|
|
"""
|
|
if current <= 0:
|
|
return 0
|
|
return (elapsed / current) * (total - current)
|
|
|
|
|
|
def print_progress(
|
|
current: int,
|
|
total: int,
|
|
elapsed: float,
|
|
extra_info: str = "",
|
|
):
|
|
"""
|
|
Print progress indicator
|
|
|
|
Args:
|
|
current: Current progress
|
|
total: Total count
|
|
elapsed: Elapsed time in seconds
|
|
extra_info: Extra info to display
|
|
"""
|
|
progress_pct = (current / total) * 100 if total > 0 else 0
|
|
eta = calculate_eta(elapsed, current, total)
|
|
|
|
print(
|
|
f" Progress: {current}/{total} ({progress_pct:.1f}%) - "
|
|
f"ETA: {eta:.0f}s - {extra_info}"
|
|
) |