Files
momentry_core/quick_performance_test.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

304 lines
9.5 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
快速性能测试 - 验证合约合规处理器的 <5% 开销要求
Quick Performance Test - Verify <5% overhead requirement for contract-compliant processors
"""
import sys
import json
import os
import time
import subprocess
import statistics
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
# Test configuration - use small video for quick test
TEST_VIDEO = "/Users/accusys/test_video/BigBuckBunny_320x180.mp4"
TEST_OUTPUT_DIR = "/tmp/quick_performance_test"
NUM_RUNS = 2 # Reduced for quick test
WARMUP_RUNS = 0 # No warmup for quick test
# Only test ASR for quick benchmark
PROCESSORS = {
"asr": {
"legacy": "scripts/asr_processor.py",
"contract": "scripts/asr_processor_contract_v2.py",
"timeout": 120, # 2 minutes max
"args": ["--model-size", "tiny", "--device", "cpu"],
},
}
def prepare_test_environment():
"""准备测试环境"""
print("准备测试环境...")
# Create output directory
os.makedirs(TEST_OUTPUT_DIR, exist_ok=True)
# Check test video exists
if not os.path.exists(TEST_VIDEO):
print(f"错误: 测试视频不存在: {TEST_VIDEO}")
return False
print(f"测试视频: {TEST_VIDEO}")
print(f"输出目录: {TEST_OUTPUT_DIR}")
print(f"每个处理器运行次数: {NUM_RUNS} (热身: {WARMUP_RUNS})")
print()
return True
def run_processor(processor_type: str, version: str, run_id: int) -> Dict[str, Any]:
"""运行处理器并测量性能"""
processor_info = PROCESSORS[processor_type]
script_path = processor_info[version]
timeout = processor_info["timeout"]
args = processor_info.get("args", [])
# Prepare output file
output_file = os.path.join(
TEST_OUTPUT_DIR, f"{processor_type}_{version}_run{run_id}.json"
)
# Build command
cmd = [
"python3",
script_path,
TEST_VIDEO,
output_file,
"--uuid",
f"benchmark_{processor_type}_{version}_{run_id}",
"--timeout",
str(timeout),
] + args
print(f"运行: {processor_type.upper()} ({version}) - 运行 #{run_id}")
# Run processor
start_time = time.time()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout + 30, # Add buffer
)
elapsed = time.time() - start_time
# Check if output file was created
output_exists = os.path.exists(output_file)
output_size = os.path.getsize(output_file) if output_exists else 0
return {
"success": result.returncode == 0,
"elapsed_time": elapsed,
"returncode": result.returncode,
"stdout": result.stdout[-200:] if result.stdout else "", # Last 200 chars
"stderr": result.stderr[-200:] if result.stderr else "", # Last 200 chars
"output_exists": output_exists,
"output_size": output_size,
}
except subprocess.TimeoutExpired:
elapsed = time.time() - start_time
return {
"success": False,
"elapsed_time": elapsed,
"returncode": -1,
"stdout": "",
"stderr": f"超时 ({timeout} 秒)",
"output_exists": False,
"output_size": 0,
}
except Exception as e:
elapsed = time.time() - start_time
return {
"success": False,
"elapsed_time": elapsed,
"returncode": -1,
"stdout": "",
"stderr": str(e),
"output_exists": False,
"output_size": 0,
}
def run_quick_test():
"""运行快速测试"""
print("=" * 80)
print("快速性能测试 - 合约合规处理器")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
print()
if not prepare_test_environment():
return
results = {}
# Test each processor
for processor_type in PROCESSORS:
print(f"\n测试 {processor_type.upper()} 处理器...")
print("-" * 40)
processor_results = {
"legacy": {"runs": [], "summary": {}},
"contract": {"runs": [], "summary": {}},
}
# Test both versions
for version in ["legacy", "contract"]:
print(f"\n版本: {version}")
# Actual test runs
run_times = []
successes = 0
for run in range(NUM_RUNS):
run_result = run_processor(processor_type, version, run)
processor_results[version]["runs"].append(run_result)
if run_result["success"]:
successes += 1
run_times.append(run_result["elapsed_time"])
print(
f" 运行 #{run}: {run_result['elapsed_time']:.1f} 秒 - ✅ 成功"
)
else:
print(
f" 运行 #{run}: {run_result['elapsed_time']:.1f} 秒 - ❌ 失败"
)
if run_result.get("stderr"):
print(f" 错误: {run_result['stderr'][:100]}...")
# Calculate statistics
if run_times:
processor_results[version]["summary"] = {
"success_rate": successes / NUM_RUNS,
"runs_completed": successes,
"total_runs": NUM_RUNS,
"min_time": min(run_times),
"max_time": max(run_times),
"avg_time": statistics.mean(run_times),
"median_time": statistics.median(run_times),
"std_dev": statistics.stdev(run_times) if len(run_times) > 1 else 0,
}
else:
processor_results[version]["summary"] = {
"success_rate": 0,
"runs_completed": 0,
"total_runs": NUM_RUNS,
"min_time": 0,
"max_time": 0,
"avg_time": 0,
"median_time": 0,
"std_dev": 0,
}
summary = processor_results[version]["summary"]
print(f" 总结: {summary['runs_completed']}/{summary['total_runs']} 成功")
if summary["runs_completed"] > 0:
print(f" 平均时间: {summary['avg_time']:.1f}")
print(
f" 时间范围: {summary['min_time']:.1f} - {summary['max_time']:.1f}"
)
results[processor_type] = processor_results
# Calculate overhead
legacy_avg = processor_results["legacy"]["summary"]["avg_time"]
contract_avg = processor_results["contract"]["summary"]["avg_time"]
if legacy_avg > 0 and contract_avg > 0:
overhead = ((contract_avg - legacy_avg) / legacy_avg) * 100
print(f"\n开销分析:")
print(f" 传统版本: {legacy_avg:.1f}")
print(f" 合约版本: {contract_avg:.1f}")
print(f" 开销: {overhead:.1f}%")
if overhead <= 5:
print(f" ✅ 通过: 开销 ≤ 5%")
else:
print(f" ❌ 失败: 开销 > 5%")
else:
print(f"\n⚠️ 无法计算开销: 缺少有效数据")
# Generate final report
print("\n" + "=" * 80)
print("快速测试完成报告")
print("=" * 80)
all_passed = True
overhead_results = {}
for processor_type, processor_results in results.items():
legacy_avg = processor_results["legacy"]["summary"]["avg_time"]
contract_avg = processor_results["contract"]["summary"]["avg_time"]
if legacy_avg > 0 and contract_avg > 0:
overhead = ((contract_avg - legacy_avg) / legacy_avg) * 100
passed = overhead <= 5
overhead_results[processor_type] = {
"legacy_avg": legacy_avg,
"contract_avg": contract_avg,
"overhead_percent": overhead,
"passed": passed,
}
status = "✅ 通过" if passed else "❌ 失败"
print(f"{processor_type.upper()}: {status} (开销: {overhead:.1f}%)")
if not passed:
all_passed = False
else:
print(f"{processor_type.upper()}: ⚠️ 数据不足")
all_passed = False
# Overall result
print("\n" + "=" * 80)
if all_passed:
print("🎉 处理器通过 <5% 开销要求!")
else:
print("⚠️ 处理器未通过开销要求")
# Save detailed results
report_file = os.path.join(
TEST_OUTPUT_DIR, f"quick_test_report_{int(time.time())}.json"
)
with open(report_file, "w") as f:
json.dump(
{
"timestamp": datetime.now().isoformat(),
"test_config": {
"test_video": TEST_VIDEO,
"num_runs": NUM_RUNS,
"warmup_runs": WARMUP_RUNS,
"processors_tested": list(PROCESSORS.keys()),
},
"results": results,
"overhead_analysis": overhead_results,
"overall_passed": all_passed,
},
f,
indent=2,
ensure_ascii=False,
)
print(f"\n详细报告保存到: {report_file}")
print("=" * 80)
return all_passed
if __name__ == "__main__":
success = run_quick_test()
sys.exit(0 if success else 1)