#!/usr/bin/env python3 """ Test ASR on first 40 minutes of a large video file. """ import sys import os import subprocess import tempfile import time def extract_segment(input_path, start_time, duration, output_path): """Extract a segment from video using ffmpeg.""" cmd = [ "ffmpeg", "-i", input_path, "-ss", str(start_time), "-t", str(duration), "-c", "copy", # Copy codec (no re-encoding) "-y", output_path, ] result = subprocess.run(cmd, capture_output=True) return result.returncode == 0 and os.path.exists(output_path) def test_large_segment(): """Test ASR on 31-minute segment of large video.""" large_video = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(large_video): print(f"Large video not found: {large_video}") return False with tempfile.TemporaryDirectory() as temp_dir: # Extract first 31 minutes (1860 seconds) - just over 1800s threshold to trigger chunked segment_path = os.path.join(temp_dir, "segment_31min.mov") output_path = os.path.join(temp_dir, "output.json") print(f"Extracting 31-minute segment from {os.path.basename(large_video)}...") if not extract_segment(large_video, 0, 1860, segment_path): print("Failed to extract segment") return False print(f"Segment created: {os.path.getsize(segment_path) / (1024**3):.2f} GB") # Run ASR with timeout (10 minutes max for 31-minute segment) script_path = os.path.join( os.path.dirname(__file__), "scripts", "asr_processor.py" ) cmd = [ "/opt/homebrew/bin/python3.11", script_path, segment_path, output_path, "--model-size", "tiny", ] print(f"Running ASR (timeout: 600 seconds)...") start_time = time.time() try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait with timeout timeout = 600 # 10 minutes try: stdout, stderr = proc.communicate(timeout=timeout) returncode = proc.returncode elapsed = time.time() - start_time print(f"ASR completed in {elapsed:.2f} seconds") print(f"Return code: {returncode}") if os.path.exists(output_path): import json with open(output_path, "r") as f: data = json.load(f) print( f"Success! Processing mode: {data.get('processing_mode', 'unknown')}" ) print(f"Chunk count: {data.get('chunk_count', 1)}") print(f"Segments: {len(data.get('segments', []))}") return True else: print("Output file not created") if stderr: print(f"STDERR (last 20 lines):") for line in stderr.strip().split("\n")[-20:]: print(f" {line}") return False except subprocess.TimeoutExpired: print(f"ASR timed out after {timeout} seconds") proc.kill() stdout, stderr = proc.communicate() return False except Exception as e: print(f"Error: {e}") return False if __name__ == "__main__": print("Testing ASR on 31-minute segment of large video") print( "This should trigger chunked transcription (just over 30-minute threshold).\n" ) success = test_large_segment() if success: print("\n✅ Large segment test PASSED") sys.exit(0) else: print("\n❌ Large segment test FAILED") sys.exit(1)