Files
momentry_core/test_minimal_asr.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

195 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""Minimal test to isolate the hang issue."""
import sys
import os
import tempfile
import subprocess
import time
# Test video
test_video = "../test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
if not os.path.exists(test_video):
print(f"Test video not found: {test_video}")
sys.exit(1)
print(f"Testing: {test_video}")
print(f"Size: {os.path.getsize(test_video) / (1024**3):.2f} GB")
# Create temp directory
temp_dir = tempfile.mkdtemp(prefix="asr_minimal_")
print(f"Temp dir: {temp_dir}")
# Step 1: Extract audio
audio_path = os.path.join(temp_dir, "audio.wav")
print(f"\n1. Extracting audio to {audio_path}...")
extract_cmd = [
"ffmpeg",
"-i",
test_video,
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
audio_path,
]
print(f"Command: {' '.join(extract_cmd)}")
start = time.time()
result = subprocess.run(extract_cmd, capture_output=True, text=True)
elapsed = time.time() - start
if result.returncode != 0:
print(f"Error extracting audio: {result.stderr[:500]}")
sys.exit(1)
print(f"Audio extraction successful: {elapsed:.1f}s")
print(f"Audio file size: {os.path.getsize(audio_path) / (1024**2):.1f} MB")
# Step 2: Get audio duration
print("\n2. Getting audio duration...")
duration_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
audio_path,
]
result = subprocess.run(duration_cmd, capture_output=True, text=True)
if result.returncode == 0:
duration = float(result.stdout.strip())
print(f"Audio duration: {duration:.1f}s ({duration / 60:.1f} min)")
else:
print(f"Error getting duration: {result.stderr[:500]}")
duration = 0
# Step 3: Extract first 60 seconds as a test chunk
chunk_path = os.path.join(temp_dir, "chunk_0000.wav")
print(f"\n3. Extracting first 60 seconds to {chunk_path}...")
chunk_cmd = [
"ffmpeg",
"-i",
audio_path,
"-ss",
"0",
"-t",
"60",
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
chunk_path,
]
print(f"Command: {' '.join(chunk_cmd)}")
start = time.time()
result = subprocess.run(chunk_cmd, capture_output=True, text=True)
elapsed = time.time() - start
if result.returncode != 0:
print(f"Error extracting chunk: {result.stderr[:500]}")
sys.exit(1)
print(f"Chunk extraction successful: {elapsed:.1f}s")
print(f"Chunk file size: {os.path.getsize(chunk_path) / (1024**2):.1f} MB")
# Step 4: Try to load Whisper model
print("\n4. Testing Whisper model load...")
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from faster_whisper import WhisperModel
print("faster_whisper import successful")
start = time.time()
model = WhisperModel("tiny", device="cpu", compute_type="int8")
elapsed = time.time() - start
print(f"Model loaded successfully: {elapsed:.1f}s")
except Exception as e:
print(f"Error loading model: {e}")
sys.exit(1)
# Step 5: Try to transcribe the chunk
print("\n5. Transcribing chunk...")
try:
start = time.time()
segments, info = model.transcribe(chunk_path, beam_size=5)
elapsed = time.time() - start
# Convert to list to force evaluation
segments = list(segments)
print(f"Transcription successful: {elapsed:.1f}s")
print(f"Detected language: {info.language} (prob {info.language_probability:.2f})")
print(f"Number of segments: {len(segments)}")
for i, segment in enumerate(segments[:3]): # Show first 3 segments
print(
f" Segment {i}: {segment.start:.1f}s - {segment.end:.1f}s: {segment.text[:50]}..."
)
if len(segments) > 3:
print(f" ... and {len(segments) - 3} more segments")
except Exception as e:
print(f"Error transcribing: {e}")
import traceback
traceback.print_exc()
# Step 6: Try to transcribe the full audio (should hang for large files)
print("\n6. Testing full audio transcription (should hang for large files)...")
try:
start = time.time()
# Set a timeout
import threading
class TranscriptionResult:
def __init__(self):
self.segments = []
self.info = None
self.error = None
result = TranscriptionResult()
def transcribe_with_timeout():
try:
segs, inf = model.transcribe(audio_path, beam_size=5)
result.segments = list(segs)
result.info = inf
except Exception as e:
result.error = e
thread = threading.Thread(target=transcribe_with_timeout)
thread.daemon = True
thread.start()
thread.join(timeout=30) # 30 second timeout
if thread.is_alive():
print("Full transcription timed out after 30 seconds (expected for large file)")
elif result.error:
print(f"Transcription error: {result.error}")
else:
elapsed = time.time() - start
print(f"Full transcription successful (unexpected!): {elapsed:.1f}s")
print(f"Segments: {len(result.segments)}")
except Exception as e:
print(f"Error in full transcription test: {e}")
print(f"\nTemp directory preserved: {temp_dir}")