Files
momentry_core/scripts/yolo_benchmark_runner.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

273 lines
8.9 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
YOLO Processor Benchmark Runner
測試 YOLO CPU vs MPS 性能對比
測試版本:
A. yolo_processor.py (CPU)
B. yolo_processor_mps.py (MPS Metal GPU)
C. yolo_processor_contract_v1.py (Contract)
測試指標:
- 處理時間
- 內存峰值 (MB)
- 檢測物件數
- 檢測類別數
- 輸出大小 (KB)
"""
import os
import sys
import json
import time
import subprocess
import shutil
from pathlib import Path
from datetime import datetime
SCRIPTS_DIR = Path(__file__).parent
OUTPUT_DIR = SCRIPTS_DIR.parent / "output" / "benchmark" / "yolo_processor"
def get_video_info(video_path):
"""獲取視頻基本信息"""
cmd = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
video_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
video_stream = next((s for s in data["streams"] if s["codec_type"] == "video"), None)
return {
"duration": float(data["format"].get("duration", 0)),
"size_mb": int(data["format"].get("size", 0)) / 1024 / 1024,
"width": video_stream.get("width", 0) if video_stream else 0,
"height": video_stream.get("height", 0) if video_stream else 0,
"fps": video_stream.get("r_frame_rate", "0/1") if video_stream else "0/1",
"total_frames": int(video_stream.get("nb_frames", 0)) if video_stream else 0
}
except Exception as e:
print(f"獲取視頻信息失敗: {e}")
return {}
def get_memory_peak(pid):
"""獲取進程內存峰值"""
try:
cmd = ["ps", "-p", str(pid), "-o", "rss="]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return int(result.stdout.strip()) / 1024
except:
pass
return 0
def run_processor(script_name, video_path, output_path, uuid="", sample_interval=30):
"""運行指定 YOLO processor"""
script_path = SCRIPTS_DIR / script_name
if not script_path.exists():
print(f"❌ 腳本不存在: {script_path}")
return None
# 不同处理器使用不同参数格式
if script_name == "yolo_processor_mps.py":
cmd = [sys.executable, str(script_path), "--video", video_path, "--output", output_path]
if uuid:
cmd.extend(["--uuid", uuid])
cmd.extend(["--device", "mps"]) # 强制使用 MPS
elif script_name == "yolo_processor_contract_v1.py":
cmd = [sys.executable, str(script_path), video_path, output_path, "--uuid", uuid if uuid else "bench"]
else:
cmd = [sys.executable, str(script_path), video_path, output_path]
if uuid:
cmd.extend(["--uuid", uuid])
print(f"\n執行: {script_name}")
print(f"命令: {' '.join(cmd)}")
start_time = time.time()
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
peak_memory = 0
while process.poll() is None:
mem = get_memory_peak(process.pid)
if mem > peak_memory:
peak_memory = mem
time.sleep(0.5)
stdout, stderr = process.communicate()
elapsed_time = time.time() - start_time
if process.returncode != 0:
print(f"❌ 处理失败: {stderr}")
return None
if os.path.exists(output_path):
with open(output_path) as f:
result = json.load(f)
# 分析输出
frames_data = result.get("frames", [])
if isinstance(frames_data, dict):
frames_data = list(frames_data.values())
total_objects = 0
classes_detected = set()
for frame in frames_data:
if isinstance(frame, dict):
detections = frame.get("detections", [])
for det in detections:
total_objects += 1
class_name = det.get("class_name", det.get("class", "unknown"))
classes_detected.add(class_name)
file_size_kb = os.path.getsize(output_path) / 1024
return {
"elapsed_time": elapsed_time,
"peak_memory_mb": peak_memory,
"total_frames": len(frames_data),
"total_objects": total_objects,
"classes_detected": list(classes_detected),
"class_count": len(classes_detected),
"file_size_kb": file_size_kb,
"stdout": stdout,
"stderr": stderr
}
return None
def main():
print("=" * 80)
print("YOLO Processor Benchmark 測試")
print("=" * 80)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
video_uuid = "ac625815183a21e1"
video_path = "/Users/accusys/momentry/var/sftpgo/data/demo/Gamma Carry Saves the World..mp4"
if not os.path.exists(video_path):
print(f"❌ 測試視頻不存在: {video_path}")
sys.exit(1)
video_info = get_video_info(video_path)
print(f"\n測試視頻:")
print(f" UUID: {video_uuid}")
print(f" 文件: {video_info.get('size_mb', 0):.1f} MB")
print(f" 時長: {video_info.get('duration', 0):.1f}")
print(f" 分辨率: {video_info.get('width', 0)}x{video_info.get('height', 0)}")
print(f" FPS: {video_info.get('fps', 'unknown')}")
print(f" 总帧数: {video_info.get('total_frames', 0)}")
processors = [
("A", "yolo_processor.py", "YOLOv8n CPU"),
("B", "yolo_processor_mps.py", "YOLOv8n MPS (Metal)"),
("C", "yolo_processor_contract_v1.py", "Contract v1"),
]
results = []
for scheme_id, script_name, description in processors:
print(f"\n{'=' * 80}")
print(f"方案 {scheme_id}: {description}")
print(f"{'=' * 80}")
output_path = OUTPUT_DIR / f"scheme_{scheme_id}_{script_name.replace('.py', '.json')}"
if os.path.exists(output_path):
os.remove(output_path)
result = run_processor(
script_name,
video_path,
str(output_path),
uuid=f"yolo_bench_{scheme_id}",
sample_interval=30
)
if result:
duration = video_info.get("duration", 0)
speed = duration / result["elapsed_time"] if result["elapsed_time"] > 0 else 0
results.append({
"scheme": scheme_id,
"script": script_name,
"description": description,
"elapsed_time": result["elapsed_time"],
"peak_memory_mb": result["peak_memory_mb"],
"total_frames": result["total_frames"],
"total_objects": result["total_objects"],
"class_count": result["class_count"],
"classes_detected": result["classes_detected"],
"file_size_kb": result["file_size_kb"],
"speed_ratio": speed
})
print(f"\n✅ 处理完成:")
print(f" 时间: {result['elapsed_time']:.2f}")
print(f" 速度: {speed:.2f}x 实时倍速")
print(f" 内存峰值: {result['peak_memory_mb']:.1f} MB")
print(f" 处理帧数: {result['total_frames']}")
print(f" 检测物件: {result['total_objects']}")
print(f" 检测类别: {result['class_count']}")
print(f" 输出大小: {result['file_size_kb']:.1f} KB")
if result['classes_detected']:
print(f" 类别列表: {', '.join(result['classes_detected'][:10])}")
else:
print(f"❌ 方案 {scheme_id} 处理失败")
results.append({
"scheme": scheme_id,
"script": script_name,
"description": description,
"error": "processing failed"
})
report = {
"test_date": datetime.now().isoformat(),
"video_info": video_info,
"video_uuid": video_uuid,
"results": results
}
report_path = OUTPUT_DIR / "YOLO_BENCHMARK_REPORT.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n{'=' * 80}")
print("测试报告已保存:")
print(f" {report_path}")
print(f"{'=' * 80}")
print("\n【对比总结】")
print(f"\n| 方案 | 脚本 | 时间(秒) | 速度 | 内存(MB) | 物件数 | 类别数 |")
print("|------|------|---------|------|---------|--------|--------|")
for r in results:
if "error" not in r:
print(f"| {r['scheme']} | {r['script']} | {r['elapsed_time']:.2f} | {r['speed_ratio']:.2f}x | {r['peak_memory_mb']:.1f} | {r['total_objects']} | {r['class_count']} |")
else:
print(f"| {r['scheme']} | {r['script']} | - | - | - | - | ❌ |")
if __name__ == "__main__":
main()