Files
momentry_core/scripts/integrated_body_action_decoder.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

439 lines
16 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Integrated Body Action Decoder - Combine InsightFace + MediaPipe Holistic
Purpose:
1. Combine InsightFace pose_angle (existing) with MediaPipe holistic
2. Generate complete body action timeline
3. Support trace-based analysis
Input:
- face.json (InsightFace: embedding, pose_angle)
- holistic.json (MediaPipe: face_mesh, pose, hands)
Output:
- Integrated action data with all body parts
"""
import sys
import json
import argparse
import numpy as np
from typing import Dict, List
from collections import defaultdict
from pathlib import Path
class IntegratedBodyActionDecoder:
"""
Decode body actions from combined InsightFace + MediaPipe data
"""
def __init__(self):
# Action thresholds
self.EAR_THRESHOLDS = {
"closed": 0.15,
"squint": 0.25,
"wide_open": 0.4,
}
self.MAR_THRESHOLDS = {
"closed": 0.2,
"slightly_open": 0.3,
"open": 0.5,
"yawn": 0.7,
}
self.ELBOW_ANGLE_THRESHOLDS = {
"fold": 90,
"extend": 150,
}
self.KNEE_ANGLE_THRESHOLDS = {
"knee_bend": 120,
"standing": 160,
}
def decode_frame_actions(
self,
face_data: Dict,
holistic_data: Dict,
) -> Dict:
"""
Decode all actions for single frame
Args:
face_data: InsightFace data (pose_angle, embedding)
holistic_data: MediaPipe data (face_mesh, pose, hands)
Returns:
Dict with all decoded actions
"""
actions = {
"face": [],
"eyes": [],
"mouth": [],
"arms": [],
"hands": [],
"legs": [],
"combined": [],
}
# 1. Face pose (from InsightFace)
if face_data and "pose_angle" in face_data:
pose_angle = face_data["pose_angle"]
angle = pose_angle.get("angle", "unknown")
confidence = pose_angle.get("confidence", 0.0)
actions["face"].append({
"action": f"pose_{angle}",
"description": f"Face pose: {angle}",
"confidence": confidence,
"source": "insightface",
})
# 2. Eye actions (from MediaPipe face_mesh)
if holistic_data and "face_mesh" in holistic_data:
eye_features = holistic_data["face_mesh"].get("eye_features", {})
eye_action = eye_features.get("eye_action", "unknown")
ear = eye_features.get("avg_ear", 0)
gaze = eye_features.get("gaze_direction", "center")
if eye_action != "unknown":
actions["eyes"].append({
"action": f"eye_{eye_action}",
"description": f"Eye: {eye_action} (EAR: {ear:.3f})",
"ear": ear,
"gaze": gaze,
"source": "mediapipe_face_mesh",
})
if gaze != "center":
actions["eyes"].append({
"action": f"gaze_{gaze}",
"description": f"Gaze: looking {gaze}",
"source": "mediapipe_face_mesh",
})
# 3. Mouth actions (from MediaPipe face_mesh)
if holistic_data and "face_mesh" in holistic_data:
mouth_features = holistic_data["face_mesh"].get("mouth_features", {})
mouth_action = mouth_features.get("mouth_action", "unknown")
mar = mouth_features.get("mar", 0)
if mouth_action != "unknown":
actions["mouth"].append({
"action": f"mouth_{mouth_action}",
"description": f"Mouth: {mouth_action} (MAR: {mar:.3f})",
"mar": mar,
"source": "mediapipe_face_mesh",
})
# 4. Arm actions (from MediaPipe pose)
if holistic_data and "pose" in holistic_data:
arm_features = holistic_data["pose"].get("arm_features", {})
left_arm_action = arm_features.get("left_arm_action", "unknown")
right_arm_action = arm_features.get("right_arm_action", "unknown")
left_angle = arm_features.get("left_elbow_angle", 0)
right_angle = arm_features.get("right_elbow_angle", 0)
cross_arms = arm_features.get("cross_arms", False)
if left_arm_action != "unknown":
actions["arms"].append({
"action": f"left_arm_{left_arm_action}",
"description": f"Left arm: {left_arm_action} (angle: {left_angle:.1f}°)",
"angle": left_angle,
"source": "mediapipe_pose",
})
if right_arm_action != "unknown":
actions["arms"].append({
"action": f"right_arm_{right_arm_action}",
"description": f"Right arm: {right_arm_action} (angle: {right_angle:.1f}°)",
"angle": right_angle,
"source": "mediapipe_pose",
})
if cross_arms:
actions["arms"].append({
"action": "cross_arms",
"description": "Arms crossed",
"source": "mediapipe_pose",
})
# 5. Hand actions (from MediaPipe hands)
if holistic_data and "hands" in holistic_data:
for hand_type in ["left", "right"]:
hand_data = holistic_data["hands"].get(hand_type)
if hand_data:
gesture = hand_data.get("gesture", "unknown")
num_fingers = hand_data.get("num_fingers_extended", 0)
if gesture != "unknown":
actions["hands"].append({
"action": f"{hand_type}_hand_{gesture}",
"description": f"{hand_type.capitalize()} hand: {gesture} ({num_fingers} fingers)",
"num_fingers_extended": num_fingers,
"source": "mediapipe_hands",
})
# 6. Leg actions (from MediaPipe pose)
if holistic_data and "pose" in holistic_data:
leg_features = holistic_data["pose"].get("leg_features", {})
leg_action = leg_features.get("leg_action", "unknown")
if leg_action != "unknown":
actions["legs"].append({
"action": f"leg_{leg_action}",
"description": f"Leg: {leg_action}",
"source": "mediapipe_pose",
})
# 7. Combined actions
actions["combined"] = self._detect_combined_actions(actions)
return actions
def _detect_combined_actions(self, actions: Dict) -> List[Dict]:
"""
Detect combined actions from multiple body parts
Args:
actions: Dict with all individual actions
Returns:
List of combined actions
"""
combined = []
detected_actions = []
for category, action_list in actions.items():
for act in action_list:
detected_actions.append(act["action"])
# Thinking: touch_face + look_down
if "pose_tilted_down" in detected_actions and "left_hand_pointing" in detected_actions:
combined.append({
"action": "thinking_pose",
"description": "Thinking pose (looking down + pointing)",
"components": ["pose_tilted_down", "left_hand_pointing"],
})
# Crossed arms + neutral pose
if "cross_arms" in detected_actions and "pose_frontal" in detected_actions:
combined.append({
"action": "defensive_pose",
"description": "Defensive pose (crossed arms + frontal)",
"components": ["cross_arms", "pose_frontal"],
})
# Open mouth + squint = surprise
if "mouth_open" in detected_actions and "eye_wide_open" in detected_actions:
combined.append({
"action": "surprise_expression",
"description": "Surprise expression (wide eyes + open mouth)",
"components": ["eye_wide_open", "mouth_open"],
})
return combined
def integrate_and_decode(
self,
face_json_path: str,
holistic_json_path: str,
) -> Dict:
"""
Integrate face.json + holistic.json and decode actions
Args:
face_json_path: Path to face.json (InsightFace)
holistic_json_path: Path to holistic.json (MediaPipe)
Returns:
Integrated action data
"""
# Load face.json
with open(face_json_path) as f:
face_data = json.load(f)
# Load holistic.json
with open(holistic_json_path) as f:
holistic_data = json.load(f)
# Merge frames
face_frames = face_data.get("frames", {})
holistic_frames = holistic_data.get("frames", {})
# Find common frames
common_frames = set(face_frames.keys()) & set(holistic_frames.keys())
print(f"Face frames: {len(face_frames)}")
print(f"Holistic frames: {len(holistic_frames)}")
print(f"Common frames: {len(common_frames)}")
print()
integrated_data = {
"metadata": {
"face_source": face_json_path,
"holistic_source": holistic_json_path,
"total_frames": len(common_frames),
"sources": ["insightface", "mediapipe_holistic"],
},
"frames": {},
"action_summary": defaultdict(int),
}
for frame_num in sorted(common_frames, key=int):
face_frame = face_frames[frame_num]
holistic_frame = holistic_frames[frame_num]
# Get first face/person
face_person = face_frame.get("faces", [{}])[0]
holistic_person = holistic_frame.get("persons", [{}])[0]
# Decode actions
actions = self.decode_frame_actions(face_person, holistic_person)
# Store
integrated_data["frames"][frame_num] = {
"frame_number": int(frame_num),
"actions": actions,
"insightface_data": {
"pose_angle": face_person.get("pose_angle"),
"embedding": face_person.get("embedding")[:10] if face_person.get("embedding") else None, # Only first 10 values
},
"mediapipe_data": {
"eye_action": (holistic_person.get("face_mesh") or {}).get("eye_features", {}).get("eye_action"),
"mouth_action": (holistic_person.get("face_mesh") or {}).get("mouth_features", {}).get("mouth_action"),
"left_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("left_arm_action"),
"right_arm_action": (holistic_person.get("pose") or {}).get("arm_features", {}).get("right_arm_action"),
"leg_action": (holistic_person.get("pose") or {}).get("leg_features", {}).get("leg_action"),
"left_hand_gesture": ((holistic_person.get("hands") or {}).get("left") or {}).get("gesture"),
"right_hand_gesture": ((holistic_person.get("hands") or {}).get("right") or {}).get("gesture"),
},
}
# Update summary
for category, action_list in actions.items():
for act in action_list:
integrated_data["action_summary"][act["action"]] += 1
# Convert defaultdict to dict
integrated_data["action_summary"] = dict(integrated_data["action_summary"])
return integrated_data
def print_action_report(self, integrated_data: Dict) -> None:
"""
Print action report
"""
print("\n" + "=" * 70)
print("Integrated Body Action Decoder Report")
print("=" * 70)
print(f"\nTotal frames: {integrated_data['metadata']['total_frames']}")
print(f"Sources: {', '.join(integrated_data['metadata']['sources'])}")
print("\n" + "=" * 70)
print("Action Summary")
print("=" * 70)
summary = integrated_data["action_summary"]
# Group by category
categories = {
"Face": [k for k in summary if k.startswith("pose_")],
"Eyes": [k for k in summary if k.startswith("eye_") or k.startswith("gaze_")],
"Mouth": [k for k in summary if k.startswith("mouth_")],
"Arms": [k for k in summary if k.startswith("left_arm_") or k.startswith("right_arm_") or k == "cross_arms"],
"Hands": [k for k in summary if k.startswith("left_hand_") or k.startswith("right_hand_")],
"Legs": [k for k in summary if k.startswith("leg_")],
"Combined": [k for k in summary if not any(k.startswith(p) for p in ["pose_", "eye_", "gaze_", "mouth_", "left_arm_", "right_arm_", "left_hand_", "right_hand_", "leg_", "cross_arms"])],
}
for category, action_keys in categories.items():
if action_keys:
print(f"\n{category} Actions:")
for action in sorted(action_keys):
count = summary[action]
print(f" {action}: {count} times")
print("\n" + "=" * 70)
print("Sample Frame Actions")
print("=" * 70)
# Show first 3 frames
for i, (frame_num, frame_data) in enumerate(sorted(integrated_data["frames"].items(), key=lambda x: int(x[0]))[:3]):
print(f"\nFrame {frame_num}:")
for category, action_list in frame_data["actions"].items():
if action_list:
action_names = [a["action"] for a in action_list]
print(f" {category}: {', '.join(action_names)}")
def main():
parser = argparse.ArgumentParser(description="Integrated Body Action Decoder")
parser.add_argument("--face-json", required=True, help="Path to face.json (InsightFace)")
parser.add_argument("--holistic-json", required=True, help="Path to holistic.json (MediaPipe)")
parser.add_argument("--output-json", help="Output JSON path")
parser.add_argument("--frame", type=int, help="Analyze single frame")
args = parser.parse_args()
print("=" * 70)
print("Integrated Body Action Decoder")
print("=" * 70)
decoder = IntegratedBodyActionDecoder()
if args.frame:
# Load single frame
with open(args.face_json) as f:
face_data = json.load(f)
with open(args.holistic_json) as f:
holistic_data = json.load(f)
frame_num = str(args.frame)
if frame_num in face_data["frames"] and frame_num in holistic_data["frames"]:
face_person = face_data["frames"][frame_num]["faces"][0]
holistic_person = holistic_data["frames"][frame_num]["persons"][0]
actions = decoder.decode_frame_actions(face_person, holistic_person)
print(f"\n=== Frame {frame_num} Actions ===")
for category, action_list in actions.items():
if action_list:
print(f"\n{category.upper()}:")
for act in action_list:
print(f" {act['action']}: {act['description']}")
else:
print(f"❌ Frame {frame_num} not found in both files")
else:
# Process all frames
integrated_data = decoder.integrate_and_decode(
args.face_json,
args.holistic_json,
)
decoder.print_action_report(integrated_data)
if args.output_json:
with open(args.output_json, "w") as f:
json.dump(integrated_data, f, indent=2)
print(f"\n✅ Output saved to: {args.output_json}")
if __name__ == "__main__":
main()