Files
momentry_core/test_full_audio.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

181 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
Test transcription of full audio file with progress monitoring.
"""
import sys
import time
import threading
import warnings
import psutil
from pathlib import Path
# Capture warnings (disabled due to urllib3 warning)
# warnings.filterwarnings("error") # Convert warnings to exceptions
def monitor_memory(pid, interval=1, stop_event=None):
"""Monitor memory usage of process."""
samples = []
while not stop_event or not stop_event.is_set():
try:
proc = psutil.Process(pid)
mem = proc.memory_info().rss / 1024 / 1024
samples.append((time.time(), mem))
except:
pass
time.sleep(interval)
return samples
def transcribe_full(
audio_path, model_size="tiny", compute_type="int8", timeout_per_segment=30
):
"""Transcribe full audio with timeout per segment."""
from faster_whisper import WhisperModel
print(f"Loading model {model_size} ({compute_type})...")
start = time.time()
model = WhisperModel(model_size, device="cpu", compute_type=compute_type)
print(f"Model loaded in {time.time() - start:.1f}s")
print(f"Starting transcription of {audio_path}...")
print(f"File size: {Path(audio_path).stat().st_size / 1024 / 1024:.1f} MB")
segments, info = model.transcribe(audio_path, beam_size=5)
# Start memory monitoring in background
import threading
stop_event = threading.Event()
mem_samples = []
def monitor():
nonlocal mem_samples
while not stop_event.is_set():
try:
proc = psutil.Process()
mem = proc.memory_info().rss / 1024 / 1024
mem_samples.append((time.time(), mem))
except:
pass
time.sleep(2)
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
results = []
segment_times = []
start_time = time.time()
last_segment_time = start_time
try:
for i, segment in enumerate(segments):
segment_time = time.time()
elapsed = segment_time - last_segment_time
last_segment_time = segment_time
segment_times.append(elapsed)
results.append(
{
"start": segment.start,
"end": segment.end,
"text": segment.text.strip(),
}
)
# Print progress
if len(mem_samples) > 0:
current_mem = mem_samples[-1][1]
else:
current_mem = 0
print(
f"[{i + 1}] {segment.start:.1f}-{segment.end:.1f} ({elapsed:.1f}s, mem: {current_mem:.1f} MB): {segment.text[:80]}..."
)
# Reset timeout for next segment
# If segment takes too long, maybe something is wrong
if elapsed > timeout_per_segment:
print(
f"WARNING: Segment {i + 1} took {elapsed:.1f}s > {timeout_per_segment}s timeout"
)
# Continue anyway
total_time = time.time() - start_time
print(f"Transcription completed in {total_time:.1f}s")
print(f"Total segments: {len(results)}")
print(
f"Average time per segment: {total_time / len(results) if results else 0:.2f}s"
)
except Exception as e:
print(f"Error during transcription: {e}")
import traceback
traceback.print_exc()
finally:
stop_event.set()
monitor_thread.join(timeout=5)
if mem_samples:
peak_mem = max(m[1] for m in mem_samples)
avg_mem = sum(m[1] for m in mem_samples) / len(mem_samples)
print(f"Memory usage: peak {peak_mem:.1f} MB, average {avg_mem:.1f} MB")
return results, info
def main():
audio_path = "/tmp/test_audio.wav"
if not Path(audio_path).exists():
print(f"Audio file not found: {audio_path}")
sys.exit(1)
print(f"Testing full audio transcription")
print(f"Audio duration: 1:54:39 (approx)")
# Set a total timeout of 10 minutes
start = time.time()
results = None
info = None
def run_transcribe():
nonlocal results, info
results, info = transcribe_full(audio_path, timeout_per_segment=60)
thread = threading.Thread(target=run_transcribe)
thread.start()
thread.join(timeout=600) # 10 minutes
if thread.is_alive():
print("\nTIMEOUT: Transcription took longer than 10 minutes")
# Can't interrupt, but we can exit
sys.exit(1)
if results is not None:
print(f"\nSuccessfully transcribed {len(results)} segments")
print(f"Language: {info.language} (prob {info.language_probability:.2f})")
# Save results
output_path = Path("test_output/full_audio_transcription.json")
output_path.parent.mkdir(exist_ok=True)
import json
with open(output_path, "w") as f:
json.dump(
{
"language": info.language,
"language_probability": info.language_probability,
"segments": results,
},
f,
indent=2,
)
print(f"Results saved to {output_path}")
print(f"Total execution time: {time.time() - start:.1f}s")
if __name__ == "__main__":
main()