Files
momentry_core/scripts/ocr_processor_mps.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

362 lines
11 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
OCR Processor - Apple MPS Optimized Version
Uses EasyOCR with Apple Silicon MPS acceleration
Falls back to CPU if MPS not available
Features:
- EasyOCR with MPS GPU support
- Apple MPS acceleration for image processing
- Memory-optimized for unified memory architecture
- Vision Framework fallback for future expansion
"""
import sys
import json
import argparse
import os
import signal
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
import torch
# Check for MPS availability
def get_device() -> str:
"""Determine the best available device for processing"""
if torch.backends.mps.is_available():
return "mps"
elif torch.cuda.is_available():
return "cuda"
else:
return "cpu"
def signal_handler(signum, frame):
"""Handle interrupt signals gracefully"""
print(f"\n[OCR] Received signal {signum}, saving results and exiting...")
sys.exit(0)
def process_video_ocr(
video_path: str,
output_path: str,
languages: List[str] = ["en"],
device: str = "auto",
sample_interval: int = 30,
confidence_threshold: float = 0.5,
resume: bool = True,
save_interval: int = 30,
) -> Dict:
"""
Process video for OCR with MPS acceleration
Args:
video_path: Path to input video file
output_path: Path to output JSON file
languages: List of languages to recognize
device: Device to use ('auto', 'mps', 'cuda', 'cpu')
sample_interval: Process every N frames
confidence_threshold: Minimum confidence threshold
resume: Whether to resume from existing results
save_interval: Auto-save interval in seconds
Returns:
Dictionary with OCR results and metadata
"""
# Set up signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Determine device
if device == "auto":
device = get_device()
print(f"[OCR] Starting OCR processing with device: {device}")
print(f"[OCR] Languages: {languages}, Confidence: {confidence_threshold}")
try:
import easyocr
except ImportError:
print("[OCR] Error: easyocr not installed")
result = {"frame_count": 0, "fps": 0.0, "frames": []}
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
return result
# Load EasyOCR reader with GPU setting based on device
use_gpu = device in ["cuda", "mps"]
print(f"[OCR] Loading EasyOCR with GPU: {use_gpu}")
reader = easyocr.Reader(languages, gpu=use_gpu, verbose=False)
# Get video info
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
print(f"[OCR] Video: {width}x{height} @ {fps:.2f} FPS, {total_frames} frames")
# Load existing data if resuming
existing_data = None
last_processed_frame = 0
if resume and os.path.exists(output_path):
try:
with open(output_path, "r") as f:
existing_data = json.load(f)
frames = existing_data.get("frames", {})
if frames:
last_processed_frame = max(int(k) for k in frames.keys())
print(f"[OCR] Resuming from frame {last_processed_frame}")
except (json.JSONDecodeError, KeyError):
pass
# Initialize result structure
result = {
"video_path": video_path,
"languages": languages,
"device": device,
"confidence_threshold": confidence_threshold,
"processed_at": datetime.now().isoformat(),
"frames": {},
}
if existing_data:
result["frames"] = existing_data.get("frames", {})
# Process video
print(f"[OCR] Processing video: {video_path}")
start_time = time.time()
frame_count = 0
text_count = 0
last_save_time = start_time
cap = cv2.VideoCapture(video_path)
try:
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Sample frames
if frame_count % sample_interval != 0:
continue
# Skip already processed frames
if frame_count <= last_processed_frame:
continue
timestamp = (frame_count - 1) / fps if fps > 0 else 0
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Run OCR
try:
detections = reader.readtext(
frame_rgb,
text_threshold=confidence_threshold,
low_text=0.3,
link_threshold=0.3,
)
except Exception as e:
print(f"[OCR] Error at frame {frame_count}: {e}")
detections = []
# Process detections
frame_texts = []
for detection in detections:
bbox, text, confidence = detection
if float(confidence) >= confidence_threshold:
# Extract bounding box coordinates
bbox_points = np.array(bbox).astype(int)
x_coords = bbox_points[:, 0]
y_coords = bbox_points[:, 1]
x = int(np.min(x_coords))
y = int(np.min(y_coords))
width = int(np.max(x_coords) - x)
height = int(np.max(y_coords) - y)
frame_texts.append(
{
"x": x,
"y": y,
"width": width,
"height": height,
"text": text,
"confidence": float(confidence),
"rotation": 0, # No rotation info from easyocr
}
)
if frame_texts:
result["frames"][str(frame_count)] = {
"timestamp": timestamp,
"texts": frame_texts,
}
text_count += len(frame_texts)
# Progress reporting
if frame_count % 100 == 0:
elapsed = time.time() - start_time
fps_rate = frame_count / elapsed if elapsed > 0 else 0
print(
f"[OCR] Processed {frame_count} frames, {text_count} text regions, {fps_rate:.1f} FPS"
)
# Periodic save
if save_interval > 0 and time.time() - last_save_time > save_interval:
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
last_save_time = time.time()
print(f"[OCR] Auto-saved at frame {frame_count}")
except Exception as e:
print(f"[OCR] Error during processing: {e}")
raise
finally:
cap.release()
# Final save
elapsed_time = time.time() - start_time
avg_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
result["summary"] = {
"total_frames": frame_count,
"total_texts": text_count,
"processing_time": round(elapsed_time, 2),
"average_fps": round(avg_fps, 2),
"device": device,
}
# Save final results
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
print(
f"[OCR] Completed: {frame_count} frames, {text_count} text regions in {elapsed_time:.1f}s ({avg_fps:.1f} FPS)"
)
print(f"[OCR] Results saved to: {output_path}")
return result
def benchmark_ocr_models(video_path: str, num_frames: int = 50) -> Dict:
"""Benchmark OCR processing on different devices"""
devices = ["cpu"]
if torch.backends.mps.is_available():
devices.append("mps")
if torch.cuda.is_available():
devices.append("cuda")
languages = ["en"]
results = {}
for device in devices:
print(f"[OCR] Benchmarking OCR on {device}...")
start_time = time.time()
count = 0
try:
import easyocr
reader = easyocr.Reader(
languages, gpu=device in ["cuda", "mps"], verbose=False
)
cap = cv2.VideoCapture(video_path)
for idx in range(num_frames):
ret, frame = cap.read()
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
detections = reader.readtext(
frame_rgb, text_threshold=0.5, low_text=0.3, link_threshold=0.3
)
count += len(detections)
cap.release()
except Exception as e:
print(f"[OCR] Error: {e}")
continue
elapsed = time.time() - start_time
fps = count / elapsed if elapsed > 0 else 0
key = f"ocr_{device}"
results[key] = {
"detections": count,
"time": round(elapsed, 2),
"fps": round(fps, 2),
}
return results
def main():
parser = argparse.ArgumentParser(description="OCR Processor with MPS Support")
parser.add_argument("--video", required=True, help="Input video path")
parser.add_argument("--output", required=True, help="Output JSON path")
parser.add_argument(
"--languages", nargs="+", default=["en"], help="Languages to recognize"
)
parser.add_argument(
"--device",
default="auto",
choices=["auto", "mps", "cuda", "cpu"],
help="Device to use",
)
parser.add_argument(
"--sample-interval", type=int, default=30, help="Process every N frames"
)
parser.add_argument(
"--confidence", type=float, default=0.5, help="Confidence threshold"
)
parser.add_argument(
"--no-resume", action="store_true", help="Do not resume from existing results"
)
parser.add_argument(
"--save-interval", type=int, default=30, help="Auto-save interval in seconds"
)
parser.add_argument(
"--benchmark", action="store_true", help="Run benchmark instead of processing"
)
args = parser.parse_args()
if args.benchmark:
results = benchmark_ocr_models(args.video)
print("\n[Benchmark Results]")
print(json.dumps(results, indent=2))
else:
process_video_ocr(
video_path=args.video,
output_path=args.output,
languages=args.languages,
device=args.device,
sample_interval=args.sample_interval,
confidence_threshold=args.confidence,
resume=not args.no_resume,
save_interval=args.save_interval,
)
if __name__ == "__main__":
main()