#!/opt/homebrew/bin/python3.11 """ Pose Processor Wrapper Calls Swift Vision Framework pose (swift_pose) with fallback to YOLOv8 Pose. Uses VNDetectHumanBodyPoseRequest with ANE acceleration. """ import re import sys import json import os import subprocess import argparse sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher SWIFT_POSE_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "swift_processors/.build/debug/swift_pose" ) SWIFT_POSE_ALT = os.path.join( os.path.dirname(os.path.abspath(__file__)), "swift_processors/.build/arm64-apple-macosx/debug/swift_pose" ) SWIFT_POSE_PROGRESS_RE = re.compile(r"\[SwiftPose\] Progress:\s*(\d+)%") def process_pose( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 3, # Changed from 30 to match Face publisher: RedisPublisher = None, target_frames: list = None, ) -> dict: # Check if pose.json or pose.json.tmp already exists (from swift_face_pose) # executor.rs renames output to .json.tmp before running Python script tmp_path = output_path.replace('.json', '.json.tmp') source_path = None if os.path.exists(output_path): source_path = output_path print(f"[Pose] Output exists from swift_face_pose: {output_path}", file=sys.stderr) elif os.path.exists(tmp_path): source_path = tmp_path print(f"[Pose] Temp output exists from swift_face_pose: {tmp_path}", file=sys.stderr) if source_path: with open(source_path) as f: data = json.load(f) detected_frames = len(data.get('frames', [])) print(f"[Pose] Loaded {detected_frames} detected frames", file=sys.stderr) # When target_frames is provided (8Hz sampling), skip interpolation # Swift already outputs at sample_interval=3, matching 8Hz for 24fps if target_frames is not None: print(f"[Pose] 8Hz mode: returning {detected_frames} frames without interpolation", file=sys.stderr) if publisher: publisher.progress("pose", 100, 100, f"{detected_frames} frames (8Hz, no interpolation)") return data # Interpolate keypoints for all frames interpolated_data = interpolate_pose(data, video_path) # Write interpolated output with open(output_path, 'w') as f: json.dump(interpolated_data, f) # Delete .json.tmp file so executor.rs won't restore it if os.path.exists(tmp_path): os.remove(tmp_path) print(f"[Pose] Deleted temp file: {tmp_path}", file=sys.stderr) total_frames = len(interpolated_data.get('frames', [])) print(f"[Pose] Interpolated to {total_frames} frames", file=sys.stderr) if publisher: publisher.progress("pose", 100, 100, f"Interpolated {total_frames} frames") return interpolated_data swift_bin = SWIFT_POSE_PATH if not os.path.exists(swift_bin): swift_bin = SWIFT_POSE_ALT if not os.path.exists(swift_bin): print("[Pose] Swift binary not found, using YOLOv8 fallback", file=sys.stderr) if publisher: publisher.error("pose", "Swift binary not found, using fallback") return _fallback(video_path, output_path, uuid, sample_interval) cmd = [swift_bin, video_path, output_path, "--sample-interval", str(sample_interval), "--uuid", uuid] print(f"[Pose] Running Swift Pose (Vision Framework)", file=sys.stderr) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) last_pct = -1 for line in proc.stdout: line = line.strip() m = SWIFT_POSE_PROGRESS_RE.search(line) if m: pct = int(m.group(1)) if pct > last_pct: last_pct = pct print(f"[Pose] Progress: {pct}%", file=sys.stderr) if publisher: publisher.progress("pose", pct, 100, f"{pct}%") elif line: print(f" {line}", file=sys.stderr) stderr_output = proc.stderr.read() if stderr_output: print(stderr_output.strip(), file=sys.stderr) proc.wait() if proc.returncode != 0 or not os.path.exists(output_path): print(f"[Pose] Swift Pose failed (exit={proc.returncode}), falling back to YOLOv8", file=sys.stderr) if publisher: publisher.error("pose", f"Swift Pose failed, using fallback") return _fallback(video_path, output_path, uuid, sample_interval) with open(output_path) as f: return json.load(f) def interpolate_pose(detected_data: dict, video_path: str) -> dict: """Interpolate keypoints for all frames between detected frames""" import cv2 import numpy as np cap = cv2.VideoCapture(video_path) total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = detected_data.get('fps', 30.0) detected_frames = detected_data.get('frames', []) if not detected_frames: cap.release() return detected_data # Build frame index map frame_map = {f['frame']: f for f in detected_frames} detected_frame_nums = sorted(frame_map.keys()) print(f"[Pose] Interpolating from {len(detected_frame_nums)} detected frames to {total_video_frames} total frames", file=sys.stderr) # Get all persons from detected frames (assume same person tracking) all_persons = {} for f in detected_frames: for i, p in enumerate(f.get('persons', [])): if i not in all_persons: all_persons[i] = [] all_persons[i].append((f['frame'], p)) # Interpolate each person's keypoints for each frame interpolated_frames = [] for frame_num in range(total_video_frames): ts = frame_num / fps persons_in_frame = [] for person_id, person_frames in all_persons.items(): # Find closest detected frames before and after before = None after = None for fn, p in person_frames: if fn <= frame_num: before = (fn, p) if fn >= frame_num and after is None: after = (fn, p) if before is None and after is None: continue # Interpolate keypoints interpolated_keypoints = [] bbox = None if before and after and before[0] != after[0]: # Linear interpolation t0, t1 = before[0], after[0] t = (frame_num - t0) / (t1 - t0) if t1 != t0 else 0 kp_before = before[1].get('keypoints', []) kp_after = after[1].get('keypoints', []) bbox_before = before[1].get('bbox', {}) bbox_after = after[1].get('bbox', {}) # Interpolate keypoints for i in range(max(len(kp_before), len(kp_after))): kp0 = kp_before[i] if i < len(kp_before) else kp_after[i] kp1 = kp_after[i] if i < len(kp_after) else kp_before[i] x = kp0['x'] + t * (kp1['x'] - kp0['x']) y = kp0['y'] + t * (kp1['y'] - kp0['y']) c = kp0['confidence'] + t * (kp1['confidence'] - kp0['confidence']) interpolated_keypoints.append({ 'name': kp0['name'], 'x': x, 'y': y, 'confidence': c }) # Interpolate bbox if bbox_before and bbox_after: bbox = { 'x': int(bbox_before['x'] + t * (bbox_after['x'] - bbox_before['x'])), 'y': int(bbox_before['y'] + t * (bbox_after['y'] - bbox_before['y'])), 'width': int(bbox_before['width'] + t * (bbox_after['width'] - bbox_before['width'])), 'height': int(bbox_before['height'] + t * (bbox_after['height'] - bbox_before['height'])) } elif before: # Use before frame's data interpolated_keypoints = before[1].get('keypoints', []) bbox = before[1].get('bbox', {}) elif after: # Use after frame's data interpolated_keypoints = after[1].get('keypoints', []) bbox = after[1].get('bbox', {}) if bbox and bbox.get('width', 0) > 0 and bbox.get('height', 0) > 0: persons_in_frame.append({ 'keypoints': interpolated_keypoints, 'bbox': bbox }) if persons_in_frame: interpolated_frames.append({ 'frame': frame_num, 'timestamp': ts, 'persons': persons_in_frame }) cap.release() return { 'frame_count': len(interpolated_frames), 'fps': fps, 'frames': interpolated_frames } def _fallback(video_path, output_path, uuid, sample_interval): """Fallback to YOLOv8 Pose""" from ultralytics import YOLO import cv2 model = YOLO("yolov8n-pose.pt") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = 0 frames = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % sample_interval == 0: ts = frame_count / fps if fps > 0 else 0 results = model(frame, verbose=False, device="cpu") persons = [] for r in results: if r.keypoints is None: continue for kp_data in r.keypoints: kps = kp_data.xy[0].cpu().numpy() if hasattr(kp_data, 'xy') else [] confs = kp_data.conf[0].cpu().numpy() if hasattr(kp_data, 'conf') else [] keypoints = [] names = ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"] for j, name in enumerate(names): if j < len(kps): x, y = float(kps[j][0]), float(kps[j][1]) c = float(confs[j]) if j < len(confs) else 0 keypoints.append({"name": name, "x": x, "y": y, "confidence": c}) if keypoints: xs = [k["x"] for k in keypoints if k["confidence"] > 0.1] ys = [k["y"] for k in keypoints if k["confidence"] > 0.1] bbox = {"x": int(min(xs)), "y": int(min(ys)), "width": int(max(xs)-min(xs)), "height": int(max(ys)-min(ys))} if xs else {"x": 0, "y": 0, "width": 0, "height": 0} persons.append({"keypoints": keypoints, "bbox": bbox}) if persons: frames.append({"frame": frame_count, "timestamp": ts, "persons": persons}) frame_count += 1 cap.release() result = {"frame_count": len(frames), "fps": fps, "frames": frames} with open(output_path, "w") as f: json.dump(result, f, indent=2) return result if __name__ == "__main__": parser = argparse.ArgumentParser(description="Pose Processor (Swift Vision)") parser.add_argument("video_path") parser.add_argument("output_path") parser.add_argument("--uuid", "-u", default="") parser.add_argument("--sample-interval", type=int, default=3) # Changed from 30 to match Face parser.add_argument("--frames", type=str, default=None, help="Comma-separated frame numbers for 8Hz sampling") args = parser.parse_args() target_frames = None if args.frames: target_frames = [int(f) for f in args.frames.split(",") if f.strip()] print(f"[Pose] 8Hz target frames: {len(target_frames)} frames", file=sys.stderr) publisher = RedisPublisher(args.uuid) if args.uuid else None if publisher: publisher.info("pose", "POSE_START") result = process_pose(args.video_path, args.output_path, args.uuid, args.sample_interval, publisher, target_frames) with open(args.output_path, "w") as f: json.dump(result, f, indent=2) print(f"Pose: {len(result.get('frames', []))} frames with poses") if publisher: publisher.complete("pose", f"{len(result.get('frames',[]))} frames")