Files
momentry_core/test_large_videos.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

280 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
Test ASR on large video files that may cause issues.
"""
import os
import sys
import time
import json
import subprocess
import signal
import threading
from pathlib import Path
import psutil
def check_audio_stream(video_path):
"""Check if video has audio stream using ffprobe."""
try:
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a",
"-show_entries",
"stream=codec_type",
"-of",
"csv=p=0",
str(video_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return bool(result.stdout.strip())
except:
return True # Assume audio exists if ffprobe fails
def monitor_resources(pid, interval=5, stop_event=None):
"""Monitor process resources and return statistics."""
cpu_readings = []
mem_readings = []
while not stop_event or not stop_event.is_set():
try:
proc = psutil.Process(pid)
cpu_readings.append(proc.cpu_percent(interval=0.1))
mem_readings.append(proc.memory_info().rss / 1024 / 1024)
except (psutil.NoSuchProcess, psutil.AccessDenied):
break
if stop_event:
stop_event.wait(interval)
else:
time.sleep(interval)
# Return summary
if cpu_readings and mem_readings:
return {
"cpu_avg": sum(cpu_readings) / len(cpu_readings),
"cpu_max": max(cpu_readings),
"mem_avg_mb": sum(mem_readings) / len(mem_readings),
"mem_max_mb": max(mem_readings),
"samples": len(cpu_readings),
}
return {}
def test_large_video(video_path, output_path, timeout_sec=1800):
"""Test ASR on a large video file with detailed monitoring."""
print(f"\n{'=' * 70}")
print(f"Testing large video: {video_path.name}")
print(f"Size: {video_path.stat().st_size / 1024 / 1024 / 1024:.2f} GB")
print(f"{'=' * 70}")
# Check audio first
print(" Checking audio stream...")
has_audio = check_audio_stream(video_path)
print(f" Has audio: {has_audio}")
if not has_audio:
print(" No audio stream - ASR will skip transcription")
return {
"video": video_path.name,
"size_gb": video_path.stat().st_size / 1024 / 1024 / 1024,
"has_audio": False,
"success": True,
"duration": 0,
"segments": 0,
"error": "No audio stream",
}
script_path = Path(__file__).parent / "scripts" / "asr_processor.py"
cmd = [sys.executable, str(script_path), str(video_path), str(output_path)]
print(f" Command: {' '.join(cmd[:3])} ...")
print(f" Timeout: {timeout_sec}s ({timeout_sec / 60:.1f} minutes)")
start_time = time.time()
result = {
"video": video_path.name,
"size_gb": video_path.stat().st_size / 1024 / 1024 / 1024,
"has_audio": True,
"success": False,
"duration": 0,
"segments": 0,
"error": None,
"resources": {},
}
try:
# Start process
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
preexec_fn=os.setsid,
bufsize=1,
)
print(f" Process started with PID: {proc.pid}")
# Start resource monitoring in background
stop_monitor = threading.Event()
monitor_thread = threading.Thread(
target=lambda: monitor_resources(
proc.pid, interval=10, stop_event=stop_monitor
)
)
monitor_thread.daemon = True
monitor_thread.start()
# Read stderr in real-time
def read_stderr():
for line in iter(proc.stderr.readline, ""):
line = line.rstrip()
if line:
print(f" [ASR] {line}")
stderr_thread = threading.Thread(target=read_stderr)
stderr_thread.daemon = True
stderr_thread.start()
# Wait for completion
try:
returncode = proc.wait(timeout=timeout_sec)
duration = time.time() - start_time
result["duration"] = duration
# Stop monitoring
stop_monitor.set()
monitor_thread.join(timeout=2)
# Get remaining output
stdout, _ = proc.communicate()
print(
f" Process exited with code {returncode} after {duration:.1f}s ({duration / 60:.1f} min)"
)
if returncode == 0:
# Check output file
if output_path.exists():
with open(output_path, "r") as f:
asr_result = json.load(f)
segments = len(asr_result.get("segments", []))
language = asr_result.get("language", "unknown")
result["segments"] = segments
result["language"] = language
result["success"] = True
print(f" Success: {segments} segments, language: {language}")
else:
result["error"] = "Output file not created"
print(f" Error: Output file not created")
else:
result["error"] = f"Process failed with exit code {returncode}"
print(f" Error: Process failed with exit code {returncode}")
except subprocess.TimeoutExpired:
duration = time.time() - start_time
result["duration"] = duration
result["error"] = f"Timeout after {duration:.1f}s"
print(f" ERROR: Timeout after {duration:.1f}s ({duration / 60:.1f} min)")
# Kill process group
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
print(" Sent SIGKILL to process group")
except:
pass
proc.wait(timeout=5)
except Exception as e:
result["error"] = str(e)
print(f" Exception: {e}")
import traceback
traceback.print_exc()
result["duration"] = time.time() - start_time
return result
def main():
video_dir = Path("../test_video")
test_dir = Path("test_large_output")
test_dir.mkdir(exist_ok=True)
# Identify large video files (> 1GB)
large_videos = []
for f in video_dir.iterdir():
if f.suffix.lower() in [".mov", ".m4v", ".mp4", ".avi", ".mkv"]:
size_gb = f.stat().st_size / 1024 / 1024 / 1024
if size_gb > 1.0: # Larger than 1GB
large_videos.append((f, size_gb))
if not large_videos:
print("No large video files (>1GB) found.")
return
print(f"Found {len(large_videos)} large video files (>1GB):")
for f, size in sorted(large_videos, key=lambda x: x[1], reverse=True):
print(f" - {f.name}: {size:.2f} GB")
# Test the largest 2 files
selected = [
f for f, _ in sorted(large_videos, key=lambda x: x[1], reverse=True)[:2]
]
print(f"\nWill test {len(selected)} largest files:")
for f in selected:
print(f" - {f.name}")
results = []
for video in selected:
output_file = test_dir / f"{video.stem}.asr.json"
result = test_large_video(
video, output_file, timeout_sec=2400
) # 40 minutes timeout
results.append(result)
# Save intermediate results
with open(test_dir / "large_video_results.json", "w") as f:
json.dump(results, f, indent=2)
# Wait between tests if there are more
if video != selected[-1]:
print("\n Waiting 30 seconds before next test...")
time.sleep(30)
# Summary
print(f"\n{'=' * 70}")
print("LARGE VIDEO TEST SUMMARY")
print(f"{'=' * 70}")
for r in results:
status = "" if r["success"] else ""
error_msg = f" - {r['error']}" if r["error"] else ""
print(
f"{status} {r['video']}: {r['duration']:.1f}s, {r.get('segments', 0)} segments{error_msg}"
)
success_count = sum(1 for r in results if r["success"])
print(f"\nSuccess rate: {success_count}/{len(results)}")
# Save final report
report_path = test_dir / "final_report.json"
with open(report_path, "w") as f:
json.dump(results, f, indent=2)
print(f"\nDetailed results saved to: {report_path}")
if __name__ == "__main__":
main()