Files
momentry_core/test_faster_whisper_debug.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

156 lines
4.4 KiB
Python

#!/usr/bin/env python3
"""
Debug faster_whisper hanging issue.
"""
import sys
import time
import threading
import subprocess
from pathlib import Path
def run_with_timeout(func, args=(), timeout=60):
"""Run function with timeout."""
result = {"success": False, "error": None, "output": None}
def wrapper():
try:
result["output"] = func(*args)
result["success"] = True
except Exception as e:
result["error"] = str(e)
import traceback
result["traceback"] = traceback.format_exc()
thread = threading.Thread(target=wrapper)
thread.start()
thread.join(timeout)
if thread.is_alive():
result["error"] = f"Timeout after {timeout}s"
# Can't interrupt thread, but we'll return
return result
def test_transcribe_audio(
audio_path, model_size="tiny", compute_type="float32", max_duration=None
):
"""Test transcription with given parameters."""
from faster_whisper import WhisperModel
print(f"Loading model {model_size} with compute_type={compute_type}...")
start = time.time()
model = WhisperModel(model_size, device="cpu", compute_type=compute_type)
print(f"Model loaded in {time.time() - start:.1f}s")
# Optional: trim audio if max_duration specified
if max_duration:
import tempfile
tmp_path = Path(audio_path).parent / f"trim_{max_duration}s.wav"
cmd = [
"ffmpeg",
"-i",
str(audio_path),
"-t",
str(max_duration),
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
str(tmp_path),
]
subprocess.run(cmd, capture_output=True)
audio_path = tmp_path
print(f"Trimmed audio to {max_duration}s: {tmp_path}")
print(f"Starting transcription...")
start = time.time()
# Try to get segments iterator
segments, info = model.transcribe(str(audio_path), beam_size=5)
# Try to get first segment with timeout
print("Getting first segment...")
segment_count = 0
try:
for segment in segments:
segment_count += 1
print(
f"Segment {segment_count}: {segment.start:.1f}-{segment.end:.1f}: {segment.text[:80]}"
)
if segment_count >= 3:
break
# Break early for test
if max_duration and segment.end > max_duration:
break
except Exception as e:
print(f"Error during iteration: {e}")
import traceback
traceback.print_exc()
return False
print(f"Transcription iteration done. Total segments: {segment_count}")
print(f"Detected language: {info.language} (prob {info.language_probability:.2f})")
print(f"Time elapsed: {time.time() - start:.1f}s")
if max_duration:
tmp_path.unlink()
return True
def main():
audio_path = "/tmp/test_audio.wav"
if not Path(audio_path).exists():
print(f"Audio file not found: {audio_path}")
# Extract from large video
video_path = "../test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
print(f"Extracting audio from {video_path}...")
cmd = [
"ffmpeg",
"-i",
video_path,
"-vn",
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
audio_path,
]
subprocess.run(cmd, capture_output=True)
print(f"Audio extracted to {audio_path}")
# Test 1: tiny model, float32, first 30 seconds
print("\n=== Test 1: tiny, float32, 30 seconds ===")
result = run_with_timeout(
test_transcribe_audio, (audio_path, "tiny", "float32", 30), timeout=120
)
print(f"Result: {result}")
# Test 2: tiny, int8, first 30 seconds
print("\n=== Test 2: tiny, int8, 30 seconds ===")
result = run_with_timeout(
test_transcribe_audio, (audio_path, "tiny", "int8", 30), timeout=120
)
print(f"Result: {result}")
# Test 3: small model, float32, first 30 seconds
print("\n=== Test 3: small, float32, 30 seconds ===")
result = run_with_timeout(
test_transcribe_audio, (audio_path, "small", "float32", 30), timeout=180
)
print(f"Result: {result}")
if __name__ == "__main__":
main()