#!/usr/bin/env python3 """Benchmark ASR with realistic chunk sizes.""" import sys import os import subprocess import json import tempfile import time import shutil import statistics VIDEO_SOURCE = "../test_video/BigBuckBunny_320x180.mp4" # 10 minutes, 62MB if not os.path.exists(VIDEO_SOURCE): print(f"Video not found: {VIDEO_SOURCE}") sys.exit(1) def run_asr_mode(mode_name, max_direct_duration, chunk_duration, description): """Run ASR processor with given parameters, return timing.""" clip_path = os.path.join(temp_dir, f"clip_{mode_name}.mp4") output_path = os.path.join(temp_dir, f"output_{mode_name}.json") # Copy source video to clip path shutil.copy2(VIDEO_SOURCE, clip_path) env = os.environ.copy() env["MOMENTRY_ASR_MAX_DIRECT_DURATION"] = str(max_direct_duration) env["MOMENTRY_ASR_CHUNK_DURATION"] = str(chunk_duration) env["MOMENTRY_ASR_MODEL_SIZE"] = "tiny" env["MOMENTRY_ASR_COMPUTE_TYPE"] = "int8" cmd = [ "/opt/homebrew/bin/python3.11", "scripts/asr_processor.py", clip_path, output_path, "--uuid", f"bench_{mode_name}", ] start_time = time.time() proc = subprocess.run(cmd, capture_output=True, env=env, text=True) elapsed = time.time() - start_time returncode = proc.returncode # Read output segments = [] language = "" if os.path.exists(output_path): with open(output_path, "r") as f: data = json.load(f) segments = data.get("segments", []) language = data.get("language", "") # Clean up try: os.unlink(clip_path) os.unlink(output_path) except: pass return { "mode": mode_name, "description": description, "elapsed": elapsed, "returncode": returncode, "segments": len(segments), "language": language, "stderr": proc.stderr[:200] if proc.stderr else "", } # Create temporary directory temp_dir = tempfile.mkdtemp(prefix="asr_bench_real_") print(f"Benchmark directory: {temp_dir}") try: # Test 1: Direct transcription (video is 10 min, max_direct=30 min) print("\n1. Direct transcription (max_direct=1800s, chunk=600s):") direct = run_asr_mode( "direct", max_direct_duration=1800, chunk_duration=600, description="Direct (video < 30min threshold)", ) print(f" Time: {direct['elapsed']:.1f}s, Segments: {direct['segments']}") # Test 2: Chunked with 1 chunk (force chunked but chunk size = video duration) print("\n2. Chunked with 1 chunk (max_direct=300s, chunk=600s):") chunked1 = run_asr_mode( "chunked1", max_direct_duration=300, chunk_duration=600, description="Chunked with 1 chunk (10 min)", ) print(f" Time: {chunked1['elapsed']:.1f}s, Segments: {chunked1['segments']}") # Test 3: Chunked with 2 chunks (5 min each) print("\n3. Chunked with 2 chunks (max_direct=300s, chunk=300s):") chunked2 = run_asr_mode( "chunked2", max_direct_duration=300, chunk_duration=300, description="Chunked with 2 chunks (5 min each)", ) print(f" Time: {chunked2['elapsed']:.1f}s, Segments: {chunked2['segments']}") # Test 4: Chunked with 5 chunks (2 min each) - worst case print("\n4. Chunked with 5 chunks (max_direct=300s, chunk=120s):") chunked5 = run_asr_mode( "chunked5", max_direct_duration=300, chunk_duration=120, description="Chunked with 5 chunks (2 min each)", ) print(f" Time: {chunked5['elapsed']:.1f}s, Segments: {chunked5['segments']}") # Calculate overheads print("\n" + "=" * 60) print("OVERHEAD ANALYSIS (compared to direct transcription)") print("=" * 60) for test in [chunked1, chunked2, chunked5]: if direct["elapsed"] > 0: overhead = (test["elapsed"] - direct["elapsed"]) / direct["elapsed"] * 100 status = "✅ ≤5%" if overhead <= 5 else "❌ >5%" print(f"\n{test['description']}:") print(f" Time: {test['elapsed']:.1f}s (direct: {direct['elapsed']:.1f}s)") print(f" Overhead: {overhead:.2f}% {status}") print(f" Segments: {test['segments']} (direct: {direct['segments']})") if test["segments"] != direct["segments"]: print(f" ⚠️ Segment count mismatch!") # Summary print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) print(f"Video: {os.path.basename(VIDEO_SOURCE)} (~10 minutes)") print(f"\nKey finding: Overhead depends heavily on chunk count.") print(f"With realistic chunk sizes (10 min), overhead should be minimal.") except Exception as e: print(f"Benchmark failed: {e}") import traceback traceback.print_exc() finally: # Clean up directory shutil.rmtree(temp_dir, ignore_errors=True) print(f"\nCleaned up {temp_dir}")