#!/usr/bin/env python3 """Test chunk extraction and transcription separately.""" import sys import os import tempfile import subprocess import time # Test video test_video = "../test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(test_video): test_video = "../test_video/Old_Time_Movie_Show_-_Charade_1963.HD.mov" print(f"Testing: {test_video}") # Create temp directory temp_dir = tempfile.mkdtemp(prefix="asr_chunk_test_") print(f"Temp dir: {temp_dir}") # Extract audio first audio_path = os.path.join(temp_dir, "audio.wav") extract_cmd = [ "ffmpeg", "-i", test_video, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] print(f"\n1. Extracting audio...") start = time.time() result = subprocess.run(extract_cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error: {result.stderr[:500]}") sys.exit(1) print(f"Audio extracted: {time.time() - start:.1f}s") # Get duration duration_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_path, ] result = subprocess.run(duration_cmd, capture_output=True, text=True) duration = float(result.stdout.strip()) print(f"Audio duration: {duration:.1f}s") # Simulate the ASR processor chunk calculation chunk_duration = 600 # 10 minutes chunks = [] start_time = 0.0 chunk_idx = 0 while start_time < duration: chunk_end = min(start_time + chunk_duration, duration) chunks.append( { "start": start_time, "end": chunk_end, "duration": chunk_end - start_time, "idx": chunk_idx, } ) start_time = chunk_end chunk_idx += 1 print(f"\n2. Calculated {len(chunks)} chunks") # Create chunk directory chunk_temp_dir = os.path.join(temp_dir, "chunks") os.makedirs(chunk_temp_dir, exist_ok=True) print(f"Chunk directory: {chunk_temp_dir}") # Test first chunk print(f"\n3. Testing first chunk extraction and transcription...") chunk = chunks[0] chunk_path = os.path.join(chunk_temp_dir, f"chunk_{chunk['idx']:04d}.wav") # Extract chunk using the exact function from asr_processor.py def extract_chunk(audio_path, start, duration, output_path): """Extract a chunk of audio using ffmpeg.""" cmd = [ "ffmpeg", "-i", audio_path, "-ss", str(start), "-t", str(duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", output_path, ] print(f" Running: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True) success = ( result.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0 ) if not success: print(f" Error: returncode={result.returncode}, stderr={result.stderr[:200]}") return success print( f"Extracting chunk 0: start={chunk['start']:.1f}, duration={chunk['duration']:.1f}" ) start = time.time() success = extract_chunk(audio_path, chunk["start"], chunk["duration"], chunk_path) if not success: print("Chunk extraction failed!") sys.exit(1) print(f"Chunk extracted: {time.time() - start:.1f}s") print(f"Chunk file size: {os.path.getsize(chunk_path) / (1024**2):.1f} MB") # Load Whisper model print(f"\n4. Loading Whisper model...") sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from faster_whisper import WhisperModel start = time.time() model = WhisperModel("tiny", device="cpu", compute_type="int8") print(f"Model loaded: {time.time() - start:.1f}s") # Transcribe chunk print(f"\n5. Transcribing chunk...") def transcribe_chunk(model, chunk_path, chunk_start, chunk_idx, total_chunks): """Transcribe a single audio chunk.""" print(f" Starting transcription of chunk {chunk_idx + 1}/{total_chunks}") start_time = time.time() segments, info = model.transcribe(chunk_path, beam_size=5) results = [] for segment in segments: results.append( { "start": segment.start + chunk_start, "end": segment.end + chunk_start, "text": segment.text.strip(), } ) elapsed = time.time() - start_time print( f" Chunk {chunk_idx + 1}/{total_chunks}: {len(results)} segments in {elapsed:.1f}s" ) return results, info start = time.time() segments, info = transcribe_chunk(model, chunk_path, chunk["start"], 0, len(chunks)) print(f"Total time for chunk transcription: {time.time() - start:.1f}s") print(f"Language: {info.language} (prob {info.language_probability:.2f})") # Test second chunk to see if it also works if len(chunks) > 1: print(f"\n6. Testing second chunk...") chunk = chunks[1] chunk_path2 = os.path.join(chunk_temp_dir, f"chunk_{chunk['idx']:04d}.wav") print( f"Extracting chunk 1: start={chunk['start']:.1f}, duration={chunk['duration']:.1f}" ) start = time.time() success = extract_chunk(audio_path, chunk["start"], chunk["duration"], chunk_path2) if success: print(f"Chunk extracted: {time.time() - start:.1f}s") start = time.time() segments2, info2 = transcribe_chunk( model, chunk_path2, chunk["start"], 1, len(chunks) ) print(f"Total time: {time.time() - start:.1f}s") else: print("Second chunk extraction failed") print(f"\nTemp directory preserved: {temp_dir}")