#!/usr/bin/env python3 """ Monitor ASR processor resource usage during transcription. """ import os import sys import time import json import subprocess import signal import threading from pathlib import Path import psutil import numpy as np class ResourceMonitor: """Monitor system and process resources.""" def __init__(self, pid=None): self.pid = pid self.samples = [] self.running = False self.monitor_thread = None def start(self, interval=5): """Start monitoring in background thread.""" if self.running: return self.running = True self.monitor_thread = threading.Thread( target=self._monitor_loop, args=(interval,), daemon=True ) self.monitor_thread.start() print(f"Resource monitoring started (interval: {interval}s)") def stop(self): """Stop monitoring.""" self.running = False if self.monitor_thread: self.monitor_thread.join(timeout=2) print("Resource monitoring stopped") def _monitor_loop(self, interval): """Main monitoring loop.""" while self.running: sample = self._collect_sample() if sample: self.samples.append(sample) time.sleep(interval) def _collect_sample(self): """Collect resource sample for target process and system.""" sample = {"timestamp": time.time(), "system": {}, "process": {}} # System metrics try: sample["system"]["cpu_percent"] = psutil.cpu_percent(interval=0.1) sample["system"]["memory_percent"] = psutil.virtual_memory().percent sample["system"]["memory_available_gb"] = ( psutil.virtual_memory().available / 1024 / 1024 / 1024 ) except: pass # Process metrics (if PID provided) if self.pid: try: proc = psutil.Process(self.pid) with proc.oneshot(): sample["process"]["cpu_percent"] = proc.cpu_percent() sample["process"]["memory_rss_mb"] = ( proc.memory_info().rss / 1024 / 1024 ) sample["process"]["memory_vms_mb"] = ( proc.memory_info().vms / 1024 / 1024 ) sample["process"]["num_threads"] = proc.num_threads() sample["process"]["status"] = proc.status() except (psutil.NoSuchProcess, psutil.AccessDenied): sample["process"]["error"] = "Process not found" return sample def get_summary(self): """Get summary statistics from collected samples.""" if not self.samples: return {} summary = { "sample_count": len(self.samples), "duration_sec": self.samples[-1]["timestamp"] - self.samples[0]["timestamp"] if len(self.samples) > 1 else 0, } # Process metrics summary process_metrics = [] for s in self.samples: if "process" in s and "memory_rss_mb" in s["process"]: process_metrics.append(s["process"]) if process_metrics: rss_values = [ m["memory_rss_mb"] for m in process_metrics if "memory_rss_mb" in m ] cpu_values = [ m["cpu_percent"] for m in process_metrics if "cpu_percent" in m ] summary["process"] = { "rss_mb_avg": np.mean(rss_values) if rss_values else 0, "rss_mb_max": max(rss_values) if rss_values else 0, "rss_mb_min": min(rss_values) if rss_values else 0, "cpu_percent_avg": np.mean(cpu_values) if cpu_values else 0, "cpu_percent_max": max(cpu_values) if cpu_values else 0, } # System metrics summary system_metrics = [] for s in self.samples: if "system" in s and "cpu_percent" in s["system"]: system_metrics.append(s["system"]) if system_metrics: sys_cpu = [m["cpu_percent"] for m in system_metrics if "cpu_percent" in m] sys_mem = [ m["memory_percent"] for m in system_metrics if "memory_percent" in m ] summary["system"] = { "cpu_percent_avg": np.mean(sys_cpu) if sys_cpu else 0, "cpu_percent_max": max(sys_cpu) if sys_cpu else 0, "memory_percent_avg": np.mean(sys_mem) if sys_mem else 0, "memory_percent_max": max(sys_mem) if sys_mem else 0, } return summary def print_realtime(self, interval=10): """Print real-time metrics every interval seconds.""" print(f"\n{'Time':>6} {'CPU%':>6} {'RSS(MB)':>8} {'VMS(MB)':>8} {'Threads':>8}") print("-" * 50) last_print = 0 while self.running: if self.samples and time.time() - last_print >= interval: sample = self.samples[-1] if "process" in sample: p = sample["process"] cpu = p.get("cpu_percent", 0) rss = p.get("memory_rss_mb", 0) vms = p.get("memory_vms_mb", 0) threads = p.get("num_threads", 0) elapsed = sample["timestamp"] - self.samples[0]["timestamp"] print( f"{elapsed:6.0f} {cpu:6.1f} {rss:8.1f} {vms:8.1f} {threads:8}" ) last_print = time.time() time.sleep(1) def run_asr_with_monitoring(video_path, output_path, timeout_sec=600): """Run ASR processor with resource monitoring.""" script_path = Path(__file__).parent / "scripts" / "asr_processor.py" cmd = [sys.executable, str(script_path), str(video_path), str(output_path)] print(f"Running ASR on: {video_path}") print(f"Output: {output_path}") print(f"Command: {' '.join(cmd)}") print(f"Timeout: {timeout_sec}s\n") start_time = time.time() # Start process proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, preexec_fn=os.setsid, bufsize=1, ) print(f"ASR process PID: {proc.pid}") # Start resource monitoring monitor = ResourceMonitor(pid=proc.pid) monitor.start(interval=5) # Start real-time display in background display_thread = threading.Thread( target=monitor.print_realtime, args=(10,), daemon=True ) display_thread.start() # Read stderr in real-time def read_stderr(): for line in iter(proc.stderr.readline, ""): line = line.strip() if line: print(f"[ASR] {line}") stderr_thread = threading.Thread(target=read_stderr, daemon=True) stderr_thread.start() result = { "success": False, "duration": 0, "exit_code": None, "error": None, "resources": {}, "output": None, } try: # Wait for process completion returncode = proc.wait(timeout=timeout_sec) duration = time.time() - start_time result["duration"] = duration result["exit_code"] = returncode # Stop monitoring monitor.stop() # Get remaining output stdout, _ = proc.communicate() print(f"\nProcess completed after {duration:.1f}s") print(f"Exit code: {returncode}") if returncode == 0: # Check output file if os.path.exists(output_path): with open(output_path, "r") as f: asr_result = json.load(f) segments = len(asr_result.get("segments", [])) language = asr_result.get("language", "unknown") result["output"] = {"segments": segments, "language": language} result["success"] = True print(f"Success: {segments} segments, language: {language}") else: result["error"] = "Output file not created" print(f"Error: Output file not created") else: result["error"] = f"Process failed with exit code {returncode}" print(f"Error: Process failed with exit code {returncode}") except subprocess.TimeoutExpired: duration = time.time() - start_time result["duration"] = duration result["error"] = f"Timeout after {duration:.1f}s" print(f"\nERROR: Timeout after {duration:.1f}s") # Kill process group try: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) print("Sent SIGKILL to process group") except: pass proc.wait(timeout=5) monitor.stop() except Exception as e: result["error"] = str(e) print(f"\nException: {e}") import traceback traceback.print_exc() monitor.stop() # Get resource summary result["resources"] = monitor.get_summary() # Print resource summary if result["resources"]: print(f"\n{'=' * 60}") print("RESOURCE USAGE SUMMARY") print(f"{'=' * 60}") summary = result["resources"] print(f"Monitoring duration: {summary.get('duration_sec', 0):.1f}s") print(f"Samples collected: {summary.get('sample_count', 0)}") if "process" in summary: p = summary["process"] print(f"\nProcess metrics:") print(f" Peak RSS memory: {p.get('rss_mb_max', 0):.1f} MB") print(f" Average RSS memory: {p.get('rss_mb_avg', 0):.1f} MB") print(f" Peak CPU usage: {p.get('cpu_percent_max', 0):.1f}%") print(f" Average CPU usage: {p.get('cpu_percent_avg', 0):.1f}%") if "system" in summary: s = summary["system"] print(f"\nSystem metrics:") print(f" Peak CPU usage: {s.get('cpu_percent_max', 0):.1f}%") print(f" Average CPU usage: {s.get('cpu_percent_avg', 0):.1f}%") print(f" Peak memory usage: {s.get('memory_percent_max', 0):.1f}%") print(f" Average memory usage: {s.get('memory_percent_avg', 0):.1f}%") return result def main(): """Test ASR on a video file with monitoring.""" import argparse parser = argparse.ArgumentParser(description="Test ASR with resource monitoring") parser.add_argument("video", help="Video file path") parser.add_argument("-o", "--output", help="Output JSON path", default=None) parser.add_argument( "-t", "--timeout", type=int, default=600, help="Timeout in seconds" ) args = parser.parse_args() video_path = Path(args.video) if not video_path.exists(): print(f"Error: Video file not found: {video_path}") sys.exit(1) if args.output: output_path = Path(args.output) else: output_path = Path(f"test_output/{video_path.stem}_monitored.asr.json") output_path.parent.mkdir(exist_ok=True, parents=True) print(f"ASR Resource Monitoring Test") print(f"{'=' * 60}") result = run_asr_with_monitoring(video_path, output_path, timeout_sec=args.timeout) # Save detailed results result_path = output_path.parent / f"{video_path.stem}_results.json" with open(result_path, "w") as f: json.dump(result, f, indent=2) print(f"\nDetailed results saved to: {result_path}") # Return exit code based on success sys.exit(0 if result["success"] else 1) if __name__ == "__main__": main()