Files
momentry_core/test_asr_simple.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

169 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""
Simple ASR test script - test a few video files with detailed logging.
"""
import os
import sys
import time
import json
import subprocess
import signal
from pathlib import Path
def run_asr_on_video(video_path, output_path, timeout_sec=600):
"""Run ASR processor with timeout and resource monitoring."""
script_path = Path(__file__).parent / "scripts" / "asr_processor.py"
cmd = [sys.executable, str(script_path), str(video_path), str(output_path)]
print(f" Command: {' '.join(cmd)}")
print(f" Timeout: {timeout_sec}s")
start_time = time.time()
try:
# Start process with process group for clean termination
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
preexec_fn=os.setsid,
bufsize=1,
universal_newlines=True,
)
# Read stderr in real-time to see progress
def read_stream(stream, prefix):
for line in iter(stream.readline, ""):
print(f"{prefix}{line.rstrip()}")
sys.stdout.flush()
# Start reading stderr in background
import threading
stderr_thread = threading.Thread(
target=read_stream, args=(proc.stderr, " [stderr] ")
)
stderr_thread.daemon = True
stderr_thread.start()
# Wait for process completion with timeout
try:
returncode = proc.wait(timeout=timeout_sec)
duration = time.time() - start_time
# Get any remaining output
stdout, _ = proc.communicate()
if stdout:
print(f" [stdout] {stdout.strip()}")
print(f" Process exited with code {returncode} after {duration:.1f}s")
if returncode == 0:
# Verify output file
if os.path.exists(output_path):
with open(output_path, "r") as f:
result = json.load(f)
segments = len(result.get("segments", []))
language = result.get("language", "unknown")
print(f" Success: {segments} segments, language: {language}")
return True, duration, segments
else:
print(f" Error: Output file not created: {output_path}")
return False, duration, 0
else:
print(f" Error: Process failed with exit code {returncode}")
return False, duration, 0
except subprocess.TimeoutExpired:
duration = time.time() - start_time
print(f" ERROR: Process timed out after {duration:.1f}s")
# Kill entire process group
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except:
pass
proc.wait(timeout=5)
return False, duration, 0
except Exception as e:
print(f" Exception: {e}")
import traceback
traceback.print_exc()
return False, time.time() - start_time, 0
def main():
video_dir = "../test_video"
test_dir = Path("test_output_simple")
test_dir.mkdir(exist_ok=True)
# Select a few test files (small to medium)
video_files = []
for f in Path(video_dir).iterdir():
if f.suffix.lower() in [".mp4", ".mov", ".avi", ".mkv"]:
video_files.append(f)
# Sort by size and take first 3
video_files.sort(key=lambda p: p.stat().st_size)
selected = video_files[:3]
print(f"Testing {len(selected)} video files (sorted by size):")
for vf in selected:
print(f" - {vf.name}: {vf.stat().st_size / 1024 / 1024:.1f} MB")
results = []
for i, video in enumerate(selected, 1):
print(f"\n{'=' * 60}")
print(f"Test {i}/{len(selected)}: {video.name}")
print(f"{'=' * 60}")
output_file = test_dir / f"{video.stem}.asr.json"
success, duration, segments = run_asr_on_video(
video, output_file, timeout_sec=300
)
results.append(
{
"video": video.name,
"size_mb": video.stat().st_size / 1024 / 1024,
"success": success,
"duration": duration,
"segments": segments,
"output_file": str(output_file),
}
)
# Small delay between tests
if i < len(selected):
print(" Waiting 5 seconds before next test...")
time.sleep(5)
# Summary
print(f"\n{'=' * 60}")
print("SUMMARY")
print(f"{'=' * 60}")
for r in results:
status = "" if r["success"] else ""
print(f"{status} {r['video']}: {r['duration']:.1f}s, {r['segments']} segments")
success_count = sum(1 for r in results if r["success"])
print(f"\nSuccess rate: {success_count}/{len(results)}")
# Save results
with open(test_dir / "test_results.json", "w") as f:
json.dump(results, f, indent=2)
print(f"\nDetailed results saved to: {test_dir}/test_results.json")
if __name__ == "__main__":
main()