Files
momentry_core/monitor_asr.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

351 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Monitor ASR processor resource usage during transcription.
"""
import os
import sys
import time
import json
import subprocess
import signal
import threading
from pathlib import Path
import psutil
import numpy as np
class ResourceMonitor:
"""Monitor system and process resources."""
def __init__(self, pid=None):
self.pid = pid
self.samples = []
self.running = False
self.monitor_thread = None
def start(self, interval=5):
"""Start monitoring in background thread."""
if self.running:
return
self.running = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop, args=(interval,), daemon=True
)
self.monitor_thread.start()
print(f"Resource monitoring started (interval: {interval}s)")
def stop(self):
"""Stop monitoring."""
self.running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=2)
print("Resource monitoring stopped")
def _monitor_loop(self, interval):
"""Main monitoring loop."""
while self.running:
sample = self._collect_sample()
if sample:
self.samples.append(sample)
time.sleep(interval)
def _collect_sample(self):
"""Collect resource sample for target process and system."""
sample = {"timestamp": time.time(), "system": {}, "process": {}}
# System metrics
try:
sample["system"]["cpu_percent"] = psutil.cpu_percent(interval=0.1)
sample["system"]["memory_percent"] = psutil.virtual_memory().percent
sample["system"]["memory_available_gb"] = (
psutil.virtual_memory().available / 1024 / 1024 / 1024
)
except:
pass
# Process metrics (if PID provided)
if self.pid:
try:
proc = psutil.Process(self.pid)
with proc.oneshot():
sample["process"]["cpu_percent"] = proc.cpu_percent()
sample["process"]["memory_rss_mb"] = (
proc.memory_info().rss / 1024 / 1024
)
sample["process"]["memory_vms_mb"] = (
proc.memory_info().vms / 1024 / 1024
)
sample["process"]["num_threads"] = proc.num_threads()
sample["process"]["status"] = proc.status()
except (psutil.NoSuchProcess, psutil.AccessDenied):
sample["process"]["error"] = "Process not found"
return sample
def get_summary(self):
"""Get summary statistics from collected samples."""
if not self.samples:
return {}
summary = {
"sample_count": len(self.samples),
"duration_sec": self.samples[-1]["timestamp"] - self.samples[0]["timestamp"]
if len(self.samples) > 1
else 0,
}
# Process metrics summary
process_metrics = []
for s in self.samples:
if "process" in s and "memory_rss_mb" in s["process"]:
process_metrics.append(s["process"])
if process_metrics:
rss_values = [
m["memory_rss_mb"] for m in process_metrics if "memory_rss_mb" in m
]
cpu_values = [
m["cpu_percent"] for m in process_metrics if "cpu_percent" in m
]
summary["process"] = {
"rss_mb_avg": np.mean(rss_values) if rss_values else 0,
"rss_mb_max": max(rss_values) if rss_values else 0,
"rss_mb_min": min(rss_values) if rss_values else 0,
"cpu_percent_avg": np.mean(cpu_values) if cpu_values else 0,
"cpu_percent_max": max(cpu_values) if cpu_values else 0,
}
# System metrics summary
system_metrics = []
for s in self.samples:
if "system" in s and "cpu_percent" in s["system"]:
system_metrics.append(s["system"])
if system_metrics:
sys_cpu = [m["cpu_percent"] for m in system_metrics if "cpu_percent" in m]
sys_mem = [
m["memory_percent"] for m in system_metrics if "memory_percent" in m
]
summary["system"] = {
"cpu_percent_avg": np.mean(sys_cpu) if sys_cpu else 0,
"cpu_percent_max": max(sys_cpu) if sys_cpu else 0,
"memory_percent_avg": np.mean(sys_mem) if sys_mem else 0,
"memory_percent_max": max(sys_mem) if sys_mem else 0,
}
return summary
def print_realtime(self, interval=10):
"""Print real-time metrics every interval seconds."""
print(f"\n{'Time':>6} {'CPU%':>6} {'RSS(MB)':>8} {'VMS(MB)':>8} {'Threads':>8}")
print("-" * 50)
last_print = 0
while self.running:
if self.samples and time.time() - last_print >= interval:
sample = self.samples[-1]
if "process" in sample:
p = sample["process"]
cpu = p.get("cpu_percent", 0)
rss = p.get("memory_rss_mb", 0)
vms = p.get("memory_vms_mb", 0)
threads = p.get("num_threads", 0)
elapsed = sample["timestamp"] - self.samples[0]["timestamp"]
print(
f"{elapsed:6.0f} {cpu:6.1f} {rss:8.1f} {vms:8.1f} {threads:8}"
)
last_print = time.time()
time.sleep(1)
def run_asr_with_monitoring(video_path, output_path, timeout_sec=600):
"""Run ASR processor with resource monitoring."""
script_path = Path(__file__).parent / "scripts" / "asr_processor.py"
cmd = [sys.executable, str(script_path), str(video_path), str(output_path)]
print(f"Running ASR on: {video_path}")
print(f"Output: {output_path}")
print(f"Command: {' '.join(cmd)}")
print(f"Timeout: {timeout_sec}s\n")
start_time = time.time()
# Start process
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
preexec_fn=os.setsid,
bufsize=1,
)
print(f"ASR process PID: {proc.pid}")
# Start resource monitoring
monitor = ResourceMonitor(pid=proc.pid)
monitor.start(interval=5)
# Start real-time display in background
display_thread = threading.Thread(
target=monitor.print_realtime, args=(10,), daemon=True
)
display_thread.start()
# Read stderr in real-time
def read_stderr():
for line in iter(proc.stderr.readline, ""):
line = line.strip()
if line:
print(f"[ASR] {line}")
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stderr_thread.start()
result = {
"success": False,
"duration": 0,
"exit_code": None,
"error": None,
"resources": {},
"output": None,
}
try:
# Wait for process completion
returncode = proc.wait(timeout=timeout_sec)
duration = time.time() - start_time
result["duration"] = duration
result["exit_code"] = returncode
# Stop monitoring
monitor.stop()
# Get remaining output
stdout, _ = proc.communicate()
print(f"\nProcess completed after {duration:.1f}s")
print(f"Exit code: {returncode}")
if returncode == 0:
# Check output file
if os.path.exists(output_path):
with open(output_path, "r") as f:
asr_result = json.load(f)
segments = len(asr_result.get("segments", []))
language = asr_result.get("language", "unknown")
result["output"] = {"segments": segments, "language": language}
result["success"] = True
print(f"Success: {segments} segments, language: {language}")
else:
result["error"] = "Output file not created"
print(f"Error: Output file not created")
else:
result["error"] = f"Process failed with exit code {returncode}"
print(f"Error: Process failed with exit code {returncode}")
except subprocess.TimeoutExpired:
duration = time.time() - start_time
result["duration"] = duration
result["error"] = f"Timeout after {duration:.1f}s"
print(f"\nERROR: Timeout after {duration:.1f}s")
# Kill process group
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
print("Sent SIGKILL to process group")
except:
pass
proc.wait(timeout=5)
monitor.stop()
except Exception as e:
result["error"] = str(e)
print(f"\nException: {e}")
import traceback
traceback.print_exc()
monitor.stop()
# Get resource summary
result["resources"] = monitor.get_summary()
# Print resource summary
if result["resources"]:
print(f"\n{'=' * 60}")
print("RESOURCE USAGE SUMMARY")
print(f"{'=' * 60}")
summary = result["resources"]
print(f"Monitoring duration: {summary.get('duration_sec', 0):.1f}s")
print(f"Samples collected: {summary.get('sample_count', 0)}")
if "process" in summary:
p = summary["process"]
print(f"\nProcess metrics:")
print(f" Peak RSS memory: {p.get('rss_mb_max', 0):.1f} MB")
print(f" Average RSS memory: {p.get('rss_mb_avg', 0):.1f} MB")
print(f" Peak CPU usage: {p.get('cpu_percent_max', 0):.1f}%")
print(f" Average CPU usage: {p.get('cpu_percent_avg', 0):.1f}%")
if "system" in summary:
s = summary["system"]
print(f"\nSystem metrics:")
print(f" Peak CPU usage: {s.get('cpu_percent_max', 0):.1f}%")
print(f" Average CPU usage: {s.get('cpu_percent_avg', 0):.1f}%")
print(f" Peak memory usage: {s.get('memory_percent_max', 0):.1f}%")
print(f" Average memory usage: {s.get('memory_percent_avg', 0):.1f}%")
return result
def main():
"""Test ASR on a video file with monitoring."""
import argparse
parser = argparse.ArgumentParser(description="Test ASR with resource monitoring")
parser.add_argument("video", help="Video file path")
parser.add_argument("-o", "--output", help="Output JSON path", default=None)
parser.add_argument(
"-t", "--timeout", type=int, default=600, help="Timeout in seconds"
)
args = parser.parse_args()
video_path = Path(args.video)
if not video_path.exists():
print(f"Error: Video file not found: {video_path}")
sys.exit(1)
if args.output:
output_path = Path(args.output)
else:
output_path = Path(f"test_output/{video_path.stem}_monitored.asr.json")
output_path.parent.mkdir(exist_ok=True, parents=True)
print(f"ASR Resource Monitoring Test")
print(f"{'=' * 60}")
result = run_asr_with_monitoring(video_path, output_path, timeout_sec=args.timeout)
# Save detailed results
result_path = output_path.parent / f"{video_path.stem}_results.json"
with open(result_path, "w") as f:
json.dump(result, f, indent=2)
print(f"\nDetailed results saved to: {result_path}")
# Return exit code based on success
sys.exit(0 if result["success"] else 1)
if __name__ == "__main__":
main()