- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
649 lines
22 KiB
Python
649 lines
22 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Face Recognition Processor
|
|
Integrates InsightFace for face detection, recognition, and tracking
|
|
Supports: face detection, face recognition, face tracking, face clustering
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import os
|
|
import time
|
|
import numpy as np
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
import uuid
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
|
|
class FaceRecognitionProcessor:
|
|
def __init__(
|
|
self,
|
|
enable_recognition: bool = True,
|
|
enable_tracking: bool = True,
|
|
enable_clustering: bool = True,
|
|
):
|
|
self.enable_recognition = enable_recognition
|
|
self.enable_tracking = enable_tracking
|
|
self.enable_clustering = enable_clustering
|
|
|
|
self.face_model = None
|
|
self.face_database = {}
|
|
self.face_tracker = None
|
|
self.face_clusters = {}
|
|
|
|
self.embedding_dim = 512 # InsightFace default embedding dimension
|
|
|
|
def load_models(self, use_mps: bool = False):
|
|
"""Load InsightFace models with MPS support"""
|
|
try:
|
|
import insightface
|
|
from insightface.app import FaceAnalysis
|
|
|
|
# Determine execution providers based on configuration
|
|
providers = ["CPUExecutionProvider"]
|
|
|
|
if use_mps:
|
|
try:
|
|
# Try to import MPS provider
|
|
import onnxruntime as ort
|
|
|
|
available_providers = ort.get_available_providers()
|
|
|
|
if "CoreMLExecutionProvider" in available_providers:
|
|
print(
|
|
"[INFO] Using CoreMLExecutionProvider for MPS acceleration"
|
|
)
|
|
providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"]
|
|
elif "CUDAExecutionProvider" in available_providers:
|
|
print("[INFO] Using CUDAExecutionProvider")
|
|
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
|
|
else:
|
|
print("[INFO] MPS/CUDA not available, using CPU")
|
|
providers = ["CPUExecutionProvider"]
|
|
|
|
except ImportError:
|
|
print("[WARNING] ONNX Runtime not available, using CPU")
|
|
providers = ["CPUExecutionProvider"]
|
|
|
|
print(f"[INFO] Using execution providers: {providers}")
|
|
|
|
# Initialize face analysis app
|
|
self.face_model = FaceAnalysis(
|
|
name="buffalo_l", # or 'buffalo_s' for smaller model
|
|
providers=providers,
|
|
)
|
|
|
|
# For MPS/CoreML, we need to adjust context
|
|
ctx_id = -1 # Default for CPU
|
|
if use_mps and "CoreMLExecutionProvider" in providers:
|
|
ctx_id = 0 # CoreML uses device 0
|
|
|
|
self.face_model.prepare(ctx_id=ctx_id, det_size=(640, 640))
|
|
|
|
print("[INFO] InsightFace models loaded successfully")
|
|
return True
|
|
|
|
except ImportError as e:
|
|
print(f"[ERROR] Failed to import InsightFace: {e}")
|
|
print("[INFO] Install with: pip install insightface")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to load models: {e}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to load models: {e}")
|
|
return False
|
|
|
|
def load_face_database(self, database_path: Optional[str] = None):
|
|
"""Load face database from file"""
|
|
if database_path and os.path.exists(database_path):
|
|
try:
|
|
with open(database_path, "r") as f:
|
|
self.face_database = json.load(f)
|
|
print(f"[INFO] Loaded {len(self.face_database)} faces from database")
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed to load face database: {e}")
|
|
self.face_database = {}
|
|
else:
|
|
print("[INFO] No face database provided, starting with empty database")
|
|
self.face_database = {}
|
|
|
|
def detect_faces(self, image: np.ndarray) -> List[Dict[str, Any]]:
|
|
"""Detect faces in image using InsightFace"""
|
|
if self.face_model is None:
|
|
return []
|
|
|
|
try:
|
|
faces = self.face_model.get(image)
|
|
results = []
|
|
|
|
for face in faces:
|
|
# Get bounding box
|
|
bbox = face.bbox.astype(int)
|
|
x, y, x2, y2 = bbox
|
|
width = x2 - x
|
|
height = y2 - y
|
|
|
|
# Get embedding
|
|
embedding = (
|
|
face.embedding.tolist() if hasattr(face, "embedding") else None
|
|
)
|
|
|
|
# Get attributes
|
|
attributes = {}
|
|
if hasattr(face, "age") and face.age is not None:
|
|
attributes["age"] = int(face.age)
|
|
if hasattr(face, "gender") and face.gender is not None:
|
|
attributes["gender"] = "female" if face.gender == 0 else "male"
|
|
|
|
# Get pose if available
|
|
pose = None
|
|
if hasattr(face, "pose") and face.pose is not None:
|
|
pose = {
|
|
"yaw": float(face.pose[0]),
|
|
"pitch": float(face.pose[1]),
|
|
"roll": float(face.pose[2]),
|
|
}
|
|
|
|
# Create face detection result
|
|
face_result = {
|
|
"x": int(x),
|
|
"y": int(y),
|
|
"width": int(width),
|
|
"height": int(height),
|
|
"confidence": float(face.det_score)
|
|
if hasattr(face, "det_score")
|
|
else 0.8,
|
|
"embedding": embedding,
|
|
"attributes": {
|
|
"age": attributes.get("age"),
|
|
"gender": attributes.get("gender"),
|
|
"emotion": None, # InsightFace doesn't provide emotion
|
|
"glasses": None,
|
|
"mask": None,
|
|
"pose": pose,
|
|
}
|
|
if any([attributes.get("age"), attributes.get("gender"), pose])
|
|
else None,
|
|
"identity": None, # Will be filled by recognition step
|
|
}
|
|
|
|
results.append(face_result)
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Face detection failed: {e}")
|
|
return []
|
|
|
|
def recognize_faces(
|
|
self, faces: List[Dict[str, Any]], threshold: float = 0.6
|
|
) -> List[Dict[str, Any]]:
|
|
"""Recognize faces by comparing with database"""
|
|
if not self.enable_recognition or not faces:
|
|
return faces
|
|
|
|
recognized_faces = []
|
|
|
|
for face in faces:
|
|
if face.get("embedding") is None:
|
|
face["identity"] = None
|
|
recognized_faces.append(face)
|
|
continue
|
|
|
|
embedding = np.array(face["embedding"])
|
|
best_match = None
|
|
best_similarity = 0.0
|
|
|
|
# Compare with all faces in database
|
|
for face_id, db_face in self.face_database.items():
|
|
if "embedding" not in db_face:
|
|
continue
|
|
|
|
db_embedding = np.array(db_face["embedding"])
|
|
similarity = self.cosine_similarity(embedding, db_embedding)
|
|
|
|
if similarity > best_similarity and similarity >= threshold:
|
|
best_similarity = similarity
|
|
best_match = {
|
|
"name": db_face.get("name", "Unknown"),
|
|
"confidence": float(similarity),
|
|
"database_id": face_id,
|
|
"metadata": db_face.get("metadata", {}),
|
|
}
|
|
|
|
if best_match:
|
|
face["identity"] = best_match
|
|
else:
|
|
face["identity"] = None
|
|
|
|
recognized_faces.append(face)
|
|
|
|
return recognized_faces
|
|
|
|
def track_faces(self, frames: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Track faces across frames using simple IoU tracking"""
|
|
if not self.enable_tracking or not frames:
|
|
return frames
|
|
|
|
tracked_frames = []
|
|
face_tracks = {} # face_id -> track info
|
|
next_face_id = 1
|
|
|
|
for frame_idx, frame in enumerate(frames):
|
|
tracked_faces = []
|
|
|
|
for face in frame.get("faces", []):
|
|
# Calculate IoU with existing tracks
|
|
best_track_id = None
|
|
best_iou = 0.3 # IoU threshold
|
|
|
|
for track_id, track in face_tracks.items():
|
|
if frame_idx - track["last_frame"] > 10: # Skip old tracks
|
|
continue
|
|
|
|
iou = self.calculate_iou(face, track["last_bbox"])
|
|
if iou > best_iou:
|
|
best_iou = iou
|
|
best_track_id = track_id
|
|
|
|
if best_track_id is not None:
|
|
# Update existing track
|
|
face["face_id"] = f"face_{best_track_id}"
|
|
face_tracks[best_track_id]["last_bbox"] = (
|
|
face["x"],
|
|
face["y"],
|
|
face["width"],
|
|
face["height"],
|
|
)
|
|
face_tracks[best_track_id]["last_frame"] = frame_idx
|
|
else:
|
|
# Create new track
|
|
face["face_id"] = f"face_{next_face_id}"
|
|
face_tracks[next_face_id] = {
|
|
"last_bbox": (
|
|
face["x"],
|
|
face["y"],
|
|
face["width"],
|
|
face["height"],
|
|
),
|
|
"last_frame": frame_idx,
|
|
}
|
|
next_face_id += 1
|
|
|
|
tracked_faces.append(face)
|
|
|
|
tracked_frame = frame.copy()
|
|
tracked_frame["faces"] = tracked_faces
|
|
tracked_frames.append(tracked_frame)
|
|
|
|
return tracked_frames
|
|
|
|
def cluster_faces(
|
|
self, frames: List[Dict[str, Any]]
|
|
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
|
"""Cluster faces using DBSCAN algorithm"""
|
|
if not self.enable_clustering:
|
|
return frames, {}
|
|
|
|
try:
|
|
from sklearn.cluster import DBSCAN
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
# Collect all face embeddings
|
|
embeddings = []
|
|
face_info = []
|
|
|
|
for frame in frames:
|
|
for face in frame.get("faces", []):
|
|
if face.get("embedding") and face.get("face_id"):
|
|
embeddings.append(face["embedding"])
|
|
face_info.append(
|
|
{
|
|
"face_id": face["face_id"],
|
|
"frame_idx": frame["frame"],
|
|
"bbox": (
|
|
face["x"],
|
|
face["y"],
|
|
face["width"],
|
|
face["height"],
|
|
),
|
|
}
|
|
)
|
|
|
|
if len(embeddings) < 2:
|
|
return frames, {}
|
|
|
|
# Normalize embeddings
|
|
scaler = StandardScaler()
|
|
embeddings_scaled = scaler.fit_transform(embeddings)
|
|
|
|
# Apply DBSCAN clustering
|
|
dbscan = DBSCAN(eps=0.5, min_samples=2, metric="euclidean")
|
|
clusters = dbscan.fit_predict(embeddings_scaled)
|
|
|
|
# Create cluster information
|
|
cluster_info = {}
|
|
for idx, cluster_id in enumerate(clusters):
|
|
if cluster_id == -1: # Noise
|
|
continue
|
|
|
|
cluster_key = f"cluster_{cluster_id}"
|
|
if cluster_key not in cluster_info:
|
|
cluster_info[cluster_key] = {
|
|
"face_ids": [],
|
|
"embeddings": [],
|
|
"size": 0,
|
|
}
|
|
|
|
cluster_info[cluster_key]["face_ids"].append(face_info[idx]["face_id"])
|
|
cluster_info[cluster_key]["embeddings"].append(embeddings[idx])
|
|
cluster_info[cluster_key]["size"] += 1
|
|
|
|
# Calculate centroids
|
|
for cluster_key, info in cluster_info.items():
|
|
if info["embeddings"]:
|
|
centroid = np.mean(info["embeddings"], axis=0).tolist()
|
|
info["centroid"] = centroid
|
|
|
|
# Find representative face (closest to centroid)
|
|
distances = [
|
|
np.linalg.norm(np.array(emb) - np.array(centroid))
|
|
for emb in info["embeddings"]
|
|
]
|
|
rep_idx = np.argmin(distances)
|
|
info["representative_face_id"] = info["face_ids"][rep_idx]
|
|
|
|
return frames, cluster_info
|
|
|
|
except ImportError:
|
|
print("[WARNING] scikit-learn not installed, skipping clustering")
|
|
return frames, {}
|
|
except Exception as e:
|
|
print(f"[ERROR] Clustering failed: {e}")
|
|
return frames, {}
|
|
|
|
def process_video(
|
|
self, video_path: str, output_path: str, uuid: str = "", use_mps: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Process video for face recognition with MPS support"""
|
|
publisher = RedisPublisher(uuid) if uuid else None
|
|
if publisher:
|
|
publisher.info("face_recognition", "FACE_RECOGNITION_START")
|
|
|
|
# Check if OpenCV is available
|
|
try:
|
|
import cv2
|
|
except ImportError:
|
|
if publisher:
|
|
publisher.error("face_recognition", "opencv-python not installed")
|
|
return self.create_empty_result()
|
|
|
|
# Load InsightFace models with MPS support
|
|
if publisher:
|
|
publisher.info("face_recognition", "LOADING_MODELS")
|
|
|
|
if not self.load_models(use_mps=use_mps):
|
|
if publisher:
|
|
publisher.error("face_recognition", "Failed to load InsightFace models")
|
|
return self.create_empty_result()
|
|
|
|
if publisher:
|
|
publisher.info("face_recognition", "MODELS_LOADED")
|
|
|
|
# Get video info
|
|
cap = cv2.VideoCapture(video_path)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
cap.release()
|
|
|
|
if publisher:
|
|
publisher.info("face_recognition", f"fps={fps}, frames={total_frames}")
|
|
publisher.progress("face_recognition", 0, total_frames, "Starting")
|
|
|
|
# Process every N frames to speed up
|
|
sample_interval = 30 # Process every 30 frames
|
|
frames = []
|
|
frame_count = 0
|
|
processed = 0
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
frame_count += 1
|
|
|
|
# Sample frames
|
|
if frame_count % sample_interval != 0:
|
|
continue
|
|
|
|
processed += 1
|
|
timestamp = (frame_count - 1) / fps if fps > 0 else 0
|
|
|
|
# Detect faces
|
|
faces = self.detect_faces(frame)
|
|
|
|
# Recognize faces if enabled
|
|
if self.enable_recognition:
|
|
faces = self.recognize_faces(faces)
|
|
|
|
# Create frame result
|
|
frame_result = {
|
|
"frame": frame_count - 1,
|
|
"timestamp": round(timestamp, 3),
|
|
"faces": faces,
|
|
}
|
|
|
|
frames.append(frame_result)
|
|
|
|
if publisher:
|
|
publisher.progress(
|
|
"face_recognition",
|
|
processed,
|
|
total_frames // sample_interval,
|
|
f"Frame {frame_count}",
|
|
)
|
|
|
|
cap.release()
|
|
|
|
# Track faces if enabled
|
|
if self.enable_tracking:
|
|
frames = self.track_faces(frames)
|
|
|
|
# Cluster faces if enabled
|
|
cluster_info = {}
|
|
if self.enable_clustering:
|
|
frames, cluster_info = self.cluster_faces(frames)
|
|
|
|
# Extract recognized faces information
|
|
recognized_faces = self.extract_recognized_faces(frames)
|
|
|
|
# Prepare final result
|
|
result = {
|
|
"frame_count": total_frames,
|
|
"fps": fps,
|
|
"frames": frames,
|
|
"recognized_faces": recognized_faces,
|
|
"face_clusters": self.format_clusters(cluster_info),
|
|
}
|
|
|
|
if publisher:
|
|
publisher.complete(
|
|
"face_recognition",
|
|
f"{len(frames)} frames, {len(recognized_faces)} recognized faces",
|
|
)
|
|
|
|
# Save result
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
return result
|
|
|
|
def extract_recognized_faces(
|
|
self, frames: List[Dict[str, Any]]
|
|
) -> List[Dict[str, Any]]:
|
|
"""Extract unique recognized faces from frames"""
|
|
face_info = {}
|
|
|
|
for frame in frames:
|
|
for face in frame.get("faces", []):
|
|
face_id = face.get("face_id")
|
|
if not face_id:
|
|
continue
|
|
|
|
if face_id not in face_info:
|
|
face_info[face_id] = {
|
|
"face_id": face_id,
|
|
"embedding": face.get("embedding"),
|
|
"first_seen": frame["timestamp"],
|
|
"last_seen": frame["timestamp"],
|
|
"total_appearances": 1,
|
|
"attributes": face.get("attributes"),
|
|
"identities": [],
|
|
"cluster_id": None,
|
|
}
|
|
else:
|
|
face_info[face_id]["last_seen"] = frame["timestamp"]
|
|
face_info[face_id]["total_appearances"] += 1
|
|
|
|
# Add identity if recognized
|
|
if face.get("identity"):
|
|
identity = face["identity"]
|
|
# Check if this identity is already recorded
|
|
existing = False
|
|
for existing_id in face_info[face_id]["identities"]:
|
|
if existing_id.get("database_id") == identity.get(
|
|
"database_id"
|
|
):
|
|
existing = True
|
|
break
|
|
|
|
if not existing:
|
|
face_info[face_id]["identities"].append(identity)
|
|
|
|
return list(face_info.values())
|
|
|
|
def format_clusters(self, cluster_info: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Format cluster information for output"""
|
|
clusters = []
|
|
|
|
for cluster_id, info in cluster_info.items():
|
|
cluster = {
|
|
"cluster_id": cluster_id,
|
|
"face_ids": info.get("face_ids", []),
|
|
"centroid": info.get("centroid", []),
|
|
"size": info.get("size", 0),
|
|
"representative_face_id": info.get("representative_face_id"),
|
|
"metadata": {},
|
|
}
|
|
clusters.append(cluster)
|
|
|
|
return clusters
|
|
|
|
def create_empty_result(self) -> Dict[str, Any]:
|
|
"""Create empty result structure"""
|
|
return {
|
|
"frame_count": 0,
|
|
"fps": 0.0,
|
|
"frames": [],
|
|
"recognized_faces": [],
|
|
"face_clusters": [],
|
|
}
|
|
|
|
@staticmethod
|
|
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
|
"""Calculate cosine similarity between two vectors"""
|
|
dot_product = np.dot(a, b)
|
|
norm_a = np.linalg.norm(a)
|
|
norm_b = np.linalg.norm(b)
|
|
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 0.0
|
|
|
|
return dot_product / (norm_a * norm_b)
|
|
|
|
@staticmethod
|
|
def calculate_iou(face1: Dict[str, Any], bbox2: Tuple[int, int, int, int]) -> float:
|
|
"""Calculate Intersection over Union between two bounding boxes"""
|
|
x1, y1, w1, h1 = face1["x"], face1["y"], face1["width"], face1["height"]
|
|
x2, y2, w2, h2 = bbox2
|
|
|
|
# Calculate intersection coordinates
|
|
x_left = max(x1, x2)
|
|
y_top = max(y1, y2)
|
|
x_right = min(x1 + w1, x2 + w2)
|
|
y_bottom = min(y1 + h1, y2 + h2)
|
|
|
|
if x_right < x_left or y_bottom < y_top:
|
|
return 0.0
|
|
|
|
intersection_area = (x_right - x_left) * (y_bottom - y_top)
|
|
area1 = w1 * h1
|
|
area2 = w2 * h2
|
|
union_area = area1 + area2 - intersection_area
|
|
|
|
return intersection_area / union_area if union_area > 0 else 0.0
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Face Recognition Processor with MPS support"
|
|
)
|
|
parser.add_argument("video_path", help="Path to video file")
|
|
parser.add_argument("output_path", help="Output JSON path")
|
|
parser.add_argument(
|
|
"enable_recognition", help="Enable face recognition (0/1)", default="1"
|
|
)
|
|
parser.add_argument(
|
|
"enable_tracking", help="Enable face tracking (0/1)", default="1"
|
|
)
|
|
parser.add_argument(
|
|
"enable_clustering", help="Enable face clustering (0/1)", default="1"
|
|
)
|
|
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
|
|
parser.add_argument(
|
|
"--database", "-d", help="Path to face database JSON file", default=""
|
|
)
|
|
parser.add_argument(
|
|
"--use-mps",
|
|
"-m",
|
|
help="Use MPS acceleration (Apple Silicon)",
|
|
action="store_true",
|
|
default=False,
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create processor
|
|
processor = FaceRecognitionProcessor(
|
|
enable_recognition=args.enable_recognition == "1",
|
|
enable_tracking=args.enable_tracking == "1",
|
|
enable_clustering=args.enable_clustering == "1",
|
|
)
|
|
|
|
# Load face database if provided
|
|
if args.database:
|
|
processor.load_face_database(args.database)
|
|
|
|
# Process video with MPS support
|
|
result = processor.process_video(
|
|
video_path=args.video_path,
|
|
output_path=args.output_path,
|
|
uuid=args.uuid,
|
|
use_mps=args.use_mps,
|
|
)
|
|
|
|
print(f"[INFO] Processing complete: {len(result['frames'])} frames processed")
|
|
print(f"[INFO] Recognized faces: {len(result['recognized_faces'])}")
|
|
print(f"[INFO] Face clusters: {len(result['face_clusters'])}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|