Files
momentry_core/scripts/utils/face_tracker.py

517 lines
18 KiB
Python
Executable File

#!/opt/homebrew/bin/python3.11
"""
Face Tracker - Track faces across frames using embedding similarity and bbox proximity
Purpose:
1. Assign unique trace_id to each face across frames
2. Track face movement across adjacent frames
3. Output trace statistics (duration, path, confidence)
Algorithm:
1. For first frame: assign new trace_id to each face
2. For subsequent frames:
- Calculate bbox overlap with previous frame faces
- Calculate embedding cosine similarity
- Match faces if both conditions met
- Assign same trace_id if matched, new trace_id if not
Matching Conditions:
- bbox overlap > 0.3 (IoU)
- embedding similarity > 0.7
- OR single condition > threshold (fallback)
Output:
- face.json with trace_id added to each face
- trace statistics report
"""
import json
import argparse
import numpy as np
from typing import Dict, List, Set
from collections import defaultdict
def calculate_bbox_iou(bbox1: Dict, bbox2: Dict) -> float:
"""
Calculate Intersection over Union (IoU) between two bboxes
Args:
bbox1: {"x": int, "y": int, "width": int, "height": int}
bbox2: same structure
Returns:
IoU score (0.0 - 1.0)
"""
x1, y1, w1, h1 = bbox1["x"], bbox1["y"], bbox1["width"], bbox1["height"]
x2, y2, w2, h2 = bbox2["x"], bbox2["y"], bbox2["width"], bbox2["height"]
x1_min, x1_max = x1, x1 + w1
y1_min, y1_max = y1, y1 + h1
x2_min, x2_max = x2, x2 + w2
y2_min, y2_max = y2, y2 + h2
inter_x_min = max(x1_min, x2_min)
inter_x_max = min(x1_max, x2_max)
inter_y_min = max(y1_min, y2_min)
inter_y_max = min(y1_max, y2_max)
if inter_x_max <= inter_x_min or inter_y_max <= inter_y_min:
return 0.0
inter_area = (inter_x_max - inter_x_min) * (inter_y_max - inter_y_min)
area1 = w1 * h1
area2 = w2 * h2
union_area = area1 + area2 - inter_area
return inter_area / union_area if union_area > 0 else 0.0
def calculate_bbox_distance(bbox1: Dict, bbox2: Dict) -> float:
"""
Calculate center distance between two bboxes
Returns:
Euclidean distance between centers
"""
cx1 = bbox1["x"] + bbox1["width"] / 2
cy1 = bbox1["y"] + bbox1["height"] / 2
cx2 = bbox2["x"] + bbox2["width"] / 2
cy2 = bbox2["y"] + bbox2["height"] / 2
return np.sqrt((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2)
def calculate_embedding_similarity(emb1: List[float], emb2: List[float]) -> float:
"""
Calculate cosine similarity between two embeddings
Returns:
Cosine similarity (-1.0 - 1.0)
"""
if emb1 is None or emb2 is None:
return 0.0
v1 = np.array(emb1)
v2 = np.array(emb2)
norm1 = np.linalg.norm(v1)
norm2 = np.linalg.norm(v2)
if norm1 == 0 or norm2 == 0:
return 0.0
return np.dot(v1, v2) / (norm1 * norm2)
def match_faces(
current_faces: List[Dict],
previous_faces: List[Dict],
iou_threshold: float = 0.3,
similarity_threshold: float = 0.7,
distance_threshold: float = 100.0,
use_embedding: bool = True,
frame_gap: int = 1,
cut_boundaries: Set[int] = None,
prev_frame: int = None,
curr_frame: int = None,
) -> Dict[int, int]:
"""
Match current frame faces to previous frame faces
Args:
current_faces: Faces in current frame
previous_faces: Faces in previous frame
iou_threshold: Minimum IoU for matching
similarity_threshold: Minimum embedding similarity for matching
distance_threshold: Maximum bbox center distance for matching
use_embedding: Whether to use embedding similarity
frame_gap: Number of frames between current and previous (1=adjacent)
cut_boundaries: Set of frame numbers where scene cuts occur
prev_frame: Previous frame number (for cut detection)
curr_frame: Current frame number (for cut detection)
Returns:
Dict mapping current_face_index -> previous_face_index (or -1 if new)
"""
if not previous_faces:
return {i: -1 for i in range(len(current_faces))}
# If a scene cut exists between prev and current frame, force all new traces
if cut_boundaries and prev_frame is not None and curr_frame is not None:
for cf in cut_boundaries:
if prev_frame < cf <= curr_frame:
return {i: -1 for i in range(len(current_faces))}
matches = {}
used_prev = set()
for curr_idx, curr_face in enumerate(current_faces):
best_prev_idx = -1
best_score = 0.0
curr_bbox = {
"x": curr_face["x"],
"y": curr_face["y"],
"width": curr_face["width"],
"height": curr_face["height"],
}
curr_emb = curr_face.get("embedding")
for prev_idx, prev_face in enumerate(previous_faces):
if prev_idx in used_prev:
continue
prev_bbox = {
"x": prev_face["x"],
"y": prev_face["y"],
"width": prev_face["width"],
"height": prev_face["height"],
}
prev_emb = prev_face.get("embedding")
iou = calculate_bbox_iou(curr_bbox, prev_bbox)
distance = calculate_bbox_distance(curr_bbox, prev_bbox)
similarity = 0.0
if use_embedding and curr_emb and prev_emb:
similarity = calculate_embedding_similarity(curr_emb, prev_emb)
# Bbox size consistency check: sudden size change = different person
prev_area = prev_bbox["width"] * prev_bbox["height"]
curr_area = curr_bbox["width"] * curr_bbox["height"]
area_ratio = max(curr_area, prev_area) / max(1, min(curr_area, prev_area))
score = 0.0
# Reject only if BOTH embedding AND IoU disagree (different person + different position)
if use_embedding and curr_emb and prev_emb and similarity < 0.5 and iou < 0.3:
continue
# Reject if bbox size changes by more than 5x (e.g., far shot → close-up)
if area_ratio > 5.0 and similarity < 0.8:
continue
# Edge exit: if previous face was near frame edge and current face is not,
# the old face likely exited and a new face appeared
prev_at_edge = (prev_bbox["x"] < 50 or
prev_bbox["x"] + prev_bbox["width"] > 1870 or
prev_bbox["y"] < 50 or
prev_bbox["y"] + prev_bbox["height"] > 1030)
curr_at_edge = (curr_bbox["x"] < 50 or
curr_bbox["x"] + curr_bbox["width"] > 1870 or
curr_bbox["y"] < 50 or
curr_bbox["y"] + curr_bbox["height"] > 1030)
if prev_at_edge and not curr_at_edge and similarity < 0.8:
continue
if iou > iou_threshold and similarity > similarity_threshold:
score = iou + similarity
elif iou > 0.5 and similarity > 0.65:
score = iou * 1.5 + similarity * 0.5
elif iou > 0.35 and distance < distance_threshold:
score = iou * 2 - distance / 500
elif similarity > 0.85:
score = similarity * 2
elif similarity > 0.75 and distance < distance_threshold:
score = similarity - distance / 1000
# For frame gaps (tracking lost and recovered), require higher confidence
elif frame_gap > 1 and similarity > 0.8 and iou > 0.2:
score = similarity + iou
if score > best_score:
best_score = score
best_prev_idx = prev_idx
if best_prev_idx >= 0 and best_score > 0:
matches[curr_idx] = best_prev_idx
used_prev.add(best_prev_idx)
else:
matches[curr_idx] = -1
return matches
def track_faces(
face_data: Dict,
iou_threshold: float = 0.3,
similarity_threshold: float = 0.7,
distance_threshold: float = 100.0,
use_embedding: bool = True,
cut_boundaries: Set[int] = None,
) -> Dict:
"""
Track faces across all frames
Args:
face_data: face.json data
iou_threshold: IoU threshold for matching
similarity_threshold: Embedding similarity threshold
distance_threshold: Distance threshold for matching
use_embedding: Whether to use embedding
Returns:
Updated face_data with trace_id added to each face
"""
frames = face_data.get("frames", {})
if not frames:
print("No frames found in face.json")
return face_data
sorted_frames = sorted(frames.items(), key=lambda x: int(x[0]))
next_trace_id = 0
traces = defaultdict(list)
prev_faces = []
prev_trace_ids = []
prev_frame_num = None
prev_face_frame = None # last frame number that had actual faces
print(f"\nTracking faces across {len(sorted_frames)} frames...")
print(f"Parameters: iou={iou_threshold}, similarity={similarity_threshold}, distance={distance_threshold}")
print()
for frame_num_str, frame_data in sorted_frames:
frame_num = int(frame_num_str)
frame_gap = frame_num - prev_frame_num if prev_frame_num is not None else 1
prev_frame_num = frame_num
faces = frame_data.get("faces", [])
if not faces:
prev_faces = []
prev_trace_ids = []
continue
matches = match_faces(
faces,
prev_faces,
iou_threshold,
similarity_threshold,
distance_threshold,
use_embedding,
frame_gap,
cut_boundaries,
prev_face_frame,
frame_num,
)
trace_ids = []
for curr_idx, prev_idx in matches.items():
if prev_idx >= 0:
trace_id = prev_trace_ids[prev_idx]
else:
trace_id = next_trace_id
next_trace_id += 1
faces[curr_idx]["trace_id"] = trace_id
trace_ids.append(trace_id)
traces[trace_id].append({
"frame": frame_num,
"face_index": curr_idx,
"bbox": {
"x": faces[curr_idx]["x"],
"y": faces[curr_idx]["y"],
"width": faces[curr_idx]["width"],
"height": faces[curr_idx]["height"],
},
"confidence": faces[curr_idx].get("confidence", 0.0),
"pose_angle": faces[curr_idx].get("pose_angle", {}).get("angle", "unknown"),
"pose_full": faces[curr_idx].get("pose_angle", {}), # 完整 pose 信息
})
prev_faces = faces
prev_trace_ids = trace_ids
prev_face_frame = frame_num
if frame_num % 100 == 0:
print(f" Frame {frame_num}: {len(faces)} faces, {len(set(trace_ids))} active traces")
face_data["traces"] = {}
for trace_id, path in traces.items():
if len(path) >= 1:
duration_frames = path[-1]["frame"] - path[0]["frame"] + 1
avg_confidence = sum(p["confidence"] for p in path) / len(path)
pose_angles = [p["pose_angle"] for p in path]
# Pose Trace: 完整 pose 信息
pose_trace = []
for p in path:
pose_info = p.get("pose_full", {})
pose_trace.append({
"frame": p["frame"],
"angle": pose_info.get("angle", "unknown"),
"confidence": pose_info.get("confidence", 0.0),
"pitch": pose_info.get("pitch", "neutral"),
"features": pose_info.get("features", {}),
})
# Pose Statistics
pose_counts = defaultdict(int)
pose_confidence_by_angle = defaultdict(list)
for pose in pose_trace:
pose_counts[pose["angle"]] += 1
pose_confidence_by_angle[pose["angle"]].append(pose["confidence"])
pose_statistics = {
"distribution": dict(pose_counts),
"avg_confidence_by_angle": {
angle: round(sum(conf_list) / len(conf_list), 3)
for angle, conf_list in pose_confidence_by_angle.items()
},
"dominant_angle": max(pose_counts.items(), key=lambda x: x[1])[0] if pose_counts else "unknown",
"pose_count": len(pose_counts),
}
# Pose Transitions: pose 变化事件
pose_transitions = []
prev_pose = None
for i, pose in enumerate(pose_trace):
if prev_pose is not None and pose["angle"] != prev_pose["angle"]:
pose_transitions.append({
"frame": pose["frame"],
"from_angle": prev_pose["angle"],
"to_angle": pose["angle"],
"transition_index": len(pose_transitions) + 1,
})
prev_pose = pose
face_data["traces"][str(trace_id)] = {
"trace_id": trace_id,
"start_frame": path[0]["frame"],
"end_frame": path[-1]["frame"],
"duration_frames": duration_frames,
"duration_seconds": duration_frames / face_data["metadata"]["fps"],
"total_appearances": len(path),
"avg_confidence": avg_confidence,
"pose_angles": pose_angles,
"pose_trace": pose_trace,
"pose_statistics": pose_statistics,
"pose_transitions": pose_transitions,
"path": path,
}
face_data["metadata"]["trace_stats"] = {
"total_traces": next_trace_id,
"active_traces": len(traces),
"long_traces": len([t for t in traces.values() if len(t) >= 2]),
}
return face_data
def analyze_traces(face_data: Dict) -> None:
"""
Analyze and print trace statistics
"""
traces = face_data.get("traces", {})
metadata = face_data.get("metadata", {})
print("\n" + "=" * 60)
print("Face Trace Analysis")
print("=" * 60)
print(f"\nTotal traces: {metadata.get('trace_stats', {}).get('total_traces', 0)}")
print(f"Long traces (>= 2 frames): {len(traces)}")
if not traces:
return
sorted_traces = sorted(traces.values(), key=lambda x: x["duration_frames"], reverse=True)
print("\n=== Top 10 Longest Traces ===")
for i, trace in enumerate(sorted_traces[:10]):
print(f"\nTrace {trace['trace_id']}:")
print(f" Frames: {trace['start_frame']} - {trace['end_frame']} ({trace['duration_frames']} frames)")
print(f" Duration: {trace['duration_seconds']:.2f} seconds")
print(f" Appearances: {trace['total_appearances']}")
print(f" Avg Confidence: {trace['avg_confidence']:.3f}")
# Pose Statistics
pose_stats = trace.get("pose_statistics", {})
print(f" Pose Distribution: {pose_stats.get('distribution', {})}")
print(f" Dominant Angle: {pose_stats.get('dominant_angle', 'unknown')}")
# Pose Transitions
transitions = trace.get("pose_transitions", [])
if transitions:
print(f" Pose Transitions: {len(transitions)} events")
for t in transitions[:3]: # 只显示前 3 个
print(f" - Frame {t['frame']}: {t['from_angle']}{t['to_angle']}")
pose_stats = defaultdict(int)
for trace in traces.values():
for pose in trace["pose_angles"]:
pose_stats[pose] += 1
print("\n=== Pose Distribution in Traces ===")
for pose, count in sorted(pose_stats.items(), key=lambda x: x[1], reverse=True):
print(f" {pose}: {count}")
duration_distribution = defaultdict(int)
for trace in traces.values():
d = trace["duration_frames"]
if d <= 30:
duration_distribution["short (<= 30 frames)"] += 1
elif d <= 90:
duration_distribution["medium (31-90 frames)"] += 1
else:
duration_distribution["long (> 90 frames)"] += 1
print("\n=== Trace Duration Distribution ===")
for duration, count in sorted(duration_distribution.items()):
print(f" {duration}: {count}")
def main():
parser = argparse.ArgumentParser(description="Track faces across frames")
parser.add_argument("--face-json", required=True, help="Path to face.json")
parser.add_argument("--output", help="Output path (default: face_traced.json)")
parser.add_argument("--iou-threshold", type=float, default=0.3, help="IoU threshold")
parser.add_argument("--similarity-threshold", type=float, default=0.7, help="Embedding similarity threshold")
parser.add_argument("--distance-threshold", type=float, default=100.0, help="Distance threshold")
parser.add_argument("--no-embedding", action="store_true", help="Disable embedding matching")
parser.add_argument("--cuts-json", help="Path to cut.json for scene-cut-aware tracking")
parser.add_argument("--analyze-only", action="store_true", help="Only analyze, don't output")
args = parser.parse_args()
# Load cut boundaries if provided
cut_boundaries = None
if args.cuts_json:
with open(args.cuts_json) as f:
cuts = json.load(f)
cut_boundaries = {s["start_frame"] for s in cuts.get("scenes", []) if s["start_frame"] > 0}
print(f" Cut boundaries loaded: {len(cut_boundaries)} cuts")
print("=" * 60)
print("Face Tracker")
print("=" * 60)
with open(args.face_json) as f:
face_data = json.load(f)
print(f"\nInput: {args.face_json}")
print(f"Frames: {len(face_data.get('frames', {}))}")
face_data = track_faces(
face_data,
iou_threshold=args.iou_threshold,
similarity_threshold=args.similarity_threshold,
distance_threshold=args.distance_threshold,
use_embedding=not args.no_embedding,
cut_boundaries=cut_boundaries,
)
analyze_traces(face_data)
if not args.analyze_only:
output_path = args.output or args.face_json.replace(".json", "_traced.json")
with open(output_path, "w") as f:
json.dump(face_data, f, indent=2)
print(f"\n✅ Output saved to: {output_path}")
if __name__ == "__main__":
main()