Files
momentry_core/scripts/pose_processor.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

257 lines
7.7 KiB
Python
Executable File

#!/opt/homebrew/bin/python3.11
"""
Pose Processor - Pose Estimation with Resume Support
Uses YOLOv8 Pose via ultralytics (local model)
Resume Feature:
- Auto-detect existing results and resume from last frame
- Auto-save at configurable intervals (default: 30 seconds)
- Ctrl+C gracefully saves and exits
Note: YOLOv8 Pose uses stream mode which is optimized for video processing.
For resume support, we need to process frames manually with OpenCV.
"""
import sys
import json
import argparse
import os
import time
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
from resume_framework import ResumeFramework, format_time, print_progress
KEYPOINT_NAMES = [
"nose",
"left_eye",
"right_eye",
"left_ear",
"right_ear",
"left_shoulder",
"right_shoulder",
"left_elbow",
"right_elbow",
"left_wrist",
"right_wrist",
"left_hip",
"right_hip",
"left_knee",
"right_knee",
"left_ankle",
"right_ankle",
]
def process_pose(
video_path: str,
output_path: str,
uuid: str = "",
auto_save_interval: int = 30,
auto_save_frames: int = 300,
force_restart: bool = False,
):
"""Process video for pose estimation using YOLOv8 Pose with resume support"""
framework = ResumeFramework(
output_path=output_path,
processor_name="pose",
uuid=uuid,
auto_save_interval=auto_save_interval,
auto_save_frames=auto_save_frames,
force_restart=force_restart,
)
framework.publish_info("POSE_START")
try:
from ultralytics import YOLO
except ImportError:
framework.publish_error("ultralytics not installed")
result = {
"metadata": {"status": "error", "error": "ultralytics not installed"},
"frames": {},
}
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
return result
framework.publish_info("POSE_LOADING_MODEL")
model = YOLO("yolov8n-pose.pt")
import cv2
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Cannot open video: {video_path}")
return {"metadata": {"status": "error"}, "frames": {}}
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
total_duration = total_frames / fps if fps > 0 else 0
cap.release()
framework.publish_info(f"fps={fps}, frames={total_frames}")
existing_data, last_checkpoint = framework.load_existing_data()
resume_mode = existing_data is not None and last_checkpoint > 0 and not force_restart
if resume_mode:
print(f"\nFound existing data: {output_path}")
print(f"Last processed frame: {last_checkpoint}")
print(f"Will resume from frame {last_checkpoint + 1}")
if resume_mode and existing_data:
pose_data = existing_data
frame_count = last_checkpoint
processed_frames = set(int(k) for k in existing_data.get("frames", {}).keys())
cap = cv2.VideoCapture(video_path)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count)
else:
pose_data = {
"metadata": framework.init_metadata(
video_path=video_path,
fps=fps,
width=width,
height=height,
total_frames=total_frames,
total_duration=total_duration,
extra={"model": "yolov8n-pose"},
),
"frames": {},
}
frame_count = 0
processed_frames = set()
cap = cv2.VideoCapture(video_path)
framework.set_data(pose_data)
start_time = time.time()
framework.last_save_time = start_time
print(f"\nProcessing video: {total_frames} frames @ {fps:.2f} fps")
print(f"Auto-save every {auto_save_interval}s or {auto_save_frames} frames")
print(f"Resume from frame {frame_count + 1 if resume_mode else 1}")
print()
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
current_time = (frame_count - 1) / fps if fps > 0 else 0
if frame_count in processed_frames:
continue
results = model(frame, conf=0.5, verbose=False, pose=True)
result = results[0]
persons = []
if result.keypoints is not None:
for person in result.keypoints:
keypoints = []
for i, kp in enumerate(person):
if len(kp) >= 3:
keypoints.append(
{
"name": KEYPOINT_NAMES[i]
if i < len(KEYPOINT_NAMES)
else f"kp_{i}",
"x": float(kp[0]),
"y": float(kp[1]),
"confidence": float(kp[2]),
}
)
valid_kps = [kp for kp in keypoints if kp["confidence"] > 0.3]
if valid_kps:
xs = [kp["x"] for kp in valid_kps]
ys = [kp["y"] for kp in valid_kps]
bbox = {
"x": int(min(xs)),
"y": int(min(ys)),
"width": int(max(xs) - min(xs)),
"height": int(max(ys) - min(ys)),
}
else:
bbox = {"x": 0, "y": 0, "width": 0, "height": 0}
persons.append({"keypoints": keypoints, "bbox": bbox})
if persons or frame_count % 30 == 0:
pose_data["frames"][str(frame_count)] = {
"frame_number": frame_count,
"time_seconds": round(current_time, 3),
"time_formatted": format_time(current_time),
"persons": persons,
}
processed_frames.add(frame_count)
if frame_count % 500 == 0:
elapsed = time.time() - start_time
print_progress(frame_count, total_frames, elapsed, f"{len(persons)} persons")
framework.publish_progress(frame_count, total_frames, f"frame {frame_count}")
if framework.should_auto_save(frame_count):
framework.save_progress(frame_count, silent=True)
cap.release()
total_processed = len(processed_frames)
framework.finalize(
total_processed=total_processed,
extra_metadata={"model": "yolov8n-pose"},
)
print(f"\nPose estimation completed: {total_processed} frames processed")
print(f"Frames with poses: {len([f for f in pose_data['frames'].values() if f['persons']])}")
return pose_data
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pose Estimation with Resume Support")
parser.add_argument("video_path", help="Path to video file")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
parser.add_argument(
"--auto-save-interval",
"-a",
help="Auto-save interval in seconds",
type=int,
default=30,
)
parser.add_argument(
"--auto-save-frames",
"-f",
help="Auto-save interval in frames",
type=int,
default=300,
)
parser.add_argument(
"--force-restart",
"-r",
help="Force restart (ignore existing data)",
action="store_true",
)
args = parser.parse_args()
process_pose(
args.video_path,
args.output_path,
args.uuid,
args.auto_save_interval,
args.auto_save_frames,
args.force_restart,
)