#!/usr/bin/env python3 """ Test ASR on large video files that may cause issues. """ import os import sys import time import json import subprocess import signal import threading from pathlib import Path import psutil def check_audio_stream(video_path): """Check if video has audio stream using ffprobe.""" try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", str(video_path), ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return bool(result.stdout.strip()) except: return True # Assume audio exists if ffprobe fails def monitor_resources(pid, interval=5, stop_event=None): """Monitor process resources and return statistics.""" cpu_readings = [] mem_readings = [] while not stop_event or not stop_event.is_set(): try: proc = psutil.Process(pid) cpu_readings.append(proc.cpu_percent(interval=0.1)) mem_readings.append(proc.memory_info().rss / 1024 / 1024) except (psutil.NoSuchProcess, psutil.AccessDenied): break if stop_event: stop_event.wait(interval) else: time.sleep(interval) # Return summary if cpu_readings and mem_readings: return { "cpu_avg": sum(cpu_readings) / len(cpu_readings), "cpu_max": max(cpu_readings), "mem_avg_mb": sum(mem_readings) / len(mem_readings), "mem_max_mb": max(mem_readings), "samples": len(cpu_readings), } return {} def test_large_video(video_path, output_path, timeout_sec=1800): """Test ASR on a large video file with detailed monitoring.""" print(f"\n{'=' * 70}") print(f"Testing large video: {video_path.name}") print(f"Size: {video_path.stat().st_size / 1024 / 1024 / 1024:.2f} GB") print(f"{'=' * 70}") # Check audio first print(" Checking audio stream...") has_audio = check_audio_stream(video_path) print(f" Has audio: {has_audio}") if not has_audio: print(" No audio stream - ASR will skip transcription") return { "video": video_path.name, "size_gb": video_path.stat().st_size / 1024 / 1024 / 1024, "has_audio": False, "success": True, "duration": 0, "segments": 0, "error": "No audio stream", } script_path = Path(__file__).parent / "scripts" / "asr_processor.py" cmd = [sys.executable, str(script_path), str(video_path), str(output_path)] print(f" Command: {' '.join(cmd[:3])} ...") print(f" Timeout: {timeout_sec}s ({timeout_sec / 60:.1f} minutes)") start_time = time.time() result = { "video": video_path.name, "size_gb": video_path.stat().st_size / 1024 / 1024 / 1024, "has_audio": True, "success": False, "duration": 0, "segments": 0, "error": None, "resources": {}, } try: # Start process proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, preexec_fn=os.setsid, bufsize=1, ) print(f" Process started with PID: {proc.pid}") # Start resource monitoring in background stop_monitor = threading.Event() monitor_thread = threading.Thread( target=lambda: monitor_resources( proc.pid, interval=10, stop_event=stop_monitor ) ) monitor_thread.daemon = True monitor_thread.start() # Read stderr in real-time def read_stderr(): for line in iter(proc.stderr.readline, ""): line = line.rstrip() if line: print(f" [ASR] {line}") stderr_thread = threading.Thread(target=read_stderr) stderr_thread.daemon = True stderr_thread.start() # Wait for completion try: returncode = proc.wait(timeout=timeout_sec) duration = time.time() - start_time result["duration"] = duration # Stop monitoring stop_monitor.set() monitor_thread.join(timeout=2) # Get remaining output stdout, _ = proc.communicate() print( f" Process exited with code {returncode} after {duration:.1f}s ({duration / 60:.1f} min)" ) if returncode == 0: # Check output file if output_path.exists(): with open(output_path, "r") as f: asr_result = json.load(f) segments = len(asr_result.get("segments", [])) language = asr_result.get("language", "unknown") result["segments"] = segments result["language"] = language result["success"] = True print(f" Success: {segments} segments, language: {language}") else: result["error"] = "Output file not created" print(f" Error: Output file not created") else: result["error"] = f"Process failed with exit code {returncode}" print(f" Error: Process failed with exit code {returncode}") except subprocess.TimeoutExpired: duration = time.time() - start_time result["duration"] = duration result["error"] = f"Timeout after {duration:.1f}s" print(f" ERROR: Timeout after {duration:.1f}s ({duration / 60:.1f} min)") # Kill process group try: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) print(" Sent SIGKILL to process group") except: pass proc.wait(timeout=5) except Exception as e: result["error"] = str(e) print(f" Exception: {e}") import traceback traceback.print_exc() result["duration"] = time.time() - start_time return result def main(): video_dir = Path("../test_video") test_dir = Path("test_large_output") test_dir.mkdir(exist_ok=True) # Identify large video files (> 1GB) large_videos = [] for f in video_dir.iterdir(): if f.suffix.lower() in [".mov", ".m4v", ".mp4", ".avi", ".mkv"]: size_gb = f.stat().st_size / 1024 / 1024 / 1024 if size_gb > 1.0: # Larger than 1GB large_videos.append((f, size_gb)) if not large_videos: print("No large video files (>1GB) found.") return print(f"Found {len(large_videos)} large video files (>1GB):") for f, size in sorted(large_videos, key=lambda x: x[1], reverse=True): print(f" - {f.name}: {size:.2f} GB") # Test the largest 2 files selected = [ f for f, _ in sorted(large_videos, key=lambda x: x[1], reverse=True)[:2] ] print(f"\nWill test {len(selected)} largest files:") for f in selected: print(f" - {f.name}") results = [] for video in selected: output_file = test_dir / f"{video.stem}.asr.json" result = test_large_video( video, output_file, timeout_sec=2400 ) # 40 minutes timeout results.append(result) # Save intermediate results with open(test_dir / "large_video_results.json", "w") as f: json.dump(results, f, indent=2) # Wait between tests if there are more if video != selected[-1]: print("\n Waiting 30 seconds before next test...") time.sleep(30) # Summary print(f"\n{'=' * 70}") print("LARGE VIDEO TEST SUMMARY") print(f"{'=' * 70}") for r in results: status = "✓" if r["success"] else "✗" error_msg = f" - {r['error']}" if r["error"] else "" print( f"{status} {r['video']}: {r['duration']:.1f}s, {r.get('segments', 0)} segments{error_msg}" ) success_count = sum(1 for r in results if r["success"]) print(f"\nSuccess rate: {success_count}/{len(results)}") # Save final report report_path = test_dir / "final_report.json" with open(report_path, "w") as f: json.dump(results, f, indent=2) print(f"\nDetailed results saved to: {report_path}") if __name__ == "__main__": main()