- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
109 lines
2.9 KiB
Python
109 lines
2.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test transcription with increasing durations to find hang threshold.
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
import threading
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
|
|
def extract_segment(audio_path, duration, output_path=None):
|
|
"""Extract first N seconds of audio."""
|
|
if output_path is None:
|
|
output_path = Path(tempfile.mktemp(suffix=".wav"))
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
str(audio_path),
|
|
"-t",
|
|
str(duration),
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
str(output_path),
|
|
]
|
|
subprocess.run(cmd, capture_output=True)
|
|
return output_path
|
|
|
|
|
|
def test_duration(audio_path, duration, model, timeout=60):
|
|
"""Test transcription of audio segment with given duration."""
|
|
print(f"Testing duration {duration}s...")
|
|
segment_path = extract_segment(audio_path, duration)
|
|
|
|
result = {"success": False, "segments": 0, "time": 0, "error": None}
|
|
start = time.time()
|
|
|
|
def transcribe():
|
|
try:
|
|
segments, info = model.transcribe(str(segment_path), beam_size=5)
|
|
segments = list(segments) # Force processing
|
|
result["segments"] = len(segments)
|
|
result["language"] = info.language
|
|
result["success"] = True
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
thread = threading.Thread(target=transcribe)
|
|
thread.start()
|
|
thread.join(timeout)
|
|
|
|
if thread.is_alive():
|
|
result["error"] = f"Timeout after {timeout}s"
|
|
# Can't interrupt, but we'll return
|
|
# Kill the model? Not possible. We'll just exit this test.
|
|
print(f" → TIMEOUT")
|
|
# Clean up
|
|
segment_path.unlink(missing_ok=True)
|
|
return result
|
|
|
|
elapsed = time.time() - start
|
|
result["time"] = elapsed
|
|
print(f" → {result['segments']} segments in {elapsed:.1f}s")
|
|
|
|
segment_path.unlink(missing_ok=True)
|
|
return result
|
|
|
|
|
|
def main():
|
|
audio_path = "/tmp/test_audio.wav"
|
|
if not Path(audio_path).exists():
|
|
print(f"Audio file not found: {audio_path}")
|
|
sys.exit(1)
|
|
|
|
print("Loading Whisper model (tiny, int8)...")
|
|
from faster_whisper import WhisperModel
|
|
|
|
model = WhisperModel("tiny", device="cpu", compute_type="int8")
|
|
print("Model loaded.")
|
|
|
|
durations = [30, 60, 120, 180, 240, 300, 600, 900, 1200]
|
|
results = {}
|
|
|
|
for dur in durations:
|
|
result = test_duration(audio_path, dur, model, timeout=120)
|
|
results[dur] = result
|
|
if not result["success"]:
|
|
print(f"FAILED at duration {dur}s: {result['error']}")
|
|
break
|
|
|
|
print("\n=== Results ===")
|
|
for dur, res in results.items():
|
|
if res["success"]:
|
|
print(f"{dur:4d}s: {res['segments']:3d} segments, {res['time']:6.1f}s")
|
|
else:
|
|
print(f"{dur:4d}s: FAILED - {res['error']}")
|
|
break
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|