#!/usr/bin/env python3 """ Test chunked transcription for full audio file. """ import sys import time import tempfile import json import subprocess from pathlib import Path def get_audio_duration(audio_path): """Get duration in seconds.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", str(audio_path), ] result = subprocess.run(cmd, capture_output=True, text=True) return float(result.stdout.strip()) def extract_chunk(audio_path, start, duration, output_path): """Extract chunk using ffmpeg.""" cmd = [ "ffmpeg", "-i", str(audio_path), "-ss", str(start), "-t", str(duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", str(output_path), ] subprocess.run(cmd, capture_output=True) return output_path.exists() and output_path.stat().st_size > 0 def transcribe_chunk(model, chunk_path, chunk_start): """Transcribe a chunk and return segments with absolute timestamps.""" segments, info = model.transcribe(str(chunk_path), beam_size=5) results = [] for seg in segments: results.append( { "start": seg.start + chunk_start, "end": seg.end + chunk_start, "text": seg.text.strip(), } ) return results, info def main(): audio_path = "/tmp/test_audio.wav" if not Path(audio_path).exists(): print(f"Audio file not found: {audio_path}") sys.exit(1) total_duration = get_audio_duration(audio_path) print(f"Audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)") # Chunk settings chunk_duration = 1800 # 30 minutes chunk_overlap = 0 # no overlap for now chunks = [] start = 0 chunk_idx = 0 while start < total_duration: chunk_end = min(start + chunk_duration, total_duration) chunks.append({"start": start, "end": chunk_end, "idx": chunk_idx}) start = chunk_end chunk_idx += 1 print(f"Split into {len(chunks)} chunks") # Load model once print("Loading Whisper model...") from faster_whisper import WhisperModel model = WhisperModel("tiny", device="cpu", compute_type="int8") all_segments = [] language = None language_prob = None temp_dir = Path(tempfile.mkdtemp(prefix="chunks_")) print(f"Temp directory: {temp_dir}") for chunk in chunks: chunk_path = temp_dir / f"chunk_{chunk['idx']}.wav" print( f"\nChunk {chunk['idx'] + 1}/{len(chunks)}: {chunk['start']:.1f}-{chunk['end']:.1f}" ) # Extract chunk print(" Extracting chunk...") if not extract_chunk( audio_path, chunk["start"], chunk["end"] - chunk["start"], chunk_path ): print(" Failed to extract chunk, skipping") continue # Transcribe with timeout print(" Transcribing...") start_time = time.time() try: segments, info = transcribe_chunk(model, chunk_path, chunk["start"]) elapsed = time.time() - start_time print(f" → {len(segments)} segments in {elapsed:.1f}s") all_segments.extend(segments) if language is None: language = info.language language_prob = info.language_probability except Exception as e: print(f" ERROR: {e}") import traceback traceback.print_exc() # Clean up chunk file chunk_path.unlink(missing_ok=True) # Clean up temp directory import shutil shutil.rmtree(temp_dir, ignore_errors=True) # Sort segments all_segments.sort(key=lambda x: x["start"]) # Save results output = { "language": language or "unknown", "language_probability": language_prob or 0.0, "segments": all_segments, "total_segments": len(all_segments), "chunk_count": len(chunks), } output_path = Path("test_output/full_chunked_transcription.json") output_path.parent.mkdir(exist_ok=True, parents=True) with open(output_path, "w") as f: json.dump(output, f, indent=2) print(f"\nTranscription completed:") print(f" Total segments: {len(all_segments)}") print(f" Language: {language} (prob {language_prob:.2f})") print(f" Results saved to: {output_path}") if __name__ == "__main__": main()