#!/opt/homebrew/bin/python3.11 """Test complete ASR processing of large file.""" import subprocess import tempfile import os import time import sys import json def test_complete(): test_video = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(test_video): print(f"Test video not found: {test_video}") return with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f: output_path = f.name try: cmd = [ "/opt/homebrew/bin/python3.11", "scripts/asr_processor.py", test_video, output_path, "--uuid", "test_complete", "--chunk-duration", "600", ] env = os.environ.copy() env["MOMENTRY_DISABLE_REDIS"] = "1" env["ASR_DEBUG"] = "1" env["MOMENTRY_ASR_CHUNK_TIMEOUT"] = "120" # 2 minutes per chunk (generous) print(f"Running ASR processor to completion...") print(f"Command: {' '.join(cmd)}") print( f"Env: MOMENTRY_DISABLE_REDIS=1, ASR_DEBUG=1, MOMENTRY_ASR_CHUNK_TIMEOUT=120" ) print("-" * 60) start = time.time() # Run with generous timeout (10 minutes total for 12 chunks) timeout = 600 # 10 minutes try: proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout, env=env, ) killed = False except subprocess.TimeoutExpired: print(f"\n⏱️ TOTAL TIMEOUT after {timeout}s") killed = True proc = None elapsed = time.time() - start if not killed and proc and proc.returncode == 0: print(f"\n✓ Process completed successfully in {elapsed:.1f}s") print(f"Return code: {proc.returncode}") # Count lines in stderr stderr_lines = proc.stderr.split("\n") print(f"Stderr lines: {len(stderr_lines)}") # Check for success patterns chunk_successes = [ line for line in stderr_lines if "transcribe_chunk succeeded" in line ] print(f"Successful chunks: {len(chunk_successes)}") # Look for any errors errors = [ line for line in stderr_lines if "error" in line.lower() and "debug" not in line ] if errors: print(f"Errors found: {len(errors)}") for err in errors[:5]: print(f" {err}") if os.path.exists(output_path): with open(output_path, "r") as f: data = json.load(f) segments = data.get("segments", []) print(f" Total segments: {len(segments)}") print(f" Language: {data.get('language')}") print(f" Language probability: {data.get('language_probability')}") # Check segment ordering if segments: first_start = segments[0].get("start", 0) last_end = segments[-1].get("end", 0) print(f" First segment start: {first_start:.1f}s") print(f" Last segment end: {last_end:.1f}s") print(f" Total transcription duration: {last_end:.1f}s") # Check for gaps or overlaps prev_end = 0 gaps = 0 overlaps = 0 for i, seg in enumerate(segments): start = seg.get("start", 0) end = seg.get("end", 0) if i > 0: if start > prev_end + 0.1: # gap > 100ms gaps += 1 elif start < prev_end - 0.1: # overlap > 100ms overlaps += 1 prev_end = end print(f" Gaps >100ms: {gaps}, Overlaps >100ms: {overlaps}") else: print(f" Output file not found at {output_path}") # Print last 10 lines of stderr for debugging print(f"\nLast 10 lines of stderr:") for line in stderr_lines[-10:]: if line.strip(): print(f" {line}") else: print(f"\n✗ Process failed or killed") print(f"Elapsed: {elapsed:.1f}s") if proc: print(f"Return code: {proc.returncode}") print(f"Last 20 lines of stderr:") for line in proc.stderr.split("\n")[-20:]: if line.strip(): print(f" {line}") else: print(f"Process was killed due to timeout") except Exception as e: print(f"✗ Error: {e}") import traceback traceback.print_exc() finally: if os.path.exists(output_path): os.unlink(output_path) print(f"✓ Cleaned up output file") if __name__ == "__main__": test_complete()