#!/usr/bin/env python3 """Benchmark ASR processor direct vs chunked transcription overhead.""" import sys import os import subprocess import json import tempfile import time import shutil import statistics # Use a small video clip for consistent benchmarking VIDEO_SOURCE = "../test_video/BigBuckBunny_320x180.mp4" # 10 minutes, 62MB if not os.path.exists(VIDEO_SOURCE): print(f"Video not found: {VIDEO_SOURCE}") sys.exit(1) # Create temporary directory for all test runs temp_dir = tempfile.mkdtemp(prefix="asr_bench_") print(f"Benchmark directory: {temp_dir}") def run_asr_mode(mode_name, max_direct_duration, chunk_duration=600): """Run ASR processor with given parameters, return timing and resource stats.""" clip_path = os.path.join(temp_dir, f"clip_{mode_name}.mp4") output_path = os.path.join(temp_dir, f"output_{mode_name}.json") # Copy source video to clip path (no transcoding) shutil.copy2(VIDEO_SOURCE, clip_path) env = os.environ.copy() env["MOMENTRY_ASR_MAX_DIRECT_DURATION"] = str(max_direct_duration) env["MOMENTRY_ASR_CHUNK_DURATION"] = str(chunk_duration) env["MOMENTRY_ASR_MODEL_SIZE"] = "tiny" env["MOMENTRY_ASR_COMPUTE_TYPE"] = "int8" cmd = [ "/opt/homebrew/bin/python3.11", "scripts/asr_processor.py", clip_path, output_path, "--uuid", f"bench_{mode_name}", ] # Start monitoring (external) import psutil start_time = time.time() proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env ) # Monitor CPU and memory of child process cpu_percents = [] memory_mbs = [] while True: try: p = psutil.Process(proc.pid) cpu = p.cpu_percent(interval=0.1) mem = p.memory_info().rss / (1024 * 1024) cpu_percents.append(cpu) memory_mbs.append(mem) except (psutil.NoSuchProcess, psutil.AccessDenied): break if proc.poll() is not None: # Process ended, wait a bit for final stats time.sleep(0.1) break stdout, stderr = proc.communicate(timeout=1) elapsed = time.time() - start_time returncode = proc.returncode # Read output segments = [] if os.path.exists(output_path): with open(output_path, "r") as f: data = json.load(f) segments = data.get("segments", []) # Clean up temporary files try: os.unlink(clip_path) os.unlink(output_path) except: pass return { "mode": mode_name, "elapsed": elapsed, "returncode": returncode, "segments": len(segments), "cpu_avg": statistics.mean(cpu_percents) if cpu_percents else 0, "cpu_max": max(cpu_percents) if cpu_percents else 0, "memory_avg": statistics.mean(memory_mbs) if memory_mbs else 0, "memory_max": max(memory_mbs) if memory_mbs else 0, "stderr": stderr.decode() if stderr else "", } try: # Run direct transcription (clip duration ~600s, max_direct=1800) print("Running direct transcription benchmark...") direct = run_asr_mode("direct", max_direct_duration=1800, chunk_duration=600) # Run chunked transcription (force chunked with max_direct=300, chunk=120) print("Running chunked transcription benchmark...") chunked = run_asr_mode("chunked", max_direct_duration=300, chunk_duration=120) # Calculate overhead overhead = (chunked["elapsed"] - direct["elapsed"]) / direct["elapsed"] * 100 # Print results print("\n" + "=" * 60) print("ASR PROCESSOR BENCHMARK RESULTS") print("=" * 60) print(f"Test video: {VIDEO_SOURCE}") print(f"Video duration: ~10 minutes (600 seconds)") print() print("Direct Transcription:") print(f" Time: {direct['elapsed']:.1f}s") print(f" Segments: {direct['segments']}") print(f" CPU avg/max: {direct['cpu_avg']:.1f}% / {direct['cpu_max']:.1f}%") print( f" Memory avg/max: {direct['memory_avg']:.1f} MB / {direct['memory_max']:.1f} MB" ) print() print("Chunked Transcription:") print(f" Time: {chunked['elapsed']:.1f}s") print(f" Segments: {chunked['segments']}") print(f" CPU avg/max: {chunked['cpu_avg']:.1f}% / {chunked['cpu_max']:.1f}%") print( f" Memory avg/max: {chunked['memory_avg']:.1f} MB / {chunked['memory_max']:.1f} MB" ) print() print("OVERHEAD ANALYSIS:") print(f" Time overhead: {overhead:.2f}%") if overhead <= 5: print(f" ✅ PASS: Overhead ≤5% requirement") else: print(f" ❌ FAIL: Overhead exceeds 5% limit") print() # Check for errors if direct["returncode"] != 0: print(f"WARNING: Direct transcription returned {direct['returncode']}") if chunked["returncode"] != 0: print(f"WARNING: Chunked transcription returned {chunked['returncode']}") except Exception as e: print(f"Benchmark failed: {e}") import traceback traceback.print_exc() finally: # Clean up directory shutil.rmtree(temp_dir, ignore_errors=True) print(f"Cleaned up {temp_dir}")