Files
momentry_core/test_chunked_full.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

168 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""
Test chunked transcription for full audio file.
"""
import sys
import time
import tempfile
import json
import subprocess
from pathlib import Path
def get_audio_duration(audio_path):
"""Get duration in seconds."""
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"csv=p=0",
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
return float(result.stdout.strip())
def extract_chunk(audio_path, start, duration, output_path):
"""Extract chunk using ffmpeg."""
cmd = [
"ffmpeg",
"-i",
str(audio_path),
"-ss",
str(start),
"-t",
str(duration),
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
str(output_path),
]
subprocess.run(cmd, capture_output=True)
return output_path.exists() and output_path.stat().st_size > 0
def transcribe_chunk(model, chunk_path, chunk_start):
"""Transcribe a chunk and return segments with absolute timestamps."""
segments, info = model.transcribe(str(chunk_path), beam_size=5)
results = []
for seg in segments:
results.append(
{
"start": seg.start + chunk_start,
"end": seg.end + chunk_start,
"text": seg.text.strip(),
}
)
return results, info
def main():
audio_path = "/tmp/test_audio.wav"
if not Path(audio_path).exists():
print(f"Audio file not found: {audio_path}")
sys.exit(1)
total_duration = get_audio_duration(audio_path)
print(f"Audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)")
# Chunk settings
chunk_duration = 1800 # 30 minutes
chunk_overlap = 0 # no overlap for now
chunks = []
start = 0
chunk_idx = 0
while start < total_duration:
chunk_end = min(start + chunk_duration, total_duration)
chunks.append({"start": start, "end": chunk_end, "idx": chunk_idx})
start = chunk_end
chunk_idx += 1
print(f"Split into {len(chunks)} chunks")
# Load model once
print("Loading Whisper model...")
from faster_whisper import WhisperModel
model = WhisperModel("tiny", device="cpu", compute_type="int8")
all_segments = []
language = None
language_prob = None
temp_dir = Path(tempfile.mkdtemp(prefix="chunks_"))
print(f"Temp directory: {temp_dir}")
for chunk in chunks:
chunk_path = temp_dir / f"chunk_{chunk['idx']}.wav"
print(
f"\nChunk {chunk['idx'] + 1}/{len(chunks)}: {chunk['start']:.1f}-{chunk['end']:.1f}"
)
# Extract chunk
print(" Extracting chunk...")
if not extract_chunk(
audio_path, chunk["start"], chunk["end"] - chunk["start"], chunk_path
):
print(" Failed to extract chunk, skipping")
continue
# Transcribe with timeout
print(" Transcribing...")
start_time = time.time()
try:
segments, info = transcribe_chunk(model, chunk_path, chunk["start"])
elapsed = time.time() - start_time
print(f"{len(segments)} segments in {elapsed:.1f}s")
all_segments.extend(segments)
if language is None:
language = info.language
language_prob = info.language_probability
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
# Clean up chunk file
chunk_path.unlink(missing_ok=True)
# Clean up temp directory
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
# Sort segments
all_segments.sort(key=lambda x: x["start"])
# Save results
output = {
"language": language or "unknown",
"language_probability": language_prob or 0.0,
"segments": all_segments,
"total_segments": len(all_segments),
"chunk_count": len(chunks),
}
output_path = Path("test_output/full_chunked_transcription.json")
output_path.parent.mkdir(exist_ok=True, parents=True)
with open(output_path, "w") as f:
json.dump(output, f, indent=2)
print(f"\nTranscription completed:")
print(f" Total segments: {len(all_segments)}")
print(f" Language: {language} (prob {language_prob:.2f})")
print(f" Results saved to: {output_path}")
if __name__ == "__main__":
main()