#!/opt/homebrew/bin/python3.11 """Test transcription of a chunk from large video.""" import sys import os import tempfile import subprocess import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def extract_chunk(audio_path, start, duration, chunk_path): """Extract a single chunk.""" cmd = [ "ffmpeg", "-i", audio_path, "-ss", str(start), "-t", str(duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", chunk_path, ] result = subprocess.run(cmd, capture_output=True, timeout=30) return ( result.returncode == 0 and os.path.exists(chunk_path) and os.path.getsize(chunk_path) > 0 ) def main(): video_path = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(video_path): print(f"Video not found: {video_path}") return # First extract audio (or reuse existing audio.wav from previous run) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: audio_path = f.name # Extract audio print("Extracting audio from video...") cmd = [ "ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] result = subprocess.run(cmd, capture_output=True, timeout=60) if result.returncode != 0: print(f"Audio extraction failed: {result.stderr.decode()[:200]}") return print(f"Audio extracted: {os.path.getsize(audio_path)} bytes") # Extract first chunk (60 seconds) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: chunk_path = f.name try: if not extract_chunk(audio_path, 0, 60, chunk_path): print("Failed to extract chunk") return print(f"Chunk extracted: {os.path.getsize(chunk_path)} bytes") # Load Whisper model print("Loading Whisper model...") try: from faster_whisper import WhisperModel model = WhisperModel("tiny", device="cpu", compute_type="int8") print("Model loaded") except ImportError as e: print(f"Failed to import faster_whisper: {e}") return except Exception as e: print(f"Failed to load model: {e}") return # Try transcription print("Transcribing chunk...") start_time = time.time() try: # Use beam_size=5 like in ASR processor segments, info = model.transcribe(chunk_path, beam_size=5) elapsed = time.time() - start_time print(f"Transcription initiated in {elapsed:.2f}s") # Convert generator to list (actual transcription happens here) print("Converting segments to list...") segments_list = list(segments) total_elapsed = time.time() - start_time print(f"Transcription completed in {total_elapsed:.2f}s") print(f"Segments: {len(segments_list)}") print( f"Language: {info.language}, Probability: {info.language_probability}" ) for i, segment in enumerate(segments_list[:5]): print( f"Segment {i}: {segment.start:.2f}s - {segment.end:.2f}s: {segment.text}" ) except Exception as e: print(f"Transcription failed: {e}") import traceback traceback.print_exc() finally: if os.path.exists(chunk_path): os.unlink(chunk_path) if os.path.exists(audio_path): os.unlink(audio_path) print("Cleaned up temporary files") if __name__ == "__main__": main()