#!/usr/bin/env python3 """ Chunked transcription to handle large audio files. """ import sys import time import tempfile import json import subprocess from pathlib import Path import numpy as np def split_audio(input_path, chunk_duration=1800, output_dir=None): """Split audio into chunks using ffmpeg.""" if output_dir is None: output_dir = Path(tempfile.mkdtemp(prefix="audio_chunks_")) else: output_dir = Path(output_dir) output_dir.mkdir(exist_ok=True, parents=True) # Get total duration cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", str(input_path), ] result = subprocess.run(cmd, capture_output=True, text=True) total_duration = float(result.stdout.strip()) print( f"Total audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)" ) print(f"Splitting into {chunk_duration}s chunks...") chunks = [] start = 0 chunk_idx = 0 while start < total_duration: chunk_path = output_dir / f"chunk_{chunk_idx:04d}.wav" cmd = [ "ffmpeg", "-i", str(input_path), "-ss", str(start), "-t", str(chunk_duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", str(chunk_path), ] subprocess.run(cmd, capture_output=True) if chunk_path.exists() and chunk_path.stat().st_size > 0: chunks.append( { "path": chunk_path, "start_time": start, "end_time": min(start + chunk_duration, total_duration), } ) else: print(f"Warning: Chunk {chunk_idx} may be empty") start += chunk_duration chunk_idx += 1 print(f"Created {len(chunks)} chunks in {output_dir}") return chunks, output_dir def transcribe_chunk(chunk_info, model, chunk_idx, total_chunks): """Transcribe a single chunk.""" print( f"[{chunk_idx + 1}/{total_chunks}] Transcribing chunk {chunk_info['start_time']:.1f}-{chunk_info['end_time']:.1f}" ) start_time = time.time() segments, info = model.transcribe(str(chunk_info["path"]), beam_size=5) results = [] for segment in segments: # Adjust timestamps by chunk start time results.append( { "start": segment.start + chunk_info["start_time"], "end": segment.end + chunk_info["start_time"], "text": segment.text.strip(), } ) elapsed = time.time() - start_time print(f" → {len(results)} segments in {elapsed:.1f}s") return results, info def main(): import argparse parser = argparse.ArgumentParser(description="Chunked transcription") parser.add_argument("audio_path", help="Audio file path") parser.add_argument( "--chunk-duration", type=int, default=1800, help="Chunk duration in seconds (default: 1800 = 30 min)", ) parser.add_argument("--model-size", default="tiny", help="Whisper model size") parser.add_argument("--compute-type", default="int8", help="Compute type") parser.add_argument( "--output", "-o", default="chunked_transcription.json", help="Output JSON path" ) args = parser.parse_args() audio_path = Path(args.audio_path) if not audio_path.exists(): print(f"Error: File not found: {audio_path}") sys.exit(1) print(f"Chunked Transcription for {audio_path}") print(f"Model: {args.model_size}, Compute: {args.compute_type}") print( f"Chunk duration: {args.chunk_duration}s ({args.chunk_duration / 60:.1f} min)" ) # Split audio chunks, temp_dir = split_audio(audio_path, chunk_duration=args.chunk_duration) if not chunks: print("No chunks created") sys.exit(1) # Load model once print("Loading Whisper model...") from faster_whisper import WhisperModel model_start = time.time() model = WhisperModel(args.model_size, device="cpu", compute_type=args.compute_type) print(f"Model loaded in {time.time() - model_start:.1f}s") # Process each chunk all_segments = [] language = None language_prob = None for i, chunk in enumerate(chunks): try: segments, info = transcribe_chunk(chunk, model, i, len(chunks)) all_segments.extend(segments) if language is None: language = info.language language_prob = info.language_probability except Exception as e: print(f"Error transcribing chunk {i}: {e}") import traceback traceback.print_exc() # Continue with next chunk # Sort segments by start time all_segments.sort(key=lambda x: x["start"]) # Save results output = { "language": language or "unknown", "language_probability": language_prob or 0.0, "segments": all_segments, "chunk_count": len(chunks), "chunk_duration": args.chunk_duration, "total_segments": len(all_segments), } output_path = Path(args.output) output_path.parent.mkdir(exist_ok=True, parents=True) with open(output_path, "w") as f: json.dump(output, f, indent=2) print(f"\nTranscription completed:") print(f" Total segments: {len(all_segments)}") print( f" Language: {output['language']} (prob {output['language_probability']:.2f})" ) print(f" Results saved to: {output_path}") # Cleanup temp directory import shutil shutil.rmtree(temp_dir, ignore_errors=True) if __name__ == "__main__": main()