- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
169 lines
5.1 KiB
Python
169 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple ASR test script - test a few video files with detailed logging.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import subprocess
|
|
import signal
|
|
from pathlib import Path
|
|
|
|
|
|
def run_asr_on_video(video_path, output_path, timeout_sec=600):
|
|
"""Run ASR processor with timeout and resource monitoring."""
|
|
script_path = Path(__file__).parent / "scripts" / "asr_processor.py"
|
|
cmd = [sys.executable, str(script_path), str(video_path), str(output_path)]
|
|
|
|
print(f" Command: {' '.join(cmd)}")
|
|
print(f" Timeout: {timeout_sec}s")
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Start process with process group for clean termination
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
preexec_fn=os.setsid,
|
|
bufsize=1,
|
|
universal_newlines=True,
|
|
)
|
|
|
|
# Read stderr in real-time to see progress
|
|
def read_stream(stream, prefix):
|
|
for line in iter(stream.readline, ""):
|
|
print(f"{prefix}{line.rstrip()}")
|
|
sys.stdout.flush()
|
|
|
|
# Start reading stderr in background
|
|
import threading
|
|
|
|
stderr_thread = threading.Thread(
|
|
target=read_stream, args=(proc.stderr, " [stderr] ")
|
|
)
|
|
stderr_thread.daemon = True
|
|
stderr_thread.start()
|
|
|
|
# Wait for process completion with timeout
|
|
try:
|
|
returncode = proc.wait(timeout=timeout_sec)
|
|
duration = time.time() - start_time
|
|
|
|
# Get any remaining output
|
|
stdout, _ = proc.communicate()
|
|
if stdout:
|
|
print(f" [stdout] {stdout.strip()}")
|
|
|
|
print(f" Process exited with code {returncode} after {duration:.1f}s")
|
|
|
|
if returncode == 0:
|
|
# Verify output file
|
|
if os.path.exists(output_path):
|
|
with open(output_path, "r") as f:
|
|
result = json.load(f)
|
|
segments = len(result.get("segments", []))
|
|
language = result.get("language", "unknown")
|
|
print(f" Success: {segments} segments, language: {language}")
|
|
return True, duration, segments
|
|
else:
|
|
print(f" Error: Output file not created: {output_path}")
|
|
return False, duration, 0
|
|
else:
|
|
print(f" Error: Process failed with exit code {returncode}")
|
|
return False, duration, 0
|
|
|
|
except subprocess.TimeoutExpired:
|
|
duration = time.time() - start_time
|
|
print(f" ERROR: Process timed out after {duration:.1f}s")
|
|
|
|
# Kill entire process group
|
|
try:
|
|
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
|
|
except:
|
|
pass
|
|
|
|
proc.wait(timeout=5)
|
|
return False, duration, 0
|
|
|
|
except Exception as e:
|
|
print(f" Exception: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
return False, time.time() - start_time, 0
|
|
|
|
|
|
def main():
|
|
video_dir = "../test_video"
|
|
test_dir = Path("test_output_simple")
|
|
test_dir.mkdir(exist_ok=True)
|
|
|
|
# Select a few test files (small to medium)
|
|
video_files = []
|
|
for f in Path(video_dir).iterdir():
|
|
if f.suffix.lower() in [".mp4", ".mov", ".avi", ".mkv"]:
|
|
video_files.append(f)
|
|
|
|
# Sort by size and take first 3
|
|
video_files.sort(key=lambda p: p.stat().st_size)
|
|
selected = video_files[:3]
|
|
|
|
print(f"Testing {len(selected)} video files (sorted by size):")
|
|
for vf in selected:
|
|
print(f" - {vf.name}: {vf.stat().st_size / 1024 / 1024:.1f} MB")
|
|
|
|
results = []
|
|
for i, video in enumerate(selected, 1):
|
|
print(f"\n{'=' * 60}")
|
|
print(f"Test {i}/{len(selected)}: {video.name}")
|
|
print(f"{'=' * 60}")
|
|
|
|
output_file = test_dir / f"{video.stem}.asr.json"
|
|
|
|
success, duration, segments = run_asr_on_video(
|
|
video, output_file, timeout_sec=300
|
|
)
|
|
|
|
results.append(
|
|
{
|
|
"video": video.name,
|
|
"size_mb": video.stat().st_size / 1024 / 1024,
|
|
"success": success,
|
|
"duration": duration,
|
|
"segments": segments,
|
|
"output_file": str(output_file),
|
|
}
|
|
)
|
|
|
|
# Small delay between tests
|
|
if i < len(selected):
|
|
print(" Waiting 5 seconds before next test...")
|
|
time.sleep(5)
|
|
|
|
# Summary
|
|
print(f"\n{'=' * 60}")
|
|
print("SUMMARY")
|
|
print(f"{'=' * 60}")
|
|
|
|
for r in results:
|
|
status = "✓" if r["success"] else "✗"
|
|
print(f"{status} {r['video']}: {r['duration']:.1f}s, {r['segments']} segments")
|
|
|
|
success_count = sum(1 for r in results if r["success"])
|
|
print(f"\nSuccess rate: {success_count}/{len(results)}")
|
|
|
|
# Save results
|
|
with open(test_dir / "test_results.json", "w") as f:
|
|
json.dump(results, f, indent=2)
|
|
|
|
print(f"\nDetailed results saved to: {test_dir}/test_results.json")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|