- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
166 lines
4.8 KiB
Python
166 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Quick test of ASR on all video files with short timeout."""
|
|
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import json
|
|
import tempfile
|
|
import time
|
|
|
|
TEST_VIDEO_DIR = "/Users/accusys/test_video"
|
|
if not os.path.isdir(TEST_VIDEO_DIR):
|
|
print(f"Test video directory not found: {TEST_VIDEO_DIR}")
|
|
sys.exit(1)
|
|
|
|
# List all video files
|
|
video_exts = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".m4v"}
|
|
video_files = []
|
|
for f in os.listdir(TEST_VIDEO_DIR):
|
|
if os.path.splitext(f)[1].lower() in video_exts:
|
|
video_files.append(os.path.join(TEST_VIDEO_DIR, f))
|
|
|
|
if not video_files:
|
|
print("No video files found")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(video_files)} video files")
|
|
print("Will test each with 120 second timeout")
|
|
print()
|
|
|
|
results = []
|
|
|
|
for i, video_path in enumerate(video_files):
|
|
print(f"\n{i + 1}/{len(video_files)}: {os.path.basename(video_path)}")
|
|
size_mb = os.path.getsize(video_path) / (1024**2)
|
|
print(f" Size: {size_mb:.1f} MB")
|
|
|
|
# Skip if > 1GB (will take too long for quick test)
|
|
if size_mb > 1000:
|
|
print(f" ⏭️ Skipping (large file > 1GB)")
|
|
results.append(
|
|
{
|
|
"file": os.path.basename(video_path),
|
|
"size_mb": size_mb,
|
|
"skipped": True,
|
|
"reason": "large file > 1GB",
|
|
}
|
|
)
|
|
continue
|
|
|
|
# Create temporary output
|
|
temp_dir = tempfile.mkdtemp(prefix="asr_quick_")
|
|
output_path = os.path.join(temp_dir, "output.json")
|
|
|
|
cmd = [
|
|
"/opt/homebrew/bin/python3.11",
|
|
"scripts/asr_processor.py",
|
|
video_path,
|
|
output_path,
|
|
"--model-size",
|
|
"tiny",
|
|
]
|
|
|
|
start = time.time()
|
|
timeout = 120 # 2 minutes
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
elapsed = time.time() - start
|
|
success = proc.returncode == 0
|
|
error_msg = proc.stderr[:100] if proc.stderr else ""
|
|
timeout_hit = False
|
|
except subprocess.TimeoutExpired:
|
|
elapsed = timeout
|
|
success = False
|
|
error_msg = f"Timeout after {timeout}s"
|
|
timeout_hit = True
|
|
except Exception as e:
|
|
elapsed = time.time() - start
|
|
success = False
|
|
error_msg = str(e)
|
|
timeout_hit = False
|
|
|
|
# Parse output if exists
|
|
segments = 0
|
|
language = ""
|
|
if os.path.exists(output_path):
|
|
try:
|
|
with open(output_path, "r") as f:
|
|
data = json.load(f)
|
|
segments = len(data.get("segments", []))
|
|
language = data.get("language", "")
|
|
except:
|
|
pass
|
|
|
|
# Clean up
|
|
import shutil
|
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
has_audio = segments > 0 or (language != "" and language != "")
|
|
|
|
result = {
|
|
"file": os.path.basename(video_path),
|
|
"size_mb": size_mb,
|
|
"success": success,
|
|
"timeout": timeout_hit,
|
|
"elapsed": elapsed,
|
|
"segments": segments,
|
|
"language": language,
|
|
"has_audio": has_audio,
|
|
"error": error_msg[:100],
|
|
"skipped": False,
|
|
}
|
|
results.append(result)
|
|
|
|
status = "✅ SUCCESS" if success else "❌ FAILED"
|
|
if timeout_hit:
|
|
status += " (TIMEOUT)"
|
|
print(f" Result: {status}, {elapsed:.1f}s, {segments} segments")
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("QUICK TEST SUMMARY")
|
|
print("=" * 60)
|
|
success_count = sum(1 for r in results if not r.get("skipped", False) and r["success"])
|
|
timeout_count = sum(1 for r in results if not r.get("skipped", False) and r["timeout"])
|
|
skipped_count = sum(1 for r in results if r.get("skipped", False))
|
|
no_audio_count = sum(
|
|
1
|
|
for r in results
|
|
if not r.get("skipped", False) and not r["has_audio"] and r["success"]
|
|
)
|
|
|
|
print(f"Total videos: {len(results)}")
|
|
print(f"Tested: {len(results) - skipped_count}")
|
|
print(f"Skipped (large): {skipped_count}")
|
|
print(f"Successful: {success_count}")
|
|
print(f"Failed: {len(results) - success_count - skipped_count}")
|
|
print(f"Timeouts: {timeout_count}")
|
|
print(f"No audio (skipped): {no_audio_count}")
|
|
print()
|
|
|
|
for r in results:
|
|
if r.get("skipped"):
|
|
print(f"⏭️ {r['file']:50s} SKIPPED: {r['reason']}")
|
|
else:
|
|
status = "✅" if r["success"] else "❌"
|
|
if r["timeout"]:
|
|
status = "⏱️"
|
|
print(
|
|
f"{status} {r['file']:50s} {r['elapsed']:6.1f}s segs:{r['segments']:4d} lang:{r['language']:5s}"
|
|
)
|
|
|
|
# Check for any failures not due to missing audio
|
|
failed = [
|
|
r for r in results if not r.get("skipped") and not r["success"] and r["has_audio"]
|
|
]
|
|
if failed:
|
|
print("\n❌ FAILURES DETECTED (videos with audio):")
|
|
for r in failed:
|
|
print(f" {r['file']}: {r['error']}")
|
|
sys.exit(1)
|
|
else:
|
|
print("\n✅ All tested videos with audio processed successfully.")
|
|
sys.exit(0)
|