#!/usr/bin/env python3 """ Test ASR v2 processor on large video. """ import sys import os import tempfile import json import subprocess import signal from pathlib import Path import threading def timeout_handler(signum, frame): raise TimeoutError("Test timed out") def run_with_timeout(cmd, timeout_sec): """Run command with timeout.""" print(f"Running with timeout {timeout_sec}s: {' '.join(cmd)}") # Start process start_time = time.time() proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Set up timeout timer = threading.Timer(timeout_sec, proc.kill) try: timer.start() stdout, stderr = proc.communicate() elapsed = time.time() - start_time finally: timer.cancel() return proc.returncode, stdout, stderr, elapsed def test_large_video(): # Start with 238MB video first video_path = "../test_video/big_buck_bunny_480p_h264.mov" # 238MB if not Path(video_path).exists(): print(f"Video not found: {video_path}") # Try the 2.2GB video video_path = "../test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not Path(video_path).exists(): print(f"Large video not found") return False output_dir = Path("test_output_v2") output_dir.mkdir(exist_ok=True, parents=True) output_path = output_dir / "asr_v2_large_output.json" script_path = Path("scripts/asr_processor_v2.py") if not script_path.exists(): print(f"Script not found: {script_path}") return False cmd = [ sys.executable, str(script_path), video_path, str(output_path), "--chunk-duration", "300", # 5 minutes "--model-size", "tiny", "--compute-type", "int8", ] print(f"Testing large video: {video_path}") print(f"Size: {Path(video_path).stat().st_size / (1024 * 1024 * 1024):.2f} GB") # Run with 5-minute timeout (should be enough for chunked processing) timeout = 300 # 5 minutes try: returncode, stdout, stderr, elapsed = run_with_timeout(cmd, timeout) except Exception as e: print(f"Error running test: {e}") return False print(f"\nResults:") print(f" Exit code: {returncode}") print(f" Elapsed time: {elapsed:.1f}s") print(f" Timeout: {timeout}s") if stdout: print(f" Stdout length: {len(stdout)} chars") if stderr: # Show warnings/errors lines = stderr.split("\n") error_lines = [ l for l in lines if "error" in l.lower() or "warning" in l.lower() or "ASR:" in l ] if error_lines: print(f" Stderr highlights:") for line in error_lines[:10]: print(f" {line}") if output_path.exists(): with open(output_path, "r") as f: data = json.load(f) print(f"\nTranscription results:") print(f" Language: {data.get('language')}") print(f" Segments: {len(data.get('segments', []))}") print(f" Chunks: {data.get('chunk_count', 1)}") if data.get("segments"): print(f" First segment: {data['segments'][0]['text'][:50]}...") print(f" Last segment: {data['segments'][-1]['text'][:50]}...") return returncode == 0 if __name__ == "__main__": import time print("Testing ASR v2 on large video...") success = test_large_video() if success: print("\n✅ Test passed!") else: print("\n❌ Test failed or timed out") sys.exit(0 if success else 1)