#!/opt/homebrew/bin/python3.11 """ Face Processor - Apple MPS Optimized Version Uses MediaPipe with Metal GPU acceleration for face detection Falls back to OpenCV Haar Cascade if MediaPipe not available Features: - MediaPipe Face Detection with Metal GPU acceleration - OpenCV Haar Cascade fallback - Apple MPS support for image processing - Memory-optimized for unified memory architecture """ import sys import json import argparse import os import signal import time from datetime import datetime from typing import Dict, List import cv2 import numpy as np import torch MEDIAPIPE_AVAILABLE = False try: import mediapipe as mp from mediapipe.tasks import python from mediapipe.tasks.python import vision MEDIAPIPE_AVAILABLE = True except ImportError: print("[Face] MediaPipe not available, will use OpenCV fallback") # MediaPipe face detection solution class MediaPipeFaceDetector: """MediaPipe Face Detection with GPU support""" def __init__(self, device: str = "auto", min_confidence: float = 0.5): self.device = device self.min_confidence = min_confidence if not MEDIAPIPE_AVAILABLE: raise RuntimeError("MediaPipe not available") # Download model if needed model_path = self._download_model() # Configure for GPU acceleration on Apple Silicon base_options = python.BaseOptions(model_asset_path=model_path) # Try to enable GPU acceleration running_mode = vision.RunningMode.IMAGE # ✅ Fixed: Use correct parameter names for MediaPipe v0.10.33 options = vision.FaceDetectorOptions( base_options=base_options, running_mode=running_mode, min_detection_confidence=min_confidence, # ✅ Correct name min_suppression_threshold=0.3, # ✅ Correct name ) self.detector = vision.FaceDetector.create_from_options(options) # Enable MPS for image preprocessing if available self.use_mps = device == "mps" or ( device == "auto" and torch.backends.mps.is_available() ) print(f"[Face] MediaPipe initialized with MPS: {self.use_mps}") def _download_model(self) -> str: """Download MediaPipe face detection model if needed""" import urllib.request model_name = "blaze_face_short_range.tflite" model_dir = os.path.expanduser("~/.mediapipe/models") model_path = os.path.join(model_dir, model_name) if not os.path.exists(model_path): print(f"[Face] Downloading MediaPipe model: {model_name}") os.makedirs(model_dir, exist_ok=True) # MediaPipe official model URL (correct path) model_urls = [ "https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float16/1/blaze_face_short_range.tflite", "https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float32/1/blaze_face_short_range.tflite", ] for model_url in model_urls: try: print(f"[Face] Trying URL: {model_url}") urllib.request.urlretrieve(model_url, model_path) print(f"[Face] Model downloaded to: {model_path}") return model_path except Exception as e: print(f"[Face] Failed: {e}") continue # All URLs failed, check if model exists in package mp_dir = os.path.dirname(mp.__file__) alt_path = os.path.join(mp_dir, "models", model_name) if os.path.exists(alt_path): print(f"[Face] Using fallback model: {alt_path}") return alt_path raise RuntimeError("Could not download MediaPipe model from any source") return model_path def detect(self, frame: np.ndarray) -> List[Dict]: """Detect faces in a frame""" # Convert frame to MediaPipe Image frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb) # Run detection detection_result = self.detector.detect(mp_image) # Convert results faces = [] height, width = frame.shape[:2] for detection in detection_result.detections: bbox = detection.bounding_box origin_x = bbox.origin_x origin_y = bbox.origin_y w = bbox.width h = bbox.height # Calculate confidence categories = detection.categories score = categories[0].score if categories else 0.5 faces.append( { "x": int(origin_x), "y": int(origin_y), "width": int(w), "height": int(h), "confidence": float(score), } ) return faces # OpenCV Haar Cascade fallback class OpenCVFaceDetector: """OpenCV Haar Cascade Face Detection""" def __init__(self, min_confidence: float = 0.5): self.min_confidence = min_confidence # Load Haar Cascade cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml" self.face_cascade = cv2.CascadeClassifier(cascade_path) if self.face_cascade.empty(): raise RuntimeError("Failed to load Haar Cascade") print("[Face] OpenCV Haar Cascade initialized") def detect(self, frame: np.ndarray) -> List[Dict]: """Detect faces using Haar Cascade""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.equalizeHist(gray) # Detect faces faces = self.face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), ) results = [] for x, y, w, h in faces: results.append( { "x": int(x), "y": int(y), "width": int(w), "height": int(h), "confidence": 0.7, # Haar Cascade doesn't provide confidence } ) return results def get_device() -> str: """Determine the best available device for processing""" if torch.backends.mps.is_available(): return "mps" elif torch.cuda.is_available(): return "cuda" else: return "cpu" def signal_handler(signum, frame): """Handle interrupt signals gracefully""" print(f"\n[Face] Received signal {signum}, saving results and exiting...") sys.exit(0) def process_video_face( video_path: str, output_path: str, use_mediapipe: bool = True, min_confidence: float = 0.5, device: str = "auto", sample_interval: int = 30, resume: bool = True, save_interval: int = 30, ) -> Dict: """ Process video for face detection with MPS acceleration Args: video_path: Path to input video file output_path: Path to output JSON file use_mediapipe: Whether to use MediaPipe (faster, more accurate) min_confidence: Minimum confidence threshold device: Device to use ('auto', 'mps', 'cuda', 'cpu') sample_interval: Process every N frames resume: Whether to resume from existing results save_interval: Auto-save interval in seconds Returns: Dictionary with face detection results and metadata """ # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Determine device if device == "auto": device = get_device() print(f"[Face] Starting face detection with device: {device}") print(f"[Face] Use MediaPipe: {use_mediapipe}, Confidence: {min_confidence}") # Initialize detector detector = None if use_mediapipe and MEDIAPIPE_AVAILABLE: try: detector = MediaPipeFaceDetector( device=device, min_confidence=min_confidence ) detector_name = "MediaPipe" except Exception as e: print(f"[Face] MediaPipe failed: {e}, falling back to OpenCV") detector = OpenCVFaceDetector(min_confidence=min_confidence) detector_name = "OpenCV" else: detector = OpenCVFaceDetector(min_confidence=min_confidence) detector_name = "OpenCV" print(f"[Face] Using detector: {detector_name}") # Get video info cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() print(f"[Face] Video: {width}x{height} @ {fps:.2f} FPS, {total_frames} frames") # Load existing data if resuming existing_data = None last_processed_frame = 0 if resume and os.path.exists(output_path): try: with open(output_path, "r") as f: existing_data = json.load(f) frames = existing_data.get("frames", {}) if frames: last_processed_frame = max(int(k) for k in frames.keys()) print(f"[Face] Resuming from frame {last_processed_frame}") except (json.JSONDecodeError, KeyError): pass # Initialize result structure result = { "video_path": video_path, "detector": detector_name, "device": device, "min_confidence": min_confidence, "processed_at": datetime.now().isoformat(), "frames": {}, } if existing_data: result["frames"] = existing_data.get("frames", {}) # Process video print(f"[Face] Processing video: {video_path}") start_time = time.time() frame_count = 0 detection_count = 0 last_save_time = start_time cap = cv2.VideoCapture(video_path) try: while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Sample frames if frame_count % sample_interval != 0: continue # Skip already processed frames if frame_count <= last_processed_frame: continue timestamp = (frame_count - 1) / fps if fps > 0 else 0 # Detect faces try: faces = detector.detect(frame) except Exception as e: print(f"[Face] Error at frame {frame_count}: {e}") faces = [] if faces: result["frames"][str(frame_count)] = { "timestamp": timestamp, "faces": faces, } detection_count += len(faces) # Progress reporting if frame_count % 100 == 0: elapsed = time.time() - start_time fps_rate = frame_count / elapsed if elapsed > 0 else 0 print( f"[Face] Processed {frame_count} frames, {detection_count} faces, {fps_rate:.1f} FPS" ) # Periodic save if save_interval > 0 and time.time() - last_save_time > save_interval: with open(output_path, "w") as f: json.dump(result, f, indent=2) last_save_time = time.time() print(f"[Face] Auto-saved at frame {frame_count}") except Exception as e: print(f"[Face] Error during processing: {e}") raise finally: cap.release() # Final save elapsed_time = time.time() - start_time avg_fps = frame_count / elapsed_time if elapsed_time > 0 else 0 result["summary"] = { "total_frames": frame_count, "total_detections": detection_count, "processing_time": round(elapsed_time, 2), "average_fps": round(avg_fps, 2), "detector": detector_name, "device": device, } # Save final results with open(output_path, "w") as f: json.dump(result, f, indent=2) print( f"[Face] Completed: {frame_count} frames, {detection_count} faces in {elapsed_time:.1f}s ({avg_fps:.1f} FPS)" ) print(f"[Face] Results saved to: {output_path}") return result def main(): parser = argparse.ArgumentParser(description="Face Processor with MPS Support") parser.add_argument("--video", required=True, help="Input video path") parser.add_argument("--output", required=True, help="Output JSON path") parser.add_argument( "--no-mediapipe", action="store_true", help="Use OpenCV instead of MediaPipe" ) parser.add_argument( "--confidence", type=float, default=0.5, help="Minimum confidence threshold" ) parser.add_argument( "--device", default="auto", choices=["auto", "mps", "cuda", "cpu"], help="Device to use", ) parser.add_argument( "--sample-interval", type=int, default=30, help="Process every N frames" ) parser.add_argument( "--no-resume", action="store_true", help="Do not resume from existing results" ) parser.add_argument( "--save-interval", type=int, default=30, help="Auto-save interval in seconds" ) args = parser.parse_args() process_video_face( video_path=args.video, output_path=args.output, use_mediapipe=not args.no_mediapipe, min_confidence=args.confidence, device=args.device, sample_interval=args.sample_interval, resume=not args.no_resume, save_interval=args.save_interval, ) if __name__ == "__main__": main()