Files
momentry_core/scripts/yolo_processor_mps.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

407 lines
11 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
YOLO Processor - Apple MPS Optimized Version
Uses YOLOv8 via ultralytics with Apple Silicon MPS acceleration
Features:
- Automatic MPS/CPU fallback
- Metal GPU acceleration for inference
- Batch processing for efficiency
- Memory-optimized for unified memory architecture
"""
import sys
import json
import argparse
import os
import signal
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import torch
from ultralytics import YOLO
YOLO_NAMES = [
"person",
"bicycle",
"car",
"motorbike",
"aeroplane",
"bus",
"train",
"truck",
"boat",
"traffic light",
"fire hydrant",
"stop sign",
"parking meter",
"bench",
"bird",
"cat",
"dog",
"horse",
"sheep",
"cow",
"elephant",
"bear",
"zebra",
"giraffe",
"backpack",
"umbrella",
"handbag",
"tie",
"suitcase",
"frisbee",
"skis",
"snowboard",
"sports ball",
"kite",
"baseball bat",
"baseball glove",
"skateboard",
"surfboard",
"tennis racket",
"bottle",
"wine glass",
"cup",
"fork",
"knife",
"spoon",
"bowl",
"banana",
"apple",
"sandwich",
"orange",
"broccoli",
"carrot",
"hot dog",
"pizza",
"donut",
"cake",
"chair",
"sofa",
"pottedplant",
"bed",
"diningtable",
"toilet",
"tvmonitor",
"laptop",
"mouse",
"remote",
"keyboard",
"cell phone",
"microwave",
"oven",
"toaster",
"sink",
"refrigerator",
"book",
"clock",
"vase",
"scissors",
"teddy bear",
"hair drier",
"toothbrush",
]
def get_device() -> str:
"""Determine the best available device for inference"""
if torch.backends.mps.is_available():
return "mps"
elif torch.cuda.is_available():
return "cuda"
else:
return "cpu"
def signal_handler(signum, frame):
"""Handle interrupt signals gracefully"""
print(f"\n[YOLO] Received signal {signum}, saving results and exiting...")
sys.exit(0)
def process_video_yolo(
video_path: str,
output_path: str,
model_name: str = "yolov8n",
confidence: float = 0.25,
iou_threshold: float = 0.45,
device: str = "auto",
batch_size: int = 8,
skip_frames: int = 1,
resume: bool = True,
save_interval: int = 30,
) -> Dict:
"""
Process video for YOLO object detection with MPS acceleration
Args:
video_path: Path to input video file
output_path: Path to output JSON file
model_name: YOLO model name (yolov8n, yolov8s, yolov8m, yolov8l, yolov8x)
confidence: Confidence threshold for detections
iou_threshold: IoU threshold for NMS
device: Device to use ('auto', 'mps', 'cuda', 'cpu')
batch_size: Number of frames to process in parallel
skip_frames: Process every N frames (1 = all frames)
resume: Whether to resume from existing results
save_interval: Save results every N seconds
Returns:
Dictionary with detection results and metadata
"""
# Set up signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Determine device
if device == "auto":
device = get_device()
print(f"[YOLO] Starting YOLO processing with device: {device}")
print(f"[YOLO] Model: {model_name}, Confidence: {confidence}, IoU: {iou_threshold}")
# Load model
print(f"[YOLO] Loading model: {model_name}")
model = YOLO(f"{model_name}.pt")
# Move to device
if device in ["mps", "cuda"]:
model.to(device)
# Load existing data if resuming
existing_data = None
last_processed_frame = 0
if resume and os.path.exists(output_path):
try:
with open(output_path, "r") as f:
existing_data = json.load(f)
frames = existing_data.get("frames", {})
if frames:
last_processed_frame = max(int(k) for k in frames.keys())
print(f"[YOLO] Resuming from frame {last_processed_frame}")
except (json.JSONDecodeError, KeyError):
pass
# Initialize result structure
result = {
"video_path": video_path,
"model": model_name,
"device": device,
"confidence_threshold": confidence,
"iou_threshold": iou_threshold,
"processed_at": datetime.now().isoformat(),
"frames": {},
}
if existing_data:
result["frames"] = existing_data.get("frames", {})
# Process video
print(f"[YOLO] Processing video: {video_path}")
start_time = time.time()
frame_count = 0
detection_count = 0
last_save_time = start_time
try:
# Use stream mode for memory efficiency
results = model(
video_path,
conf=confidence,
iou=iou_threshold,
device=device,
stream=True,
imgsz=640, # Smaller size for faster processing
verbose=False,
)
for idx, r in enumerate(results):
# Skip frames based on skip_frames setting
if idx % skip_frames != 0:
continue
# Get frame detections
boxes = r.boxes
if boxes is not None and len(boxes) > 0:
frame_detections = []
for box in boxes:
xyxy = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0].cpu())
cls = int(box.cls[0].cpu())
detection = {
"x": int(xyxy[0]),
"y": int(xyxy[1]),
"width": int(xyxy[2] - xyxy[0]),
"height": int(xyxy[3] - xyxy[1]),
"confidence": round(conf, 4),
"class": YOLO_NAMES[cls]
if cls < len(YOLO_NAMES)
else f"class_{cls}",
"class_id": cls,
}
frame_detections.append(detection)
detection_count += 1
result["frames"][str(idx)] = {
"timestamp": r.boxes.data[0].cpu().numpy()[4]
if len(r.boxes.data) > 0
else idx / 30.0,
"detections": frame_detections,
}
frame_count += 1
# Progress reporting
if frame_count % 100 == 0:
elapsed = time.time() - start_time
fps = frame_count / elapsed if elapsed > 0 else 0
print(
f"[YOLO] Processed {frame_count} frames, {detection_count} detections, {fps:.1f} FPS"
)
# Periodic save
if save_interval > 0 and time.time() - last_save_time > save_interval:
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
last_save_time = time.time()
print(f"[YOLO] Auto-saved at frame {frame_count}")
except Exception as e:
print(f"[YOLO] Error during processing: {e}")
raise
# Final save
elapsed_time = time.time() - start_time
avg_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
result["summary"] = {
"total_frames": frame_count,
"total_detections": detection_count,
"processing_time": round(elapsed_time, 2),
"average_fps": round(avg_fps, 2),
"device": device,
}
# Save final results
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
print(
f"[YOLO] Completed: {frame_count} frames, {detection_count} detections in {elapsed_time:.1f}s ({avg_fps:.1f} FPS)"
)
print(f"[YOLO] Results saved to: {output_path}")
return result
def benchmark_models(video_path: str, num_frames: int = 100) -> Dict:
"""Benchmark different YOLO models and devices"""
devices = ["cpu"]
if torch.backends.mps.is_available():
devices.append("mps")
if torch.cuda.is_available():
devices.append("cuda")
models = ["yolov8n", "yolov8s", "yolov8m"]
results = {}
for model_name in models:
for device in devices:
print(f"[YOLO] Benchmarking {model_name} on {device}...")
model = YOLO(f"{model_name}.pt")
if device != "cpu":
model.to(device)
start_time = time.time()
count = 0
try:
for idx, r in enumerate(
model(video_path, device=device, stream=True, imgsz=320)
):
if idx >= num_frames:
break
count += 1
except Exception as e:
print(f"[YOLO] Error: {e}")
continue
elapsed = time.time() - start_time
fps = count / elapsed if elapsed > 0 else 0
key = f"{model_name}_{device}"
results[key] = {
"frames": count,
"time": round(elapsed, 2),
"fps": round(fps, 2),
}
return results
def main():
parser = argparse.ArgumentParser(description="YOLO Processor with MPS Support")
parser.add_argument("--video", required=True, help="Input video path")
parser.add_argument("--output", required=True, help="Output JSON path")
parser.add_argument(
"--model", default="yolov8n", help="YOLO model (yolov8n/s/m/l/x)"
)
parser.add_argument(
"--confidence", type=float, default=0.25, help="Confidence threshold"
)
parser.add_argument("--iou", type=float, default=0.45, help="IoU threshold for NMS")
parser.add_argument(
"--device",
default="auto",
choices=["auto", "mps", "cuda", "cpu"],
help="Device to use",
)
parser.add_argument(
"--batch-size", type=int, default=8, help="Batch size for processing"
)
parser.add_argument(
"--skip-frames", type=int, default=1, help="Process every N frames"
)
parser.add_argument(
"--no-resume", action="store_true", help="Do not resume from existing results"
)
parser.add_argument(
"--save-interval", type=int, default=30, help="Auto-save interval in seconds"
)
parser.add_argument(
"--benchmark", action="store_true", help="Run benchmark instead of processing"
)
args = parser.parse_args()
if args.benchmark:
results = benchmark_models(args.video)
print("\n[Benchmark Results]")
print(json.dumps(results, indent=2))
else:
process_video_yolo(
video_path=args.video,
output_path=args.output,
model_name=args.model,
confidence=args.confidence,
iou_threshold=args.iou,
device=args.device,
batch_size=args.batch_size,
skip_frames=args.skip_frames,
resume=not args.no_resume,
save_interval=args.save_interval,
)
if __name__ == "__main__":
main()