- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
400 lines
12 KiB
Python
Executable File
400 lines
12 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
TMDB Identity Integration Script
|
|
|
|
Purpose:
|
|
1. Fetch person images from TMDB /person/:id/images endpoint
|
|
2. Download multiple images (different angles/shots)
|
|
3. Extract ArcFace embeddings using InsightFace
|
|
4. Store embeddings to reference_data JSONB
|
|
5. Register Identity to PostgreSQL database
|
|
|
|
Usage:
|
|
python3 scripts/tmdb_identity_integration.py --tmdb-id 1234 --name "Maggie Cheung"
|
|
python3 scripts/tmdb_identity_integration.py --search "張曼玉"
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import requests
|
|
import psycopg2
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import numpy as np
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
TMDB_API_KEY = os.getenv("TMDB_API_KEY")
|
|
if not TMDB_API_KEY:
|
|
print("⚠️ TMDB_API_KEY not found.")
|
|
print("👉 Please set: export TMDB_API_KEY='your_api_key'")
|
|
sys.exit(1)
|
|
|
|
TMDB_BASE_URL = "https://api.themoviedb.org/3"
|
|
TMDB_IMG_BASE_URL = "https://image.tmdb.org/t/p/original"
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev")
|
|
|
|
TEMP_DIR = Path("data/tmdb_images")
|
|
TEMP_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def search_person(query: str) -> dict | None:
|
|
"""Search TMDB person by name"""
|
|
url = f"{TMDB_BASE_URL}/search/person"
|
|
params = {"query": query, "api_key": TMDB_API_KEY, "language": "zh-TW"}
|
|
|
|
try:
|
|
resp = requests.get(url, params=params)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("results"):
|
|
return data["results"][0]
|
|
return None
|
|
except Exception as e:
|
|
print(f"❌ Search failed: {e}")
|
|
return None
|
|
|
|
|
|
def get_person_details(tmdb_id: int) -> dict:
|
|
"""Get TMDB person details"""
|
|
url = f"{TMDB_BASE_URL}/person/{tmdb_id}"
|
|
params = {"api_key": TMDB_API_KEY, "language": "zh-TW"}
|
|
|
|
try:
|
|
resp = requests.get(url, params=params)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
print(f"❌ Failed to get person details: {e}")
|
|
return {}
|
|
|
|
|
|
def get_person_images(tmdb_id: int) -> list[dict]:
|
|
"""Get TMDB person images (multiple photos)"""
|
|
url = f"{TMDB_BASE_URL}/person/{tmdb_id}/images"
|
|
params = {"api_key": TMDB_API_KEY}
|
|
|
|
try:
|
|
resp = requests.get(url, params=params)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data.get("profiles", [])
|
|
except Exception as e:
|
|
print(f"❌ Failed to get person images: {e}")
|
|
return []
|
|
|
|
|
|
def download_image(image_url: str, save_path: Path) -> bool:
|
|
"""Download image from TMDB"""
|
|
try:
|
|
resp = requests.get(image_url, timeout=30)
|
|
resp.raise_for_status()
|
|
save_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(save_path, "wb") as f:
|
|
f.write(resp.content)
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Download failed: {e}")
|
|
return False
|
|
|
|
|
|
def load_insightface():
|
|
"""Load InsightFace model"""
|
|
try:
|
|
import insightface
|
|
from insightface.app import FaceAnalysis
|
|
|
|
print("🔧 Loading InsightFace buffalo_l...")
|
|
app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"])
|
|
app.prepare(ctx_id=0, det_size=(320, 320))
|
|
print("✅ InsightFace loaded")
|
|
return app
|
|
except Exception as e:
|
|
print(f"❌ Failed to load InsightFace: {e}")
|
|
return None
|
|
|
|
|
|
def extract_face_embedding(app, image_path: Path) -> dict | None:
|
|
"""Extract ArcFace embedding from image"""
|
|
try:
|
|
import cv2
|
|
|
|
img = cv2.imread(str(image_path))
|
|
if img is None:
|
|
print(f"❌ Cannot read image: {image_path}")
|
|
return None
|
|
|
|
faces = app.get(img)
|
|
|
|
if not faces:
|
|
print(f"⚠️ No face detected in: {image_path.name}")
|
|
return None
|
|
|
|
face = faces[0]
|
|
|
|
embedding = face.embedding.tolist() if hasattr(face, "embedding") else None
|
|
if not embedding:
|
|
print(f"⚠️ No embedding in: {image_path.name}")
|
|
return None
|
|
|
|
bbox = face.bbox.astype(int)
|
|
|
|
det_score = float(face.det_score) if hasattr(face, "det_score") else 0.9
|
|
|
|
angle = detect_face_angle(bbox, img.shape)
|
|
|
|
quality_score = evaluate_face_quality(face, img.shape)
|
|
|
|
return {
|
|
"embedding": embedding,
|
|
"image_path": str(image_path),
|
|
"image_url": f"{TMDB_IMG_BASE_URL}/{image_path.name}",
|
|
"angle": angle,
|
|
"quality_score": quality_score,
|
|
"det_score": det_score,
|
|
}
|
|
except Exception as e:
|
|
print(f"❌ Extraction failed: {e}")
|
|
return None
|
|
|
|
|
|
def detect_face_angle(bbox: np.ndarray, img_shape: tuple) -> str:
|
|
"""Detect face angle (frontal, profile_left, profile_right, three_quarter)"""
|
|
img_w = img_shape[1]
|
|
face_center_x = (bbox[0] + bbox[2]) / 2
|
|
|
|
left_dist = face_center_x
|
|
right_dist = img_w - face_center_x
|
|
|
|
ratio = left_dist / right_dist
|
|
|
|
if ratio > 1.5:
|
|
return "profile_right"
|
|
elif ratio < 0.67:
|
|
return "profile_left"
|
|
elif ratio > 1.2 or ratio < 0.83:
|
|
return "three_quarter"
|
|
else:
|
|
return "frontal"
|
|
|
|
|
|
def evaluate_face_quality(face, img_shape: tuple) -> float:
|
|
"""Evaluate face quality score (0.0-1.0)"""
|
|
det_score = float(face.det_score) if hasattr(face, "det_score") else 0.9
|
|
|
|
bbox = face.bbox.astype(int)
|
|
face_size = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
|
|
img_size = img_shape[0] * img_shape[1]
|
|
|
|
size_ratio = face_size / img_size
|
|
|
|
size_score = min(1.0, size_ratio * 20)
|
|
|
|
quality = det_score * 0.7 + size_score * 0.3
|
|
|
|
return min(1.0, max(0.0, quality))
|
|
|
|
|
|
def calculate_centroid(embeddings: list[list[float]]) -> list[float]:
|
|
"""Calculate centroid (average) of embeddings"""
|
|
if not embeddings:
|
|
return []
|
|
|
|
embeddings_array = np.array(embeddings)
|
|
centroid = np.mean(embeddings_array, axis=0)
|
|
|
|
return centroid.tolist()
|
|
|
|
|
|
def register_identity_to_db(
|
|
name: str,
|
|
tmdb_id: int,
|
|
tmdb_profile: str,
|
|
face_embeddings: list[dict],
|
|
centroid: list[float],
|
|
schema: str = "dev",
|
|
) -> str | None:
|
|
"""Register Identity to PostgreSQL"""
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
reference_data = {
|
|
"face_embeddings": [
|
|
{
|
|
"embedding": emb["embedding"],
|
|
"source": "tmdb_images",
|
|
"image_url": emb["image_url"],
|
|
"angle": emb["angle"],
|
|
"quality_score": emb["quality_score"],
|
|
"created_at": datetime.now().isoformat(),
|
|
}
|
|
for emb in face_embeddings
|
|
],
|
|
"image_urls": [emb["image_url"] for emb in face_embeddings],
|
|
}
|
|
|
|
sql = f"""
|
|
INSERT INTO {schema}.identities (
|
|
name, identity_type, source, status,
|
|
face_embedding, reference_data, tmdb_id, tmdb_profile,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
%s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
NOW(), NOW()
|
|
)
|
|
ON CONFLICT (name) DO UPDATE SET
|
|
face_embedding = EXCLUDED.face_embedding,
|
|
reference_data = EXCLUDED.reference_data,
|
|
tmdb_id = EXCLUDED.tmdb_id,
|
|
tmdb_profile = EXCLUDED.tmdb_profile,
|
|
updated_at = NOW()
|
|
RETURNING uuid;
|
|
"""
|
|
|
|
embedding_str = "[" + ",".join(str(x) for x in centroid) + "]"
|
|
|
|
cur.execute(
|
|
sql,
|
|
(
|
|
name,
|
|
"people",
|
|
"tmdb",
|
|
"confirmed",
|
|
embedding_str,
|
|
json.dumps(reference_data),
|
|
tmdb_id,
|
|
tmdb_profile,
|
|
),
|
|
)
|
|
|
|
uuid = cur.fetchone()[0]
|
|
conn.commit()
|
|
|
|
print(f"✅ Identity registered: {name} (UUID: {uuid})")
|
|
return uuid
|
|
|
|
except Exception as e:
|
|
print(f"❌ Database error: {e}")
|
|
conn.rollback()
|
|
return None
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="TMDB Identity Integration")
|
|
parser.add_argument("--tmdb-id", type=int, help="TMDB Person ID (e.g., 1234)")
|
|
parser.add_argument("--name", help="Person name (for registration)")
|
|
parser.add_argument("--search", help="Search person by name")
|
|
parser.add_argument("--limit", type=int, default=10, help="Max images to process")
|
|
parser.add_argument("--schema", default="dev", help="Database schema (dev/public)")
|
|
args = parser.parse_args()
|
|
|
|
if not args.tmdb_id and not args.search:
|
|
print("❌ Please provide --tmdb-id or --search")
|
|
sys.exit(1)
|
|
|
|
if args.search:
|
|
print(f"🔍 Searching TMDB for: '{args.search}'")
|
|
person = search_person(args.search)
|
|
if not person:
|
|
print("❌ Person not found")
|
|
sys.exit(1)
|
|
|
|
tmdb_id = person["id"]
|
|
name = args.name or person["name"]
|
|
print(f"✅ Found: {name} (TMDB ID: {tmdb_id})")
|
|
else:
|
|
tmdb_id = args.tmdb_id
|
|
name = args.name
|
|
|
|
if not name:
|
|
print("🔧 Fetching person details...")
|
|
details = get_person_details(tmdb_id)
|
|
name = details.get("name", f"Person_{tmdb_id}")
|
|
print(f"✅ Name: {name}")
|
|
|
|
print(f"\n🔧 Fetching images for: {name} (TMDB ID: {tmdb_id})")
|
|
images = get_person_images(tmdb_id)
|
|
|
|
if not images:
|
|
print("❌ No images found")
|
|
sys.exit(1)
|
|
|
|
print(f"✅ Found {len(images)} images")
|
|
|
|
app = load_insightface()
|
|
if not app:
|
|
sys.exit(1)
|
|
|
|
person_dir = TEMP_DIR / str(tmdb_id)
|
|
person_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
face_embeddings = []
|
|
|
|
print(f"\n🔧 Processing images (limit: {args.limit})...")
|
|
for i, img_data in enumerate(images[:args.limit]):
|
|
file_path = img_data.get("file_path")
|
|
if not file_path:
|
|
continue
|
|
|
|
image_url = f"{TMDB_IMG_BASE_URL}{file_path}"
|
|
local_path = person_dir / Path(file_path).name
|
|
|
|
print(f" [{i+1}/{min(len(images), args.limit)}] {file_path}")
|
|
|
|
if not local_path.exists():
|
|
print(f" 🔧 Downloading...")
|
|
if not download_image(image_url, local_path):
|
|
continue
|
|
|
|
print(f" 🔧 Extracting embedding...")
|
|
result = extract_face_embedding(app, local_path)
|
|
|
|
if result:
|
|
face_embeddings.append(result)
|
|
print(f" ✅ Success: angle={result['angle']}, quality={result['quality_score']:.2f}")
|
|
else:
|
|
print(f" ⚠️ Failed")
|
|
|
|
if not face_embeddings:
|
|
print("❌ No valid face embeddings extracted")
|
|
sys.exit(1)
|
|
|
|
print(f"\n✅ Extracted {len(face_embeddings)} embeddings")
|
|
|
|
centroid = calculate_centroid([emb["embedding"] for emb in face_embeddings])
|
|
|
|
details = get_person_details(tmdb_id)
|
|
tmdb_profile = f"{TMDB_IMG_BASE_URL}{details.get('profile_path')}" if details.get("profile_path") else None
|
|
|
|
print(f"\n🔧 Registering Identity to database (schema: {args.schema})...")
|
|
uuid = register_identity_to_db(
|
|
name=name,
|
|
tmdb_id=tmdb_id,
|
|
tmdb_profile=tmdb_profile,
|
|
face_embeddings=face_embeddings,
|
|
centroid=centroid,
|
|
schema=args.schema,
|
|
)
|
|
|
|
if uuid:
|
|
print(f"\n🎉 Integration completed!")
|
|
print(f" Identity: {name}")
|
|
print(f" UUID: {uuid}")
|
|
print(f" TMDB ID: {tmdb_id}")
|
|
print(f" Embeddings: {len(face_embeddings)}")
|
|
print(f" Centroid dimension: {len(centroid)}")
|
|
else:
|
|
print("\n❌ Integration failed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |