#!/usr/bin/env python3 """ Test ASR on first 60 minutes of the largest video file. This tests chunked transcription with multiple chunks (6 chunks of 10 minutes). """ import sys import os import subprocess import tempfile import time def extract_segment(input_path, start_time, duration, output_path): """Extract a segment from video using ffmpeg.""" cmd = [ "ffmpeg", "-i", input_path, "-ss", str(start_time), "-t", str(duration), "-c", "copy", # Copy codec (no re-encoding) "-y", output_path, ] result = subprocess.run(cmd, capture_output=True) return result.returncode == 0 and os.path.exists(output_path) def test_60min_segment(): """Test ASR on 60-minute segment of large video.""" large_video = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(large_video): print(f"Large video not found: {large_video}") return False with tempfile.TemporaryDirectory() as temp_dir: # Extract first 60 minutes (3600 seconds) - should create 6 chunks of 10 min each segment_path = os.path.join(temp_dir, "segment_60min.mov") output_path = os.path.join(temp_dir, "output.json") print(f"Extracting 60-minute segment from {os.path.basename(large_video)}...") if not extract_segment(large_video, 0, 3600, segment_path): print("Failed to extract segment") return False print(f"Segment created: {os.path.getsize(segment_path) / (1024**3):.2f} GB") # Run ASR with timeout (20 minutes max for 60-minute segment) script_path = os.path.join( os.path.dirname(__file__), "scripts", "asr_processor.py" ) cmd = [ "/opt/homebrew/bin/python3.11", script_path, segment_path, output_path, "--model-size", "tiny", # Use default chunk duration (600s = 10 min) ] print(f"Running ASR (timeout: 1200 seconds = 20 minutes)...") start_time = time.time() try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait with timeout timeout = 1200 # 20 minutes try: stdout, stderr = proc.communicate(timeout=timeout) returncode = proc.returncode elapsed = time.time() - start_time print(f"ASR completed in {elapsed:.2f} seconds") print(f"Return code: {returncode}") if os.path.exists(output_path): import json with open(output_path, "r") as f: data = json.load(f) print( f"Success! Processing mode: {data.get('processing_mode', 'unknown')}" ) print(f"Chunk count: {data.get('chunk_count', 1)}") print(f"Segments: {len(data.get('segments', []))}") # Verify chunk count (should be 6 for 60 min with 10 min chunks) expected_chunks = 6 actual_chunks = data.get("chunk_count", 1) if actual_chunks == expected_chunks: print(f"✅ Correct chunk count: {actual_chunks}") else: print( f"⚠️ Unexpected chunk count: {actual_chunks} (expected: {expected_chunks})" ) return True else: print("Output file not created") if stderr: print(f"STDERR (last 20 lines):") for line in stderr.strip().split("\n")[-20:]: print(f" {line}") return False except subprocess.TimeoutExpired: print(f"ASR timed out after {timeout} seconds") proc.kill() stdout, stderr = proc.communicate() return False except Exception as e: print(f"Error: {e}") return False if __name__ == "__main__": print("Testing ASR on 60-minute segment of large video") print("This should trigger chunked transcription with 6 chunks (10 min each).\n") success = test_60min_segment() if success: print("\n✅ 60-minute segment test PASSED") sys.exit(0) else: print("\n❌ 60-minute segment test FAILED") sys.exit(1)