Files
momentry_core/scripts/match_face_identity.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

435 lines
14 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Face Identity Matching with 1-to-many Reference Vectors
Purpose:
1. Implement 1-to-many matching algorithms
2. Support multiple strategies (Best Match, Voting, Weighted, Combined)
3. Match detected face to Identity in database
Usage:
python3 scripts/match_face_identity.py --identity-name "Preview Test Person" --face-json output/preview.face_new.json
"""
import json
import argparse
import numpy as np
from datetime import datetime
import psycopg2
import os
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev")
def cosine_similarity(a, b):
"""Calculate cosine similarity between two vectors"""
a = np.array(a, dtype=np.float64)
b = np.array(b, dtype=np.float64)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return np.dot(a, b) / (norm_a * norm_b)
def strategy_best_match(detected_embedding, reference_embeddings, threshold=0.85):
"""
Strategy 1: Best Match
Take the highest similarity among all reference vectors
Pros: Fast, simple
Cons: May miss if detected face is from different angle
"""
similarities = [
cosine_similarity(detected_embedding, ref["embedding"])
for ref in reference_embeddings
]
best_sim = max(similarities)
best_idx = np.argmax(similarities)
return {
"strategy": "best_match",
"best_similarity": best_sim,
"best_reference_idx": best_idx,
"is_match": best_sim >= threshold,
"threshold": threshold,
}
def strategy_voting(detected_embedding, reference_embeddings, threshold=0.85):
"""
Strategy 2: Voting Mechanism
Count how many reference vectors exceed threshold
Pros: More robust
Cons: Requires more reference vectors
"""
similarities = [
cosine_similarity(detected_embedding, ref["embedding"])
for ref in reference_embeddings
]
votes = sum(1 for sim in similarities if sim >= threshold)
vote_ratio = votes / len(similarities)
# At least 50% of reference vectors should match
is_match = vote_ratio >= 0.5
return {
"strategy": "voting",
"votes": votes,
"total_references": len(similarities),
"vote_ratio": vote_ratio,
"is_match": is_match,
"threshold": threshold,
"similarities": similarities,
}
def strategy_weighted(detected_embedding, reference_embeddings, threshold=0.85):
"""
Strategy 3: Weighted Average
Weight similarity by quality score
Pros: Accounts for reference vector quality
Cons: Requires quality scores
"""
similarities = [
cosine_similarity(detected_embedding, ref["embedding"])
for ref in reference_embeddings
]
weights = [
ref.get("quality_score", 1.0)
for ref in reference_embeddings
]
weighted_sim = sum(sim * w for sim, w in zip(similarities, weights)) / sum(weights)
return {
"strategy": "weighted",
"weighted_similarity": weighted_sim,
"is_match": weighted_sim >= threshold,
"threshold": threshold,
"weights": weights,
}
def strategy_combined(detected_embedding, reference_embeddings, threshold=0.85, weights=None):
"""
Strategy 4: Combined Scoring
Combine Best Match + Voting + Weighted
Formula (optimized):
final_score = best_match * 0.7 + vote_ratio * 0.2 + weighted_sim * 0.1
Pros: Most robust, prioritizes best_match
Cons: More computation
Args:
weights: dict with keys 'best_match', 'vote_ratio', 'weighted_sim'
default: {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1}
"""
if weights is None:
weights = {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1}
best_result = strategy_best_match(detected_embedding, reference_embeddings, threshold)
voting_result = strategy_voting(detected_embedding, reference_embeddings, threshold)
weighted_result = strategy_weighted(detected_embedding, reference_embeddings, threshold)
final_score = (
best_result["best_similarity"] * weights['best_match'] +
voting_result["vote_ratio"] * weights['vote_ratio'] +
weighted_result["weighted_similarity"] * weights['weighted_sim']
)
return {
"strategy": "combined",
"best_match": best_result["best_similarity"],
"vote_ratio": voting_result["vote_ratio"],
"weighted_sim": weighted_result["weighted_similarity"],
"final_score": final_score,
"is_match": final_score >= threshold,
"threshold": threshold,
"weights": weights,
"details": {
"best_match": best_result,
"voting": voting_result,
"weighted": weighted_result,
}
}
def match_face_to_identity(
detected_embedding: list,
identity_uuid: str,
strategy: str = "combined",
threshold: float = 0.85,
schema: str = "dev",
weights: dict = None,
):
"""Match detected face embedding to Identity in database
Args:
weights: dict for combined strategy, e.g., {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1}
"""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
try:
# Get Identity reference_data
cur.execute(f"""
SELECT name, identity_type, reference_data, face_embedding
FROM {schema}.identities
WHERE uuid = %s;
""", (identity_uuid,))
result = cur.fetchone()
if not result:
print(f"❌ Identity not found: {identity_uuid}")
return None
name, identity_type, reference_data_json, centroid_embedding = result
# Parse reference_data
reference_data = json.loads(reference_data_json) if isinstance(reference_data_json, str) else reference_data_json
face_embeddings = reference_data.get("face_embeddings", [])
if not face_embeddings:
print(f"⚠️ No reference embeddings for Identity: {name}")
return None
# Normalize detected embedding
detected_norm = np.linalg.norm(detected_embedding)
if detected_norm > 0:
detected_normalized = (np.array(detected_embedding) / detected_norm).tolist()
else:
detected_normalized = detected_embedding
# Choose matching strategy
if strategy == "best_match":
match_result = strategy_best_match(detected_normalized, face_embeddings, threshold)
elif strategy == "voting":
match_result = strategy_voting(detected_normalized, face_embeddings, threshold)
elif strategy == "weighted":
match_result = strategy_weighted(detected_normalized, face_embeddings, threshold)
else:
match_result = strategy_combined(detected_normalized, face_embeddings, threshold, weights)
match_result["identity_name"] = name
match_result["identity_uuid"] = identity_uuid
match_result["identity_type"] = identity_type
match_result["reference_count"] = len(face_embeddings)
return match_result
except Exception as e:
print(f"❌ Matching error: {e}")
return None
finally:
cur.close()
conn.close()
def batch_match_faces(face_json_path, identity_uuid, strategy="combined", threshold=0.85, schema="dev", weights=None):
"""Batch match all faces in face.json to Identity
Args:
weights: dict for combined strategy
"""
with open(face_json_path) as f:
data = json.load(f)
frames = data.get("frames", {})
results = []
for frame_key, frame_data in frames.items():
faces = frame_data.get("faces", [])
for i, face in enumerate(faces):
embedding = face.get("embedding")
if not embedding:
continue
match_result = match_face_to_identity(
detected_embedding=embedding,
identity_uuid=identity_uuid,
strategy=strategy,
threshold=threshold,
schema=schema,
weights=weights,
)
if match_result:
match_result["frame"] = frame_key
match_result["face_index"] = i
match_result["detected_confidence"] = face.get("confidence", 0.9)
results.append(match_result)
return results
def analyze_match_results(results):
"""Analyze batch match results"""
print("\n=== Match Results Analysis ===")
print(f"Total faces matched: {len(results)}")
# Strategy comparison
if results:
is_match_count = sum(1 for r in results if r["is_match"])
match_ratio = is_match_count / len(results)
print(f"Match ratio: {match_ratio:.2%} ({is_match_count}/{len(results)})")
# Score distribution
final_scores = [r.get("final_score", r.get("best_similarity", r.get("weighted_similarity", 0))) for r in results]
print(f"Scores: min={min(final_scores):.2f}, max={max(final_scores):.2f}, avg={np.mean(final_scores):.2f}")
# Print detailed results (first 5)
print("\n=== Top 5 Match Details ===")
sorted_results = sorted(results, key=lambda x: x.get("final_score", x.get("best_similarity", 0)), reverse=True)
for i, r in enumerate(sorted_results[:5]):
print(f"\nMatch {i+1}: Frame {r['frame']}, Face {r['face_index']}")
print(f" Strategy: {r['strategy']}")
print(f" Identity: {r['identity_name']}")
print(f" Final Score: {r.get('final_score', r.get('best_similarity', 0)):.4f}")
print(f" Is Match: {r['is_match']}")
if r['strategy'] == 'combined':
print(f" Details:")
print(f" Best Match: {r['best_match']:.4f}")
print(f" Vote Ratio: {r['vote_ratio']:.2%}")
print(f" Weighted Sim: {r['weighted_sim']:.4f}")
def main():
parser = argparse.ArgumentParser(description="Match Face to Identity")
parser.add_argument("--identity-uuid", help="Identity UUID to match against")
parser.add_argument("--identity-name", help="Identity name (will query UUID)")
parser.add_argument("--face-json", required=True, help="Path to face.json")
parser.add_argument("--strategy", default="combined", choices=["best_match", "voting", "weighted", "combined"])
parser.add_argument("--threshold", type=float, default=0.85, help="Match threshold")
parser.add_argument("--schema", default="dev", help="Database schema")
parser.add_argument("--batch", action="store_true", help="Batch match all faces")
parser.add_argument("--weights", type=str, default="0.7,0.2,0.1", help="Weights for combined strategy (best_match,vote_ratio,weighted_sim)")
args = parser.parse_args()
# Parse weights
weights = None
if args.strategy == "combined":
w_parts = args.weights.split(",")
if len(w_parts) == 3:
weights = {
'best_match': float(w_parts[0]),
'vote_ratio': float(w_parts[1]),
'weighted_sim': float(w_parts[2]),
}
print("=" * 60)
print("Face Identity Matching (1-to-many)")
print("=" * 60)
# Get Identity UUID
identity_uuid = args.identity_uuid
if not identity_uuid and args.identity_name:
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
try:
cur.execute(f"""
SELECT uuid FROM {args.schema}.identities
WHERE name = %s;
""", (args.identity_name,))
result = cur.fetchone()
if result:
identity_uuid = result[0]
print(f"✅ Found Identity: {args.identity_name} (UUID: {identity_uuid})")
else:
print(f"❌ Identity not found: {args.identity_name}")
return
finally:
cur.close()
conn.close()
if not identity_uuid:
print("❌ Please provide --identity-uuid or --identity-name")
return
print(f"\nStrategy: {args.strategy}")
print(f"Threshold: {args.threshold}")
if weights:
print(f"Weights: best_match={weights['best_match']}, vote_ratio={weights['vote_ratio']}, weighted_sim={weights['weighted_sim']}")
# Batch match
if args.batch:
print(f"\n🔧 Batch matching from: {args.face_json}")
results = batch_match_faces(
face_json_path=args.face_json,
identity_uuid=identity_uuid,
strategy=args.strategy,
threshold=args.threshold,
schema=args.schema,
weights=weights,
)
analyze_match_results(results)
else:
# Single match (first face in face.json)
with open(args.face_json) as f:
data = json.load(f)
frames = data.get("frames", {})
first_frame = list(frames.values())[0]
first_face = first_frame["faces"][0]
embedding = first_face.get("embedding")
if not embedding:
print("❌ No embedding in first face")
return
print(f"\n🔧 Matching first face...")
match_result = match_face_to_identity(
detected_embedding=embedding,
identity_uuid=identity_uuid,
strategy=args.strategy,
threshold=args.threshold,
schema=args.schema,
weights=weights,
)
if match_result:
print(f"\n✅ Match Result:")
print(f" Identity: {match_result['identity_name']}")
print(f" Strategy: {match_result['strategy']}")
print(f" Is Match: {match_result['is_match']}")
if match_result['strategy'] == 'combined':
print(f" Final Score: {match_result['final_score']:.4f}")
print(f" Best Match: {match_result['best_match']:.4f}")
print(f" Vote Ratio: {match_result['vote_ratio']:.2%}")
print(f" Weighted Sim: {match_result['weighted_sim']:.4f}")
if __name__ == "__main__":
main()