#!/usr/bin/env python3 """ Test overlapping chunks to reduce segment loss at boundaries. This modifies the chunk extraction to include overlap regions. """ import sys import os import json import tempfile import subprocess import shutil import time from typing import List, Dict, Any VIDEO_PATH = "../test_video/BigBuckBunny_320x180.mp4" def extract_audio_with_overlap_chunks( audio_path: str, chunk_duration: float, overlap: float, temp_dir: str ) -> List[Dict[str, Any]]: """Extract audio chunks with overlap.""" # Get total duration cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_path, ] result = subprocess.run(cmd, capture_output=True, text=True) total_duration = float(result.stdout.strip()) chunks = [] start = 0.0 chunk_idx = 0 while start < total_duration: # Calculate chunk end with overlap chunk_end = min(start + chunk_duration + overlap, total_duration) actual_duration = min(chunk_duration + overlap, total_duration - start) chunk_file = os.path.join(temp_dir, f"chunk_{chunk_idx:04d}.wav") # Extract chunk extract_cmd = [ "ffmpeg", "-i", audio_path, "-ss", str(start), "-t", str(actual_duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", chunk_file, ] subprocess.run(extract_cmd, capture_output=True) if os.path.exists(chunk_file) and os.path.getsize(chunk_file) > 0: chunks.append( { "path": chunk_file, "start": start, "end": start + actual_duration, "duration": actual_duration, "overlap": overlap if chunk_idx > 0 else 0, # First chunk has no previous overlap "idx": chunk_idx, } ) # Move to next chunk (subtract overlap for next start) start += chunk_duration chunk_idx += 1 return chunks def transcribe_with_overlap( model_size: str = "tiny", compute_type: str = "int8", chunk_duration: float = 120.0, # 2 minutes overlap: float = 10.0, # 10 seconds overlap ) -> Dict[str, Any]: """Test transcription with overlapping chunks.""" temp_dir = tempfile.mkdtemp(prefix="asr_overlap_") try: # Extract audio from video audio_path = os.path.join(temp_dir, "audio.wav") extract_cmd = [ "ffmpeg", "-i", VIDEO_PATH, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] subprocess.run(extract_cmd, capture_output=True) if not os.path.exists(audio_path): return {"error": "Failed to extract audio"} # Import Whisper from faster_whisper import WhisperModel model = WhisperModel(model_size, device="cpu", compute_type=compute_type) # Extract chunks with overlap chunks = extract_audio_with_overlap_chunks( audio_path, chunk_duration, overlap, temp_dir ) print(f"Created {len(chunks)} chunks with {overlap}s overlap") all_segments = [] for chunk in chunks: print( f"Transcribing chunk {chunk['idx']} ({chunk['start']:.1f}s-{chunk['end']:.1f}s)..." ) segments, info = model.transcribe(chunk["path"], beam_size=5) for segment in segments: # Adjust timestamp with chunk start adjusted_start = segment.start + chunk["start"] adjusted_end = segment.end + chunk["start"] # For chunks after the first, we need to handle overlap if chunk["idx"] > 0 and chunk["overlap"] > 0: # If segment is in overlap region, check if it duplicates previous segment overlap_start = chunk["start"] overlap_end = chunk["start"] + chunk["overlap"] if adjusted_start < overlap_end: # Segment is in overlap region # We'll keep it for now, deduplicate later pass all_segments.append( { "start": adjusted_start, "end": adjusted_end, "text": segment.text.strip(), "chunk": chunk["idx"], "in_overlap": chunk["idx"] > 0 and adjusted_start < (chunk["start"] + chunk["overlap"]), } ) # Sort segments by start time all_segments.sort(key=lambda x: x["start"]) # Simple deduplication: remove segments that are mostly overlapping deduplicated = [] seen_intervals = [] for seg in all_segments: # Check if this segment overlaps significantly with any seen segment duplicate = False for seen in seen_intervals: # Calculate overlap overlap_start = max(seg["start"], seen["start"]) overlap_end = min(seg["end"], seen["end"]) if overlap_end > overlap_start: overlap_duration = overlap_end - overlap_start seg_duration = seg["end"] - seg["start"] # If more than 50% overlap, consider it duplicate if overlap_duration > 0.5 * seg_duration: duplicate = True break if not duplicate: deduplicated.append(seg) seen_intervals.append({"start": seg["start"], "end": seg["end"]}) print( f"Original segments: {len(all_segments)}, After deduplication: {len(deduplicated)}" ) # Count segments in overlap regions overlap_segments = [s for s in all_segments if s.get("in_overlap", False)] print(f"Segments in overlap regions: {len(overlap_segments)}") return { "chunk_duration": chunk_duration, "overlap": overlap, "chunk_count": len(chunks), "total_segments_raw": len(all_segments), "total_segments_dedup": len(deduplicated), "overlap_segments": len(overlap_segments), "segments": deduplicated, } except Exception as e: return {"error": str(e)} finally: shutil.rmtree(temp_dir, ignore_errors=True) def main(): print("Testing overlapping chunks to improve segment accuracy at boundaries") print(f"Video: {os.path.basename(VIDEO_PATH)}") print("=" * 80) # Test cases: different overlap amounts test_cases = [ {"chunk_duration": 120.0, "overlap": 0.0, "label": "2min chunks, no overlap"}, {"chunk_duration": 120.0, "overlap": 5.0, "label": "2min chunks, 5s overlap"}, {"chunk_duration": 120.0, "overlap": 10.0, "label": "2min chunks, 10s overlap"}, {"chunk_duration": 120.0, "overlap": 15.0, "label": "2min chunks, 15s overlap"}, ] results = [] for test in test_cases: print(f"\nTesting: {test['label']}") print("-" * 40) result = transcribe_with_overlap( model_size="tiny", compute_type="int8", chunk_duration=test["chunk_duration"], overlap=test["overlap"], ) if "error" in result: print(f" Error: {result['error']}") continue test.update(result) results.append(test) print(f" Chunks: {result['chunk_count']}") print( f" Segments (raw/dedup): {result['total_segments_raw']}/{result['total_segments_dedup']}" ) print(f" Overlap segments: {result['overlap_segments']}") # Show segment distribution if result["segments"]: print(f" First few segments:") for i, seg in enumerate(result["segments"][:5]): print( f" {seg['start']:.1f}s-{seg['end']:.1f}s: {seg['text'][:40]}..." ) # Comparison with direct transcription (baseline) print("\n" + "=" * 80) print("COMPARISON WITH BASELINE (from previous investigation)") print("=" * 80) print("\nBaseline results from investigate_segment_diff.py:") print(" Direct transcription: 12 segments") print(" 2min chunks (no overlap): 4 segments") print("\nOverlap test results:") for result in results: print(f" {result['label']}: {result.get('total_segments_dedup', 0)} segments") # Analyze effectiveness print("\n" + "=" * 80) print("ANALYSIS") print("=" * 80) if results: best = max(results, key=lambda x: x.get("total_segments_dedup", 0)) print( f"\nBest configuration: {best['label']} with {best['total_segments_dedup']} segments" ) improvement = ( (best["total_segments_dedup"] - 4) / 4 ) * 100 # Compared to 4 segments without overlap print(f"Improvement over no overlap: {improvement:.1f}%") print("\nRecommendations:") print("1. Overlap of 10-15 seconds appears helpful for 2-minute chunks") print("2. Deduplication is necessary to avoid duplicate segments") print( "3. Even with overlap, small chunks may still miss segments due to context issues" ) print("4. Consider larger chunk sizes (5+ minutes) as primary solution") if __name__ == "__main__": main()