Files
momentry_core/scripts/generate_chunk_visual_stats.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

116 lines
3.5 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Generate pre-computed visual statistics for chunks.
Reads frame yolo_objects, counts them per chunk, and updates chunks.visual_stats.
"""
import json
import psycopg2
import psycopg2.extras
from collections import Counter
DB_CONFIG = {
"host": "localhost",
"user": "accusys",
"dbname": "momentry",
}
def get_chunks_to_process(conn, schema="public"):
"""Fetch all chunks that need visual_stats processing."""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
# We check both public and dev chunks
cur.execute(f"""
SELECT id, uuid, start_time, end_time
FROM {schema}.chunks
WHERE (visual_stats IS NULL OR visual_stats = '{{}}'::jsonb)
""")
return cur.fetchall()
def get_yolo_stats_for_range(conn, uuid, start_time, end_time, schema="public"):
"""Aggregate YOLO object counts for a specific time range."""
# We need to find file_id for the given uuid
with conn.cursor() as cur:
cur.execute(f"SELECT id FROM {schema}.videos WHERE uuid = %s", (uuid,))
row = cur.fetchone()
if not row:
return {}
file_id = row[0]
# Fetch yolo_objects from frames in range
cur.execute(
f"""
SELECT yolo_objects
FROM {schema}.frames
WHERE file_id = %s
AND timestamp >= %s
AND timestamp <= %s
AND yolo_objects IS NOT NULL
""",
(file_id, start_time, end_time),
)
objects = Counter()
for (yolo_data,) in cur.fetchall():
# yolo_data is a JSON list of objects: [{"class_name": "person", ...}, ...]
if isinstance(yolo_data, str):
try:
yolo_data = json.loads(yolo_data)
except:
continue
if isinstance(yolo_data, list):
for obj in yolo_data:
class_name = obj.get("class_name")
if class_name:
objects[class_name] += 1
return dict(objects)
def update_chunk_visual_stats(conn, chunk_id, stats, schema="public"):
"""Update the visual_stats column for a chunk."""
with conn.cursor() as cur:
cur.execute(
f"UPDATE {schema}.chunks SET visual_stats = %s::jsonb WHERE id = %s",
(json.dumps(stats), chunk_id),
)
def main():
print("🚀 Starting visual stats generation...")
conn = psycopg2.connect(**DB_CONFIG)
for schema in ["public", "dev"]:
print(f"📊 Processing schema: {schema}")
chunks = get_chunks_to_process(conn, schema)
print(f" Found {len(chunks)} chunks to process.")
processed_count = 0
for chunk in chunks:
chunk_id = chunk["id"]
uuid = chunk["uuid"]
start_time = chunk["start_time"]
end_time = chunk["end_time"]
stats = get_yolo_stats_for_range(conn, uuid, start_time, end_time, schema)
# Update DB even if empty to mark as processed (avoid re-scanning)
update_chunk_visual_stats(conn, chunk_id, stats, schema)
processed_count += 1
if processed_count % 100 == 0:
conn.commit()
print(f" ✅ Processed {processed_count}/{len(chunks)} chunks...")
conn.commit()
print(f"🎉 Done with {schema}! Processed {processed_count} chunks.")
conn.close()
if __name__ == "__main__":
main()