Files
momentry_core/scripts/ocr_benchmark_runner.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

281 lines
9.5 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
OCR Processor Benchmark Runner
测试 OCR 文字辨识的性能和质量
测试版本:
A. ocr_processor.py (EasyOCR CPU + Resume)
B. ocr_processor_mps.py (EasyOCR MPS)
C. ocr_processor_contract_v1.py (Contract v1.0)
测试指标:
- 处理时间
- 内存峰值 (MB)
- 检测帧数
- 检测文字数
- 平均置信度
- 空帧率
"""
import os
import sys
import json
import time
import subprocess
from pathlib import Path
from datetime import datetime
SCRIPTS_DIR = Path(__file__).parent
OUTPUT_DIR = SCRIPTS_DIR.parent / "output" / "benchmark" / "ocr_processor"
def get_memory_peak(pid):
"""获取进程内存峰值"""
try:
cmd = ["ps", "-p", str(pid), "-o", "rss="]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return int(result.stdout.strip()) / 1024
except:
pass
return 0
def run_processor(script_name, video_path, output_path, languages=["en"], uuid="", extra_args=None):
"""运行指定 OCR processor"""
script_path = SCRIPTS_DIR / script_name
if not script_path.exists():
print(f"❌ 脚本不存在: {script_path}")
return None
# 方案 B: 语言参数格式不同 (--video, --output)
if script_name == "ocr_processor_mps.py":
cmd = [sys.executable, str(script_path)]
cmd.extend(["--video", video_path])
cmd.extend(["--output", output_path])
cmd.extend(["--languages"] + languages)
cmd.extend(["--sample-interval", "30"])
cmd.extend(["--confidence", "0.5"])
if uuid:
cmd.extend(["--device", "auto"])
# 方案 C: Contract 版本 (positional args)
elif script_name == "ocr_processor_contract_v1.py":
cmd = [sys.executable, str(script_path), video_path, output_path]
if uuid:
cmd.extend(["--uuid", uuid])
cmd.extend(["--confidence", "0.5"])
# 方案 A: 默认不支持多语言参数
else:
cmd = [sys.executable, str(script_path), video_path, output_path]
if uuid:
cmd.extend(["--uuid", uuid])
cmd.extend(["--sample-interval", "30"])
print(f"\n执行: {script_name}")
print(f"命令: {' '.join(cmd)}")
start_time = time.time()
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
peak_memory = 0
while process.poll() is None:
mem = get_memory_peak(process.pid)
if mem > peak_memory:
peak_memory = mem
time.sleep(0.5)
stdout, stderr = process.communicate()
elapsed_time = time.time() - start_time
if process.returncode != 0:
print(f"❌ 处理失败: {stderr[:500]}")
return None
if os.path.exists(output_path):
with open(output_path) as f:
result = json.load(f)
# 解析结果
frames = result.get("frames", {})
if isinstance(frames, dict):
frames_list = list(frames.values())
else:
frames_list = frames
total_frames = len(frames_list)
total_texts = 0
confidences = []
empty_frames = 0
for frame_data in frames_list:
texts = frame_data.get("texts", [])
if not texts:
empty_frames += 1
else:
total_texts += len(texts)
for text in texts:
confidences.append(text.get("confidence", 0))
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
empty_frame_rate = empty_frames / total_frames if total_frames > 0 else 0
avg_texts_per_frame = total_texts / total_frames if total_frames > 0 else 0
file_size_kb = os.path.getsize(output_path) / 1024
return {
"elapsed_time": elapsed_time,
"peak_memory_mb": peak_memory,
"total_frames": total_frames,
"total_texts": total_texts,
"avg_confidence": avg_confidence,
"empty_frame_rate": empty_frame_rate,
"avg_texts_per_frame": avg_texts_per_frame,
"empty_frames": empty_frames,
"file_size_kb": file_size_kb,
"stdout": stdout,
"stderr": stderr,
}
return None
def main():
print("=" * 80)
print("OCR Processor Benchmark 测试")
print("=" * 80)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# 测试视频
video_path = "/Users/accusys/momentry/var/sftpgo/data/demo/Gamma Carry Saves the World..mp4"
if not os.path.exists(video_path):
print(f"❌ 测试视频不存在: {video_path}")
sys.exit(1)
# 获取视频信息
cmd = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
video_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
video_info = json.loads(result.stdout)
video_stream = next((s for s in video_info["streams"] if s["codec_type"] == "video"), None)
print(f"\n测试视频:")
print(f" 文件: {float(video_info['format'].get('size', 0)) / 1024 / 1024:.1f} MB")
print(f" 时长: {float(video_info['format'].get('duration', 0)):.1f}")
print(f" 分辨率: {video_stream.get('width', 0)}x{video_stream.get('height', 0)}")
print(f" FPS: {video_stream.get('r_frame_rate', 'unknown')}")
except:
print("⚠️ 无法获取视频信息")
# 测试语言(用户选择多语言)
languages = ["en", "ch_sim", "ja"]
processors = [
("A", "ocr_processor.py", "EasyOCR CPU + Resume", ["en"]), # 方案A仅支持en
("B", "ocr_processor_mps.py", "EasyOCR MPS", languages),
("C", "ocr_processor_contract_v1.py", "Contract v1.0", languages),
]
results = []
for scheme_id, script_name, description, langs in processors:
print(f"\n{'=' * 80}")
print(f"方案 {scheme_id}: {description}")
print(f"{'=' * 80}")
print(f"语言: {langs}")
output_path = OUTPUT_DIR / f"scheme_{scheme_id}_{script_name.replace('.py', '.json')}"
if os.path.exists(output_path):
os.remove(output_path)
result = run_processor(
script_name,
video_path,
str(output_path),
languages=langs,
uuid=f"ocr_bench_{scheme_id}",
extra_args=["--sample-interval", "30"]
)
if result:
results.append({
"scheme": scheme_id,
"script": script_name,
"description": description,
"languages": langs,
"elapsed_time": result["elapsed_time"],
"peak_memory_mb": result["peak_memory_mb"],
"total_frames": result["total_frames"],
"total_texts": result["total_texts"],
"avg_confidence": result["avg_confidence"],
"empty_frame_rate": result["empty_frame_rate"],
"avg_texts_per_frame": result["avg_texts_per_frame"],
"empty_frames": result["empty_frames"],
"file_size_kb": result["file_size_kb"],
})
print(f"\n✅ 处理完成:")
print(f" 时间: {result['elapsed_time']:.2f}")
print(f" 内存峰值: {result['peak_memory_mb']:.1f} MB")
print(f" 检测帧数: {result['total_frames']}")
print(f" 检测文字数: {result['total_texts']}")
print(f" 平均置信度: {result['avg_confidence']:.2f}")
print(f" 空帧率: {result['empty_frame_rate']*100:.1f}%")
print(f" 每帧平均文字: {result['avg_texts_per_frame']:.1f}")
print(f" 输出大小: {result['file_size_kb']:.1f} KB")
else:
print(f"❌ 方案 {scheme_id} 处理失败")
results.append({
"scheme": scheme_id,
"script": script_name,
"description": description,
"languages": langs,
"error": "processing failed"
})
# 保存报告
report = {
"test_date": datetime.now().isoformat(),
"video_path": video_path,
"languages": languages,
"results": results,
}
report_path = OUTPUT_DIR / "OCR_BENCHMARK_REPORT.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n{'=' * 80}")
print("测试报告已保存:")
print(f" {report_path}")
print(f"{'=' * 80}")
print("\n【对比总结】")
print(f"\n| 方案 | 脚本 | 语言 | 时间(秒) | 内存(MB) | 帧数 | 文字数 | 置信度 | 空帧率 |")
print("|------|------|------|---------|---------|------|--------|--------|--------|")
for r in results:
if "error" not in r:
langs_str = ",".join(r["languages"])
print(f"| {r['scheme']} | {r['script']} | {langs_str} | {r['elapsed_time']:.2f} | {r['peak_memory_mb']:.1f} | {r['total_frames']} | {r['total_texts']} | {r['avg_confidence']:.2f} | {r['empty_frame_rate']*100:.1f}% |")
else:
langs_str = ",".join(r["languages"])
print(f"| {r['scheme']} | {r['script']} | {langs_str} | - | - | - | - | - | - |")
if __name__ == "__main__":
main()