- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
168 lines
4.5 KiB
Python
168 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test chunked transcription for full audio file.
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
|
|
def get_audio_duration(audio_path):
|
|
"""Get duration in seconds."""
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"csv=p=0",
|
|
str(audio_path),
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return float(result.stdout.strip())
|
|
|
|
|
|
def extract_chunk(audio_path, start, duration, output_path):
|
|
"""Extract chunk using ffmpeg."""
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
str(audio_path),
|
|
"-ss",
|
|
str(start),
|
|
"-t",
|
|
str(duration),
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
str(output_path),
|
|
]
|
|
subprocess.run(cmd, capture_output=True)
|
|
return output_path.exists() and output_path.stat().st_size > 0
|
|
|
|
|
|
def transcribe_chunk(model, chunk_path, chunk_start):
|
|
"""Transcribe a chunk and return segments with absolute timestamps."""
|
|
segments, info = model.transcribe(str(chunk_path), beam_size=5)
|
|
results = []
|
|
for seg in segments:
|
|
results.append(
|
|
{
|
|
"start": seg.start + chunk_start,
|
|
"end": seg.end + chunk_start,
|
|
"text": seg.text.strip(),
|
|
}
|
|
)
|
|
return results, info
|
|
|
|
|
|
def main():
|
|
audio_path = "/tmp/test_audio.wav"
|
|
if not Path(audio_path).exists():
|
|
print(f"Audio file not found: {audio_path}")
|
|
sys.exit(1)
|
|
|
|
total_duration = get_audio_duration(audio_path)
|
|
print(f"Audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)")
|
|
|
|
# Chunk settings
|
|
chunk_duration = 1800 # 30 minutes
|
|
chunk_overlap = 0 # no overlap for now
|
|
chunks = []
|
|
start = 0
|
|
chunk_idx = 0
|
|
while start < total_duration:
|
|
chunk_end = min(start + chunk_duration, total_duration)
|
|
chunks.append({"start": start, "end": chunk_end, "idx": chunk_idx})
|
|
start = chunk_end
|
|
chunk_idx += 1
|
|
|
|
print(f"Split into {len(chunks)} chunks")
|
|
|
|
# Load model once
|
|
print("Loading Whisper model...")
|
|
from faster_whisper import WhisperModel
|
|
|
|
model = WhisperModel("tiny", device="cpu", compute_type="int8")
|
|
|
|
all_segments = []
|
|
language = None
|
|
language_prob = None
|
|
|
|
temp_dir = Path(tempfile.mkdtemp(prefix="chunks_"))
|
|
print(f"Temp directory: {temp_dir}")
|
|
|
|
for chunk in chunks:
|
|
chunk_path = temp_dir / f"chunk_{chunk['idx']}.wav"
|
|
print(
|
|
f"\nChunk {chunk['idx'] + 1}/{len(chunks)}: {chunk['start']:.1f}-{chunk['end']:.1f}"
|
|
)
|
|
|
|
# Extract chunk
|
|
print(" Extracting chunk...")
|
|
if not extract_chunk(
|
|
audio_path, chunk["start"], chunk["end"] - chunk["start"], chunk_path
|
|
):
|
|
print(" Failed to extract chunk, skipping")
|
|
continue
|
|
|
|
# Transcribe with timeout
|
|
print(" Transcribing...")
|
|
start_time = time.time()
|
|
try:
|
|
segments, info = transcribe_chunk(model, chunk_path, chunk["start"])
|
|
elapsed = time.time() - start_time
|
|
print(f" → {len(segments)} segments in {elapsed:.1f}s")
|
|
all_segments.extend(segments)
|
|
if language is None:
|
|
language = info.language
|
|
language_prob = info.language_probability
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
|
|
# Clean up chunk file
|
|
chunk_path.unlink(missing_ok=True)
|
|
|
|
# Clean up temp directory
|
|
import shutil
|
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
# Sort segments
|
|
all_segments.sort(key=lambda x: x["start"])
|
|
|
|
# Save results
|
|
output = {
|
|
"language": language or "unknown",
|
|
"language_probability": language_prob or 0.0,
|
|
"segments": all_segments,
|
|
"total_segments": len(all_segments),
|
|
"chunk_count": len(chunks),
|
|
}
|
|
|
|
output_path = Path("test_output/full_chunked_transcription.json")
|
|
output_path.parent.mkdir(exist_ok=True, parents=True)
|
|
with open(output_path, "w") as f:
|
|
json.dump(output, f, indent=2)
|
|
|
|
print(f"\nTranscription completed:")
|
|
print(f" Total segments: {len(all_segments)}")
|
|
print(f" Language: {language} (prob {language_prob:.2f})")
|
|
print(f" Results saved to: {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|