#!/usr/bin/env python3 """ Investigate segment count differences between direct and chunked transcription. Analyze timestamps, durations, and text to understand why segment counts differ. """ import sys import os import json import tempfile import subprocess import shutil import time from typing import List, Dict, Any, Tuple import statistics VIDEO_PATH = "../test_video/BigBuckBunny_320x180.mp4" # 10 min, 62MB def run_transcription( mode_name: str, max_direct: int, chunk_dur: int ) -> Dict[str, Any]: """Run transcription with given parameters and return detailed results.""" temp_dir = tempfile.mkdtemp(prefix=f"asr_invest_{mode_name}_") output_path = os.path.join(temp_dir, "output.json") audio_path = os.path.join(temp_dir, "audio.wav") # Extract audio first extract_cmd = [ "ffmpeg", "-i", VIDEO_PATH, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] subprocess.run(extract_cmd, capture_output=True) # Set environment for ASR processor env = os.environ.copy() env["MOMENTRY_ASR_MAX_DIRECT_DURATION"] = str(max_direct) env["MOMENTRY_ASR_CHUNK_DURATION"] = str(chunk_dur) env["MOMENTRY_ASR_MODEL_SIZE"] = "tiny" env["MOMENTRY_ASR_COMPUTE_TYPE"] = "int8" cmd = [ "/opt/homebrew/bin/python3.11", "scripts/asr_processor.py", VIDEO_PATH, output_path, "--uuid", f"invest_{mode_name}", ] start = time.time() proc = subprocess.run(cmd, capture_output=True, env=env, text=True) elapsed = time.time() - start # Load results if os.path.exists(output_path): with open(output_path, "r") as f: data = json.load(f) segments = data.get("segments", []) language = data.get("language", "") mode = data.get("processing_mode", "unknown") chunk_count = data.get("chunk_count", 1) else: segments = [] language = "" mode = "failed" chunk_count = 0 # Calculate segment statistics if segments: durations = [s["end"] - s["start"] for s in segments] stats = { "count": len(segments), "total_duration": sum(durations), "avg_duration": statistics.mean(durations) if durations else 0, "min_duration": min(durations) if durations else 0, "max_duration": max(durations) if durations else 0, } else: stats = { "count": 0, "total_duration": 0, "avg_duration": 0, "min_duration": 0, "max_duration": 0, } # Clean up shutil.rmtree(temp_dir, ignore_errors=True) return { "mode_name": mode_name, "processing_mode": mode, "chunk_count": chunk_count, "chunk_duration": chunk_dur, "elapsed": elapsed, "language": language, "segment_count": len(segments), "segments": segments, "segment_stats": stats, "returncode": proc.returncode, "stderr": proc.stderr[:500] if proc.stderr else "", } def analyze_segment_overlap( segments1: List[Dict], segments2: List[Dict], tolerance: float = 0.5 ) -> Dict[str, Any]: """Analyze overlap between two segment lists based on timestamps.""" matches = [] only_in_1 = [] only_in_2 = [] # For each segment in list1, find closest match in list2 for s1 in segments1: best_match = None best_overlap = 0 for s2 in segments2: # Calculate overlap start_overlap = max(s1["start"], s2["start"]) end_overlap = min(s1["end"], s2["end"]) if end_overlap > start_overlap: overlap = end_overlap - start_overlap if overlap > best_overlap: best_overlap = overlap best_match = s2 if best_match and best_overlap >= tolerance: matches.append( { "segment1": s1, "segment2": best_match, "overlap": best_overlap, "text_diff": s1["text"] != best_match["text"], } ) else: only_in_1.append(s1) # Find segments only in list2 for s2 in segments2: matched = any(m["segment2"] == s2 for m in matches) if not matched: only_in_2.append(s2) return { "matches": matches, "only_in_1": only_in_1, "only_in_2": only_in_2, "match_count": len(matches), "unique_to_1": len(only_in_1), "unique_to_2": len(only_in_2), } def analyze_chunk_boundaries( chunk_results: Dict[str, Any], chunk_duration: float ) -> Dict[str, Any]: """Analyze segments near chunk boundaries.""" if chunk_results["chunk_count"] <= 1: return {"boundary_issues": [], "segments_near_boundary": 0} boundaries = [] for i in range(chunk_results["chunk_count"] - 1): boundary_time = (i + 1) * chunk_duration boundaries.append(boundary_time) segments_near_boundary = [] boundary_tolerance = 1.0 # 1 second tolerance for segment in chunk_results["segments"]: for boundary in boundaries: if ( abs(segment["start"] - boundary) < boundary_tolerance or abs(segment["end"] - boundary) < boundary_tolerance ): segments_near_boundary.append( { "segment": segment, "boundary": boundary, "distance_to_start": segment["start"] - boundary, "distance_to_end": segment["end"] - boundary, } ) break return { "boundaries": boundaries, "segments_near_boundary": segments_near_boundary, "count_near_boundary": len(segments_near_boundary), } def print_segment_comparison(title: str, segments: List[Dict]): """Print segment details for comparison.""" print(f"\n{title} ({len(segments)} segments):") print("-" * 80) for i, seg in enumerate(segments): print( f"{i:3d}: {seg['start']:7.2f}s - {seg['end']:7.2f}s " f"(dur:{seg['end'] - seg['start']:5.2f}s): {seg['text'][:60]}" ) def main(): print( "Investigating segment count differences between direct and chunked transcription" ) print(f"Video: {os.path.basename(VIDEO_PATH)}") print("=" * 80) # Run different transcription modes modes = [ ("direct", 1800, 600), # Direct (30 min max, 10 min chunk size) ("chunked_10min", 300, 600), # 1 chunk (10 min) ("chunked_5min", 300, 300), # 2 chunks (5 min each) ("chunked_2min", 300, 120), # 5 chunks (2 min each) ] results = {} for mode_name, max_direct, chunk_dur in modes: print( f"\nRunning {mode_name} (max_direct={max_direct}s, chunk={chunk_dur}s)..." ) result = run_transcription(mode_name, max_direct, chunk_dur) results[mode_name] = result print(f" Mode: {result['processing_mode']}, Chunks: {result['chunk_count']}") print(f" Segments: {result['segment_count']}, Language: {result['language']}") print(f" Time: {result['elapsed']:.1f}s") print( f" Segment stats: avg={result['segment_stats']['avg_duration']:.2f}s, " f"min={result['segment_stats']['min_duration']:.2f}s, " f"max={result['segment_stats']['max_duration']:.2f}s" ) # Compare direct with each chunked mode direct_result = results["direct"] direct_segments = direct_result["segments"] print("\n" + "=" * 80) print("COMPARISON WITH DIRECT TRANSCRIPTION") print("=" * 80) for mode_name in ["chunked_10min", "chunked_5min", "chunked_2min"]: chunk_result = results[mode_name] chunk_segments = chunk_result["segments"] print( f"\n{direct_result['segment_count']} direct vs {chunk_result['segment_count']} {mode_name} segments" ) print( f"Chunk size: {chunk_result['chunk_duration']}s, Chunks: {chunk_result['chunk_count']}" ) # Analyze overlap overlap = analyze_segment_overlap(direct_segments, chunk_segments) print( f" Matches: {overlap['match_count']}, Unique to direct: {overlap['unique_to_1']}, Unique to chunked: {overlap['unique_to_2']}" ) # Print unique segments if any if overlap["unique_to_1"] > 0: print(f" Segments only in direct transcription:") for i, seg in enumerate(overlap["only_in_1"][:5]): # Show first 5 print( f" {seg['start']:.2f}s-{seg['end']:.2f}s: {seg['text'][:50]}..." ) if overlap["unique_to_1"] > 5: print(f" ... and {overlap['unique_to_1'] - 5} more") if overlap["unique_to_2"] > 0: print(f" Segments only in {mode_name}:") for i, seg in enumerate(overlap["only_in_2"][:5]): print( f" {seg['start']:.2f}s-{seg['end']:.2f}s: {seg['text'][:50]}..." ) if overlap["unique_to_2"] > 5: print(f" ... and {overlap['unique_to_2'] - 5} more") # Analyze chunk boundary issues for chunked modes if chunk_result["chunk_count"] > 1: boundary_analysis = analyze_chunk_boundaries( chunk_result, chunk_result["chunk_duration"] ) if boundary_analysis["count_near_boundary"] > 0: print( f" ⚠️ {boundary_analysis['count_near_boundary']} segments near chunk boundaries" ) for item in boundary_analysis["segments_near_boundary"][:3]: seg = item["segment"] print( f" At {item['boundary']:.1f}s: {seg['start']:.2f}s-{seg['end']:.2f}s " f"(dist: {item['distance_to_start']:.2f}s)" ) # Detailed segment comparison print("\n" + "=" * 80) print("DETAILED SEGMENT COMPARISON") print("=" * 80) print_segment_comparison("Direct Transcription", direct_segments) print_segment_comparison( "Chunked (10min chunks)", results["chunked_10min"]["segments"] ) # Analyze segment duration distribution print("\n" + "=" * 80) print("SEGMENT DURATION ANALYSIS") print("=" * 80) for mode_name, result in results.items(): stats = result["segment_stats"] if stats["count"] > 0: print(f"\n{mode_name}:") print(f" Total segments: {stats['count']}") print(f" Avg duration: {stats['avg_duration']:.2f}s") print(f" Min duration: {stats['min_duration']:.2f}s") print(f" Max duration: {stats['max_duration']:.2f}s") print(f" Total speech duration: {stats['total_duration']:.2f}s") # Summary of findings print("\n" + "=" * 80) print("SUMMARY OF FINDINGS") print("=" * 80) print("\n1. Segment count decreases dramatically with smaller chunks:") for mode_name, result in results.items(): print(f" {mode_name:15s}: {result['segment_count']:3d} segments") print("\n2. Potential causes:") print(" - Small chunks (2min) may not provide enough context for Whisper") print(" - Speech near chunk boundaries may be cut off") print( " - Whisper's VAD (voice activity detection) may behave differently on short clips" ) print(" - Model initialization/context window effects") print("\n3. Recommendations:") print(" - Use larger chunk sizes (≥5 minutes) for better accuracy") print(" - Consider overlapping chunks to avoid boundary issues") print(" - For critical applications, prefer direct transcription when possible") print(" - Test with different Whisper model sizes (tiny vs. base vs. small)") if __name__ == "__main__": main()