Files
momentry_core/scripts/asr_benchmark_runner.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

697 lines
28 KiB
Python
Executable File

#!/opt/homebrew/bin/python3.11
"""
ASR Benchmark Runner - Automated Testing Script for ASR Processor Comparison
Version: 1.0.0
Purpose: Compare faster-whisper vs OpenAI whisper on CPU/MPS devices
Features:
1. Real-time timestamp recording (ISO 8601, microsecond precision)
2. Video-time frame calculation (start_frame, end_frame)
3. Independent file output for each test scheme
4. Memory monitoring with psutil
5. Log recording for each test
"""
import sys
import json
import os
import time
import subprocess
import argparse
import signal
import platform
import psutil
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List, Tuple
from pathlib import Path
import traceback
SCRIPTS_DIR = Path(__file__).parent
OUTPUT_DIR = SCRIPTS_DIR.parent / "output" / "benchmark"
CONTRACT_VERSION = "1.0"
RUNNER_VERSION = "1.0.0"
SCHEMES = {
'A': {
'name': 'faster-whisper small CPU',
'script': 'asr_processor.py',
'engine': 'faster-whisper',
'model': 'small',
'device': 'cpu',
'args': [],
'env': {}
},
'B': {
'name': 'OpenAI whisper small CPU',
'script': 'asr_processor_contract_v2.py',
'engine': 'whisper',
'model': 'small',
'device': 'cpu',
'args': ['--model-size', 'small', '--device', 'cpu'],
'env': {}
},
'C': {
'name': 'OpenAI whisper small MPS',
'script': 'asr_processor_contract_v2.py',
'engine': 'whisper',
'model': 'small',
'device': 'mps',
'args': ['--model-size', 'small', '--device', 'mps'],
'env': {'MOMENTRY_ASR_DEVICE': 'mps'}
},
'D': {
'name': 'OpenAI whisper medium CPU',
'script': 'asr_processor_contract_v2.py',
'engine': 'whisper',
'model': 'medium',
'device': 'cpu',
'args': ['--model-size', 'medium', '--device', 'cpu'],
'env': {}
},
'E': {
'name': 'OpenAI whisper medium MPS',
'script': 'asr_processor_contract_v2.py',
'engine': 'whisper',
'model': 'medium',
'device': 'mps',
'args': ['--model-size', 'medium', '--device', 'mps'],
'env': {'MOMENTRY_ASR_DEVICE': 'mps'}
}
}
VIDEOS = {
'charade': {
'name': 'Charade 1963',
'path': '/Users/accusys/momentry/var/sftpgo/data/demo/Old_Time_Movie_Show_-_Charade_1963.HD.mov',
'output_dir': 'charade_1963',
'features': ['multilingual', 'movie_dialogue', '114_minutes']
},
'exasan': {
'name': 'ExaSAN PCIe',
'path': '/Users/accusys/momentry/var/sftpgo/data/demo/ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4',
'output_dir': 'exasan_pcie',
'features': ['technical_terms', 'professional_accent', '2_minutes']
}
}
class SignalHandler:
def __init__(self):
self.shutdown_requested = False
def setup(self):
signal.signal(signal.SIGTERM, self.handle_signal)
signal.signal(signal.SIGINT, self.handle_signal)
def handle_signal(self, signum, frame):
signal_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
print(f"[RUNNER] Received {signal_name}, stopping...")
self.shutdown_requested = True
def get_iso_timestamp() -> str:
return datetime.now(timezone.utc).astimezone().isoformat()
def get_video_metadata(video_path: str) -> Dict[str, Any]:
cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration,format_name',
'-show_entries', 'stream=codec_type,codec_name,r_frame_rate,avg_frame_rate,nb_frames',
'-of', 'json',
video_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
video_stream = None
for stream in data.get('streams', []):
if stream.get('codec_type') == 'video':
video_stream = stream
break
if not video_stream:
raise ValueError("No video stream found")
fps_str = video_stream.get('r_frame_rate', video_stream.get('avg_frame_rate', '0/1'))
fps_parts = fps_str.split('/')
fps = float(fps_parts[0]) / float(fps_parts[1]) if len(fps_parts) == 2 else float(fps_str)
nb_frames = int(video_stream.get('nb_frames', 0))
duration = float(data.get('format', {}).get('duration', 0))
if nb_frames == 0 and fps > 0 and duration > 0:
nb_frames = int(duration * fps)
return {
'path': video_path,
'duration_seconds': duration,
'fps': fps,
'total_frames': nb_frames,
'codec_type': video_stream.get('codec_type'),
'codec_name': video_stream.get('codec_name'),
'r_frame_rate': fps_str,
'avg_frame_rate': video_stream.get('avg_frame_rate'),
'nb_frames': nb_frames
}
except subprocess.CalledProcessError as e:
raise RuntimeError(f"ffprobe failed: {e.stderr}")
except Exception as e:
raise RuntimeError(f"Failed to get video metadata: {e}")
def time_to_frame(seconds: float, fps: float) -> int:
return int(round(seconds * fps))
def process_asr_output(asr_data: Dict[str, Any], video_fps: float) -> Dict[str, Any]:
segments = asr_data.get('segments', [])
total_frames = 0
for segment in segments:
start = segment.get('start', 0.0)
end = segment.get('end', 0.0)
segment['start_frame'] = time_to_frame(start, video_fps)
segment['end_frame'] = time_to_frame(end, video_fps)
segment['duration_seconds'] = end - start
segment['duration_frames'] = segment['end_frame'] - segment['start_frame']
segment['id'] = segments.index(segment)
total_frames += segment['duration_frames']
asr_data['segments'] = segments
asr_data['total_transcribed_frames'] = total_frames
asr_data['avg_segment_frames'] = total_frames / len(segments) if segments else 0
return asr_data
class ASRBenchmarkRunner:
def __init__(self, output_dir: Path = OUTPUT_DIR, verbose: bool = False):
self.output_dir = output_dir
self.verbose = verbose
self.signal_handler = SignalHandler()
self.signal_handler.setup()
self.results = []
self.test_start_time = None
self.test_end_time = None
def log(self, message: str):
if self.verbose:
timestamp = get_iso_timestamp()
print(f"[{timestamp}] {message}")
def run_single_test(self, scheme_id: str, video_key: str) -> Dict[str, Any]:
scheme = SCHEMES.get(scheme_id)
video_info = VIDEOS.get(video_key)
if not scheme or not video_info:
raise ValueError(f"Invalid scheme_id or video_key: {scheme_id}, {video_key}")
if self.signal_handler.shutdown_requested:
raise RuntimeError("Shutdown requested")
video_dir = self.output_dir / video_info['output_dir']
video_dir.mkdir(parents=True, exist_ok=True)
video_metadata = get_video_metadata(video_info['path'])
video_fps = video_metadata['fps']
output_filename = f"scheme_{scheme_id}_{scheme['engine']}_{scheme['model']}_{scheme['device']}.json"
output_path = video_dir / output_filename
log_path = video_dir / "logs" / f"scheme_{scheme_id}.log"
test_id = f"{scheme_id}_{video_key}_{int(time.time())}"
self.log(f"Starting test: {test_id}")
self.log(f"Scheme: {scheme['name']}")
self.log(f"Video: {video_info['name']}")
self.log(f"FPS: {video_fps}, Total frames: {video_metadata['total_frames']}")
test_start = get_iso_timestamp()
start_time = time.time()
script_path = SCRIPTS_DIR / scheme['script']
cmd = ['/opt/homebrew/bin/python3.11', str(script_path)]
cmd.extend(scheme['args'])
cmd.extend([video_info['path'], str(output_path)])
env = os.environ.copy()
env.update(scheme['env'])
process = None
stdout_data = ""
stderr_data = ""
peak_memory_mb = 0
avg_memory_mb = 0
memory_samples = []
cpu_samples = []
try:
self.log(f"Running command: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
psutil_process = psutil.Process(process.pid)
while process.poll() is None:
if self.signal_handler.shutdown_requested:
process.terminate()
raise RuntimeError("Shutdown requested")
try:
mem_info = psutil_process.memory_info()
cpu_percent = psutil_process.cpu_percent(interval=0.5)
memory_mb = mem_info.rss / 1024 / 1024
memory_samples.append(memory_mb)
cpu_samples.append(cpu_percent)
peak_memory_mb = max(peak_memory_mb, memory_mb)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
time.sleep(1)
stdout_data, stderr_data = process.communicate()
except Exception as e:
if process and process.poll() is None:
process.terminate()
raise RuntimeError(f"Process execution failed: {e}")
end_time = time.time()
test_end = get_iso_timestamp()
wall_clock_duration = end_time - start_time
if memory_samples:
avg_memory_mb = sum(memory_samples) / len(memory_samples)
avg_cpu_percent = sum(cpu_samples) / len(cpu_samples) if cpu_samples else 0
peak_cpu_percent = max(cpu_samples) if cpu_samples else 0
with open(log_path, 'w') as f:
f.write(f"Test ID: {test_id}\n")
f.write(f"Scheme: {scheme['name']}\n")
f.write(f"Video: {video_info['name']}\n")
f.write(f"Start: {test_start}\n")
f.write(f"End: {test_end}\n")
f.write(f"Duration: {wall_clock_duration:.3f}s\n")
f.write(f"\n=== STDOUT ===\n{stdout_data}\n")
f.write(f"\n=== STDERR ===\n{stderr_data}\n")
success = process.returncode == 0
asr_output = None
metrics = {}
if success and output_path.exists():
try:
with open(output_path, 'r') as f:
asr_output = json.load(f)
asr_output = process_asr_output(asr_output, video_fps)
segments = asr_output.get('segments', [])
total_duration = sum(s.get('duration_seconds', 0) for s in segments)
metrics = {
'processing_time_seconds': wall_clock_duration,
'processing_speed_ratio': video_metadata['duration_seconds'] / wall_clock_duration if wall_clock_duration > 0 else 0,
'peak_memory_mb': peak_memory_mb,
'avg_memory_mb': avg_memory_mb,
'segments_count': len(segments),
'avg_segment_length_seconds': total_duration / len(segments) if segments else 0,
'avg_segment_frames': asr_output.get('avg_segment_frames', 0),
'total_transcribed_duration_seconds': total_duration,
'total_transcribed_frames': asr_output.get('total_transcribed_frames', 0),
'language_detected': asr_output.get('language', 'unknown'),
'language_probability': asr_output.get('language_probability', 0),
'cpu_avg_percent': avg_cpu_percent,
'cpu_peak_percent': peak_cpu_percent
}
asr_data_for_output = {
'language': asr_output.get('language'),
'language_probability': asr_output.get('language_probability'),
'segments': asr_output.get('segments', []),
'total_transcribed_frames': asr_output.get('total_transcribed_frames'),
'avg_segment_frames': asr_output.get('avg_segment_frames')
}
except Exception as e:
self.log(f"Failed to parse ASR output: {e}")
asr_output = None
metrics = {
'processing_time_seconds': wall_clock_duration,
'processing_speed_ratio': 0,
'peak_memory_mb': peak_memory_mb,
'avg_memory_mb': avg_memory_mb,
'error': str(e)
}
asr_data_for_output = None
if 'asr_data_for_output' not in locals():
asr_data_for_output = None
result = {
'file_info': {
'filename': output_filename,
'created_at': test_end,
'test_id': test_id,
'scheme_id': scheme_id,
'scheme_name': scheme['name'],
'video_name': video_info['name']
},
'video_metadata': video_metadata,
'real_time': {
'test_start': test_start,
'test_end': test_end,
'wall_clock_duration_seconds': wall_clock_duration
},
'metrics': metrics,
'asr_output': asr_data_for_output,
'resource_usage': {
'cpu_avg_percent': avg_cpu_percent,
'cpu_peak_percent': peak_cpu_percent,
'peak_memory_mb': peak_memory_mb,
'avg_memory_mb': avg_memory_mb
},
'output_file_size_bytes': output_path.stat().st_size if output_path.exists() else 0,
'success': success,
'error_message': stderr_data if not success else None
}
with open(output_path, 'w') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
self.log(f"Test completed: {test_id}")
self.log(f"Duration: {wall_clock_duration:.3f}s, Speed: {metrics.get('processing_speed_ratio', 0):.2f}x")
self.log(f"Segments: {metrics.get('segments_count', 0)}, Memory peak: {peak_memory_mb:.1f}MB")
self.log(f"Output: {output_path}")
return result
def save_video_metadata_files(self):
for video_key, video_info in VIDEOS.items():
video_dir = self.output_dir / video_info['output_dir']
video_dir.mkdir(parents=True, exist_ok=True)
metadata_path = video_dir / "video_metadata.json"
video_metadata = get_video_metadata(video_info['path'])
metadata = {
'video_key': video_key,
'name': video_info['name'],
'path': video_info['path'],
'features': video_info['features'],
'metadata': video_metadata,
'created_at': get_iso_timestamp()
}
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
self.log(f"Saved video metadata: {metadata_path}")
def run_all_tests(self, schemes: List[str] = None, videos: List[str] = None, skip_existing: bool = False) -> List[Dict[str, Any]]:
if schemes is None:
schemes = list(SCHEMES.keys())
if videos is None:
videos = list(VIDEOS.keys())
self.test_start_time = get_iso_timestamp()
self.log(f"Benchmark started: {self.test_start_time}")
self.save_video_metadata_files()
self.results = []
for video_key in videos:
for scheme_id in schemes:
if self.signal_handler.shutdown_requested:
self.log("Shutdown requested, stopping tests")
break
video_info = VIDEOS.get(video_key)
scheme = SCHEMES.get(scheme_id)
video_dir = self.output_dir / video_info['output_dir']
output_filename = f"scheme_{scheme_id}_{scheme['engine']}_{scheme['model']}_{scheme['device']}.json"
output_path = video_dir / output_filename
if skip_existing and output_path.exists():
self.log(f"Skipping existing: {output_path}")
try:
with open(output_path, 'r') as f:
result = json.load(f)
self.results.append(result)
except Exception as e:
self.log(f"Failed to load existing result: {e}")
continue
try:
result = self.run_single_test(scheme_id, video_key)
self.results.append(result)
except Exception as e:
self.log(f"Test failed: {scheme_id}/{video_key} - {e}")
self.results.append({
'scheme_id': scheme_id,
'video_key': video_key,
'success': False,
'error': str(e),
'traceback': traceback.format_exc()
})
self.test_end_time = get_iso_timestamp()
self.log(f"Benchmark completed: {self.test_end_time}")
return self.results
def generate_results_json(self) -> Path:
results_path = self.output_dir / "asr_benchmark_results.json"
successful_tests = [r for r in self.results if r.get('success', False)]
failed_tests = [r for r in self.results if not r.get('success', False)]
system_info = {
'os': platform.system(),
'os_version': platform.version(),
'python_version': platform.python_version(),
'cpu': platform.processor(),
'machine': platform.machine(),
'memory_total_gb': psutil.virtual_memory().total / (1024**3)
}
benchmark_metadata = {
'benchmark_id': f"asr_comparison_{int(time.time())}",
'benchmark_start': self.test_start_time,
'benchmark_end': self.test_end_time,
'total_tests': len(self.results),
'successful_tests': len(successful_tests),
'failed_tests': len(failed_tests),
'runner_version': RUNNER_VERSION,
'system_info': system_info
}
summary_by_scheme = {}
for scheme_id in SCHEMES.keys():
scheme_results = [r for r in successful_tests if r.get('scheme_id') == scheme_id]
if scheme_results:
metrics_list = [r.get('metrics', {}) for r in scheme_results]
summary_by_scheme[scheme_id] = {
'avg_processing_time_seconds': sum(m.get('processing_time_seconds', 0) for m in metrics_list) / len(metrics_list),
'avg_speed_ratio': sum(m.get('processing_speed_ratio', 0) for m in metrics_list) / len(metrics_list),
'avg_memory_mb': sum(m.get('peak_memory_mb', 0) for m in metrics_list) / len(metrics_list),
'avg_segments_count': sum(m.get('segments_count', 0) for m in metrics_list) / len(metrics_list)
}
summary_by_video = {}
for video_key in VIDEOS.keys():
video_results = [r for r in successful_tests if r.get('video_key') == video_key or r.get('file_info', {}).get('video_name') == VIDEOS[video_key]['name']]
if video_results:
metrics_list = [r.get('metrics', {}) for r in video_results]
summary_by_video[video_key] = {
'avg_processing_time_seconds': sum(m.get('processing_time_seconds', 0) for m in metrics_list) / len(metrics_list),
'avg_speed_ratio': sum(m.get('processing_speed_ratio', 0) for m in metrics_list) / len(metrics_list),
'avg_memory_mb': sum(m.get('peak_memory_mb', 0) for m in metrics_list) / len(metrics_list)
}
results_data = {
'benchmark_metadata': benchmark_metadata,
'test_results': self.results,
'summary_statistics': {
'by_scheme': summary_by_scheme,
'by_video': summary_by_video
},
'created_at': get_iso_timestamp()
}
with open(results_path, 'w') as f:
json.dump(results_data, f, indent=2, ensure_ascii=False)
self.log(f"Saved results JSON: {results_path}")
return results_path
def generate_markdown_report(self) -> Path:
report_path = self.output_dir / "asr_benchmark_report.md"
successful_tests = [r for r in self.results if r.get('success', False)]
lines = []
lines.append("# ASR Benchmark Automated Report")
lines.append("")
lines.append(f"**Generated**: {get_iso_timestamp()}")
lines.append(f"**Total Tests**: {len(self.results)}")
lines.append(f"**Successful**: {len(successful_tests)}")
lines.append(f"**Failed**: {len(self.results) - len(successful_tests)}")
lines.append("")
lines.append("---")
lines.append("")
lines.append("## Test Results Summary")
lines.append("")
lines.append("### By Scheme")
lines.append("")
lines.append("| Scheme | Engine | Model | Device | Avg Time (s) | Avg Speed | Avg Memory (MB) | Avg Segments |")
lines.append("|--------|--------|-------|--------|--------------|-----------|-----------------|---------------|")
summary = {}
for r in successful_tests:
scheme_id = r.get('scheme_id', 'unknown')
metrics = r.get('metrics', {})
if scheme_id not in summary:
summary[scheme_id] = {'times': [], 'speeds': [], 'memories': [], 'segments': []}
summary[scheme_id]['times'].append(metrics.get('processing_time_seconds', 0))
summary[scheme_id]['speeds'].append(metrics.get('processing_speed_ratio', 0))
summary[scheme_id]['memories'].append(metrics.get('peak_memory_mb', 0))
summary[scheme_id]['segments'].append(metrics.get('segments_count', 0))
for scheme_id in sorted(summary.keys()):
s = summary[scheme_id]
scheme = SCHEMES.get(scheme_id, {})
avg_time = sum(s['times']) / len(s['times'])
avg_speed = sum(s['speeds']) / len(s['speeds'])
avg_mem = sum(s['memories']) / len(s['memories'])
avg_seg = sum(s['segments']) / len(s['segments'])
lines.append(f"| {scheme_id} | {scheme.get('engine', 'N/A')} | {scheme.get('model', 'N/A')} | {scheme.get('device', 'N/A')} | {avg_time:.1f} | {avg_speed:.2f}x | {avg_mem:.1f} | {avg_seg:.0f} |")
lines.append("")
lines.append("### Detailed Results")
lines.append("")
for result in self.results:
scheme_id = result.get('scheme_id', 'unknown')
video_name = result.get('file_info', {}).get('video_name', result.get('video_key', 'unknown'))
success = result.get('success', False)
lines.append(f"#### {scheme_id} - {video_name}")
lines.append("")
if success:
metrics = result.get('metrics', {})
real_time = result.get('real_time', {})
lines.append(f"- **Status**: Success")
lines.append(f"- **Start**: {real_time.get('test_start', 'N/A')}")
lines.append(f"- **End**: {real_time.get('test_end', 'N/A')}")
lines.append(f"- **Duration**: {metrics.get('processing_time_seconds', 0):.3f}s")
lines.append(f"- **Speed**: {metrics.get('processing_speed_ratio', 0):.2f}x")
lines.append(f"- **Segments**: {metrics.get('segments_count', 0)}")
lines.append(f"- **Memory Peak**: {metrics.get('peak_memory_mb', 0):.1f}MB")
lines.append(f"- **Language**: {metrics.get('language_detected', 'N/A')} ({metrics.get('language_probability', 0):.2f})")
else:
lines.append(f"- **Status**: Failed")
lines.append(f"- **Error**: {result.get('error', 'Unknown error')}")
lines.append("")
lines.append("---")
lines.append("")
lines.append("## Output Files")
lines.append("")
lines.append("All test outputs are saved in:")
lines.append(f"- `{self.output_dir}/`")
lines.append("")
for video_key in VIDEOS.keys():
video_dir = self.output_dir / VIDEOS[video_key]['output_dir']
lines.append(f"### {VIDEOS[video_key]['name']}")
lines.append(f"- `{video_dir}/`")
for scheme_id in SCHEMES.keys():
scheme = SCHEMES[scheme_id]
filename = f"scheme_{scheme_id}_{scheme['engine']}_{scheme['model']}_{scheme['device']}.json"
lines.append(f" - `{filename}`")
lines.append("")
with open(report_path, 'w') as f:
f.write('\n'.join(lines))
self.log(f"Saved markdown report: {report_path}")
return report_path
def main():
parser = argparse.ArgumentParser(description='ASR Benchmark Runner')
parser.add_argument('--output-dir', type=str, default=str(OUTPUT_DIR), help='Output directory')
parser.add_argument('--schemes', type=str, default='A,B,C,D,E', help='Schemes to test (comma-separated)')
parser.add_argument('--videos', type=str, default='charade,exasan', help='Videos to test (comma-separated)')
parser.add_argument('--skip-existing', action='store_true', help='Skip existing output files')
parser.add_argument('--verbose', action='store_true', help='Verbose output')
parser.add_argument('--single', type=str, help='Run single test: scheme_id,video_key (e.g., A,charade)')
args = parser.parse_args()
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
runner = ASRBenchmarkRunner(output_dir=output_dir, verbose=args.verbose)
try:
if args.single:
parts = args.single.split(',')
if len(parts) != 2:
print("Error: --single format should be scheme_id,video_key")
sys.exit(1)
scheme_id, video_key = parts
result = runner.run_single_test(scheme_id, video_key)
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
schemes = [s.strip() for s in args.schemes.split(',') if s.strip()]
videos = [v.strip() for v in args.videos.split(',') if v.strip()]
runner.run_all_tests(schemes=schemes, videos=videos, skip_existing=args.skip_existing)
runner.generate_results_json()
runner.generate_markdown_report()
print(f"\nBenchmark completed!")
print(f"Results: {output_dir / 'asr_benchmark_results.json'}")
print(f"Report: {output_dir / 'asr_benchmark_report.md'}")
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(130)
except Exception as e:
print(f"Error: {e}")
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()