Files
momentry_core/test_shutdown_recovery.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

392 lines
14 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Shutdown/Reboot Testing Script
Tests the system's ability to handle service interruptions and recover gracefully.
This simulates various shutdown scenarios and verifies that:
1. Services can be stopped gracefully
2. Data is preserved during shutdown
3. Services can be restarted successfully
4. System returns to healthy state after restart
"""
import subprocess
import time
import json
import os
import sys
from datetime import datetime
from typing import Dict, List, Tuple
class ShutdownTester:
def __init__(self):
self.test_results = []
self.start_time = datetime.now()
def log(self, message: str, level: str = "info"):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level.upper()}] {message}")
def run_command(self, cmd: str, timeout: int = 30) -> Tuple[bool, str]:
"""Run a shell command and return success status and output"""
try:
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout
)
return result.returncode == 0, result.stdout.strip()
except subprocess.TimeoutExpired:
return False, f"Command timed out after {timeout}s"
except Exception as e:
return False, str(e)
def check_service_health(self) -> Dict[str, bool]:
"""Check health of all services"""
self.log("Checking service health...")
services = {
"postgresql": "pg_isready -h localhost -p 5432",
"redis": "redis-cli ping",
"mongodb": "mongosh --eval 'db.adminCommand(\"ping\")' --quiet",
"qdrant": "curl -s http://localhost:6333/health",
"gitea": "curl -s http://localhost:3000/api/v1/version",
"sftpgo": "curl -s http://localhost:8080/healthz",
"caddy": "curl -s http://localhost:2019/config/",
"php_fpm": "ps aux | grep php-fpm | grep -v grep | head -1",
"momentry": "curl -s http://localhost:3002/health",
}
results = {}
for name, cmd in services.items():
success, output = self.run_command(cmd, timeout=10)
results[name] = success
status = "" if success else ""
self.log(f" {status} {name}: {'healthy' if success else 'unhealthy'}")
return results
def test_graceful_shutdown(self, service: str) -> bool:
"""Test graceful shutdown of a specific service"""
self.log(f"Testing graceful shutdown of {service}...")
shutdown_commands = {
"redis": "redis-cli shutdown",
"postgresql": "pg_ctl -D /Users/accusys/momentry/var/postgresql stop -m smart",
"mongodb": "mongod --dbpath /opt/homebrew/var/mongodb --shutdown",
"momentry": "pkill -TERM momentry",
}
if service not in shutdown_commands:
self.log(f"No shutdown command for {service}, skipping", "warning")
return True
# Check if service is running
health_before = self.check_service_health()
if not health_before.get(service, False):
self.log(f"Service {service} not running, skipping", "warning")
return True
# Perform graceful shutdown
success, output = self.run_command(shutdown_commands[service], timeout=30)
if not success:
self.log(f"Failed to shutdown {service}: {output}", "error")
return False
# Wait for shutdown to complete
time.sleep(5)
# Check if service is stopped
health_after = self.check_service_health()
if health_after.get(service, True): # If still running
self.log(f"Service {service} still running after shutdown", "error")
return False
self.log(f"Service {service} shutdown successfully")
return True
def test_service_restart(self, service: str) -> bool:
"""Test restarting a specific service"""
self.log(f"Testing restart of {service}...")
restart_commands = {
"redis": "brew services restart redis",
"postgresql": "brew services restart postgresql@18",
"mongodb": "brew services restart mongodb-community",
"momentry": "cd /Users/accusys/momentry_core_0.1 && cargo run --bin momentry -- server --port 3002 &",
}
if service not in restart_commands:
self.log(f"No restart command for {service}, skipping", "warning")
return True
# Start the service
success, output = self.run_command(restart_commands[service], timeout=60)
if not success:
self.log(f"Failed to restart {service}: {output}", "error")
return False
# Wait for service to start
time.sleep(10)
# Check if service is healthy
health = self.check_service_health()
if not health.get(service, False):
self.log(f"Service {service} not healthy after restart", "error")
return False
self.log(f"Service {service} restarted successfully")
return True
def test_ai_processor_recovery(self) -> bool:
"""Test AI processor recovery after service interruption"""
self.log("Testing AI processor recovery...")
# Test each processor's health check
processors = [
("asr", "scripts/asr_processor_contract_v2.py"),
("ocr", "scripts/ocr_processor_contract_v1.py"),
("yolo", "scripts/yolo_processor_contract_v1.py"),
("face", "scripts/face_processor_contract_v1.py"),
("pose", "scripts/pose_processor_contract_v1.py"),
]
all_healthy = True
for name, script in processors:
if not os.path.exists(script):
self.log(f"Processor script {script} not found", "warning")
continue
cmd = f"python3 {script} test_clip.mp4 /tmp/test_output.json --check-health"
success, output = self.run_command(cmd, timeout=30)
if success:
try:
result = json.loads(output)
if result.get("status") == "healthy":
self.log(f"{name}: healthy")
else:
self.log(f"{name}: unhealthy - {result}", "error")
all_healthy = False
except:
self.log(f"{name}: invalid JSON output", "error")
all_healthy = False
else:
self.log(f"{name}: health check failed - {output}", "error")
all_healthy = False
return all_healthy
def test_data_persistence(self) -> bool:
"""Test that data persists across service restarts"""
self.log("Testing data persistence...")
# Test Redis data persistence
test_key = "shutdown_test_key"
test_value = f"test_value_{int(time.time())}"
# Set a value in Redis
set_cmd = f"redis-cli set {test_key} {test_value}"
success, _ = self.run_command(set_cmd)
if not success:
self.log("Failed to set Redis test key", "error")
return False
# Restart Redis
if not self.test_graceful_shutdown("redis"):
return False
if not self.test_service_restart("redis"):
return False
# Check if value still exists
get_cmd = f"redis-cli get {test_key}"
success, output = self.run_command(get_cmd)
if not success or output != test_value:
self.log(
f"Redis data not persisted. Expected {test_value}, got {output}",
"error",
)
return False
self.log(" ✓ Redis data persistence: passed")
# Test PostgreSQL data persistence
test_table = "shutdown_test"
create_cmd = f'psql -U accusys -d momentry -c "CREATE TABLE IF NOT EXISTS {test_table} (id SERIAL, value TEXT);"'
insert_cmd = f"psql -U accusys -d momentry -c \"INSERT INTO {test_table} (value) VALUES ('{test_value}');\""
select_cmd = f"psql -U accusys -d momentry -c \"SELECT value FROM {test_table} WHERE value = '{test_value}';\""
success, _ = self.run_command(create_cmd)
if not success:
self.log("Failed to create PostgreSQL test table", "error")
return False
success, _ = self.run_command(insert_cmd)
if not success:
self.log("Failed to insert PostgreSQL test data", "error")
return False
# Restart PostgreSQL
if not self.test_graceful_shutdown("postgresql"):
return False
if not self.test_service_restart("postgresql"):
return False
# Check if data still exists
success, output = self.run_command(select_cmd)
if not success or test_value not in output:
self.log(
f"PostgreSQL data not persisted. Expected {test_value}, got {output}",
"error",
)
return False
self.log(" ✓ PostgreSQL data persistence: passed")
return True
def run_all_tests(self) -> bool:
"""Run all shutdown/reboot tests"""
self.log("=" * 60)
self.log("Starting Shutdown/Reboot Testing")
self.log("=" * 60)
tests = [
("Initial Health Check", self.check_service_health, {}),
("AI Processor Recovery", self.test_ai_processor_recovery, {}),
(
"Redis Graceful Shutdown",
self.test_graceful_shutdown,
{"service": "redis"},
),
("Redis Restart", self.test_service_restart, {"service": "redis"}),
(
"PostgreSQL Graceful Shutdown",
self.test_graceful_shutdown,
{"service": "postgresql"},
),
(
"PostgreSQL Restart",
self.test_service_restart,
{"service": "postgresql"},
),
("Data Persistence", self.test_data_persistence, {}),
("Final Health Check", self.check_service_health, {}),
]
all_passed = True
for test_name, test_func, kwargs in tests:
self.log(f"\nTest: {test_name}")
self.log("-" * 40)
try:
result = test_func(**kwargs)
if isinstance(result, dict):
# Health check returns dict
healthy_count = sum(1 for v in result.values() if v)
total_count = len(result)
passed = healthy_count == total_count
status = "PASS" if passed else "FAIL"
self.log(
f"Result: {status} ({healthy_count}/{total_count} services healthy)"
)
else:
# Other tests return bool
status = "PASS" if result else "FAIL"
self.log(f"Result: {status}")
self.test_results.append(
{
"test": test_name,
"status": status,
"timestamp": datetime.now().isoformat(),
}
)
if not result:
all_passed = False
except Exception as e:
self.log(f"Test failed with exception: {e}", "error")
self.test_results.append(
{
"test": test_name,
"status": "ERROR",
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
)
all_passed = False
# Generate report
self.generate_report(all_passed)
return all_passed
def generate_report(self, all_passed: bool):
"""Generate test report"""
self.log("\n" + "=" * 60)
self.log("SHUTDOWN/REBOOT TESTING REPORT")
self.log("=" * 60)
total_tests = len(self.test_results)
passed_tests = sum(1 for r in self.test_results if r["status"] == "PASS")
failed_tests = sum(1 for r in self.test_results if r["status"] == "FAIL")
error_tests = sum(1 for r in self.test_results if r["status"] == "ERROR")
self.log(f"Total Tests: {total_tests}")
self.log(f"Passed: {passed_tests}")
self.log(f"Failed: {failed_tests}")
self.log(f"Errors: {error_tests}")
if all_passed:
self.log(
"\n✅ ALL TESTS PASSED - System handles shutdown/reboot gracefully"
)
else:
self.log("\n❌ SOME TESTS FAILED - Review issues above")
# Save detailed report
report = {
"summary": {
"total_tests": total_tests,
"passed": passed_tests,
"failed": failed_tests,
"errors": error_tests,
"overall_status": "PASS" if all_passed else "FAIL",
"start_time": self.start_time.isoformat(),
"end_time": datetime.now().isoformat(),
"duration_seconds": (datetime.now() - self.start_time).total_seconds(),
},
"test_results": self.test_results,
"system_info": {
"python_version": sys.version,
"platform": sys.platform,
"processor_count": os.cpu_count(),
},
}
report_file = f"/tmp/shutdown_test_report_{int(time.time())}.json"
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
self.log(f"\nDetailed report saved to: {report_file}")
def main():
"""Main entry point"""
tester = ShutdownTester()
try:
success = tester.run_all_tests()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
tester.log("Testing interrupted by user", "warning")
sys.exit(130)
except Exception as e:
tester.log(f"Fatal error: {e}", "error")
sys.exit(1)
if __name__ == "__main__":
main()