#!/usr/bin/env python3 """ ASR Baseline Test Script Test ASR processor on ../test_video files and collect performance metrics. """ import os import sys import time import json import subprocess import tempfile import traceback from pathlib import Path from typing import Dict, List, Optional, Tuple import psutil import signal # Add scripts directory to path for imports sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # Try to import the ASR processor module try: from scripts.asr_processor import run_asr DIRECT_CALL = True except ImportError as e: print(f"Warning: Could not import asr_processor directly: {e}") print("Will use subprocess call instead.") DIRECT_CALL = False def get_video_files(video_dir: str) -> List[Path]: """Get list of video files from directory.""" video_exts = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".webm", ".m4v", ".wmv"} video_dir_path = Path(video_dir) if not video_dir_path.exists(): raise FileNotFoundError(f"Video directory not found: {video_dir}") videos = [] for ext in video_exts: videos.extend(video_dir_path.glob(f"*{ext}")) return sorted(videos, key=lambda p: p.stat().st_size) # Sort by size (small first) def monitor_process(pid: int, interval: float = 1.0) -> Dict: """Monitor process resource usage.""" try: proc = psutil.Process(pid) cpu_percent = proc.cpu_percent(interval=interval) memory_info = proc.memory_info() return { "cpu_percent": cpu_percent, "rss_mb": memory_info.rss / 1024 / 1024, "vms_mb": memory_info.vms / 1024 / 1024, } except (psutil.NoSuchProcess, psutil.AccessDenied): return {} def run_asr_subprocess( video_path: Path, output_path: Path, uuid: str = "" ) -> Tuple[bool, Dict]: """Run ASR processor via subprocess and collect metrics.""" script_path = Path(__file__).parent / "scripts" / "asr_processor.py" cmd = [sys.executable, str(script_path), str(video_path), str(output_path)] if uuid: cmd.extend(["--uuid", uuid]) start_time = time.time() metrics = { "success": False, "duration": 0, "error": None, "peak_rss_mb": 0, "avg_cpu_percent": 0, "exit_code": None, "stderr": "", } try: # Start process proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, preexec_fn=os.setsid, # Create process group for better termination ) # Monitor resources cpu_readings = [] memory_readings = [] monitor_start = time.time() while True: # Check if process is still running if proc.poll() is not None: break # Monitor resources every 2 seconds if time.time() - monitor_start >= 2.0: try: ps_proc = psutil.Process(proc.pid) cpu_readings.append(ps_proc.cpu_percent()) memory_readings.append(ps_proc.memory_info().rss / 1024 / 1024) except (psutil.NoSuchProcess, psutil.AccessDenied): pass monitor_start = time.time() time.sleep(0.5) # Wait for process to complete stdout, stderr = proc.communicate(timeout=300) # 5 minute timeout metrics["duration"] = time.time() - start_time metrics["exit_code"] = proc.returncode metrics["stderr"] = stderr.strip() if proc.returncode == 0: metrics["success"] = True else: metrics["error"] = f"Process exited with code {proc.returncode}" # Calculate resource metrics if cpu_readings: metrics["avg_cpu_percent"] = sum(cpu_readings) / len(cpu_readings) if memory_readings: metrics["peak_rss_mb"] = max(memory_readings) except subprocess.TimeoutExpired: metrics["error"] = "Process timed out after 5 minutes" metrics["duration"] = time.time() - start_time # Try to kill the process group try: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) except: pass except Exception as e: metrics["error"] = str(e) metrics["duration"] = time.time() - start_time return metrics["success"], metrics def run_asr_direct( video_path: Path, output_path: Path, uuid: str = "" ) -> Tuple[bool, Dict]: """Run ASR processor by directly calling the function.""" import scripts.asr_processor as asr_module start_time = time.time() metrics = { "success": False, "duration": 0, "error": None, "peak_rss_mb": 0, "avg_cpu_percent": 0, } try: # Monitor in background thread import threading import queue cpu_readings = [] memory_readings = [] stop_monitor = threading.Event() def monitor_thread(): while not stop_monitor.is_set(): try: # Monitor current process proc = psutil.Process() cpu_readings.append(proc.cpu_percent(interval=0.5)) memory_readings.append(proc.memory_info().rss / 1024 / 1024) except: pass monitor = threading.Thread(target=monitor_thread, daemon=True) monitor.start() # Run ASR asr_module.run_asr(str(video_path), str(output_path), uuid) # Stop monitoring stop_monitor.set() monitor.join(timeout=2.0) metrics["duration"] = time.time() - start_time metrics["success"] = True # Calculate resource metrics if cpu_readings: metrics["avg_cpu_percent"] = sum(cpu_readings) / len(cpu_readings) if memory_readings: metrics["peak_rss_mb"] = max(memory_readings) except Exception as e: metrics["error"] = str(e) metrics["duration"] = time.time() - start_time traceback.print_exc() return metrics["success"], metrics def test_video(video_path: Path, test_dir: Path, index: int, total: int) -> Dict: """Test ASR on a single video file.""" print(f"\n{'=' * 60}") print(f"Testing [{index}/{total}]: {video_path.name}") print(f"Size: {video_path.stat().st_size / 1024 / 1024:.1f} MB") print(f"Path: {video_path}") # Create output file path output_file = test_dir / f"{video_path.stem}.asr.json" # Run ASR start_time = time.time() if DIRECT_CALL: success, metrics = run_asr_direct(video_path, output_file, uuid="") else: success, metrics = run_asr_subprocess(video_path, output_file, uuid="") # Check if output was created output_exists = output_file.exists() if output_exists: try: with open(output_file, "r") as f: result = json.load(f) segments = len(result.get("segments", [])) language = result.get("language", "unknown") metrics["segments"] = segments metrics["language"] = language except Exception as e: metrics["json_error"] = str(e) # Compile test result test_result = { "video": video_path.name, "video_size_mb": video_path.stat().st_size / 1024 / 1024, "success": success, "output_exists": output_exists, **metrics, } # Print summary if success: print( f"✓ SUCCESS: {metrics['duration']:.1f}s, {metrics.get('segments', 0)} segments" ) if "peak_rss_mb" in metrics: print( f" Peak RAM: {metrics['peak_rss_mb']:.1f} MB, Avg CPU: {metrics['avg_cpu_percent']:.1f}%" ) else: print(f"✗ FAILED: {metrics.get('error', 'Unknown error')}") return test_result def main(): """Main test function.""" video_dir = "../test_video" if not os.path.exists(video_dir): print(f"Error: Video directory '{video_dir}' not found.") sys.exit(1) # Create test directory for outputs test_dir = Path("test_output") test_dir.mkdir(exist_ok=True) # Get video files videos = get_video_files(video_dir) if not videos: print(f"No video files found in {video_dir}") return print(f"Found {len(videos)} video files in {video_dir}") print("Starting ASR baseline tests...") results = [] for i, video in enumerate(videos, 1): try: result = test_video(video, test_dir, i, len(videos)) results.append(result) # Save intermediate results with open(test_dir / "results.json", "w") as f: json.dump(results, f, indent=2) except KeyboardInterrupt: print("\n\nTest interrupted by user.") break except Exception as e: print(f"\nUnexpected error testing {video.name}: {e}") traceback.print_exc() # Generate summary report print(f"\n{'=' * 60}") print("TEST SUMMARY") print(f"{'=' * 60}") successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] print(f"Total videos: {len(results)}") print(f"Successful: {len(successful)}") print(f"Failed: {len(failed)}") if successful: avg_duration = sum(r["duration"] for r in successful) / len(successful) avg_segments = sum(r.get("segments", 0) for r in successful) / len(successful) print(f"\nAverage duration: {avg_duration:.1f}s") print(f"Average segments: {avg_segments:.1f}") if failed: print(f"\nFailed videos:") for r in failed: print(f" - {r['video']}: {r.get('error', 'Unknown error')}") # Save detailed report report_path = test_dir / "detailed_report.json" with open(report_path, "w") as f: json.dump(results, f, indent=2) print(f"\nDetailed results saved to: {report_path}") print(f"Test outputs saved to: {test_dir}/") if __name__ == "__main__": main()