#!/usr/bin/env python3 """ Simple ASR test script - test a few video files with detailed logging. """ import os import sys import time import json import subprocess import signal from pathlib import Path def run_asr_on_video(video_path, output_path, timeout_sec=600): """Run ASR processor with timeout and resource monitoring.""" script_path = Path(__file__).parent / "scripts" / "asr_processor.py" cmd = [sys.executable, str(script_path), str(video_path), str(output_path)] print(f" Command: {' '.join(cmd)}") print(f" Timeout: {timeout_sec}s") start_time = time.time() try: # Start process with process group for clean termination proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, preexec_fn=os.setsid, bufsize=1, universal_newlines=True, ) # Read stderr in real-time to see progress def read_stream(stream, prefix): for line in iter(stream.readline, ""): print(f"{prefix}{line.rstrip()}") sys.stdout.flush() # Start reading stderr in background import threading stderr_thread = threading.Thread( target=read_stream, args=(proc.stderr, " [stderr] ") ) stderr_thread.daemon = True stderr_thread.start() # Wait for process completion with timeout try: returncode = proc.wait(timeout=timeout_sec) duration = time.time() - start_time # Get any remaining output stdout, _ = proc.communicate() if stdout: print(f" [stdout] {stdout.strip()}") print(f" Process exited with code {returncode} after {duration:.1f}s") if returncode == 0: # Verify output file if os.path.exists(output_path): with open(output_path, "r") as f: result = json.load(f) segments = len(result.get("segments", [])) language = result.get("language", "unknown") print(f" Success: {segments} segments, language: {language}") return True, duration, segments else: print(f" Error: Output file not created: {output_path}") return False, duration, 0 else: print(f" Error: Process failed with exit code {returncode}") return False, duration, 0 except subprocess.TimeoutExpired: duration = time.time() - start_time print(f" ERROR: Process timed out after {duration:.1f}s") # Kill entire process group try: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) except: pass proc.wait(timeout=5) return False, duration, 0 except Exception as e: print(f" Exception: {e}") import traceback traceback.print_exc() return False, time.time() - start_time, 0 def main(): video_dir = "../test_video" test_dir = Path("test_output_simple") test_dir.mkdir(exist_ok=True) # Select a few test files (small to medium) video_files = [] for f in Path(video_dir).iterdir(): if f.suffix.lower() in [".mp4", ".mov", ".avi", ".mkv"]: video_files.append(f) # Sort by size and take first 3 video_files.sort(key=lambda p: p.stat().st_size) selected = video_files[:3] print(f"Testing {len(selected)} video files (sorted by size):") for vf in selected: print(f" - {vf.name}: {vf.stat().st_size / 1024 / 1024:.1f} MB") results = [] for i, video in enumerate(selected, 1): print(f"\n{'=' * 60}") print(f"Test {i}/{len(selected)}: {video.name}") print(f"{'=' * 60}") output_file = test_dir / f"{video.stem}.asr.json" success, duration, segments = run_asr_on_video( video, output_file, timeout_sec=300 ) results.append( { "video": video.name, "size_mb": video.stat().st_size / 1024 / 1024, "success": success, "duration": duration, "segments": segments, "output_file": str(output_file), } ) # Small delay between tests if i < len(selected): print(" Waiting 5 seconds before next test...") time.sleep(5) # Summary print(f"\n{'=' * 60}") print("SUMMARY") print(f"{'=' * 60}") for r in results: status = "✓" if r["success"] else "✗" print(f"{status} {r['video']}: {r['duration']:.1f}s, {r['segments']} segments") success_count = sum(1 for r in results if r["success"]) print(f"\nSuccess rate: {success_count}/{len(results)}") # Save results with open(test_dir / "test_results.json", "w") as f: json.dump(results, f, indent=2) print(f"\nDetailed results saved to: {test_dir}/test_results.json") if __name__ == "__main__": main()