Files
momentry_core/scripts/face_count_comparison.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

260 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
Face Detection Count Comparison
对比四个版本在同一帧上的检测数量差异
"""
import json
import sys
from pathlib import Path
from collections import defaultdict
def load_results(filepath):
"""加载检测结果"""
with open(filepath) as f:
data = json.load(f)
results = {}
# 处理不同的帧格式
frames = data.get('frames', {})
if isinstance(frames, dict):
# InsightFace, MediaPipe格式: {"750": {...}, "900": {...}}
for frame_num, frame_data in frames.items():
if isinstance(frame_data, dict):
faces = frame_data.get('faces', [])
results[int(frame_num)] = {
'count': len(faces),
'faces': faces,
'timestamp': frame_data.get('timestamp', frame_data.get('time_seconds', 0))
}
elif isinstance(frames, list):
# OpenCV格式: [{...}, {...}]
for frame_data in frames:
if isinstance(frame_data, dict):
frame_num = frame_data.get('frame', 0)
faces = frame_data.get('faces', [])
results[frame_num] = {
'count': len(faces),
'faces': faces,
'timestamp': frame_data.get('timestamp', 0)
}
return results
def compare_frames(results_a, results_b, results_c):
"""对比同一帧的检测结果"""
# 找出所有检测到的帧号
all_frames = set()
all_frames.update(results_a.keys())
all_frames.update(results_b.keys())
all_frames.update(results_c.keys())
comparison = []
for frame_num in sorted(all_frames):
a_count = results_a.get(frame_num, {}).get('count', 0)
b_count = results_b.get(frame_num, {}).get('count', 0)
c_count = results_c.get(frame_num, {}).get('count', 0)
# 只对比检测数量不同的帧
if not (a_count == b_count == c_count):
comparison.append({
'frame': frame_num,
'timestamp': results_a.get(frame_num, {}).get('timestamp',
results_b.get(frame_num, {}).get('timestamp',
results_c.get(frame_num, {}).get('timestamp', 0))),
'insightface': a_count,
'mediapipe': b_count,
'opencv': c_count,
'max': max(a_count, b_count, c_count),
'min': min(a_count, b_count, c_count),
'diff': max(a_count, b_count, c_count) - min(a_count, b_count, c_count)
})
return comparison
def analyze_detection_distribution(results_a, results_b, results_c):
"""分析检测分布"""
stats = {
'insightface': {
'total_faces': sum(r['count'] for r in results_a.values()),
'total_frames': len(results_a),
'avg_per_frame': 0,
'frames_with_faces': len([r for r in results_a.values() if r['count'] > 0]),
'frames_no_faces': len([r for r in results_a.values() if r['count'] == 0]),
'max_faces': max(r['count'] for r in results_a.values()) if results_a else 0,
},
'mediapipe': {
'total_faces': sum(r['count'] for r in results_b.values()),
'total_frames': len(results_b),
'avg_per_frame': 0,
'frames_with_faces': len([r for r in results_b.values() if r['count'] > 0]),
'frames_no_faces': len([r for r in results_b.values() if r['count'] == 0]),
'max_faces': max(r['count'] for r in results_b.values()) if results_b else 0,
},
'opencv': {
'total_faces': sum(r['count'] for r in results_c.values()),
'total_frames': len(results_c),
'avg_per_frame': 0,
'frames_with_faces': len([r for r in results_c.values() if r['count'] > 0]),
'frames_no_faces': len([r for r in results_c.values() if r['count'] == 0]),
'max_faces': max(r['count'] for r in results_c.values()) if results_c else 0,
}
}
for key in stats:
if stats[key]['total_frames'] > 0:
stats[key]['avg_per_frame'] = stats[key]['total_faces'] / stats[key]['frames_with_faces']
return stats
def find_missed_frames(results_a, results_b, results_c):
"""找出被漏检的帧"""
all_frames = set()
all_frames.update(results_a.keys())
all_frames.update(results_b.keys())
all_frames.update(results_c.keys())
missed = []
for frame_num in sorted(all_frames):
a = results_a.get(frame_num, {}).get('count', 0)
b = results_b.get(frame_num, {}).get('count', 0)
c = results_c.get(frame_num, {}).get('count', 0)
# 某个版本完全漏检检测到0张
if a > 0 and b == 0:
missed.append({
'frame': frame_num,
'missed_by': 'MediaPipe',
'insightface_count': a,
'opencv_count': c
})
if a > 0 and c == 0:
missed.append({
'frame': frame_num,
'missed_by': 'OpenCV',
'insightface_count': a,
'mediapipe_count': b
})
if (a > 0 or c > 0) and b == 0:
missed.append({
'frame': frame_num,
'missed_by': 'MediaPipe',
'others_count': max(a, c)
})
return missed
def main():
benchmark_dir = Path('/Users/accusys/momentry_core_0.1/output/benchmark/face_processor')
# 加载四个版本的结果
print("=" * 80)
print("Face Detection Count Comparison")
print("=" * 80)
print()
results_a = load_results(benchmark_dir / 'scheme_A_fixed.json')
results_b = load_results(benchmark_dir / 'scheme_B_mediapipe_fixed.json')
results_c = load_results(benchmark_dir / 'scheme_C_face_processor_optimized.json')
# results_d = load_results(benchmark_dir / 'scheme_D_face_processor_contract_v1.json')
print("【检测结果统计】")
print()
stats = analyze_detection_distribution(results_a, results_b, results_c)
print(f"| 版本 | 总人脸数 | 检测帧数 | 有人脸帧 | 无人脸帧 | 平均每帧 | 最多人脸 |")
print("|------|---------|---------|---------|---------|---------|---------|")
for name, s in stats.items():
print(f"| {name} | {s['total_faces']} | {s['total_frames']} | {s['frames_with_faces']} | {s['frames_no_faces']} | {s['avg_per_frame']:.2f} | {s['max_faces']} |")
print()
print("【检测数量差异对比】")
print()
comparison = compare_frames(results_a, results_b, results_c)
print(f"共有 {len(comparison)} 帧检测数量不同")
print()
print(f"| 帧号 | 时间(秒) | InsightFace | MediaPipe | OpenCV | 最大差异 |")
print("|------|---------|------------|----------|--------|---------|")
for item in comparison[:30]: # 只显示前30帧
print(f"| {item['frame']} | {item['timestamp']:.2f} | {item['insightface']} | {item['mediapipe']} | {item['opencv']} | {item['diff']} |")
if len(comparison) > 30:
print(f"| ... | ... | ... | ... | ... | ... |")
print(f"| 共 {len(comparison)} 帧有差异 |")
print()
print("【漏检分析】")
print()
missed = find_missed_frames(results_a, results_b, results_c)
mediapipe_missed = [m for m in missed if m.get('missed_by') == 'MediaPipe']
opencv_missed = [m for m in missed if m.get('missed_by') == 'OpenCV']
print(f"MediaPipe漏检帧数: {len(mediapipe_missed)}")
print(f"OpenCV漏检帧数: {len(opencv_missed)}")
print()
if mediapipe_missed:
print("MediaPipe漏检详情前10帧:")
print(f"| 帧号 | InsightFace检测 | OpenCV检测 |")
print("|------|----------------|-----------|")
for m in mediapipe_missed[:10]:
print(f"| {m['frame']} | {m.get('insightface_count', m.get('others_count', '?'))} | {m.get('opencv_count', '?')} |")
print()
print("【检测率分析】")
print()
baseline = stats['insightface']['total_faces']
print(f"以InsightFace为基准{baseline}张人脸):")
print()
print(f"| 版本 | 检测数 | 检测率 | 漏检数 |")
print("|------|--------|--------|--------|")
for name, s in stats.items():
rate = s['total_faces'] / baseline * 100 if baseline > 0 else 0
missed_count = baseline - s['total_faces']
print(f"| {name} | {s['total_faces']} | {rate:.1f}% | {missed_count} |")
print()
print("=" * 80)
print("对比完成")
print("=" * 80)
# 保存详细对比结果
output = {
'stats': stats,
'comparison': comparison,
'missed_frames': missed,
'summary': {
'baseline_faces': baseline,
'mediapipe_detection_rate': stats['mediapipe']['total_faces'] / baseline * 100 if baseline > 0 else 0,
'opencv_detection_rate': stats['opencv']['total_faces'] / baseline * 100 if baseline > 0 else 0,
}
}
output_path = benchmark_dir / 'FACE_COUNT_COMPARISON.json'
with open(output_path, 'w') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"\n详细对比已保存: {output_path}")
if __name__ == '__main__':
main()