#!/opt/homebrew/bin/python3.11 """ YOLO Processor - Object Detection with Resume Support Uses YOLOv8 via ultralytics (local model) Resume Feature (integrated from video_yolo_player): - Auto-detect existing results and resume from last frame - Auto-save at configurable intervals (default: 30 seconds) - Ctrl+C gracefully saves and exits """ import sys import json import argparse import os import signal import time from datetime import datetime from typing import Dict, Optional, Set sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher YOLO_NAMES = [ "person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush", ] # Global state for signal handling g_detection_data: Optional[Dict] = None g_output_file: Optional[str] = None g_auto_save_interval: int = 30 g_auto_save_frames: int = 300 # Save every N frames (in addition to time-based) def format_time(seconds: float) -> str: """Format seconds to HH:MM:SS""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{secs:02d}" def load_existing_data(output_file: str) -> tuple[Optional[Dict], int]: """Load existing detection data from file. Returns (data, last_processed_frame)""" if not os.path.exists(output_file): return None, 0 try: with open(output_file, "r", encoding="utf-8") as f: data = json.load(f) frames = data.get("frames", {}) if frames: last_frame = max(int(k) for k in frames.keys()) return data, last_frame except (json.JSONDecodeError, KeyError, ValueError) as e: print(f"Warning: Could not load existing file: {e}") return None, 0 def save_detection_data( output_file: str, detection_data: Dict, is_interrupted: bool = False, silent: bool = False, last_saved_frame: int = 0, ) -> tuple[bool, int]: """Save detection data to JSON file""" try: metadata = detection_data.get("metadata", {}) metadata["last_saved_at"] = datetime.now().isoformat() metadata["status"] = "interrupted" if is_interrupted else "in_progress" metadata["last_saved_frame"] = last_saved_frame detection_data["metadata"] = metadata with open(output_file, "w", encoding="utf-8") as f: json.dump(detection_data, f, indent=2, ensure_ascii=False) if not silent: return True, os.path.getsize(output_file) return True, 0 except Exception as e: print(f"Error saving data: {e}") return False, 0 def signal_handler(signum, frame): """Handle Ctrl+C to pause and save progress""" global g_detection_data, g_output_file print(f"\n\n{'=' * 60}") print("PAUSE - Saving progress...") print(f"{'=' * 60}") if g_detection_data and g_output_file: success, _ = save_detection_data( g_output_file, g_detection_data, is_interrupted=True ) if success: print(f"Progress saved to: {g_output_file}") print("Run the same command again to resume") print(f"{'=' * 60}\n") sys.exit(0) def get_detections_list(result, model) -> list: """Extract detection info as list of dicts""" detections = [] if result.boxes is None: return detections boxes = result.boxes.xyxy.cpu().numpy() confidences = result.boxes.conf.cpu().numpy() class_ids = result.boxes.cls.cpu().numpy().astype(int) for box, conf, class_id in zip(boxes, confidences, class_ids): x1, y1, x2, y2 = box class_name = YOLO_NAMES[class_id] if class_id < len(YOLO_NAMES) else "unknown" detections.append( { "class_id": int(class_id), "class_name": class_name, "confidence": float(conf), "x1": float(x1), "y1": float(y1), "x2": float(x2), "y2": float(y2), "width": int(x2 - x1), "height": int(y2 - y1), } ) return detections def process_yolo( video_path: str, output_path: str, uuid: str = "", auto_save_interval: int = 30, force_restart: bool = False, auto_save_frames: int = 300, ): """Process video for object detection using YOLOv8 with resume support""" global g_detection_data, g_output_file, g_auto_save_interval, g_auto_save_frames g_auto_save_interval = auto_save_interval g_auto_save_frames = auto_save_frames publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("yolo", "YOLO_START") # Set up signal handler for graceful pause signal.signal(signal.SIGINT, signal_handler) # Check for existing results (resume support) existing_data, last_processed_frame = load_existing_data(output_path) resume_mode = ( existing_data is not None and last_processed_frame > 0 and not force_restart ) if resume_mode: print(f"\nFound existing data: {output_path}") print(f"Last processed frame: {last_processed_frame}") print(f"Will resume from frame {last_processed_frame + 1}") try: from ultralytics import YOLO except ImportError: if publisher: publisher.error("yolo", "ultralytics not installed") result = { "metadata": {"status": "error", "error": "ultralytics not installed"}, "frames": {}, } with open(output_path, "w") as f: json.dump(result, f, indent=2) if publisher: publisher.complete("yolo", "0 frames") return result if publisher: publisher.info("yolo", "YOLO_LOADING_MODEL") # Load YOLO model (prefer CoreML for ANE acceleration, fallback to PyTorch) model_path_mlpackage = os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "yolov8s.mlpackage" ) model_path_pt = os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "yolov8s.pt" ) if os.path.exists(model_path_mlpackage): model = YOLO(model_path_mlpackage) print("YOLO: CoreML model loaded (YOLOv8s, ANE accelerated)") elif os.path.exists(model_path_pt): model = YOLO(model_path_pt) print("YOLO: PyTorch model loaded (YOLOv8s)") else: model = YOLO("yolov8s.pt") # will auto-download # Get video info import cv2 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Cannot open video: {video_path}") return {"metadata": {"status": "error"}, "frames": {}} fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) total_duration = total_frames / fps if fps > 0 else 0 cap.release() if publisher: publisher.info("yolo", f"fps={fps}, total={total_frames}") publisher.progress("yolo", 0, total_frames, "Starting") # Initialize or load detection data if resume_mode and existing_data: detection_data = existing_data frame_count = last_processed_frame processed_frames: Set[int] = set( int(k) for k in existing_data.get("frames", {}).keys() ) # Seek to resume position cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count) else: # Initialize new detection data detection_data = { "metadata": { "video_path": os.path.abspath(video_path), "fps": fps, "width": width, "height": height, "total_frames": total_frames, "total_duration": total_duration, "processed_at": datetime.now().isoformat(), "auto_save_interval": auto_save_interval, "auto_save_frames": auto_save_frames, "status": "in_progress", "last_saved_at": datetime.now().isoformat(), "last_saved_frame": 0, }, "frames": {}, } frame_count = 0 processed_frames = set() cap = cv2.VideoCapture(video_path) # Set global for signal handler g_detection_data = detection_data g_output_file = output_path start_time = time.time() last_save_time = start_time last_save_frame_count = frame_count # Track which frame we last saved at auto_save_count = 0 print(f"\nProcessing video: {total_frames} frames @ {fps:.2f} fps") print( f"Auto-save every {auto_save_interval}s or {auto_save_frames} frames (whichever comes first)" ) print(f"Resume from frame {frame_count + 1 if resume_mode else 1}") print() # Process frames while True: ret, frame = cap.read() if not ret: break frame_count += 1 current_time = (frame_count - 1) / fps if fps > 0 else 0 # Skip already processed frames in resume mode if frame_count in processed_frames: continue # Run YOLO detection results = model(frame, verbose=False) detections = get_detections_list(results[0], model) # Store detection data detection_data["frames"][str(frame_count)] = { "frame_number": frame_count, "time_seconds": round(current_time, 3), "time_formatted": format_time(current_time), "detections": detections, } processed_frames.add(frame_count) # Progress indicator every 500 frames if frame_count % 500 == 0: elapsed = time.time() - start_time progress = (frame_count / total_frames) * 100 eta = ( (elapsed / frame_count) * (total_frames - frame_count) if frame_count > 0 else 0 ) print( f" Progress: {frame_count}/{total_frames} ({progress:.1f}%) - " f"ETA: {eta:.0f}s - {len(detections)} objects" ) if publisher: publisher.progress( "yolo", frame_count, total_frames, f"frame {frame_count}" ) # Auto-save check (time-based OR frame-based) current_time_val = time.time() time_elapsed = current_time_val - last_save_time >= auto_save_interval frames_since_save = frame_count - last_save_frame_count >= auto_save_frames if time_elapsed or frames_since_save: success, file_size = save_detection_data( output_path, detection_data, is_interrupted=False, silent=True, last_saved_frame=frame_count, ) if success: auto_save_count += 1 reason = "time" if time_elapsed else "frames" print( f" Auto-saved (#{auto_save_count}, {reason}): frame {last_save_frame_count}-{frame_count}" ) last_save_time = current_time_val last_save_frame_count = frame_count cap.release() processing_time = time.time() - start_time # Update final metadata total_detections = sum( len(f.get("detections", [])) for f in detection_data.get("frames", {}).values() ) detection_data["metadata"]["status"] = "completed" detection_data["metadata"]["completed_at"] = datetime.now().isoformat() detection_data["metadata"]["processing_time"] = processing_time detection_data["metadata"]["total_detections"] = total_detections detection_data["metadata"]["auto_save_count"] = auto_save_count # Save final data save_detection_data(output_path, detection_data, is_interrupted=False) # Print summary print(f"\n{'=' * 60}") print("YOLO Detection complete!") print(f" Total frames processed: {frame_count}") print(f" Frames with detections: {len(detection_data['frames'])}") print(f" Total objects detected: {total_detections}") print(f" Processing time: {processing_time:.1f}s") print(f" Auto-saves: {auto_save_count}") print(f" Output: {output_path}") print(f"{'=' * 60}") if publisher: publisher.complete( "yolo", f"{len(detection_data['frames'])} frames with objects" ) return detection_data if __name__ == "__main__": parser = argparse.ArgumentParser( description="YOLO Object Detection with Resume Support" ) parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--auto-save", type=int, default=30, help="Auto-save interval in seconds (default: 30)", ) parser.add_argument( "--auto-save-frames", type=int, default=300, help="Auto-save after N frames (default: 300)", ) parser.add_argument( "--force", action="store_true", help="Force restart from beginning (ignore existing data)", ) args = parser.parse_args() process_yolo( args.video_path, args.output_path, args.uuid, args.auto_save, args.force, args.auto_save_frames, )