#!/opt/homebrew/bin/python3.11 """ Shutdown/Reboot Testing Script Tests the system's ability to handle service interruptions and recover gracefully. This simulates various shutdown scenarios and verifies that: 1. Services can be stopped gracefully 2. Data is preserved during shutdown 3. Services can be restarted successfully 4. System returns to healthy state after restart """ import subprocess import time import json import os import sys from datetime import datetime from typing import Dict, List, Tuple class ShutdownTester: def __init__(self): self.test_results = [] self.start_time = datetime.now() def log(self, message: str, level: str = "info"): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] [{level.upper()}] {message}") def run_command(self, cmd: str, timeout: int = 30) -> Tuple[bool, str]: """Run a shell command and return success status and output""" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout ) return result.returncode == 0, result.stdout.strip() except subprocess.TimeoutExpired: return False, f"Command timed out after {timeout}s" except Exception as e: return False, str(e) def check_service_health(self) -> Dict[str, bool]: """Check health of all services""" self.log("Checking service health...") services = { "postgresql": "pg_isready -h localhost -p 5432", "redis": "redis-cli ping", "mongodb": "mongosh --eval 'db.adminCommand(\"ping\")' --quiet", "qdrant": "curl -s http://localhost:6333/health", "gitea": "curl -s http://localhost:3000/api/v1/version", "sftpgo": "curl -s http://localhost:8080/healthz", "caddy": "curl -s http://localhost:2019/config/", "php_fpm": "ps aux | grep php-fpm | grep -v grep | head -1", "momentry": "curl -s http://localhost:3002/health", } results = {} for name, cmd in services.items(): success, output = self.run_command(cmd, timeout=10) results[name] = success status = "✓" if success else "✗" self.log(f" {status} {name}: {'healthy' if success else 'unhealthy'}") return results def test_graceful_shutdown(self, service: str) -> bool: """Test graceful shutdown of a specific service""" self.log(f"Testing graceful shutdown of {service}...") shutdown_commands = { "redis": "redis-cli shutdown", "postgresql": "pg_ctl -D /Users/accusys/momentry/var/postgresql stop -m smart", "mongodb": "mongod --dbpath /opt/homebrew/var/mongodb --shutdown", "momentry": "pkill -TERM momentry", } if service not in shutdown_commands: self.log(f"No shutdown command for {service}, skipping", "warning") return True # Check if service is running health_before = self.check_service_health() if not health_before.get(service, False): self.log(f"Service {service} not running, skipping", "warning") return True # Perform graceful shutdown success, output = self.run_command(shutdown_commands[service], timeout=30) if not success: self.log(f"Failed to shutdown {service}: {output}", "error") return False # Wait for shutdown to complete time.sleep(5) # Check if service is stopped health_after = self.check_service_health() if health_after.get(service, True): # If still running self.log(f"Service {service} still running after shutdown", "error") return False self.log(f"Service {service} shutdown successfully") return True def test_service_restart(self, service: str) -> bool: """Test restarting a specific service""" self.log(f"Testing restart of {service}...") restart_commands = { "redis": "brew services restart redis", "postgresql": "brew services restart postgresql@18", "mongodb": "brew services restart mongodb-community", "momentry": "cd /Users/accusys/momentry_core_0.1 && cargo run --bin momentry -- server --port 3002 &", } if service not in restart_commands: self.log(f"No restart command for {service}, skipping", "warning") return True # Start the service success, output = self.run_command(restart_commands[service], timeout=60) if not success: self.log(f"Failed to restart {service}: {output}", "error") return False # Wait for service to start time.sleep(10) # Check if service is healthy health = self.check_service_health() if not health.get(service, False): self.log(f"Service {service} not healthy after restart", "error") return False self.log(f"Service {service} restarted successfully") return True def test_ai_processor_recovery(self) -> bool: """Test AI processor recovery after service interruption""" self.log("Testing AI processor recovery...") # Test each processor's health check processors = [ ("asr", "scripts/asr_processor_contract_v2.py"), ("ocr", "scripts/ocr_processor_contract_v1.py"), ("yolo", "scripts/yolo_processor_contract_v1.py"), ("face", "scripts/face_processor_contract_v1.py"), ("pose", "scripts/pose_processor_contract_v1.py"), ] all_healthy = True for name, script in processors: if not os.path.exists(script): self.log(f"Processor script {script} not found", "warning") continue cmd = f"python3 {script} test_clip.mp4 /tmp/test_output.json --check-health" success, output = self.run_command(cmd, timeout=30) if success: try: result = json.loads(output) if result.get("status") == "healthy": self.log(f" ✓ {name}: healthy") else: self.log(f" ✗ {name}: unhealthy - {result}", "error") all_healthy = False except: self.log(f" ✗ {name}: invalid JSON output", "error") all_healthy = False else: self.log(f" ✗ {name}: health check failed - {output}", "error") all_healthy = False return all_healthy def test_data_persistence(self) -> bool: """Test that data persists across service restarts""" self.log("Testing data persistence...") # Test Redis data persistence test_key = "shutdown_test_key" test_value = f"test_value_{int(time.time())}" # Set a value in Redis set_cmd = f"redis-cli set {test_key} {test_value}" success, _ = self.run_command(set_cmd) if not success: self.log("Failed to set Redis test key", "error") return False # Restart Redis if not self.test_graceful_shutdown("redis"): return False if not self.test_service_restart("redis"): return False # Check if value still exists get_cmd = f"redis-cli get {test_key}" success, output = self.run_command(get_cmd) if not success or output != test_value: self.log( f"Redis data not persisted. Expected {test_value}, got {output}", "error", ) return False self.log(" ✓ Redis data persistence: passed") # Test PostgreSQL data persistence test_table = "shutdown_test" create_cmd = f'psql -U accusys -d momentry -c "CREATE TABLE IF NOT EXISTS {test_table} (id SERIAL, value TEXT);"' insert_cmd = f"psql -U accusys -d momentry -c \"INSERT INTO {test_table} (value) VALUES ('{test_value}');\"" select_cmd = f"psql -U accusys -d momentry -c \"SELECT value FROM {test_table} WHERE value = '{test_value}';\"" success, _ = self.run_command(create_cmd) if not success: self.log("Failed to create PostgreSQL test table", "error") return False success, _ = self.run_command(insert_cmd) if not success: self.log("Failed to insert PostgreSQL test data", "error") return False # Restart PostgreSQL if not self.test_graceful_shutdown("postgresql"): return False if not self.test_service_restart("postgresql"): return False # Check if data still exists success, output = self.run_command(select_cmd) if not success or test_value not in output: self.log( f"PostgreSQL data not persisted. Expected {test_value}, got {output}", "error", ) return False self.log(" ✓ PostgreSQL data persistence: passed") return True def run_all_tests(self) -> bool: """Run all shutdown/reboot tests""" self.log("=" * 60) self.log("Starting Shutdown/Reboot Testing") self.log("=" * 60) tests = [ ("Initial Health Check", self.check_service_health, {}), ("AI Processor Recovery", self.test_ai_processor_recovery, {}), ( "Redis Graceful Shutdown", self.test_graceful_shutdown, {"service": "redis"}, ), ("Redis Restart", self.test_service_restart, {"service": "redis"}), ( "PostgreSQL Graceful Shutdown", self.test_graceful_shutdown, {"service": "postgresql"}, ), ( "PostgreSQL Restart", self.test_service_restart, {"service": "postgresql"}, ), ("Data Persistence", self.test_data_persistence, {}), ("Final Health Check", self.check_service_health, {}), ] all_passed = True for test_name, test_func, kwargs in tests: self.log(f"\nTest: {test_name}") self.log("-" * 40) try: result = test_func(**kwargs) if isinstance(result, dict): # Health check returns dict healthy_count = sum(1 for v in result.values() if v) total_count = len(result) passed = healthy_count == total_count status = "PASS" if passed else "FAIL" self.log( f"Result: {status} ({healthy_count}/{total_count} services healthy)" ) else: # Other tests return bool status = "PASS" if result else "FAIL" self.log(f"Result: {status}") self.test_results.append( { "test": test_name, "status": status, "timestamp": datetime.now().isoformat(), } ) if not result: all_passed = False except Exception as e: self.log(f"Test failed with exception: {e}", "error") self.test_results.append( { "test": test_name, "status": "ERROR", "error": str(e), "timestamp": datetime.now().isoformat(), } ) all_passed = False # Generate report self.generate_report(all_passed) return all_passed def generate_report(self, all_passed: bool): """Generate test report""" self.log("\n" + "=" * 60) self.log("SHUTDOWN/REBOOT TESTING REPORT") self.log("=" * 60) total_tests = len(self.test_results) passed_tests = sum(1 for r in self.test_results if r["status"] == "PASS") failed_tests = sum(1 for r in self.test_results if r["status"] == "FAIL") error_tests = sum(1 for r in self.test_results if r["status"] == "ERROR") self.log(f"Total Tests: {total_tests}") self.log(f"Passed: {passed_tests}") self.log(f"Failed: {failed_tests}") self.log(f"Errors: {error_tests}") if all_passed: self.log( "\n✅ ALL TESTS PASSED - System handles shutdown/reboot gracefully" ) else: self.log("\n❌ SOME TESTS FAILED - Review issues above") # Save detailed report report = { "summary": { "total_tests": total_tests, "passed": passed_tests, "failed": failed_tests, "errors": error_tests, "overall_status": "PASS" if all_passed else "FAIL", "start_time": self.start_time.isoformat(), "end_time": datetime.now().isoformat(), "duration_seconds": (datetime.now() - self.start_time).total_seconds(), }, "test_results": self.test_results, "system_info": { "python_version": sys.version, "platform": sys.platform, "processor_count": os.cpu_count(), }, } report_file = f"/tmp/shutdown_test_report_{int(time.time())}.json" with open(report_file, "w") as f: json.dump(report, f, indent=2) self.log(f"\nDetailed report saved to: {report_file}") def main(): """Main entry point""" tester = ShutdownTester() try: success = tester.run_all_tests() sys.exit(0 if success else 1) except KeyboardInterrupt: tester.log("Testing interrupted by user", "warning") sys.exit(130) except Exception as e: tester.log(f"Fatal error: {e}", "error") sys.exit(1) if __name__ == "__main__": main()