Files
momentry_core/scripts/update_all_demographics.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

133 lines
3.6 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Comprehensive Age & Gender Updater.
Scans all persons in DB, finds a representative frame, and updates demographics using InsightFace.
"""
import os
import cv2
import psycopg2
import insightface
import numpy as np
# Configuration
DB_CONFIG = {"host": "localhost", "user": "accusys", "dbname": "momentry"}
BASE_VIDEO_DIR = "output"
def get_face_app():
print("Loading InsightFace model (buffalo_l)...")
app = insightface.app.FaceAnalysis(
name="buffalo_l", providers=["CPUExecutionProvider"]
)
app.prepare(ctx_id=0, det_size=(640, 640))
return app
def get_video_path(video_uuid):
"""Locate video file."""
path = f"{BASE_VIDEO_DIR}/{video_uuid}/{video_uuid}.mp4"
if os.path.exists(path):
return path
return None
def update_db(conn, person_id, age, gender):
"""Update demographics in DB."""
cur = conn.cursor()
cur.execute(
"""
UPDATE person_identities
SET age = %s, gender = %s
WHERE person_id = %s
""",
(age, gender, person_id),
)
conn.commit()
def process_person(app, conn, person_id, video_uuid, timestamp):
"""Extract frame and analyze face."""
video_path = get_video_path(video_uuid)
if not video_path:
return
# OpenCV seek (approximate)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return
# Try seeking by msec
cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000)
ret, frame = cap.read()
# If seeking failed or frame is bad, try frame number estimation (assuming 30fps as fallback, though inaccurate)
# But for this script, we just try a few times around the timestamp
attempts = 0
while not ret and attempts < 3:
ret, frame = cap.read()
attempts += 1
cap.release()
if not ret or frame is None:
print(f" - Failed to get frame for {person_id}")
return
# Analyze
faces = app.get(frame)
if faces:
# Take the first (usually largest/clearest) face
face = faces[0]
age = int(face.age) if hasattr(face, "age") else None
gender_val = face.gender if hasattr(face, "gender") else None
# gender is often 0 or 1 in insightface, map it
gender = "female" if gender_val == 0 else ("male" if gender_val == 1 else None)
if age and gender:
print(f" -> Detected: Age {age}, Gender {gender}")
update_db(conn, person_id, age, gender)
else:
print(f" -> Face found but attributes missing.")
else:
print(f" -> No face detected in frame.")
def main():
print("=== Starting Full Demographics Scan ===")
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
# Get all persons missing age/gender
# We group by person_id and video_uuid to handle multiple videos if necessary
cur.execute("""
SELECT person_id, video_uuid, MIN(first_appearance_time) as min_time
FROM person_identities
WHERE age IS NULL OR gender IS NULL
GROUP BY person_id, video_uuid
""")
rows = cur.fetchall()
if not rows:
print("All persons already have demographics data!")
return
print(f"Found {len(rows)} persons to process.")
app = get_face_app()
for i, (person_id, video_uuid, min_time) in enumerate(rows):
print(
f"[{i + 1}/{len(rows)}] Processing: {person_id} (Video: {video_uuid}, Time: {min_time:.1f}s)"
)
process_person(app, conn, person_id, video_uuid, min_time)
print("=== Done ===")
conn.close()
if __name__ == "__main__":
main()