#!/opt/homebrew/bin/python3.11 """ Pose Processor - Apple MPS Optimized Version Uses YOLOv8 Pose with Apple Silicon MPS acceleration Features: - Automatic MPS/CPU fallback - Metal GPU acceleration for inference - YOLOv8 Pose model support - Memory-optimized for unified memory architecture """ import sys import json import argparse import os import signal import time from datetime import datetime from typing import Dict import cv2 import torch from ultralytics import YOLO # COCO keypoint names (17 keypoints) KEYPOINT_NAMES = [ "nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle", ] # Keypoint connections for skeleton visualization KEYPOINT_CONNECTIONS = [ ("left_shoulder", "right_shoulder"), ("left_shoulder", "left_elbow"), ("left_elbow", "left_wrist"), ("right_shoulder", "right_elbow"), ("right_elbow", "right_wrist"), ("left_shoulder", "left_hip"), ("right_shoulder", "right_hip"), ("left_hip", "right_hip"), ("left_hip", "left_knee"), ("left_knee", "left_ankle"), ("right_hip", "right_knee"), ("right_knee", "right_ankle"), ] def get_device() -> str: """Determine the best available device for inference""" if torch.backends.mps.is_available(): return "mps" elif torch.cuda.is_available(): return "cuda" else: return "cpu" def signal_handler(signum, frame): """Handle interrupt signals gracefully""" print(f"\n[Pose] Received signal {signum}, saving results and exiting...") sys.exit(0) def process_video_pose( video_path: str, output_path: str, model_name: str = "yolov8n-pose", confidence: float = 0.5, device: str = "auto", sample_interval: int = 30, resume: bool = True, save_interval: int = 30, ) -> Dict: """ Process video for pose estimation with MPS acceleration Args: video_path: Path to input video file output_path: Path to output JSON file model_name: YOLO Pose model name (yolov8n-pose/s/m/l/x) confidence: Confidence threshold for keypoints device: Device to use ('auto', 'mps', 'cuda', 'cpu') sample_interval: Process every N frames resume: Whether to resume from existing results save_interval: Auto-save interval in seconds Returns: Dictionary with pose estimation results and metadata """ # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Determine device if device == "auto": device = get_device() print(f"[Pose] Starting pose estimation with device: {device}") print(f"[Pose] Model: {model_name}, Confidence: {confidence}") # Load model print(f"[Pose] Loading model: {model_name}") model = YOLO(f"{model_name}.pt") # Move to device if device in ["mps", "cuda"]: model.to(device) # Get video info cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() print(f"[Pose] Video: {width}x{height} @ {fps:.2f} FPS, {total_frames} frames") # Load existing data if resuming existing_data = None last_processed_frame = 0 if resume and os.path.exists(output_path): try: with open(output_path, "r") as f: existing_data = json.load(f) frames = existing_data.get("frames", {}) if frames: last_processed_frame = max(int(k) for k in frames.keys()) print(f"[Pose] Resuming from frame {last_processed_frame}") except (json.JSONDecodeError, KeyError): pass # Initialize result structure result = { "video_path": video_path, "model": model_name, "device": device, "confidence_threshold": confidence, "processed_at": datetime.now().isoformat(), "keypoint_names": KEYPOINT_NAMES, "connections": KEYPOINT_CONNECTIONS, "frames": {}, } if existing_data: result["frames"] = existing_data.get("frames", {}) # Process video print(f"[Pose] Processing video: {video_path}") start_time = time.time() frame_count = 0 pose_count = 0 last_save_time = start_time try: # Use stream mode for memory efficiency results = model( video_path, conf=confidence, device=device, stream=True, imgsz=640, pose=True, verbose=False, ) for idx, r in enumerate(results): # Skip frames based on sample_interval if idx % sample_interval != 0: continue # Get pose results keypoints = r.keypoints if keypoints is not None and len(keypoints) > 0: # Get keypoint data kp_data = keypoints.data.cpu().numpy() frame_poses = [] for person_idx in range(len(keypoints)): person_keypoints = [] for kp_idx in range(min(17, len(kp_data[person_idx]))): kp = kp_data[person_idx][kp_idx] # Keypoint: [x, y, confidence] if len(kp) >= 3 and kp[2] > confidence: person_keypoints.append( { "name": KEYPOINT_NAMES[kp_idx] if kp_idx < len(KEYPOINT_NAMES) else f"kp_{kp_idx}", "x": float(kp[0]), "y": float(kp[1]), "confidence": float(kp[2]), } ) if person_keypoints: frame_poses.append( { "keypoints": person_keypoints, "person_id": person_idx, } ) pose_count += 1 if frame_poses: result["frames"][str(idx)] = { "timestamp": idx / fps if fps > 0 else 0, "poses": frame_poses, } frame_count += 1 # Progress reporting if frame_count % 100 == 0: elapsed = time.time() - start_time fps_rate = frame_count / elapsed if elapsed > 0 else 0 print( f"[Pose] Processed {frame_count} frames, {pose_count} poses, {fps_rate:.1f} FPS" ) # Periodic save if save_interval > 0 and time.time() - last_save_time > save_interval: with open(output_path, "w") as f: json.dump(result, f, indent=2) last_save_time = time.time() print(f"[Pose] Auto-saved at frame {frame_count}") except Exception as e: print(f"[Pose] Error during processing: {e}") raise # Final save elapsed_time = time.time() - start_time avg_fps = frame_count / elapsed_time if elapsed_time > 0 else 0 result["summary"] = { "total_frames": frame_count, "total_poses": pose_count, "processing_time": round(elapsed_time, 2), "average_fps": round(avg_fps, 2), "model": model_name, "device": device, } # Save final results with open(output_path, "w") as f: json.dump(result, f, indent=2) print( f"[Pose] Completed: {frame_count} frames, {pose_count} poses in {elapsed_time:.1f}s ({avg_fps:.1f} FPS)" ) print(f"[Pose] Results saved to: {output_path}") return result def benchmark_pose_models(video_path: str, num_frames: int = 100) -> Dict: """Benchmark different YOLO Pose models and devices""" devices = ["cpu"] if torch.backends.mps.is_available(): devices.append("mps") if torch.cuda.is_available(): devices.append("cuda") models = ["yolov8n-pose", "yolov8s-pose"] results = {} for model_name in models: for device in devices: print(f"[Pose] Benchmarking {model_name} on {device}...") model = YOLO(f"{model_name}.pt") if device != "cpu": model.to(device) start_time = time.time() count = 0 try: for idx, r in enumerate( model(video_path, device=device, stream=True, imgsz=320, pose=True) ): if idx >= num_frames: break count += 1 except Exception as e: print(f"[Pose] Error: {e}") continue elapsed = time.time() - start_time fps = count / elapsed if elapsed > 0 else 0 key = f"{model_name}_{device}" results[key] = { "frames": count, "time": round(elapsed, 2), "fps": round(fps, 2), } return results def main(): parser = argparse.ArgumentParser(description="Pose Processor with MPS Support") parser.add_argument("--video", required=True, help="Input video path") parser.add_argument("--output", required=True, help="Output JSON path") parser.add_argument( "--model", default="yolov8n-pose", help="YOLO Pose model (yolov8n-pose/s/m/l/x)" ) parser.add_argument( "--confidence", type=float, default=0.5, help="Confidence threshold" ) parser.add_argument( "--device", default="auto", choices=["auto", "mps", "cuda", "cpu"], help="Device to use", ) parser.add_argument( "--sample-interval", type=int, default=30, help="Process every N frames" ) parser.add_argument( "--no-resume", action="store_true", help="Do not resume from existing results" ) parser.add_argument( "--save-interval", type=int, default=30, help="Auto-save interval in seconds" ) parser.add_argument( "--benchmark", action="store_true", help="Run benchmark instead of processing" ) args = parser.parse_args() if args.benchmark: results = benchmark_pose_models(args.video) print("\n[Benchmark Results]") print(json.dumps(results, indent=2)) else: process_video_pose( video_path=args.video, output_path=args.output, model_name=args.model, confidence=args.confidence, device=args.device, sample_interval=args.sample_interval, resume=not args.no_resume, save_interval=args.save_interval, ) if __name__ == "__main__": main()