- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
396 lines
13 KiB
Python
396 lines
13 KiB
Python
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
性能基准测试 - 验证合约合规处理器的 <5% 开销要求
|
||
Performance Benchmark - Verify <5% overhead requirement for contract-compliant processors
|
||
"""
|
||
|
||
import sys
|
||
import json
|
||
import os
|
||
import time
|
||
import subprocess
|
||
import statistics
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any
|
||
|
||
# Test configuration
|
||
TEST_VIDEO = "/Users/accusys/test_video/BigBuckBunny_320x180.mp4"
|
||
TEST_OUTPUT_DIR = "/tmp/performance_benchmark"
|
||
NUM_RUNS = 3 # Number of runs per processor
|
||
WARMUP_RUNS = 1 # Warmup runs (discarded)
|
||
|
||
# Processors to test (legacy vs contract)
|
||
PROCESSORS = {
|
||
"asr": {
|
||
"legacy": "scripts/asr_processor.py",
|
||
"contract": "scripts/asr_processor_contract_v2.py",
|
||
"timeout": 300, # 5 minutes
|
||
"args": ["--model-size", "tiny", "--device", "cpu"],
|
||
},
|
||
"ocr": {
|
||
"legacy": "scripts/ocr_processor.py",
|
||
"contract": "scripts/ocr_processor_contract_v1.py",
|
||
"timeout": 600, # 10 minutes
|
||
"args": ["--languages", "en", "--confidence", "0.7"],
|
||
},
|
||
# Note: YOLO, Face, Pose require models and may take too long
|
||
# We'll test the lighter processors first
|
||
}
|
||
|
||
|
||
def prepare_test_environment():
|
||
"""准备测试环境"""
|
||
print("准备测试环境...")
|
||
|
||
# Create output directory
|
||
os.makedirs(TEST_OUTPUT_DIR, exist_ok=True)
|
||
|
||
# Check test video exists
|
||
if not os.path.exists(TEST_VIDEO):
|
||
print(f"错误: 测试视频不存在: {TEST_VIDEO}")
|
||
return False
|
||
|
||
print(f"测试视频: {TEST_VIDEO}")
|
||
print(f"输出目录: {TEST_OUTPUT_DIR}")
|
||
print(f"每个处理器运行次数: {NUM_RUNS} (热身: {WARMUP_RUNS})")
|
||
print()
|
||
|
||
return True
|
||
|
||
|
||
def run_processor(processor_type: str, version: str, run_id: int) -> Dict[str, Any]:
|
||
"""运行处理器并测量性能"""
|
||
|
||
processor_info = PROCESSORS[processor_type]
|
||
script_path = processor_info[version]
|
||
timeout = processor_info["timeout"]
|
||
args = processor_info.get("args", [])
|
||
|
||
# Prepare output file
|
||
output_file = os.path.join(
|
||
TEST_OUTPUT_DIR, f"{processor_type}_{version}_run{run_id}.json"
|
||
)
|
||
|
||
# Build command
|
||
cmd = [
|
||
"python3",
|
||
script_path,
|
||
TEST_VIDEO,
|
||
output_file,
|
||
"--uuid",
|
||
f"benchmark_{processor_type}_{version}_{run_id}",
|
||
"--timeout",
|
||
str(timeout),
|
||
] + args
|
||
|
||
print(f"运行: {processor_type.upper()} ({version}) - 运行 #{run_id}")
|
||
print(f" 命令: {' '.join(cmd[:6])}...")
|
||
|
||
# Run processor
|
||
start_time = time.time()
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout + 60, # Add buffer
|
||
)
|
||
|
||
elapsed = time.time() - start_time
|
||
|
||
# Check if output file was created
|
||
output_exists = os.path.exists(output_file)
|
||
output_size = os.path.getsize(output_file) if output_exists else 0
|
||
|
||
# Try to read output JSON
|
||
output_data = None
|
||
if output_exists and output_size > 0:
|
||
try:
|
||
with open(output_file, "r") as f:
|
||
output_data = json.load(f)
|
||
except:
|
||
output_data = {"error": "Failed to parse output"}
|
||
|
||
return {
|
||
"success": result.returncode == 0,
|
||
"elapsed_time": elapsed,
|
||
"returncode": result.returncode,
|
||
"stdout": result.stdout[-500:] if result.stdout else "", # Last 500 chars
|
||
"stderr": result.stderr[-500:] if result.stderr else "", # Last 500 chars
|
||
"output_exists": output_exists,
|
||
"output_size": output_size,
|
||
"output_data": output_data,
|
||
}
|
||
|
||
except subprocess.TimeoutExpired:
|
||
elapsed = time.time() - start_time
|
||
return {
|
||
"success": False,
|
||
"elapsed_time": elapsed,
|
||
"returncode": -1,
|
||
"stdout": "",
|
||
"stderr": f"超时 ({timeout} 秒)",
|
||
"output_exists": False,
|
||
"output_size": 0,
|
||
"output_data": None,
|
||
}
|
||
except Exception as e:
|
||
elapsed = time.time() - start_time
|
||
return {
|
||
"success": False,
|
||
"elapsed_time": elapsed,
|
||
"returncode": -1,
|
||
"stdout": "",
|
||
"stderr": str(e),
|
||
"output_exists": False,
|
||
"output_size": 0,
|
||
"output_data": None,
|
||
}
|
||
|
||
|
||
def run_benchmark():
|
||
"""运行完整的基准测试"""
|
||
|
||
print("=" * 80)
|
||
print("性能基准测试 - 合约合规处理器")
|
||
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
if not prepare_test_environment():
|
||
return
|
||
|
||
results = {}
|
||
|
||
# Test each processor
|
||
for processor_type in PROCESSORS:
|
||
print(f"\n测试 {processor_type.upper()} 处理器...")
|
||
print("-" * 40)
|
||
|
||
processor_results = {
|
||
"legacy": {"runs": [], "summary": {}},
|
||
"contract": {"runs": [], "summary": {}},
|
||
}
|
||
|
||
# Test both versions
|
||
for version in ["legacy", "contract"]:
|
||
print(f"\n版本: {version}")
|
||
|
||
# Warmup runs (discarded)
|
||
if WARMUP_RUNS > 0:
|
||
print(f" 热身运行 ({WARMUP_RUNS} 次)...")
|
||
for warmup in range(WARMUP_RUNS):
|
||
run_result = run_processor(processor_type, version, warmup)
|
||
if not run_result["success"]:
|
||
print(f" 热身失败: {run_result.get('stderr', '未知错误')}")
|
||
|
||
# Actual test runs
|
||
run_times = []
|
||
successes = 0
|
||
|
||
for run in range(NUM_RUNS):
|
||
run_result = run_processor(processor_type, version, run)
|
||
processor_results[version]["runs"].append(run_result)
|
||
|
||
if run_result["success"]:
|
||
successes += 1
|
||
run_times.append(run_result["elapsed_time"])
|
||
print(
|
||
f" 运行 #{run}: {run_result['elapsed_time']:.1f} 秒 - ✅ 成功"
|
||
)
|
||
else:
|
||
print(
|
||
f" 运行 #{run}: {run_result['elapsed_time']:.1f} 秒 - ❌ 失败"
|
||
)
|
||
if run_result.get("stderr"):
|
||
print(f" 错误: {run_result['stderr'][:100]}...")
|
||
|
||
# Calculate statistics
|
||
if run_times:
|
||
processor_results[version]["summary"] = {
|
||
"success_rate": successes / NUM_RUNS,
|
||
"runs_completed": successes,
|
||
"total_runs": NUM_RUNS,
|
||
"min_time": min(run_times),
|
||
"max_time": max(run_times),
|
||
"avg_time": statistics.mean(run_times),
|
||
"median_time": statistics.median(run_times),
|
||
"std_dev": statistics.stdev(run_times) if len(run_times) > 1 else 0,
|
||
}
|
||
else:
|
||
processor_results[version]["summary"] = {
|
||
"success_rate": 0,
|
||
"runs_completed": 0,
|
||
"total_runs": NUM_RUNS,
|
||
"min_time": 0,
|
||
"max_time": 0,
|
||
"avg_time": 0,
|
||
"median_time": 0,
|
||
"std_dev": 0,
|
||
}
|
||
|
||
summary = processor_results[version]["summary"]
|
||
print(f" 总结: {summary['runs_completed']}/{summary['total_runs']} 成功")
|
||
if summary["runs_completed"] > 0:
|
||
print(f" 平均时间: {summary['avg_time']:.1f} 秒")
|
||
print(
|
||
f" 时间范围: {summary['min_time']:.1f} - {summary['max_time']:.1f} 秒"
|
||
)
|
||
|
||
results[processor_type] = processor_results
|
||
|
||
# Calculate overhead
|
||
legacy_avg = processor_results["legacy"]["summary"]["avg_time"]
|
||
contract_avg = processor_results["contract"]["summary"]["avg_time"]
|
||
|
||
if legacy_avg > 0 and contract_avg > 0:
|
||
overhead = ((contract_avg - legacy_avg) / legacy_avg) * 100
|
||
print(f"\n开销分析:")
|
||
print(f" 传统版本: {legacy_avg:.1f} 秒")
|
||
print(f" 合约版本: {contract_avg:.1f} 秒")
|
||
print(f" 开销: {overhead:.1f}%")
|
||
|
||
if overhead <= 5:
|
||
print(f" ✅ 通过: 开销 ≤ 5%")
|
||
else:
|
||
print(f" ❌ 失败: 开销 > 5%")
|
||
else:
|
||
print(f"\n⚠️ 无法计算开销: 缺少有效数据")
|
||
|
||
# Generate final report
|
||
print("\n" + "=" * 80)
|
||
print("基准测试完成报告")
|
||
print("=" * 80)
|
||
|
||
all_passed = True
|
||
overhead_results = {}
|
||
|
||
for processor_type, processor_results in results.items():
|
||
legacy_avg = processor_results["legacy"]["summary"]["avg_time"]
|
||
contract_avg = processor_results["contract"]["summary"]["avg_time"]
|
||
|
||
if legacy_avg > 0 and contract_avg > 0:
|
||
overhead = ((contract_avg - legacy_avg) / legacy_avg) * 100
|
||
passed = overhead <= 5
|
||
|
||
overhead_results[processor_type] = {
|
||
"legacy_avg": legacy_avg,
|
||
"contract_avg": contract_avg,
|
||
"overhead_percent": overhead,
|
||
"passed": passed,
|
||
}
|
||
|
||
status = "✅ 通过" if passed else "❌ 失败"
|
||
print(f"{processor_type.upper()}: {status} (开销: {overhead:.1f}%)")
|
||
|
||
if not passed:
|
||
all_passed = False
|
||
else:
|
||
print(f"{processor_type.upper()}: ⚠️ 数据不足")
|
||
all_passed = False
|
||
|
||
# Overall result
|
||
print("\n" + "=" * 80)
|
||
if all_passed:
|
||
print("🎉 所有处理器通过 <5% 开销要求!")
|
||
else:
|
||
print("⚠️ 部分处理器未通过开销要求")
|
||
|
||
# Save detailed results
|
||
report_file = os.path.join(
|
||
TEST_OUTPUT_DIR, f"benchmark_report_{int(time.time())}.json"
|
||
)
|
||
with open(report_file, "w") as f:
|
||
json.dump(
|
||
{
|
||
"timestamp": datetime.now().isoformat(),
|
||
"test_config": {
|
||
"test_video": TEST_VIDEO,
|
||
"num_runs": NUM_RUNS,
|
||
"warmup_runs": WARMUP_RUNS,
|
||
"processors_tested": list(PROCESSORS.keys()),
|
||
},
|
||
"results": results,
|
||
"overhead_analysis": overhead_results,
|
||
"overall_passed": all_passed,
|
||
},
|
||
f,
|
||
indent=2,
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
print(f"\n详细报告保存到: {report_file}")
|
||
print("=" * 80)
|
||
|
||
return all_passed
|
||
|
||
|
||
def quick_smoke_test():
|
||
"""快速冒烟测试 - 检查处理器是否能正常运行"""
|
||
|
||
print("快速冒烟测试...")
|
||
print("-" * 40)
|
||
|
||
test_processors = ["asr", "ocr"] # Test lighter processors first
|
||
|
||
for processor_type in test_processors:
|
||
print(f"\n测试 {processor_type.upper()}...")
|
||
|
||
# Test contract version only (legacy might not have health check)
|
||
processor_info = PROCESSORS[processor_type]
|
||
script_path = processor_info["contract"]
|
||
|
||
# Run health check (requires dummy arguments)
|
||
cmd = ["python3", script_path, "--check-health", "dummy.mp4", "dummy.json"]
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=30,
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
print(f" ✅ 健康检查通过")
|
||
|
||
# Try to parse health check output
|
||
try:
|
||
health_data = json.loads(result.stdout)
|
||
checks = health_data.get("checks", [])
|
||
passed = all(
|
||
c["status"] in ["available", "optional"] for c in checks
|
||
)
|
||
|
||
if passed:
|
||
print(f" ✅ 所有依赖可用")
|
||
else:
|
||
print(f" ⚠️ 部分依赖缺失")
|
||
for check in checks:
|
||
if check["status"] not in ["available", "optional"]:
|
||
print(f" 缺失: {check['name']}")
|
||
except:
|
||
print(f" ℹ️ 健康检查输出: {result.stdout[:100]}...")
|
||
|
||
else:
|
||
print(f" ❌ 健康检查失败")
|
||
print(
|
||
f" 错误: {result.stderr[:100] if result.stderr else '未知错误'}"
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f" ❌ 测试失败: {e}")
|
||
|
||
print("\n冒烟测试完成")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Check if we should run quick smoke test or full benchmark
|
||
if len(sys.argv) > 1 and sys.argv[1] == "--smoke":
|
||
quick_smoke_test()
|
||
else:
|
||
success = run_benchmark()
|
||
sys.exit(0 if success else 1)
|