#!/opt/homebrew/bin/python3.11 """ OCR Processor Benchmark Runner 测试 OCR 文字辨识的性能和质量 测试版本: A. ocr_processor.py (EasyOCR CPU + Resume) B. ocr_processor_mps.py (EasyOCR MPS) C. ocr_processor_contract_v1.py (Contract v1.0) 测试指标: - 处理时间 - 内存峰值 (MB) - 检测帧数 - 检测文字数 - 平均置信度 - 空帧率 """ import os import sys import json import time import subprocess from pathlib import Path from datetime import datetime SCRIPTS_DIR = Path(__file__).parent OUTPUT_DIR = SCRIPTS_DIR.parent / "output" / "benchmark" / "ocr_processor" def get_memory_peak(pid): """获取进程内存峰值""" try: cmd = ["ps", "-p", str(pid), "-o", "rss="] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: return int(result.stdout.strip()) / 1024 except: pass return 0 def run_processor(script_name, video_path, output_path, languages=["en"], uuid="", extra_args=None): """运行指定 OCR processor""" script_path = SCRIPTS_DIR / script_name if not script_path.exists(): print(f"❌ 脚本不存在: {script_path}") return None # 方案 B: 语言参数格式不同 (--video, --output) if script_name == "ocr_processor_mps.py": cmd = [sys.executable, str(script_path)] cmd.extend(["--video", video_path]) cmd.extend(["--output", output_path]) cmd.extend(["--languages"] + languages) cmd.extend(["--sample-interval", "30"]) cmd.extend(["--confidence", "0.5"]) if uuid: cmd.extend(["--device", "auto"]) # 方案 C: Contract 版本 (positional args) elif script_name == "ocr_processor_contract_v1.py": cmd = [sys.executable, str(script_path), video_path, output_path] if uuid: cmd.extend(["--uuid", uuid]) cmd.extend(["--confidence", "0.5"]) # 方案 A: 默认不支持多语言参数 else: cmd = [sys.executable, str(script_path), video_path, output_path] if uuid: cmd.extend(["--uuid", uuid]) cmd.extend(["--sample-interval", "30"]) print(f"\n执行: {script_name}") print(f"命令: {' '.join(cmd)}") start_time = time.time() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) peak_memory = 0 while process.poll() is None: mem = get_memory_peak(process.pid) if mem > peak_memory: peak_memory = mem time.sleep(0.5) stdout, stderr = process.communicate() elapsed_time = time.time() - start_time if process.returncode != 0: print(f"❌ 处理失败: {stderr[:500]}") return None if os.path.exists(output_path): with open(output_path) as f: result = json.load(f) # 解析结果 frames = result.get("frames", {}) if isinstance(frames, dict): frames_list = list(frames.values()) else: frames_list = frames total_frames = len(frames_list) total_texts = 0 confidences = [] empty_frames = 0 for frame_data in frames_list: texts = frame_data.get("texts", []) if not texts: empty_frames += 1 else: total_texts += len(texts) for text in texts: confidences.append(text.get("confidence", 0)) avg_confidence = sum(confidences) / len(confidences) if confidences else 0 empty_frame_rate = empty_frames / total_frames if total_frames > 0 else 0 avg_texts_per_frame = total_texts / total_frames if total_frames > 0 else 0 file_size_kb = os.path.getsize(output_path) / 1024 return { "elapsed_time": elapsed_time, "peak_memory_mb": peak_memory, "total_frames": total_frames, "total_texts": total_texts, "avg_confidence": avg_confidence, "empty_frame_rate": empty_frame_rate, "avg_texts_per_frame": avg_texts_per_frame, "empty_frames": empty_frames, "file_size_kb": file_size_kb, "stdout": stdout, "stderr": stderr, } return None def main(): print("=" * 80) print("OCR Processor Benchmark 测试") print("=" * 80) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # 测试视频 video_path = "/Users/accusys/momentry/var/sftpgo/data/demo/Gamma Carry Saves the World..mp4" if not os.path.exists(video_path): print(f"❌ 测试视频不存在: {video_path}") sys.exit(1) # 获取视频信息 cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) video_info = json.loads(result.stdout) video_stream = next((s for s in video_info["streams"] if s["codec_type"] == "video"), None) print("\n测试视频:") print(f" 文件: {float(video_info['format'].get('size', 0)) / 1024 / 1024:.1f} MB") print(f" 时长: {float(video_info['format'].get('duration', 0)):.1f} 秒") print(f" 分辨率: {video_stream.get('width', 0)}x{video_stream.get('height', 0)}") print(f" FPS: {video_stream.get('r_frame_rate', 'unknown')}") except: print("⚠️ 无法获取视频信息") # 测试语言(用户选择多语言) languages = ["en", "ch_sim", "ja"] processors = [ ("A", "ocr_processor.py", "EasyOCR CPU + Resume", ["en"]), # 方案A仅支持en ("B", "ocr_processor_mps.py", "EasyOCR MPS", languages), ("C", "ocr_processor_contract_v1.py", "Contract v1.0", languages), ] results = [] for scheme_id, script_name, description, langs in processors: print(f"\n{'=' * 80}") print(f"方案 {scheme_id}: {description}") print(f"{'=' * 80}") print(f"语言: {langs}") output_path = OUTPUT_DIR / f"scheme_{scheme_id}_{script_name.replace('.py', '.json')}" if os.path.exists(output_path): os.remove(output_path) result = run_processor( script_name, video_path, str(output_path), languages=langs, uuid=f"ocr_bench_{scheme_id}", extra_args=["--sample-interval", "30"] ) if result: results.append({ "scheme": scheme_id, "script": script_name, "description": description, "languages": langs, "elapsed_time": result["elapsed_time"], "peak_memory_mb": result["peak_memory_mb"], "total_frames": result["total_frames"], "total_texts": result["total_texts"], "avg_confidence": result["avg_confidence"], "empty_frame_rate": result["empty_frame_rate"], "avg_texts_per_frame": result["avg_texts_per_frame"], "empty_frames": result["empty_frames"], "file_size_kb": result["file_size_kb"], }) print("\n✅ 处理完成:") print(f" 时间: {result['elapsed_time']:.2f}秒") print(f" 内存峰值: {result['peak_memory_mb']:.1f} MB") print(f" 检测帧数: {result['total_frames']}") print(f" 检测文字数: {result['total_texts']}") print(f" 平均置信度: {result['avg_confidence']:.2f}") print(f" 空帧率: {result['empty_frame_rate']*100:.1f}%") print(f" 每帧平均文字: {result['avg_texts_per_frame']:.1f}") print(f" 输出大小: {result['file_size_kb']:.1f} KB") else: print(f"❌ 方案 {scheme_id} 处理失败") results.append({ "scheme": scheme_id, "script": script_name, "description": description, "languages": langs, "error": "processing failed" }) # 保存报告 report = { "test_date": datetime.now().isoformat(), "video_path": video_path, "languages": languages, "results": results, } report_path = OUTPUT_DIR / "OCR_BENCHMARK_REPORT.json" with open(report_path, "w") as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"\n{'=' * 80}") print("测试报告已保存:") print(f" {report_path}") print(f"{'=' * 80}") print("\n【对比总结】") print("\n| 方案 | 脚本 | 语言 | 时间(秒) | 内存(MB) | 帧数 | 文字数 | 置信度 | 空帧率 |") print("|------|------|------|---------|---------|------|--------|--------|--------|") for r in results: if "error" not in r: langs_str = ",".join(r["languages"]) print(f"| {r['scheme']} | {r['script']} | {langs_str} | {r['elapsed_time']:.2f} | {r['peak_memory_mb']:.1f} | {r['total_frames']} | {r['total_texts']} | {r['avg_confidence']:.2f} | {r['empty_frame_rate']*100:.1f}% |") else: langs_str = ",".join(r["languages"]) print(f"| {r['scheme']} | {r['script']} | {langs_str} | - | - | - | - | - | - |") if __name__ == "__main__": main()