#!/usr/bin/python3.11 """ ASR x Face Combination Statistics For each ASR segment, count unique faces (person_ids) appearing during that segment. Then aggregate: how many segments have 1 face, 2 faces, 3 faces, etc. """ import json import os from collections import defaultdict UUID = "384b0ff44aaaa1f1" BASE_DIR = f"output/{UUID}" def load_json(filepath): with open(filepath, "r") as f: return json.load(f) def build_asr_face_stats(): print(f"šŸ“Š Building ASR x Face combination statistics for {UUID}...") # Load data asr_data = load_json(os.path.join(BASE_DIR, f"{UUID}.asr.json")) face_data = load_json(os.path.join(BASE_DIR, f"{UUID}.face_clustered.json")) segments = asr_data.get("segments", []) face_frames = face_data.get("frames", []) # Build face lookup: timestamp -> set of person_ids face_by_time = {} for frame in face_frames: ts = frame.get("timestamp", 0) faces = frame.get("faces", []) pids = set() for f in faces: pid = f.get("person_id") if pid: pids.add(pid) face_by_time[ts] = pids # Get sorted timestamps for efficient lookup sorted_times = sorted(face_by_time.keys()) def get_faces_in_range(start, end): """Get all unique person_ids appearing in a time range.""" all_pids = set() for ts in sorted_times: if start <= ts <= end: all_pids.update(face_by_time[ts]) return all_pids # Analyze each ASR segment face_count_dist = defaultdict(int) segment_details = [] for seg in segments: start = seg.get("start", 0) end = seg.get("end", 0) text = seg.get("text", "") pids = get_faces_in_range(start, end) face_count = len(pids) face_count_dist[face_count] += 1 segment_details.append( { "start": start, "end": end, "text": text[:80], "face_count": face_count, "person_ids": list(pids)[:5], # Top 5 } ) return dict(face_count_dist), segment_details, len(segments) def print_stats(dist, total_segments): print("\n" + "=" * 60) print("šŸ“ˆ ASR x Face Combination Statistics") print("=" * 60) print(f"\nTotal ASR segments: {total_segments}") print(f"\n{'Face Count':<12} {'Segments':>10} {'Percentage':>12}") print("-" * 40) sorted_dist = sorted(dist.items(), key=lambda x: x[0]) for fc, count in sorted_dist: pct = count / total_segments * 100 print(f" {fc:>2} faces {count:>8} {pct:>6.1f}%") # Summary total_faces_sum = sum(fc * count for fc, count in dist.items()) avg_faces = total_faces_sum / total_segments if total_segments > 0 else 0 max_faces = max(dist.keys()) if dist else 0 print("\nšŸ“Š Summary:") print(f" Average faces per segment: {avg_faces:.1f}") print(f" Max faces in a segment: {max_faces}") print( f" Segments with 0 faces: {dist.get(0, 0)} ({dist.get(0, 0) / total_segments * 100:.1f}%)" ) print( f" Segments with 1 face: {dist.get(1, 0)} ({dist.get(1, 0) / total_segments * 100:.1f}%)" ) print( f" Segments with 2+ faces: {total_segments - dist.get(0, 0) - dist.get(1, 0)}" ) # Show some example segments print("\nšŸ” Example Segments:") print(" 0 faces:") examples = [s for s in segment_details if s["face_count"] == 0][:3] for ex in examples: print(f" [{ex['start']:.0f}s-{ex['end']:.0f}s] {ex['text']}...") print(" 1 face:") examples = [s for s in segment_details if s["face_count"] == 1][:3] for ex in examples: print( f" [{ex['start']:.0f}s-{ex['end']:.0f}s] {ex['person_ids'][0]}: {ex['text']}..." ) print(" 3 faces:") examples = [s for s in segment_details if s["face_count"] == 3][:3] for ex in examples: pids = ", ".join(ex["person_ids"]) print(f" [{ex['start']:.0f}s-{ex['end']:.0f}s] [{pids}] {ex['text']}...") if __name__ == "__main__": dist, segment_details, total = build_asr_face_stats() print_stats(dist, total) # Save output_path = os.path.join(BASE_DIR, "asr_face_stats.json") with open(output_path, "w") as f: json.dump({"distribution": dist, "segments": segment_details}, f, indent=2) print(f"\nšŸ’¾ Saved: {output_path}")