- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
303 lines
9.7 KiB
Python
303 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test overlapping chunks to reduce segment loss at boundaries.
|
|
This modifies the chunk extraction to include overlap regions.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import tempfile
|
|
import subprocess
|
|
import shutil
|
|
import time
|
|
from typing import List, Dict, Any
|
|
|
|
VIDEO_PATH = "../test_video/BigBuckBunny_320x180.mp4"
|
|
|
|
|
|
def extract_audio_with_overlap_chunks(
|
|
audio_path: str, chunk_duration: float, overlap: float, temp_dir: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Extract audio chunks with overlap."""
|
|
# Get total duration
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
audio_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
total_duration = float(result.stdout.strip())
|
|
|
|
chunks = []
|
|
start = 0.0
|
|
chunk_idx = 0
|
|
|
|
while start < total_duration:
|
|
# Calculate chunk end with overlap
|
|
chunk_end = min(start + chunk_duration + overlap, total_duration)
|
|
actual_duration = min(chunk_duration + overlap, total_duration - start)
|
|
|
|
chunk_file = os.path.join(temp_dir, f"chunk_{chunk_idx:04d}.wav")
|
|
|
|
# Extract chunk
|
|
extract_cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
audio_path,
|
|
"-ss",
|
|
str(start),
|
|
"-t",
|
|
str(actual_duration),
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
chunk_file,
|
|
]
|
|
subprocess.run(extract_cmd, capture_output=True)
|
|
|
|
if os.path.exists(chunk_file) and os.path.getsize(chunk_file) > 0:
|
|
chunks.append(
|
|
{
|
|
"path": chunk_file,
|
|
"start": start,
|
|
"end": start + actual_duration,
|
|
"duration": actual_duration,
|
|
"overlap": overlap
|
|
if chunk_idx > 0
|
|
else 0, # First chunk has no previous overlap
|
|
"idx": chunk_idx,
|
|
}
|
|
)
|
|
|
|
# Move to next chunk (subtract overlap for next start)
|
|
start += chunk_duration
|
|
chunk_idx += 1
|
|
|
|
return chunks
|
|
|
|
|
|
def transcribe_with_overlap(
|
|
model_size: str = "tiny",
|
|
compute_type: str = "int8",
|
|
chunk_duration: float = 120.0, # 2 minutes
|
|
overlap: float = 10.0, # 10 seconds overlap
|
|
) -> Dict[str, Any]:
|
|
"""Test transcription with overlapping chunks."""
|
|
temp_dir = tempfile.mkdtemp(prefix="asr_overlap_")
|
|
|
|
try:
|
|
# Extract audio from video
|
|
audio_path = os.path.join(temp_dir, "audio.wav")
|
|
extract_cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
VIDEO_PATH,
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
audio_path,
|
|
]
|
|
subprocess.run(extract_cmd, capture_output=True)
|
|
|
|
if not os.path.exists(audio_path):
|
|
return {"error": "Failed to extract audio"}
|
|
|
|
# Import Whisper
|
|
from faster_whisper import WhisperModel
|
|
|
|
model = WhisperModel(model_size, device="cpu", compute_type=compute_type)
|
|
|
|
# Extract chunks with overlap
|
|
chunks = extract_audio_with_overlap_chunks(
|
|
audio_path, chunk_duration, overlap, temp_dir
|
|
)
|
|
print(f"Created {len(chunks)} chunks with {overlap}s overlap")
|
|
|
|
all_segments = []
|
|
|
|
for chunk in chunks:
|
|
print(
|
|
f"Transcribing chunk {chunk['idx']} ({chunk['start']:.1f}s-{chunk['end']:.1f}s)..."
|
|
)
|
|
|
|
segments, info = model.transcribe(chunk["path"], beam_size=5)
|
|
|
|
for segment in segments:
|
|
# Adjust timestamp with chunk start
|
|
adjusted_start = segment.start + chunk["start"]
|
|
adjusted_end = segment.end + chunk["start"]
|
|
|
|
# For chunks after the first, we need to handle overlap
|
|
if chunk["idx"] > 0 and chunk["overlap"] > 0:
|
|
# If segment is in overlap region, check if it duplicates previous segment
|
|
overlap_start = chunk["start"]
|
|
overlap_end = chunk["start"] + chunk["overlap"]
|
|
|
|
if adjusted_start < overlap_end:
|
|
# Segment is in overlap region
|
|
# We'll keep it for now, deduplicate later
|
|
pass
|
|
|
|
all_segments.append(
|
|
{
|
|
"start": adjusted_start,
|
|
"end": adjusted_end,
|
|
"text": segment.text.strip(),
|
|
"chunk": chunk["idx"],
|
|
"in_overlap": chunk["idx"] > 0
|
|
and adjusted_start < (chunk["start"] + chunk["overlap"]),
|
|
}
|
|
)
|
|
|
|
# Sort segments by start time
|
|
all_segments.sort(key=lambda x: x["start"])
|
|
|
|
# Simple deduplication: remove segments that are mostly overlapping
|
|
deduplicated = []
|
|
seen_intervals = []
|
|
|
|
for seg in all_segments:
|
|
# Check if this segment overlaps significantly with any seen segment
|
|
duplicate = False
|
|
for seen in seen_intervals:
|
|
# Calculate overlap
|
|
overlap_start = max(seg["start"], seen["start"])
|
|
overlap_end = min(seg["end"], seen["end"])
|
|
if overlap_end > overlap_start:
|
|
overlap_duration = overlap_end - overlap_start
|
|
seg_duration = seg["end"] - seg["start"]
|
|
# If more than 50% overlap, consider it duplicate
|
|
if overlap_duration > 0.5 * seg_duration:
|
|
duplicate = True
|
|
break
|
|
|
|
if not duplicate:
|
|
deduplicated.append(seg)
|
|
seen_intervals.append({"start": seg["start"], "end": seg["end"]})
|
|
|
|
print(
|
|
f"Original segments: {len(all_segments)}, After deduplication: {len(deduplicated)}"
|
|
)
|
|
|
|
# Count segments in overlap regions
|
|
overlap_segments = [s for s in all_segments if s.get("in_overlap", False)]
|
|
print(f"Segments in overlap regions: {len(overlap_segments)}")
|
|
|
|
return {
|
|
"chunk_duration": chunk_duration,
|
|
"overlap": overlap,
|
|
"chunk_count": len(chunks),
|
|
"total_segments_raw": len(all_segments),
|
|
"total_segments_dedup": len(deduplicated),
|
|
"overlap_segments": len(overlap_segments),
|
|
"segments": deduplicated,
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
finally:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
def main():
|
|
print("Testing overlapping chunks to improve segment accuracy at boundaries")
|
|
print(f"Video: {os.path.basename(VIDEO_PATH)}")
|
|
print("=" * 80)
|
|
|
|
# Test cases: different overlap amounts
|
|
test_cases = [
|
|
{"chunk_duration": 120.0, "overlap": 0.0, "label": "2min chunks, no overlap"},
|
|
{"chunk_duration": 120.0, "overlap": 5.0, "label": "2min chunks, 5s overlap"},
|
|
{"chunk_duration": 120.0, "overlap": 10.0, "label": "2min chunks, 10s overlap"},
|
|
{"chunk_duration": 120.0, "overlap": 15.0, "label": "2min chunks, 15s overlap"},
|
|
]
|
|
|
|
results = []
|
|
|
|
for test in test_cases:
|
|
print(f"\nTesting: {test['label']}")
|
|
print("-" * 40)
|
|
|
|
result = transcribe_with_overlap(
|
|
model_size="tiny",
|
|
compute_type="int8",
|
|
chunk_duration=test["chunk_duration"],
|
|
overlap=test["overlap"],
|
|
)
|
|
|
|
if "error" in result:
|
|
print(f" Error: {result['error']}")
|
|
continue
|
|
|
|
test.update(result)
|
|
results.append(test)
|
|
|
|
print(f" Chunks: {result['chunk_count']}")
|
|
print(
|
|
f" Segments (raw/dedup): {result['total_segments_raw']}/{result['total_segments_dedup']}"
|
|
)
|
|
print(f" Overlap segments: {result['overlap_segments']}")
|
|
|
|
# Show segment distribution
|
|
if result["segments"]:
|
|
print(f" First few segments:")
|
|
for i, seg in enumerate(result["segments"][:5]):
|
|
print(
|
|
f" {seg['start']:.1f}s-{seg['end']:.1f}s: {seg['text'][:40]}..."
|
|
)
|
|
|
|
# Comparison with direct transcription (baseline)
|
|
print("\n" + "=" * 80)
|
|
print("COMPARISON WITH BASELINE (from previous investigation)")
|
|
print("=" * 80)
|
|
|
|
print("\nBaseline results from investigate_segment_diff.py:")
|
|
print(" Direct transcription: 12 segments")
|
|
print(" 2min chunks (no overlap): 4 segments")
|
|
|
|
print("\nOverlap test results:")
|
|
for result in results:
|
|
print(f" {result['label']}: {result.get('total_segments_dedup', 0)} segments")
|
|
|
|
# Analyze effectiveness
|
|
print("\n" + "=" * 80)
|
|
print("ANALYSIS")
|
|
print("=" * 80)
|
|
|
|
if results:
|
|
best = max(results, key=lambda x: x.get("total_segments_dedup", 0))
|
|
print(
|
|
f"\nBest configuration: {best['label']} with {best['total_segments_dedup']} segments"
|
|
)
|
|
|
|
improvement = (
|
|
(best["total_segments_dedup"] - 4) / 4
|
|
) * 100 # Compared to 4 segments without overlap
|
|
print(f"Improvement over no overlap: {improvement:.1f}%")
|
|
|
|
print("\nRecommendations:")
|
|
print("1. Overlap of 10-15 seconds appears helpful for 2-minute chunks")
|
|
print("2. Deduplication is necessary to avoid duplicate segments")
|
|
print(
|
|
"3. Even with overlap, small chunks may still miss segments due to context issues"
|
|
)
|
|
print("4. Consider larger chunk sizes (5+ minutes) as primary solution")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|