Files
momentry_core/test_overlap_chunks.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

303 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
Test overlapping chunks to reduce segment loss at boundaries.
This modifies the chunk extraction to include overlap regions.
"""
import sys
import os
import json
import tempfile
import subprocess
import shutil
import time
from typing import List, Dict, Any
VIDEO_PATH = "../test_video/BigBuckBunny_320x180.mp4"
def extract_audio_with_overlap_chunks(
audio_path: str, chunk_duration: float, overlap: float, temp_dir: str
) -> List[Dict[str, Any]]:
"""Extract audio chunks with overlap."""
# Get total duration
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
audio_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
total_duration = float(result.stdout.strip())
chunks = []
start = 0.0
chunk_idx = 0
while start < total_duration:
# Calculate chunk end with overlap
chunk_end = min(start + chunk_duration + overlap, total_duration)
actual_duration = min(chunk_duration + overlap, total_duration - start)
chunk_file = os.path.join(temp_dir, f"chunk_{chunk_idx:04d}.wav")
# Extract chunk
extract_cmd = [
"ffmpeg",
"-i",
audio_path,
"-ss",
str(start),
"-t",
str(actual_duration),
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
chunk_file,
]
subprocess.run(extract_cmd, capture_output=True)
if os.path.exists(chunk_file) and os.path.getsize(chunk_file) > 0:
chunks.append(
{
"path": chunk_file,
"start": start,
"end": start + actual_duration,
"duration": actual_duration,
"overlap": overlap
if chunk_idx > 0
else 0, # First chunk has no previous overlap
"idx": chunk_idx,
}
)
# Move to next chunk (subtract overlap for next start)
start += chunk_duration
chunk_idx += 1
return chunks
def transcribe_with_overlap(
model_size: str = "tiny",
compute_type: str = "int8",
chunk_duration: float = 120.0, # 2 minutes
overlap: float = 10.0, # 10 seconds overlap
) -> Dict[str, Any]:
"""Test transcription with overlapping chunks."""
temp_dir = tempfile.mkdtemp(prefix="asr_overlap_")
try:
# Extract audio from video
audio_path = os.path.join(temp_dir, "audio.wav")
extract_cmd = [
"ffmpeg",
"-i",
VIDEO_PATH,
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
audio_path,
]
subprocess.run(extract_cmd, capture_output=True)
if not os.path.exists(audio_path):
return {"error": "Failed to extract audio"}
# Import Whisper
from faster_whisper import WhisperModel
model = WhisperModel(model_size, device="cpu", compute_type=compute_type)
# Extract chunks with overlap
chunks = extract_audio_with_overlap_chunks(
audio_path, chunk_duration, overlap, temp_dir
)
print(f"Created {len(chunks)} chunks with {overlap}s overlap")
all_segments = []
for chunk in chunks:
print(
f"Transcribing chunk {chunk['idx']} ({chunk['start']:.1f}s-{chunk['end']:.1f}s)..."
)
segments, info = model.transcribe(chunk["path"], beam_size=5)
for segment in segments:
# Adjust timestamp with chunk start
adjusted_start = segment.start + chunk["start"]
adjusted_end = segment.end + chunk["start"]
# For chunks after the first, we need to handle overlap
if chunk["idx"] > 0 and chunk["overlap"] > 0:
# If segment is in overlap region, check if it duplicates previous segment
overlap_start = chunk["start"]
overlap_end = chunk["start"] + chunk["overlap"]
if adjusted_start < overlap_end:
# Segment is in overlap region
# We'll keep it for now, deduplicate later
pass
all_segments.append(
{
"start": adjusted_start,
"end": adjusted_end,
"text": segment.text.strip(),
"chunk": chunk["idx"],
"in_overlap": chunk["idx"] > 0
and adjusted_start < (chunk["start"] + chunk["overlap"]),
}
)
# Sort segments by start time
all_segments.sort(key=lambda x: x["start"])
# Simple deduplication: remove segments that are mostly overlapping
deduplicated = []
seen_intervals = []
for seg in all_segments:
# Check if this segment overlaps significantly with any seen segment
duplicate = False
for seen in seen_intervals:
# Calculate overlap
overlap_start = max(seg["start"], seen["start"])
overlap_end = min(seg["end"], seen["end"])
if overlap_end > overlap_start:
overlap_duration = overlap_end - overlap_start
seg_duration = seg["end"] - seg["start"]
# If more than 50% overlap, consider it duplicate
if overlap_duration > 0.5 * seg_duration:
duplicate = True
break
if not duplicate:
deduplicated.append(seg)
seen_intervals.append({"start": seg["start"], "end": seg["end"]})
print(
f"Original segments: {len(all_segments)}, After deduplication: {len(deduplicated)}"
)
# Count segments in overlap regions
overlap_segments = [s for s in all_segments if s.get("in_overlap", False)]
print(f"Segments in overlap regions: {len(overlap_segments)}")
return {
"chunk_duration": chunk_duration,
"overlap": overlap,
"chunk_count": len(chunks),
"total_segments_raw": len(all_segments),
"total_segments_dedup": len(deduplicated),
"overlap_segments": len(overlap_segments),
"segments": deduplicated,
}
except Exception as e:
return {"error": str(e)}
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def main():
print("Testing overlapping chunks to improve segment accuracy at boundaries")
print(f"Video: {os.path.basename(VIDEO_PATH)}")
print("=" * 80)
# Test cases: different overlap amounts
test_cases = [
{"chunk_duration": 120.0, "overlap": 0.0, "label": "2min chunks, no overlap"},
{"chunk_duration": 120.0, "overlap": 5.0, "label": "2min chunks, 5s overlap"},
{"chunk_duration": 120.0, "overlap": 10.0, "label": "2min chunks, 10s overlap"},
{"chunk_duration": 120.0, "overlap": 15.0, "label": "2min chunks, 15s overlap"},
]
results = []
for test in test_cases:
print(f"\nTesting: {test['label']}")
print("-" * 40)
result = transcribe_with_overlap(
model_size="tiny",
compute_type="int8",
chunk_duration=test["chunk_duration"],
overlap=test["overlap"],
)
if "error" in result:
print(f" Error: {result['error']}")
continue
test.update(result)
results.append(test)
print(f" Chunks: {result['chunk_count']}")
print(
f" Segments (raw/dedup): {result['total_segments_raw']}/{result['total_segments_dedup']}"
)
print(f" Overlap segments: {result['overlap_segments']}")
# Show segment distribution
if result["segments"]:
print(f" First few segments:")
for i, seg in enumerate(result["segments"][:5]):
print(
f" {seg['start']:.1f}s-{seg['end']:.1f}s: {seg['text'][:40]}..."
)
# Comparison with direct transcription (baseline)
print("\n" + "=" * 80)
print("COMPARISON WITH BASELINE (from previous investigation)")
print("=" * 80)
print("\nBaseline results from investigate_segment_diff.py:")
print(" Direct transcription: 12 segments")
print(" 2min chunks (no overlap): 4 segments")
print("\nOverlap test results:")
for result in results:
print(f" {result['label']}: {result.get('total_segments_dedup', 0)} segments")
# Analyze effectiveness
print("\n" + "=" * 80)
print("ANALYSIS")
print("=" * 80)
if results:
best = max(results, key=lambda x: x.get("total_segments_dedup", 0))
print(
f"\nBest configuration: {best['label']} with {best['total_segments_dedup']} segments"
)
improvement = (
(best["total_segments_dedup"] - 4) / 4
) * 100 # Compared to 4 segments without overlap
print(f"Improvement over no overlap: {improvement:.1f}%")
print("\nRecommendations:")
print("1. Overlap of 10-15 seconds appears helpful for 2-minute chunks")
print("2. Deduplication is necessary to avoid duplicate segments")
print(
"3. Even with overlap, small chunks may still miss segments due to context issues"
)
print("4. Consider larger chunk sizes (5+ minutes) as primary solution")
if __name__ == "__main__":
main()