Files
momentry_core/scripts/face_registration.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

373 lines
12 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Face Registration Script
Register new faces to the face database
"""
import sys
import json
import argparse
import os
import numpy as np
import time
from typing import Dict, Any, Optional
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
class FaceRegistration:
def __init__(self):
self.face_model = None
self.face_database = {}
self.database_path = None
def load_models(self, use_mps: bool = False):
"""Load InsightFace models with MPS support"""
try:
import insightface
from insightface.app import FaceAnalysis
# Determine execution providers based on configuration
providers = ["CPUExecutionProvider"]
if use_mps:
try:
# Try to import MPS provider
import onnxruntime as ort
available_providers = ort.get_available_providers()
if "CoreMLExecutionProvider" in available_providers:
print(
"[INFO] Using CoreMLExecutionProvider for MPS acceleration"
)
providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"]
elif "CUDAExecutionProvider" in available_providers:
print("[INFO] Using CUDAExecutionProvider")
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
else:
print("[INFO] MPS/CUDA not available, using CPU")
providers = ["CPUExecutionProvider"]
except ImportError:
print("[WARNING] ONNX Runtime not available, using CPU")
providers = ["CPUExecutionProvider"]
print(f"[INFO] Using execution providers: {providers}")
# Initialize face analysis app
self.face_model = FaceAnalysis(
name="buffalo_l", # or 'buffalo_s' for smaller model
providers=providers,
)
# For MPS/CoreML, we need to adjust context
ctx_id = -1 # Default for CPU
if use_mps and "CoreMLExecutionProvider" in providers:
ctx_id = 0 # CoreML uses device 0
self.face_model.prepare(ctx_id=ctx_id, det_size=(640, 640))
print("[INFO] InsightFace models loaded successfully")
return True
except ImportError as e:
print(f"[ERROR] Failed to import InsightFace: {e}")
print("[INFO] Install with: pip install insightface")
return False
except Exception as e:
print(f"[ERROR] Failed to load models: {e}")
return False
def load_database(self, database_path: str):
"""Load existing face database"""
self.database_path = database_path
if os.path.exists(database_path):
try:
with open(database_path, "r") as f:
self.face_database = json.load(f)
print(f"[INFO] Loaded {len(self.face_database)} faces from database")
except Exception as e:
print(f"[WARNING] Failed to load database: {e}")
self.face_database = {}
else:
print("[INFO] Creating new face database")
self.face_database = {}
def save_database(self):
"""Save face database to file"""
if not self.database_path:
print("[ERROR] No database path specified")
return False
try:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.database_path), exist_ok=True)
with open(self.database_path, "w") as f:
json.dump(self.face_database, f, indent=2)
print(f"[INFO] Saved {len(self.face_database)} faces to database")
return True
except Exception as e:
print(f"[ERROR] Failed to save database: {e}")
return False
def register_face(
self, image_path: str, name: str, metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Register a new face from image"""
# Check if image exists
if not os.path.exists(image_path):
return {
"success": False,
"message": f"Image not found: {image_path}",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Load models if not already loaded
if self.face_model is None:
if not self.load_models():
return {
"success": False,
"message": "Failed to load face models",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Load image
try:
import cv2
image = cv2.imread(image_path)
if image is None:
return {
"success": False,
"message": f"Failed to load image: {image_path}",
"face_id": None,
"embedding": None,
"attributes": None,
}
except ImportError:
return {
"success": False,
"message": "OpenCV not installed",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Detect faces
try:
faces = self.face_model.get(image)
if len(faces) == 0:
return {
"success": False,
"message": "No faces detected in image",
"face_id": None,
"embedding": None,
"attributes": None,
}
if len(faces) > 1:
print(f"[WARNING] Multiple faces detected, using the first one")
# Use the first face
face = faces[0]
# Get embedding
embedding = face.embedding.tolist() if hasattr(face, "embedding") else None
if embedding is None:
return {
"success": False,
"message": "Failed to extract face embedding",
"face_id": None,
"embedding": None,
"attributes": None,
}
# Get attributes
attributes = {}
if hasattr(face, "age") and face.age is not None:
attributes["age"] = int(face.age)
if hasattr(face, "gender") and face.gender is not None:
attributes["gender"] = "female" if face.gender == 0 else "male"
# Get pose if available
pose = None
if hasattr(face, "pose") and face.pose is not None:
pose = {
"yaw": float(face.pose[0]),
"pitch": float(face.pose[1]),
"roll": float(face.pose[2]),
}
# Generate face ID
import uuid
face_id = str(uuid.uuid4())
# Create face record
face_record = {
"face_id": face_id,
"name": name,
"embedding": embedding,
"attributes": {
"age": attributes.get("age"),
"gender": attributes.get("gender"),
"emotion": None,
"glasses": None,
"mask": None,
"pose": pose,
}
if any([attributes.get("age"), attributes.get("gender"), pose])
else None,
"metadata": metadata or {},
"registration_time": time.time(),
"image_path": image_path,
}
# Add to database
self.face_database[face_id] = face_record
# Save database
if not self.save_database():
return {
"success": False,
"message": "Failed to save face to database",
"face_id": face_id,
"embedding": embedding,
"attributes": face_record["attributes"],
}
return {
"success": True,
"message": f"Face registered successfully as '{name}'",
"face_id": face_id,
"embedding": embedding,
"attributes": face_record["attributes"],
}
except Exception as e:
return {
"success": False,
"message": f"Face registration failed: {str(e)}",
"face_id": None,
"embedding": None,
"attributes": None,
}
def list_faces(self) -> Dict[str, Any]:
"""List all registered faces"""
faces = []
for face_id, face_data in self.face_database.items():
faces.append(
{
"face_id": face_id,
"name": face_data.get("name", "Unknown"),
"registration_time": face_data.get("registration_time"),
"metadata": face_data.get("metadata", {}),
}
)
return {
"success": True,
"message": f"Found {len(faces)} registered faces",
"faces": faces,
}
def delete_face(self, face_id: str) -> Dict[str, Any]:
"""Delete a face from database"""
if face_id not in self.face_database:
return {"success": False, "message": f"Face ID not found: {face_id}"}
# Remove from database
deleted_face = self.face_database.pop(face_id)
# Save database
if not self.save_database():
# Try to restore
self.face_database[face_id] = deleted_face
return {
"success": False,
"message": "Failed to save database after deletion",
}
return {
"success": True,
"message": f"Face '{deleted_face.get('name', 'Unknown')}' deleted successfully",
}
def main():
parser = argparse.ArgumentParser(description="Face Registration Tool")
parser.add_argument("image_path", help="Path to face image")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("name", help="Name for the registered face")
parser.add_argument(
"--metadata", "-m", help="Path to metadata JSON file", default=""
)
parser.add_argument(
"--database", "-d", help="Path to face database", default="face_database.json"
)
parser.add_argument(
"--list", "-l", help="List registered faces", action="store_true"
)
parser.add_argument("--delete", help="Delete face by ID", default="")
args = parser.parse_args()
# Initialize registration
registration = FaceRegistration()
# Load database
registration.load_database(args.database)
# Handle list command
if args.list:
result = registration.list_faces()
with open(args.output_path, "w") as f:
json.dump(result, f, indent=2)
print(result["message"])
return
# Handle delete command
if args.delete:
result = registration.delete_face(args.delete)
with open(args.output_path, "w") as f:
json.dump(result, f, indent=2)
print(result["message"])
return
# Load metadata if provided
metadata = {}
if args.metadata and os.path.exists(args.metadata):
try:
with open(args.metadata, "r") as f:
metadata = json.load(f)
except Exception as e:
print(f"[WARNING] Failed to load metadata: {e}")
# Register face
result = registration.register_face(
image_path=args.image_path, name=args.name, metadata=metadata
)
# Save result
with open(args.output_path, "w") as f:
json.dump(result, f, indent=2)
print(result["message"])
if __name__ == "__main__":
main()