#!/usr/bin/env python3 """Minimal test to isolate the hang issue.""" import sys import os import tempfile import subprocess import time # Test video test_video = "../test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(test_video): print(f"Test video not found: {test_video}") sys.exit(1) print(f"Testing: {test_video}") print(f"Size: {os.path.getsize(test_video) / (1024**3):.2f} GB") # Create temp directory temp_dir = tempfile.mkdtemp(prefix="asr_minimal_") print(f"Temp dir: {temp_dir}") # Step 1: Extract audio audio_path = os.path.join(temp_dir, "audio.wav") print(f"\n1. Extracting audio to {audio_path}...") extract_cmd = [ "ffmpeg", "-i", test_video, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] print(f"Command: {' '.join(extract_cmd)}") start = time.time() result = subprocess.run(extract_cmd, capture_output=True, text=True) elapsed = time.time() - start if result.returncode != 0: print(f"Error extracting audio: {result.stderr[:500]}") sys.exit(1) print(f"Audio extraction successful: {elapsed:.1f}s") print(f"Audio file size: {os.path.getsize(audio_path) / (1024**2):.1f} MB") # Step 2: Get audio duration print("\n2. Getting audio duration...") duration_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_path, ] result = subprocess.run(duration_cmd, capture_output=True, text=True) if result.returncode == 0: duration = float(result.stdout.strip()) print(f"Audio duration: {duration:.1f}s ({duration / 60:.1f} min)") else: print(f"Error getting duration: {result.stderr[:500]}") duration = 0 # Step 3: Extract first 60 seconds as a test chunk chunk_path = os.path.join(temp_dir, "chunk_0000.wav") print(f"\n3. Extracting first 60 seconds to {chunk_path}...") chunk_cmd = [ "ffmpeg", "-i", audio_path, "-ss", "0", "-t", "60", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", chunk_path, ] print(f"Command: {' '.join(chunk_cmd)}") start = time.time() result = subprocess.run(chunk_cmd, capture_output=True, text=True) elapsed = time.time() - start if result.returncode != 0: print(f"Error extracting chunk: {result.stderr[:500]}") sys.exit(1) print(f"Chunk extraction successful: {elapsed:.1f}s") print(f"Chunk file size: {os.path.getsize(chunk_path) / (1024**2):.1f} MB") # Step 4: Try to load Whisper model print("\n4. Testing Whisper model load...") import sys sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: from faster_whisper import WhisperModel print("faster_whisper import successful") start = time.time() model = WhisperModel("tiny", device="cpu", compute_type="int8") elapsed = time.time() - start print(f"Model loaded successfully: {elapsed:.1f}s") except Exception as e: print(f"Error loading model: {e}") sys.exit(1) # Step 5: Try to transcribe the chunk print("\n5. Transcribing chunk...") try: start = time.time() segments, info = model.transcribe(chunk_path, beam_size=5) elapsed = time.time() - start # Convert to list to force evaluation segments = list(segments) print(f"Transcription successful: {elapsed:.1f}s") print(f"Detected language: {info.language} (prob {info.language_probability:.2f})") print(f"Number of segments: {len(segments)}") for i, segment in enumerate(segments[:3]): # Show first 3 segments print( f" Segment {i}: {segment.start:.1f}s - {segment.end:.1f}s: {segment.text[:50]}..." ) if len(segments) > 3: print(f" ... and {len(segments) - 3} more segments") except Exception as e: print(f"Error transcribing: {e}") import traceback traceback.print_exc() # Step 6: Try to transcribe the full audio (should hang for large files) print("\n6. Testing full audio transcription (should hang for large files)...") try: start = time.time() # Set a timeout import threading class TranscriptionResult: def __init__(self): self.segments = [] self.info = None self.error = None result = TranscriptionResult() def transcribe_with_timeout(): try: segs, inf = model.transcribe(audio_path, beam_size=5) result.segments = list(segs) result.info = inf except Exception as e: result.error = e thread = threading.Thread(target=transcribe_with_timeout) thread.daemon = True thread.start() thread.join(timeout=30) # 30 second timeout if thread.is_alive(): print("Full transcription timed out after 30 seconds (expected for large file)") elif result.error: print(f"Transcription error: {result.error}") else: elapsed = time.time() - start print(f"Full transcription successful (unexpected!): {elapsed:.1f}s") print(f"Segments: {len(result.segments)}") except Exception as e: print(f"Error in full transcription test: {e}") print(f"\nTemp directory preserved: {temp_dir}")