#!/opt/homebrew/bin/python3.11 """ OCR Processor - Apple MPS Optimized Version Uses EasyOCR with Apple Silicon MPS acceleration Falls back to CPU if MPS not available Features: - EasyOCR with MPS GPU support - Apple MPS acceleration for image processing - Memory-optimized for unified memory architecture - Vision Framework fallback for future expansion """ import sys import json import argparse import os import signal import time from datetime import datetime from typing import Dict, List import cv2 import numpy as np import torch # Check for MPS availability def get_device() -> str: """Determine the best available device for processing""" if torch.backends.mps.is_available(): return "mps" elif torch.cuda.is_available(): return "cuda" else: return "cpu" def signal_handler(signum, frame): """Handle interrupt signals gracefully""" print(f"\n[OCR] Received signal {signum}, saving results and exiting...") sys.exit(0) def process_video_ocr( video_path: str, output_path: str, languages: List[str] = ["en"], device: str = "auto", sample_interval: int = 30, confidence_threshold: float = 0.5, resume: bool = True, save_interval: int = 30, ) -> Dict: """ Process video for OCR with MPS acceleration Args: video_path: Path to input video file output_path: Path to output JSON file languages: List of languages to recognize device: Device to use ('auto', 'mps', 'cuda', 'cpu') sample_interval: Process every N frames confidence_threshold: Minimum confidence threshold resume: Whether to resume from existing results save_interval: Auto-save interval in seconds Returns: Dictionary with OCR results and metadata """ # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Determine device if device == "auto": device = get_device() print(f"[OCR] Starting OCR processing with device: {device}") print(f"[OCR] Languages: {languages}, Confidence: {confidence_threshold}") try: import easyocr except ImportError: print("[OCR] Error: easyocr not installed") result = {"frame_count": 0, "fps": 0.0, "frames": []} with open(output_path, "w") as f: json.dump(result, f, indent=2) return result # Load EasyOCR reader with GPU setting based on device use_gpu = device in ["cuda", "mps"] print(f"[OCR] Loading EasyOCR with GPU: {use_gpu}") reader = easyocr.Reader(languages, gpu=use_gpu, verbose=False) # Get video info cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() print(f"[OCR] Video: {width}x{height} @ {fps:.2f} FPS, {total_frames} frames") # Load existing data if resuming existing_data = None last_processed_frame = 0 if resume and os.path.exists(output_path): try: with open(output_path, "r") as f: existing_data = json.load(f) frames = existing_data.get("frames", {}) if frames: last_processed_frame = max(int(k) for k in frames.keys()) print(f"[OCR] Resuming from frame {last_processed_frame}") except (json.JSONDecodeError, KeyError): pass # Initialize result structure result = { "video_path": video_path, "languages": languages, "device": device, "confidence_threshold": confidence_threshold, "processed_at": datetime.now().isoformat(), "frames": {}, } if existing_data: result["frames"] = existing_data.get("frames", {}) # Process video print(f"[OCR] Processing video: {video_path}") start_time = time.time() frame_count = 0 text_count = 0 last_save_time = start_time cap = cv2.VideoCapture(video_path) try: while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Sample frames if frame_count % sample_interval != 0: continue # Skip already processed frames if frame_count <= last_processed_frame: continue timestamp = (frame_count - 1) / fps if fps > 0 else 0 # Convert BGR to RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Run OCR try: detections = reader.readtext( frame_rgb, text_threshold=confidence_threshold, low_text=0.3, link_threshold=0.3, ) except Exception as e: print(f"[OCR] Error at frame {frame_count}: {e}") detections = [] # Process detections frame_texts = [] for detection in detections: bbox, text, confidence = detection if float(confidence) >= confidence_threshold: # Extract bounding box coordinates bbox_points = np.array(bbox).astype(int) x_coords = bbox_points[:, 0] y_coords = bbox_points[:, 1] x = int(np.min(x_coords)) y = int(np.min(y_coords)) width = int(np.max(x_coords) - x) height = int(np.max(y_coords) - y) frame_texts.append( { "x": x, "y": y, "width": width, "height": height, "text": text, "confidence": float(confidence), "rotation": 0, # No rotation info from easyocr } ) if frame_texts: result["frames"][str(frame_count)] = { "timestamp": timestamp, "texts": frame_texts, } text_count += len(frame_texts) # Progress reporting if frame_count % 100 == 0: elapsed = time.time() - start_time fps_rate = frame_count / elapsed if elapsed > 0 else 0 print( f"[OCR] Processed {frame_count} frames, {text_count} text regions, {fps_rate:.1f} FPS" ) # Periodic save if save_interval > 0 and time.time() - last_save_time > save_interval: with open(output_path, "w") as f: json.dump(result, f, indent=2) last_save_time = time.time() print(f"[OCR] Auto-saved at frame {frame_count}") except Exception as e: print(f"[OCR] Error during processing: {e}") raise finally: cap.release() # Final save elapsed_time = time.time() - start_time avg_fps = frame_count / elapsed_time if elapsed_time > 0 else 0 result["summary"] = { "total_frames": frame_count, "total_texts": text_count, "processing_time": round(elapsed_time, 2), "average_fps": round(avg_fps, 2), "device": device, } # Save final results with open(output_path, "w") as f: json.dump(result, f, indent=2) print( f"[OCR] Completed: {frame_count} frames, {text_count} text regions in {elapsed_time:.1f}s ({avg_fps:.1f} FPS)" ) print(f"[OCR] Results saved to: {output_path}") return result def benchmark_ocr_models(video_path: str, num_frames: int = 50) -> Dict: """Benchmark OCR processing on different devices""" devices = ["cpu"] if torch.backends.mps.is_available(): devices.append("mps") if torch.cuda.is_available(): devices.append("cuda") languages = ["en"] results = {} for device in devices: print(f"[OCR] Benchmarking OCR on {device}...") start_time = time.time() count = 0 try: import easyocr reader = easyocr.Reader( languages, gpu=device in ["cuda", "mps"], verbose=False ) cap = cv2.VideoCapture(video_path) for idx in range(num_frames): ret, frame = cap.read() if not ret: break frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) detections = reader.readtext( frame_rgb, text_threshold=0.5, low_text=0.3, link_threshold=0.3 ) count += len(detections) cap.release() except Exception as e: print(f"[OCR] Error: {e}") continue elapsed = time.time() - start_time fps = count / elapsed if elapsed > 0 else 0 key = f"ocr_{device}" results[key] = { "detections": count, "time": round(elapsed, 2), "fps": round(fps, 2), } return results def main(): parser = argparse.ArgumentParser(description="OCR Processor with MPS Support") parser.add_argument("--video", required=True, help="Input video path") parser.add_argument("--output", required=True, help="Output JSON path") parser.add_argument( "--languages", nargs="+", default=["en"], help="Languages to recognize" ) parser.add_argument( "--device", default="auto", choices=["auto", "mps", "cuda", "cpu"], help="Device to use", ) parser.add_argument( "--sample-interval", type=int, default=30, help="Process every N frames" ) parser.add_argument( "--confidence", type=float, default=0.5, help="Confidence threshold" ) parser.add_argument( "--no-resume", action="store_true", help="Do not resume from existing results" ) parser.add_argument( "--save-interval", type=int, default=30, help="Auto-save interval in seconds" ) parser.add_argument( "--benchmark", action="store_true", help="Run benchmark instead of processing" ) args = parser.parse_args() if args.benchmark: results = benchmark_ocr_models(args.video) print("\n[Benchmark Results]") print(json.dumps(results, indent=2)) else: process_video_ocr( video_path=args.video, output_path=args.output, languages=args.languages, device=args.device, sample_interval=args.sample_interval, confidence_threshold=args.confidence, resume=not args.no_resume, save_interval=args.save_interval, ) if __name__ == "__main__": main()