8f05a7c188
- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
283 lines
8.9 KiB
Python
283 lines
8.9 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Face Clustering Processor
|
|
職責:將短暫的 Face ID 聚合為持續的 Person ID,並自動綁定 Speaker。
|
|
"""
|
|
|
|
import cv2
|
|
import json
|
|
import numpy as np
|
|
import os
|
|
import sys
|
|
import psycopg2
|
|
from sklearn.cluster import AgglomerativeClustering
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
try:
|
|
from deepface import DeepFace
|
|
|
|
HAS_DEEPFACE = True
|
|
except ImportError:
|
|
print("❌ DeepFace not found. Run: pip install deepface")
|
|
sys.exit(1)
|
|
|
|
# 設定
|
|
UUID = os.getenv("UUID", "quick_preview")
|
|
OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output")
|
|
VIDEO_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.mp4")
|
|
FACE_JSON_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.face.json")
|
|
OUTPUT_JSON_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.face_clustered.json")
|
|
ASRX_JSON_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.asrx.json")
|
|
DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
|
|
|
|
|
|
def optimized_clustering(embeddings):
|
|
"""
|
|
Optimized Clustering for large datasets (e.g. 25k faces).
|
|
Strategy: Sample -> Agglomerative -> Centroid Assignment
|
|
"""
|
|
import numpy as np
|
|
from sklearn.cluster import AgglomerativeClustering
|
|
from sklearn.metrics.pairwise import cosine_distances
|
|
|
|
n_faces = len(embeddings)
|
|
print(f" 🚀 Starting optimized clustering for {n_faces} faces...")
|
|
|
|
# 1. Sampling
|
|
sample_size = min(5000, n_faces)
|
|
if n_faces > sample_size:
|
|
indices = np.random.choice(n_faces, sample_size, replace=False)
|
|
sample_embeddings = embeddings[indices]
|
|
else:
|
|
sample_embeddings = embeddings
|
|
indices = np.arange(n_faces)
|
|
|
|
print(f" 📊 Sampling {len(sample_embeddings)} faces for clustering structure...")
|
|
|
|
# 2. Agglomerative Clustering on Sample
|
|
clustering = AgglomerativeClustering(
|
|
n_clusters=None, distance_threshold=0.4, metric="cosine", linkage="average"
|
|
)
|
|
sample_labels = clustering.fit_predict(sample_embeddings)
|
|
|
|
unique_labels = set(sample_labels)
|
|
n_clusters = len(unique_labels)
|
|
print(f" 🔍 Found {n_clusters} unique clusters in sample.")
|
|
|
|
# 3. Compute Centroids for each cluster
|
|
centroids = []
|
|
for label in unique_labels:
|
|
cluster_mask = sample_labels == label
|
|
cluster_faces = sample_embeddings[cluster_mask]
|
|
# Mean embedding
|
|
centroid = np.mean(cluster_faces, axis=0)
|
|
centroids.append(centroid)
|
|
|
|
centroids = np.array(centroids) # Shape: (n_clusters, 512)
|
|
|
|
# 4. Assign all faces to nearest centroid
|
|
# Batch processing to save memory
|
|
print(f" 🏃 Assigning {n_faces} faces to {n_clusters} clusters...")
|
|
all_labels = np.zeros(n_faces, dtype=int)
|
|
|
|
batch_size = 5000
|
|
for start in range(0, n_faces, batch_size):
|
|
end = min(start + batch_size, n_faces)
|
|
batch = embeddings[start:end]
|
|
dists = cosine_distances(batch, centroids)
|
|
all_labels[start:end] = np.argmin(dists, axis=1)
|
|
|
|
return all_labels
|
|
|
|
|
|
def main():
|
|
if not os.path.exists(FACE_JSON_PATH):
|
|
print("❌ Face JSON not found.")
|
|
return
|
|
|
|
with open(FACE_JSON_PATH) as f:
|
|
face_data = json.load(f)
|
|
|
|
frames_list = face_data.get("frames", [])
|
|
if not frames_list:
|
|
print("❌ No frames in JSON.")
|
|
return
|
|
|
|
cap = cv2.VideoCapture(VIDEO_PATH)
|
|
embeddings = []
|
|
face_refs = []
|
|
|
|
print(f"🔍 Extracting face embeddings from {UUID}...")
|
|
|
|
for frame_idx, frame_obj in enumerate(frames_list):
|
|
ts = frame_obj.get("timestamp")
|
|
faces = frame_obj.get("faces", [])
|
|
if not faces:
|
|
continue
|
|
|
|
if ts is not None:
|
|
cap.set(cv2.CAP_PROP_POS_MSEC, ts * 1000)
|
|
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
for face_idx, face in enumerate(faces):
|
|
x, y, w, h = face["x"], face["y"], face["width"], face["height"]
|
|
margin = 5
|
|
crop = frame[
|
|
max(0, y - margin) : y + h + margin, max(0, x - margin) : x + w + margin
|
|
]
|
|
|
|
if crop is None or crop.size == 0:
|
|
continue
|
|
|
|
try:
|
|
res = DeepFace.represent(
|
|
img_path=crop, model_name="ArcFace", enforce_detection=False
|
|
)
|
|
if res and "embedding" in res[0]:
|
|
embeddings.append(res[0]["embedding"])
|
|
face_refs.append({"frame_idx": frame_idx, "face_idx": face_idx})
|
|
except Exception:
|
|
pass
|
|
|
|
cap.release()
|
|
|
|
if not embeddings:
|
|
print("❌ No embeddings extracted.")
|
|
return
|
|
|
|
embeddings = np.array(embeddings)
|
|
print(f"✅ Extracted {len(embeddings)} face embeddings.")
|
|
|
|
# 2. 聚類
|
|
print(f"🧠 Clustering {len(embeddings)} faces...")
|
|
clustering = AgglomerativeClustering(
|
|
n_clusters=None, distance_threshold=0.4, metric="cosine", linkage="average"
|
|
)
|
|
labels = clustering.fit_predict(embeddings)
|
|
|
|
unique_labels = set(labels)
|
|
label_to_person = {l: f"Person_{i}" for i, l in enumerate(unique_labels)}
|
|
print(
|
|
f"👥 Detected {len(unique_labels)} unique persons: {[label_to_person[l] for l in unique_labels]}"
|
|
)
|
|
|
|
# 3. 更新 JSON
|
|
for ref, label in zip(face_refs, labels):
|
|
f_idx = ref["frame_idx"]
|
|
face_idx = ref["face_idx"]
|
|
person_id = label_to_person[label]
|
|
|
|
if f_idx < len(frames_list):
|
|
faces = frames_list[f_idx].get("faces", [])
|
|
if face_idx < len(faces):
|
|
frames_list[f_idx]["faces"][face_idx]["person_id"] = person_id
|
|
|
|
# 保存
|
|
with open(OUTPUT_JSON_PATH, "w", encoding="utf-8") as f:
|
|
json.dump(face_data, f, indent=2, ensure_ascii=False)
|
|
print(f"✅ Saved clustered data to {OUTPUT_JSON_PATH}")
|
|
|
|
# 4. 自動綁定 Speaker
|
|
auto_bind_speakers()
|
|
|
|
|
|
def auto_bind_speakers():
|
|
if not os.path.exists(OUTPUT_JSON_PATH) or not os.path.exists(ASRX_JSON_PATH):
|
|
print("⚠️ Missing data for speaker binding.")
|
|
return
|
|
|
|
with open(OUTPUT_JSON_PATH) as f:
|
|
face_clustered = json.load(f)
|
|
with open(ASRX_JSON_PATH) as f:
|
|
asrx_data = json.load(f)
|
|
|
|
print("🔗 Auto-binding Speakers to Persons...")
|
|
|
|
# 建立 Face 時間列表
|
|
face_spans = []
|
|
for frame_obj in face_clustered.get("frames", []):
|
|
ts = frame_obj.get("timestamp")
|
|
for face in frame_obj.get("faces", []):
|
|
person_id = face.get("person_id")
|
|
if person_id and ts is not None:
|
|
face_spans.append({"ts": ts, "person_id": person_id})
|
|
|
|
speaker_person_counts = {}
|
|
|
|
# 對於每個說話片段,找出畫面中出現的人
|
|
for seg in asrx_data.get("segments", []):
|
|
start = seg.get("start")
|
|
end = seg.get("end")
|
|
speaker = seg.get("speaker_id")
|
|
if not speaker:
|
|
continue
|
|
|
|
# 找時間重疊
|
|
candidates = [f for f in face_spans if start <= f["ts"] <= end]
|
|
if candidates:
|
|
# 投票
|
|
person_counts = {}
|
|
for c in candidates:
|
|
pid = c["person_id"]
|
|
person_counts[pid] = person_counts.get(pid, 0) + 1
|
|
|
|
if speaker not in speaker_person_counts:
|
|
speaker_person_counts[speaker] = {}
|
|
|
|
best_person = max(person_counts, key=person_counts.get)
|
|
speaker_person_counts[speaker][best_person] = (
|
|
speaker_person_counts[speaker].get(best_person, 0) + 1
|
|
)
|
|
|
|
# 寫入資料庫
|
|
try:
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
for speaker, persons in speaker_person_counts.items():
|
|
if not persons:
|
|
continue
|
|
best_person = max(persons, key=persons.get)
|
|
print(
|
|
f" 🎤 {speaker} is likely {best_person} ({persons[best_person]} votes)"
|
|
)
|
|
|
|
# 1. 找或建 Talent
|
|
cur.execute("SELECT id FROM talents WHERE real_name = %s", (best_person,))
|
|
row = cur.fetchone()
|
|
|
|
if row:
|
|
talent_id = row[0]
|
|
else:
|
|
cur.execute(
|
|
"INSERT INTO talents (real_name) VALUES (%s) RETURNING id",
|
|
(best_person,),
|
|
)
|
|
talent_id = cur.fetchone()[0]
|
|
print(f" ✨ Created Talent #{talent_id} ({best_person})")
|
|
|
|
# 2. 綁定 Speaker
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO identity_bindings (talent_id, binding_type, binding_value, source, confidence)
|
|
VALUES (%s, 'speaker', %s, 'auto_cluster', 0.8)
|
|
ON CONFLICT (binding_type, binding_value) DO UPDATE SET talent_id = EXCLUDED.talent_id
|
|
""",
|
|
(talent_id, speaker),
|
|
)
|
|
print(f" ✅ Bound {speaker} -> {best_person}")
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f" ❌ DB Error: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|