- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
435 lines
14 KiB
Python
435 lines
14 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Face Identity Matching with 1-to-many Reference Vectors
|
|
|
|
Purpose:
|
|
1. Implement 1-to-many matching algorithms
|
|
2. Support multiple strategies (Best Match, Voting, Weighted, Combined)
|
|
3. Match detected face to Identity in database
|
|
|
|
Usage:
|
|
python3 scripts/match_face_identity.py --identity-name "Preview Test Person" --face-json output/preview.face_new.json
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
import numpy as np
|
|
from datetime import datetime
|
|
import psycopg2
|
|
import os
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev")
|
|
|
|
|
|
def cosine_similarity(a, b):
|
|
"""Calculate cosine similarity between two vectors"""
|
|
a = np.array(a, dtype=np.float64)
|
|
b = np.array(b, dtype=np.float64)
|
|
|
|
norm_a = np.linalg.norm(a)
|
|
norm_b = np.linalg.norm(b)
|
|
|
|
if norm_a == 0 or norm_b == 0:
|
|
return 0.0
|
|
|
|
return np.dot(a, b) / (norm_a * norm_b)
|
|
|
|
|
|
def strategy_best_match(detected_embedding, reference_embeddings, threshold=0.85):
|
|
"""
|
|
Strategy 1: Best Match
|
|
|
|
Take the highest similarity among all reference vectors
|
|
|
|
Pros: Fast, simple
|
|
Cons: May miss if detected face is from different angle
|
|
"""
|
|
similarities = [
|
|
cosine_similarity(detected_embedding, ref["embedding"])
|
|
for ref in reference_embeddings
|
|
]
|
|
|
|
best_sim = max(similarities)
|
|
best_idx = np.argmax(similarities)
|
|
|
|
return {
|
|
"strategy": "best_match",
|
|
"best_similarity": best_sim,
|
|
"best_reference_idx": best_idx,
|
|
"is_match": best_sim >= threshold,
|
|
"threshold": threshold,
|
|
}
|
|
|
|
|
|
def strategy_voting(detected_embedding, reference_embeddings, threshold=0.85):
|
|
"""
|
|
Strategy 2: Voting Mechanism
|
|
|
|
Count how many reference vectors exceed threshold
|
|
|
|
Pros: More robust
|
|
Cons: Requires more reference vectors
|
|
"""
|
|
similarities = [
|
|
cosine_similarity(detected_embedding, ref["embedding"])
|
|
for ref in reference_embeddings
|
|
]
|
|
|
|
votes = sum(1 for sim in similarities if sim >= threshold)
|
|
vote_ratio = votes / len(similarities)
|
|
|
|
# At least 50% of reference vectors should match
|
|
is_match = vote_ratio >= 0.5
|
|
|
|
return {
|
|
"strategy": "voting",
|
|
"votes": votes,
|
|
"total_references": len(similarities),
|
|
"vote_ratio": vote_ratio,
|
|
"is_match": is_match,
|
|
"threshold": threshold,
|
|
"similarities": similarities,
|
|
}
|
|
|
|
|
|
def strategy_weighted(detected_embedding, reference_embeddings, threshold=0.85):
|
|
"""
|
|
Strategy 3: Weighted Average
|
|
|
|
Weight similarity by quality score
|
|
|
|
Pros: Accounts for reference vector quality
|
|
Cons: Requires quality scores
|
|
"""
|
|
similarities = [
|
|
cosine_similarity(detected_embedding, ref["embedding"])
|
|
for ref in reference_embeddings
|
|
]
|
|
|
|
weights = [
|
|
ref.get("quality_score", 1.0)
|
|
for ref in reference_embeddings
|
|
]
|
|
|
|
weighted_sim = sum(sim * w for sim, w in zip(similarities, weights)) / sum(weights)
|
|
|
|
return {
|
|
"strategy": "weighted",
|
|
"weighted_similarity": weighted_sim,
|
|
"is_match": weighted_sim >= threshold,
|
|
"threshold": threshold,
|
|
"weights": weights,
|
|
}
|
|
|
|
|
|
def strategy_combined(detected_embedding, reference_embeddings, threshold=0.85, weights=None):
|
|
"""
|
|
Strategy 4: Combined Scoring
|
|
|
|
Combine Best Match + Voting + Weighted
|
|
|
|
Formula (optimized):
|
|
final_score = best_match * 0.7 + vote_ratio * 0.2 + weighted_sim * 0.1
|
|
|
|
Pros: Most robust, prioritizes best_match
|
|
Cons: More computation
|
|
|
|
Args:
|
|
weights: dict with keys 'best_match', 'vote_ratio', 'weighted_sim'
|
|
default: {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1}
|
|
"""
|
|
if weights is None:
|
|
weights = {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1}
|
|
|
|
best_result = strategy_best_match(detected_embedding, reference_embeddings, threshold)
|
|
voting_result = strategy_voting(detected_embedding, reference_embeddings, threshold)
|
|
weighted_result = strategy_weighted(detected_embedding, reference_embeddings, threshold)
|
|
|
|
final_score = (
|
|
best_result["best_similarity"] * weights['best_match'] +
|
|
voting_result["vote_ratio"] * weights['vote_ratio'] +
|
|
weighted_result["weighted_similarity"] * weights['weighted_sim']
|
|
)
|
|
|
|
return {
|
|
"strategy": "combined",
|
|
"best_match": best_result["best_similarity"],
|
|
"vote_ratio": voting_result["vote_ratio"],
|
|
"weighted_sim": weighted_result["weighted_similarity"],
|
|
"final_score": final_score,
|
|
"is_match": final_score >= threshold,
|
|
"threshold": threshold,
|
|
"weights": weights,
|
|
"details": {
|
|
"best_match": best_result,
|
|
"voting": voting_result,
|
|
"weighted": weighted_result,
|
|
}
|
|
}
|
|
|
|
|
|
def match_face_to_identity(
|
|
detected_embedding: list,
|
|
identity_uuid: str,
|
|
strategy: str = "combined",
|
|
threshold: float = 0.85,
|
|
schema: str = "dev",
|
|
weights: dict = None,
|
|
):
|
|
"""Match detected face embedding to Identity in database
|
|
|
|
Args:
|
|
weights: dict for combined strategy, e.g., {'best_match': 0.7, 'vote_ratio': 0.2, 'weighted_sim': 0.1}
|
|
"""
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# Get Identity reference_data
|
|
cur.execute(f"""
|
|
SELECT name, identity_type, reference_data, face_embedding
|
|
FROM {schema}.identities
|
|
WHERE uuid = %s;
|
|
""", (identity_uuid,))
|
|
|
|
result = cur.fetchone()
|
|
|
|
if not result:
|
|
print(f"❌ Identity not found: {identity_uuid}")
|
|
return None
|
|
|
|
name, identity_type, reference_data_json, centroid_embedding = result
|
|
|
|
# Parse reference_data
|
|
reference_data = json.loads(reference_data_json) if isinstance(reference_data_json, str) else reference_data_json
|
|
|
|
face_embeddings = reference_data.get("face_embeddings", [])
|
|
|
|
if not face_embeddings:
|
|
print(f"⚠️ No reference embeddings for Identity: {name}")
|
|
return None
|
|
|
|
# Normalize detected embedding
|
|
detected_norm = np.linalg.norm(detected_embedding)
|
|
if detected_norm > 0:
|
|
detected_normalized = (np.array(detected_embedding) / detected_norm).tolist()
|
|
else:
|
|
detected_normalized = detected_embedding
|
|
|
|
# Choose matching strategy
|
|
if strategy == "best_match":
|
|
match_result = strategy_best_match(detected_normalized, face_embeddings, threshold)
|
|
elif strategy == "voting":
|
|
match_result = strategy_voting(detected_normalized, face_embeddings, threshold)
|
|
elif strategy == "weighted":
|
|
match_result = strategy_weighted(detected_normalized, face_embeddings, threshold)
|
|
else:
|
|
match_result = strategy_combined(detected_normalized, face_embeddings, threshold, weights)
|
|
|
|
match_result["identity_name"] = name
|
|
match_result["identity_uuid"] = identity_uuid
|
|
match_result["identity_type"] = identity_type
|
|
match_result["reference_count"] = len(face_embeddings)
|
|
|
|
return match_result
|
|
|
|
except Exception as e:
|
|
print(f"❌ Matching error: {e}")
|
|
return None
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def batch_match_faces(face_json_path, identity_uuid, strategy="combined", threshold=0.85, schema="dev", weights=None):
|
|
"""Batch match all faces in face.json to Identity
|
|
|
|
Args:
|
|
weights: dict for combined strategy
|
|
"""
|
|
|
|
with open(face_json_path) as f:
|
|
data = json.load(f)
|
|
|
|
frames = data.get("frames", {})
|
|
|
|
results = []
|
|
|
|
for frame_key, frame_data in frames.items():
|
|
faces = frame_data.get("faces", [])
|
|
|
|
for i, face in enumerate(faces):
|
|
embedding = face.get("embedding")
|
|
|
|
if not embedding:
|
|
continue
|
|
|
|
match_result = match_face_to_identity(
|
|
detected_embedding=embedding,
|
|
identity_uuid=identity_uuid,
|
|
strategy=strategy,
|
|
threshold=threshold,
|
|
schema=schema,
|
|
weights=weights,
|
|
)
|
|
|
|
if match_result:
|
|
match_result["frame"] = frame_key
|
|
match_result["face_index"] = i
|
|
match_result["detected_confidence"] = face.get("confidence", 0.9)
|
|
results.append(match_result)
|
|
|
|
return results
|
|
|
|
|
|
def analyze_match_results(results):
|
|
"""Analyze batch match results"""
|
|
|
|
print("\n=== Match Results Analysis ===")
|
|
print(f"Total faces matched: {len(results)}")
|
|
|
|
# Strategy comparison
|
|
if results:
|
|
is_match_count = sum(1 for r in results if r["is_match"])
|
|
match_ratio = is_match_count / len(results)
|
|
|
|
print(f"Match ratio: {match_ratio:.2%} ({is_match_count}/{len(results)})")
|
|
|
|
# Score distribution
|
|
final_scores = [r.get("final_score", r.get("best_similarity", r.get("weighted_similarity", 0))) for r in results]
|
|
|
|
print(f"Scores: min={min(final_scores):.2f}, max={max(final_scores):.2f}, avg={np.mean(final_scores):.2f}")
|
|
|
|
# Print detailed results (first 5)
|
|
print("\n=== Top 5 Match Details ===")
|
|
sorted_results = sorted(results, key=lambda x: x.get("final_score", x.get("best_similarity", 0)), reverse=True)
|
|
|
|
for i, r in enumerate(sorted_results[:5]):
|
|
print(f"\nMatch {i+1}: Frame {r['frame']}, Face {r['face_index']}")
|
|
print(f" Strategy: {r['strategy']}")
|
|
print(f" Identity: {r['identity_name']}")
|
|
print(f" Final Score: {r.get('final_score', r.get('best_similarity', 0)):.4f}")
|
|
print(f" Is Match: {r['is_match']}")
|
|
|
|
if r['strategy'] == 'combined':
|
|
print(f" Details:")
|
|
print(f" Best Match: {r['best_match']:.4f}")
|
|
print(f" Vote Ratio: {r['vote_ratio']:.2%}")
|
|
print(f" Weighted Sim: {r['weighted_sim']:.4f}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Match Face to Identity")
|
|
parser.add_argument("--identity-uuid", help="Identity UUID to match against")
|
|
parser.add_argument("--identity-name", help="Identity name (will query UUID)")
|
|
parser.add_argument("--face-json", required=True, help="Path to face.json")
|
|
parser.add_argument("--strategy", default="combined", choices=["best_match", "voting", "weighted", "combined"])
|
|
parser.add_argument("--threshold", type=float, default=0.85, help="Match threshold")
|
|
parser.add_argument("--schema", default="dev", help="Database schema")
|
|
parser.add_argument("--batch", action="store_true", help="Batch match all faces")
|
|
parser.add_argument("--weights", type=str, default="0.7,0.2,0.1", help="Weights for combined strategy (best_match,vote_ratio,weighted_sim)")
|
|
args = parser.parse_args()
|
|
|
|
# Parse weights
|
|
weights = None
|
|
if args.strategy == "combined":
|
|
w_parts = args.weights.split(",")
|
|
if len(w_parts) == 3:
|
|
weights = {
|
|
'best_match': float(w_parts[0]),
|
|
'vote_ratio': float(w_parts[1]),
|
|
'weighted_sim': float(w_parts[2]),
|
|
}
|
|
|
|
print("=" * 60)
|
|
print("Face Identity Matching (1-to-many)")
|
|
print("=" * 60)
|
|
|
|
# Get Identity UUID
|
|
identity_uuid = args.identity_uuid
|
|
|
|
if not identity_uuid and args.identity_name:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
cur.execute(f"""
|
|
SELECT uuid FROM {args.schema}.identities
|
|
WHERE name = %s;
|
|
""", (args.identity_name,))
|
|
|
|
result = cur.fetchone()
|
|
|
|
if result:
|
|
identity_uuid = result[0]
|
|
print(f"✅ Found Identity: {args.identity_name} (UUID: {identity_uuid})")
|
|
else:
|
|
print(f"❌ Identity not found: {args.identity_name}")
|
|
return
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not identity_uuid:
|
|
print("❌ Please provide --identity-uuid or --identity-name")
|
|
return
|
|
|
|
print(f"\nStrategy: {args.strategy}")
|
|
print(f"Threshold: {args.threshold}")
|
|
|
|
if weights:
|
|
print(f"Weights: best_match={weights['best_match']}, vote_ratio={weights['vote_ratio']}, weighted_sim={weights['weighted_sim']}")
|
|
|
|
# Batch match
|
|
if args.batch:
|
|
print(f"\n🔧 Batch matching from: {args.face_json}")
|
|
results = batch_match_faces(
|
|
face_json_path=args.face_json,
|
|
identity_uuid=identity_uuid,
|
|
strategy=args.strategy,
|
|
threshold=args.threshold,
|
|
schema=args.schema,
|
|
weights=weights,
|
|
)
|
|
|
|
analyze_match_results(results)
|
|
else:
|
|
# Single match (first face in face.json)
|
|
with open(args.face_json) as f:
|
|
data = json.load(f)
|
|
|
|
frames = data.get("frames", {})
|
|
first_frame = list(frames.values())[0]
|
|
first_face = first_frame["faces"][0]
|
|
embedding = first_face.get("embedding")
|
|
|
|
if not embedding:
|
|
print("❌ No embedding in first face")
|
|
return
|
|
|
|
print(f"\n🔧 Matching first face...")
|
|
match_result = match_face_to_identity(
|
|
detected_embedding=embedding,
|
|
identity_uuid=identity_uuid,
|
|
strategy=args.strategy,
|
|
threshold=args.threshold,
|
|
schema=args.schema,
|
|
weights=weights,
|
|
)
|
|
|
|
if match_result:
|
|
print(f"\n✅ Match Result:")
|
|
print(f" Identity: {match_result['identity_name']}")
|
|
print(f" Strategy: {match_result['strategy']}")
|
|
print(f" Is Match: {match_result['is_match']}")
|
|
|
|
if match_result['strategy'] == 'combined':
|
|
print(f" Final Score: {match_result['final_score']:.4f}")
|
|
print(f" Best Match: {match_result['best_match']:.4f}")
|
|
print(f" Vote Ratio: {match_result['vote_ratio']:.2%}")
|
|
print(f" Weighted Sim: {match_result['weighted_sim']:.4f}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |