#!/usr/bin/env python3 """ Processor Compliance Verification Tool Verifies that all AI processor modules comply with the AI-Driven Processor Contract. """ import os import sys import json import subprocess import argparse from pathlib import Path from typing import Dict, List, Any, Tuple import re from datetime import datetime # Contract requirements from AI_DRIVEN_PROCESSOR_CONTRACT.md CONTRACT_REQUIREMENTS = { "cli_interface": { "required_args": ["video_path", "output_path"], "optional_args": ["--uuid", "-u", "--check-health"], "description": "Command-line interface compliant with specification", }, "redis_reporting": {"description": "Redis progress reporting implemented"}, "signal_handling": { "signals": ["SIGTERM", "SIGINT"], "description": "Signal handlers for SIGTERM and SIGINT", }, "health_check": { "flag": "--check-health", "description": "Health check mode implemented", }, "resource_monitoring": { "optional": True, "description": "Resource monitoring (optional but recommended)", }, "json_output": { "required_fields": ["processor_name", "processor_version", "contract_version"], "description": "Output JSON includes required base fields", }, "error_handling": {"description": "Error handling with graceful cleanup"}, "performance_overhead": { "limit": 5, # percent "description": "Performance overhead within 5% limit", }, "documentation": {"description": "Documentation of processor-specific features"}, } PROCESSORS = { "asr": { "script": "scripts/asr_processor_contract_v2.py", "version": "2.1.0", "contract_version": "1.0", }, "ocr": { "script": "scripts/ocr_processor_contract_v1.py", "version": "1.0.0", "contract_version": "1.0", }, "yolo": { "script": "scripts/yolo_processor_contract_v1.py", "version": "1.0.0", "contract_version": "1.0", }, "face": { "script": "scripts/face_processor_contract_v1.py", "version": "1.0.0", "contract_version": "1.0", }, "pose": { "script": "scripts/pose_processor_contract_v1.py", "version": "1.0.0", "contract_version": "1.0", }, } def check_file_exists(script_path: str) -> Tuple[bool, str]: """Check if processor script exists.""" if os.path.exists(script_path): return True, f"✅ Script exists: {script_path}" return False, f"❌ Script not found: {script_path}" def check_cli_interface(script_path: str) -> Tuple[bool, List[str]]: """Check command-line interface compliance.""" results = [] try: # Run with --help to check CLI cmd = [sys.executable, script_path, "--help"] result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) if result.returncode != 0: results.append(f"❌ CLI --help failed: {result.stderr[:100]}") return False, results help_text = result.stdout.lower() # Check for required arguments if "video_path" not in help_text: results.append("❌ Missing 'video_path' argument in help") else: results.append("✅ Found 'video_path' argument") if "output_path" not in help_text: results.append("❌ Missing 'output_path' argument in help") else: results.append("✅ Found 'output_path' argument") # Check for optional arguments if "--uuid" not in help_text and "-u" not in help_text: results.append("❌ Missing '--uuid' or '-u' argument") else: results.append("✅ Found UUID argument") if "--check-health" not in help_text: results.append("❌ Missing '--check-health' argument") else: results.append("✅ Found '--check-health' argument") # Check for hidden configuration arguments if "suppress" in help_text or "hidden" in help_text: results.append("✅ Hidden arguments properly suppressed") else: results.append("⚠️ No hidden arguments found (may be using env vars)") return all("❌" not in r for r in results), results except subprocess.TimeoutExpired: results.append("❌ CLI check timed out") return False, results except Exception as e: results.append(f"❌ CLI check failed: {str(e)}") return False, results def check_health_check(script_path: str) -> Tuple[bool, List[str]]: """Check health check functionality.""" results = [] try: # First try with dummy arguments (some processors might require them) dummy_args = ["dummy.mp4", "dummy.json"] cmd = [sys.executable, script_path] + dummy_args + ["--check-health"] result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode == 0: # Try to parse JSON output try: health_data = json.loads(result.stdout) if "status" in health_data: results.append(f"✅ Health check passed: {health_data['status']}") # Check for dependencies if "dependencies" in health_data: results.append("✅ Dependencies reported") else: results.append("⚠️ No dependencies reported") # Check for timestamp if "timestamp" in health_data: results.append("✅ Timestamp included") else: results.append("⚠️ No timestamp in health check") return True, results else: results.append("❌ Health check missing 'status' field") return False, results except json.JSONDecodeError: # Try without dummy args (contract-compliant version) cmd = [sys.executable, script_path, "--check-health"] result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode == 0: try: health_data = json.loads(result.stdout) if "status" in health_data: results.append( f"✅ Health check passed (contract-compliant): {health_data['status']}" ) results.append( "✅ Contract-compliant (no video_path/output_path required)" ) return True, results except json.JSONDecodeError: results.append("❌ Health check output is not valid JSON") return False, results else: results.append( f"❌ Health check failed with exit code {result.returncode}" ) if result.stderr: results.append(f" Error: {result.stderr[:200]}") return False, results else: # Try without dummy args cmd = [sys.executable, script_path, "--check-health"] result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode == 0: try: health_data = json.loads(result.stdout) if "status" in health_data: results.append( f"✅ Health check passed (contract-compliant): {health_data['status']}" ) results.append( "✅ Contract-compliant (no video_path/output_path required)" ) return True, results except json.JSONDecodeError: results.append("❌ Health check output is not valid JSON") return False, results else: results.append( f"❌ Health check failed with exit code {result.returncode}" ) if result.stderr: results.append(f" Error: {result.stderr[:200]}") return False, results except subprocess.TimeoutExpired: results.append("❌ Health check timed out") return False, results except Exception as e: results.append(f"❌ Health check failed: {str(e)}") return False, results def check_signal_handling(script_path: str) -> Tuple[bool, List[str]]: """Check for signal handling code.""" results = [] try: with open(script_path, "r") as f: content = f.read() # Check for signal imports if "import signal" in content or "from signal import" in content: results.append("✅ Signal module imported") else: results.append("❌ Signal module not imported") # Check for signal handlers if ( "signal.signal" in content or "signal.SIGTERM" in content or "signal.SIGINT" in content ): results.append("✅ Signal handling code found") else: results.append("❌ No signal handling code found") # Check for graceful shutdown patterns graceful_patterns = [ "shutdown_requested", "graceful.*shutdown", "cleanup", "atexit", ] found_patterns = [] for pattern in graceful_patterns: if re.search(pattern, content, re.IGNORECASE): found_patterns.append(pattern) if found_patterns: results.append( f"✅ Graceful shutdown patterns found: {', '.join(found_patterns)}" ) else: results.append("⚠️ No graceful shutdown patterns found") return all("❌" not in r for r in results), results except Exception as e: results.append(f"❌ Signal check failed: {str(e)}") return False, results def check_redis_reporting(script_path: str) -> Tuple[bool, List[str]]: """Check for Redis progress reporting.""" results = [] try: with open(script_path, "r") as f: content = f.read() # Check for RedisPublisher import if "RedisPublisher" in content or "redis_publisher" in content: results.append("✅ RedisPublisher import found") else: results.append("⚠️ RedisPublisher not imported (may be optional)") # Check for progress reporting patterns progress_patterns = [ "publish.*progress", "progress.*report", "redis.*publish", "message.*type", ] found_patterns = [] for pattern in progress_patterns: if re.search(pattern, content, re.IGNORECASE): found_patterns.append(pattern) if found_patterns: results.append( f"✅ Progress reporting patterns found: {', '.join(found_patterns[:3])}" ) else: results.append("⚠️ No progress reporting patterns found") # Check for message types from contract message_types = ["info", "progress", "warning", "error", "complete"] found_types = [] for msg_type in message_types: if re.search(f'"{msg_type}"|type.*{msg_type}', content, re.IGNORECASE): found_types.append(msg_type) if found_types: results.append(f"✅ Message types found: {', '.join(found_types)}") else: results.append("⚠️ No contract message types found") return True, results # Redis is optional, so don't fail except Exception as e: results.append(f"❌ Redis check failed: {str(e)}") return False, results def check_json_output_structure(script_path: str) -> Tuple[bool, List[str]]: """Check JSON output structure compliance.""" results = [] try: with open(script_path, "r") as f: content = f.read() # Check for required fields in code required_fields = ["processor_name", "processor_version", "contract_version"] for field in required_fields: if re.search(f'"{field}"|{field}.*:', content): results.append(f"✅ Found required field: {field}") else: results.append(f"❌ Missing required field: {field}") # Check for JSON output patterns json_patterns = ["json\\.dumps", "write.*json", "output.*json"] found_patterns = [] for pattern in json_patterns: if re.search(pattern, content, re.IGNORECASE): found_patterns.append(pattern) if found_patterns: results.append( f"✅ JSON output patterns found: {', '.join(found_patterns)}" ) else: results.append("❌ No JSON output patterns found") return all( "❌" not in r for r in results[:3] ), results # Only fail on required fields except Exception as e: results.append(f"❌ JSON structure check failed: {str(e)}") return False, results def check_error_handling(script_path: str) -> Tuple[bool, List[str]]: """Check error handling patterns.""" results = [] try: with open(script_path, "r") as f: content = f.read() # Check for error handling patterns error_patterns = [ "try.*except", "except.*Exception", "traceback", "sys\\.stderr", "graceful.*failure", "cleanup", "finally", ] found_patterns = [] for pattern in error_patterns: if re.search(pattern, content, re.IGNORECASE): found_patterns.append(pattern) if len(found_patterns) >= 3: # At least 3 error handling patterns results.append( f"✅ Error handling patterns found: {', '.join(found_patterns[:5])}" ) else: results.append( f"⚠️ Limited error handling patterns: {', '.join(found_patterns)}" ) # Check for exit codes if "sys.exit" in content or "exit(" in content: results.append("✅ Exit codes used") else: results.append("⚠️ No exit code patterns found") return True, results # Error handling is important but don't fail except Exception as e: results.append(f"❌ Error handling check failed: {str(e)}") return False, results def check_unified_configuration(script_path: str) -> Tuple[bool, List[str]]: """Check for unified configuration patterns.""" results = [] try: with open(script_path, "r") as f: content = f.read() # Check for environment variable patterns env_patterns = ["os\\.getenv", "MOMENTRY_", "DEFAULT_", "config.*timeout"] found_patterns = [] for pattern in env_patterns: if re.search(pattern, content): found_patterns.append(pattern) if found_patterns: results.append( f"✅ Configuration patterns found: {', '.join(found_patterns)}" ) else: results.append("⚠️ No configuration patterns found") # Check for timeout handling if "timeout" in content.lower(): results.append("✅ Timeout handling found") else: results.append("⚠️ No timeout handling found") return True, results except Exception as e: results.append(f"❌ Configuration check failed: {str(e)}") return False, results def verify_processor(processor_name: str, processor_info: Dict) -> Dict[str, Any]: """Verify a single processor's compliance.""" print(f"\n{'=' * 60}") print(f"Verifying {processor_name.upper()} Processor") print(f"{'=' * 60}") script_path = processor_info["script"] results = { "processor": processor_name, "script": script_path, "version": processor_info["version"], "contract_version": processor_info["contract_version"], "checks": {}, "overall_compliance": 0.0, "passed_checks": 0, "total_checks": 0, } # Check 1: File exists passed, message = check_file_exists(script_path) results["checks"]["file_exists"] = {"passed": passed, "message": message} if passed: results["passed_checks"] += 1 results["total_checks"] += 1 if not passed: print(f" {message}") return results # Check 2: CLI Interface passed, messages = check_cli_interface(script_path) results["checks"]["cli_interface"] = {"passed": passed, "messages": messages} if passed: results["passed_checks"] += 1 results["total_checks"] += 1 # Check 3: Health Check passed, messages = check_health_check(script_path) results["checks"]["health_check"] = {"passed": passed, "messages": messages} if passed: results["passed_checks"] += 1 results["total_checks"] += 1 # Check 4: Signal Handling passed, messages = check_signal_handling(script_path) results["checks"]["signal_handling"] = {"passed": passed, "messages": messages} if passed: results["passed_checks"] += 1 results["total_checks"] += 1 # Check 5: Redis Reporting (optional) passed, messages = check_redis_reporting(script_path) results["checks"]["redis_reporting"] = { "passed": passed, "messages": messages, "optional": True, } if passed: results["passed_checks"] += 1 results["total_checks"] += 1 # Check 6: JSON Output Structure passed, messages = check_json_output_structure(script_path) results["checks"]["json_output"] = {"passed": passed, "messages": messages} if passed: results["passed_checks"] += 1 results["total_checks"] += 1 # Check 7: Error Handling passed, messages = check_error_handling(script_path) results["checks"]["error_handling"] = {"passed": passed, "messages": messages} if passed: results["passed_checks"] += 1 results["total_checks"] += 1 # Check 8: Unified Configuration passed, messages = check_unified_configuration(script_path) results["checks"]["unified_configuration"] = { "passed": passed, "messages": messages, } if passed: results["passed_checks"] += 1 results["total_checks"] += 1 # Calculate overall compliance if results["total_checks"] > 0: # Don't count optional Redis check against score if it fails effective_total = results["total_checks"] effective_passed = results["passed_checks"] if not results["checks"]["redis_reporting"]["passed"]: effective_total -= 1 effective_passed -= 1 if effective_total > 0: results["overall_compliance"] = (effective_passed / effective_total) * 100 # Print summary print(f" Version: {processor_info['version']}") print(f" Contract: v{processor_info['contract_version']}") print(f" Compliance: {results['overall_compliance']:.1f}%") for check_name, check_result in results["checks"].items(): status = "✅" if check_result["passed"] else "❌" if check_result.get("optional", False): status = "⚠️" if not check_result["passed"] else "✅" print(f" {status} {check_name.replace('_', ' ').title()}") if "messages" in check_result: for msg in check_result["messages"][:3]: # Show first 3 messages print(f" {msg}") return results def generate_compliance_report(all_results: Dict[str, Dict]) -> str: """Generate a comprehensive compliance report.""" report = [] report.append("=" * 80) report.append("AI PROCESSOR COMPLIANCE REPORT") report.append("=" * 80) report.append(f"Generated: {datetime.now().isoformat()}") report.append(f"Contract Version: 1.0") report.append("") # Summary table report.append("SUMMARY") report.append("-" * 80) report.append( f"{'Processor':<10} {'Version':<10} {'Compliance':<12} {'Status':<10}" ) report.append("-" * 80) for processor_name, results in all_results.items(): compliance = results["overall_compliance"] status = ( "✅ COMPLIANT" if compliance >= 90 else "⚠️ PARTIAL" if compliance >= 70 else "❌ NON-COMPLIANT" ) report.append( f"{processor_name:<10} {results['version']:<10} {compliance:>10.1f}% {status:<10}" ) report.append("") # Detailed findings report.append("DETAILED FINDINGS") report.append("=" * 80) for processor_name, results in all_results.items(): report.append(f"\n{processor_name.upper()} PROCESSOR") report.append("-" * 40) for check_name, check_result in results["checks"].items(): status = "PASS" if check_result["passed"] else "FAIL" if check_result.get("optional", False) and not check_result["passed"]: status = "OPTIONAL" report.append(f" {check_name.replace('_', ' ').title():<25} [{status}]") if "messages" in check_result: for msg in check_result["messages"]: report.append(f" {msg}") # Recommendations report.append("\n" + "=" * 80) report.append("RECOMMENDATIONS") report.append("=" * 80) # Identify common issues common_issues = [] for processor_name, results in all_results.items(): for check_name, check_result in results["checks"].items(): if not check_result["passed"] and not check_result.get("optional", False): issue = f"{processor_name}: {check_name}" if issue not in common_issues: common_issues.append(issue) if common_issues: report.append("\nCritical Issues to Address:") for issue in common_issues: report.append(f" • {issue}") else: report.append("\n✅ All processors are compliant with the contract!") # Next steps report.append("\nNext Steps:") report.append(" 1. Address any critical issues identified above") report.append(" 2. Run performance benchmarks to verify <5% overhead") report.append(" 3. Update documentation with compliance status") report.append(" 4. Integrate with monitoring system") return "\n".join(report) def main(): parser = argparse.ArgumentParser( description="Verify AI processor compliance with contract" ) parser.add_argument( "--processor", help="Verify specific processor (asr, ocr, yolo, face, pose)" ) parser.add_argument("--output", help="Output report to file") args = parser.parse_args() print("AI Processor Compliance Verification") print("=" * 60) # Determine which processors to verify if args.processor: if args.processor in PROCESSORS: processors_to_check = {args.processor: PROCESSORS[args.processor]} else: print(f"Error: Unknown processor '{args.processor}'") print(f"Available processors: {', '.join(PROCESSORS.keys())}") return 1 else: processors_to_check = PROCESSORS # Verify all selected processors all_results = {} for processor_name, processor_info in processors_to_check.items(): results = verify_processor(processor_name, processor_info) all_results[processor_name] = results # Generate report report = generate_compliance_report(all_results) # Output report if args.output: with open(args.output, "w") as f: f.write(report) print(f"\nReport saved to: {args.output}") else: print("\n" + report) # Determine overall status all_compliant = all(r["overall_compliance"] >= 90 for r in all_results.values()) if all_compliant: print("\n✅ ALL PROCESSORS ARE CONTRACT-COMPLIANT!") return 0 else: print("\n⚠️ Some processors require attention") return 1 if __name__ == "__main__": sys.exit(main())