Files
momentry_core/scripts/object_search.py
accusys 383201cacd feat: Initial v0.9 release with API Key authentication
## v0.9.20260325_144654

### Features
- API Key Authentication System
- Job Worker System
- V2 Backup Versioning

### Bug Fixes
- get_processor_results_by_job column mapping

Co-authored-by: OpenCode
2026-03-25 14:53:41 +08:00

166 lines
4.6 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Object search using YOLO metadata
"""
import json
import time
import psycopg2
YOLO_FILE = "/Users/accusys/test_video/Old_Time_Movie_Show_-_Charade_1963.HD.yolo.json"
VIDEO_UUID = "39567a0eb16f39fd"
POSTGRES_CONFIG = {
"host": "localhost",
"port": 5432,
"user": "accusys",
"password": "Test3200",
"database": "momentry",
}
def load_yolo_data():
"""Load YOLO JSON data"""
print(f"Loading YOLO data from {YOLO_FILE}...")
with open(YOLO_FILE) as f:
data = json.load(f)
print(f"Loaded {len(data['frames'])} frames")
return data
def get_object_time_ranges(yolo_data, object_name, min_confidence=0.3):
"""Get time ranges where an object appears"""
time_ranges = []
for frame_num, frame_data in yolo_data["frames"].items():
for det in frame_data.get("detections", []):
if det["class_name"].lower() == object_name.lower():
if det["confidence"] >= min_confidence:
time_ranges.append(
{
"start": frame_data["time_seconds"],
"end": frame_data["time_seconds"]
+ 0.5, # Assume ~0.5s per frame
"confidence": det["confidence"],
}
)
break # One detection per frame is enough
return time_ranges
def search_chunks_by_object_postgres(object_name):
"""Search PostgreSQL chunks by object using JSON query"""
conn = psycopg2.connect(**POSTGRES_CONFIG)
cur = conn.cursor()
# Query chunks that have YOLO metadata containing the object
query = """
SELECT chunk_id, start_time, end_time, metadata
FROM chunks
WHERE uuid = %s
AND chunk_type = 'sentence'
AND metadata IS NOT NULL
AND metadata->'yolo' IS NOT NULL
"""
cur.execute(query, (VIDEO_UUID,))
rows = cur.fetchall()
matching_chunks = []
for chunk_id, start_time, end_time, metadata in rows:
yolo_data = metadata.get("yolo", {})
objects = yolo_data.get("objects", [])
if any(obj.lower() == object_name.lower() for obj in objects):
matching_chunks.append(
{
"chunk_id": chunk_id,
"start_time": start_time,
"end_time": end_time,
}
)
cur.close()
conn.close()
return matching_chunks
def test_object_search_by_time():
"""Test object search by matching timestamps"""
yolo_data = load_yolo_data()
test_objects = ["person", "car", "clock", "tie", "chair"]
results = {}
for obj in test_objects:
start = time.time()
# Get time ranges from YOLO
time_ranges = get_object_time_ranges(yolo_data, obj)
if not time_ranges:
results[obj] = {"ms": 0, "chunks": 0, "frames": 0}
continue
# Get chunks from PostgreSQL that overlap with these time ranges
conn = psycopg2.connect(**POSTGRES_CONFIG)
cur = conn.cursor()
# Find chunks that have any overlap with YOLO detections
query = """
SELECT COUNT(DISTINCT c.chunk_id)
FROM chunks c
WHERE c.uuid = %s
AND c.chunk_type = 'sentence'
AND c.start_time <= %s
AND c.end_time >= %s
"""
total_matches = 0
# Sample time ranges to avoid too many queries
import random
sample_ranges = random.sample(time_ranges, min(100, len(time_ranges)))
for tr in sample_ranges:
cur.execute(query, (VIDEO_UUID, tr["end"], tr["start"]))
total_matches += cur.fetchone()[0] or 0
cur.close()
conn.close()
elapsed = (time.time() - start) * 1000
results[obj] = {
"ms": round(elapsed, 2),
"chunks": total_matches,
"frames": len(time_ranges),
}
print(
f"Object '{obj}': {elapsed:.2f}ms, {len(time_ranges)} frames, {total_matches} chunks"
)
return results
def main():
print("=" * 60)
print("Object Search Test (Priority c)")
print("=" * 60)
results = test_object_search_by_time()
print("\n" + "=" * 60)
print("Summary")
print("=" * 60)
print(f"\n{'Object':<20} | {'Time (ms)':<12} | {'Frames':<10} | {'Chunks'}")
print("-" * 60)
for obj, data in results.items():
print(
f"{obj:<20} | {data['ms']:<12.1f} | {data['frames']:<10} | {data['chunks']}"
)
if __name__ == "__main__":
main()