#!/usr/bin/env python3 """Test ASR processor on all video files in test_video directory.""" import sys import os import subprocess import json import tempfile import time import shutil import signal TEST_VIDEO_DIR = "../test_video" if not os.path.isdir(TEST_VIDEO_DIR): print(f"Test video directory not found: {TEST_VIDEO_DIR}") sys.exit(1) # List all video files (common extensions) video_exts = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".m4v"} video_files = [] for f in os.listdir(TEST_VIDEO_DIR): if os.path.splitext(f)[1].lower() in video_exts: video_files.append(os.path.join(TEST_VIDEO_DIR, f)) if not video_files: print("No video files found") sys.exit(1) print(f"Found {len(video_files)} video files:") for vf in video_files: size = os.path.getsize(vf) / (1024**3) print(f" {os.path.basename(vf)} ({size:.2f} GB)") def get_audio_duration(video_path): """Get audio duration in seconds using ffprobe, return 0 if no audio or error.""" # First check if there's an audio stream check_cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", video_path, ] try: # Check for audio streams with timeout check_result = subprocess.run( check_cmd, capture_output=True, text=True, timeout=5 ) if check_result.returncode != 0 or not check_result.stdout.strip(): # No audio streams found return 0.0 except (subprocess.TimeoutExpired, Exception): # If check fails, assume no audio return 0.0 # Get audio duration duration_cmd = [ "ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path, ] try: result = subprocess.run(duration_cmd, capture_output=True, text=True, timeout=5) if result.returncode == 0 and result.stdout.strip(): duration = float(result.stdout.strip()) return duration if duration > 0 else 0.0 except (subprocess.TimeoutExpired, ValueError, Exception): pass # If we can't get duration, return 0 (will use minimum timeout) return 0.0 # Configuration - timeout based on audio duration SECONDS_PER_MINUTE_AUDIO = 30 # 30 seconds processing time per minute of audio MAX_TIMEOUT = 3600 # 60 minutes max MIN_TIMEOUT = 120 # 2 minutes min results = [] for video_path in video_files: print("\n" + "=" * 60) print(f"Processing: {os.path.basename(video_path)}") size_gb = os.path.getsize(video_path) / (1024**3) # Skip files <= 1 GB (already tested in quick test) if size_gb <= 1.0: print(f" Skipping (size {size_gb:.2f} GB <= 1 GB)") continue # Get audio duration for timeout calculation audio_duration = get_audio_duration(video_path) audio_minutes = audio_duration / 60 if audio_duration > 0 else 0 # Calculate timeout based on audio duration estimated_processing_time = audio_minutes * SECONDS_PER_MINUTE_AUDIO timeout = min(MAX_TIMEOUT, max(MIN_TIMEOUT, estimated_processing_time)) print( f"Size: {size_gb:.2f} GB, Audio: {audio_duration:.0f}s ({audio_minutes:.1f} min)" ) print( f"Estimated processing: {estimated_processing_time:.0f}s, Timeout: {timeout}s" ) # Create temporary output temp_dir = tempfile.mkdtemp(prefix="asr_test_") output_path = os.path.join(temp_dir, "output.json") cmd = [ "/opt/homebrew/bin/python3.11", "scripts/asr_processor.py", video_path, output_path, "--uuid", f"test_{os.path.basename(video_path)}", ] start = time.time() proc = None try: # Use Popen to allow killing on timeout proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = proc.communicate(timeout=timeout) elapsed = time.time() - start success = proc.returncode == 0 error_msg = stderr if not success else "" timeout_hit = False except subprocess.TimeoutExpired: elapsed = timeout success = False error_msg = f"Timeout after {timeout}s" timeout_hit = True # Kill process if still running if proc: proc.kill() proc.wait() except Exception as e: elapsed = time.time() - start success = False error_msg = str(e) timeout_hit = False if proc: proc.kill() proc.wait() # Parse output if exists segments = 0 language = "" if os.path.exists(output_path): try: with open(output_path, "r") as f: data = json.load(f) segments = len(data.get("segments", [])) language = data.get("language", "") except: pass # Clean up shutil.rmtree(temp_dir, ignore_errors=True) # Determine if video has audio (by checking if segments > 0 or language not empty) has_audio = segments > 0 or language != "" result = { "file": os.path.basename(video_path), "size_gb": size_gb, "success": success, "timeout": timeout_hit, "elapsed": elapsed, "segments": segments, "language": language, "has_audio": has_audio, "error": error_msg[:200] if error_msg else "", } results.append(result) status = "✅ SUCCESS" if success else "❌ FAILED" if timeout_hit: status += " (TIMEOUT)" print( f" Result: {status}, {elapsed:.1f}s, {segments} segments, language: {language}" ) if error_msg: print(f" Error: {error_msg}") # Summary print("\n" + "=" * 60) print("TEST SUMMARY") print("=" * 60) success_count = sum(1 for r in results if r["success"]) timeout_count = sum(1 for r in results if r["timeout"]) no_audio_count = sum(1 for r in results if not r["has_audio"] and r["success"]) print(f"Total videos: {len(results)}") print(f"Successful: {success_count}") print(f"Failed: {len(results) - success_count}") print(f"Timeouts: {timeout_count}") print(f"No audio (skipped): {no_audio_count}") print() for r in results: status = "✅" if r["success"] else "❌" if r["timeout"]: status = "⏱️" print( f"{status} {r['file']:50s} {r['elapsed']:6.1f}s segs:{r['segments']:4d} lang:{r['language']:5s} {r['error']}" ) # Check for any failures not due to missing audio failed = [r for r in results if not r["success"] and r["has_audio"]] if failed: print("\n❌ FAILURES DETECTED (videos with audio):") for r in failed: print(f" {r['file']}: {r['error']}") sys.exit(1) else: print("\n✅ All videos with audio processed successfully.") sys.exit(0)