## v0.9.20260325_144654 ### Features - API Key Authentication System - Job Worker System - V2 Backup Versioning ### Bug Fixes - get_processor_results_by_job column mapping Co-authored-by: OpenCode
166 lines
4.6 KiB
Python
166 lines
4.6 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Object search using YOLO metadata
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import psycopg2
|
|
|
|
|
|
YOLO_FILE = "/Users/accusys/test_video/Old_Time_Movie_Show_-_Charade_1963.HD.yolo.json"
|
|
VIDEO_UUID = "39567a0eb16f39fd"
|
|
|
|
POSTGRES_CONFIG = {
|
|
"host": "localhost",
|
|
"port": 5432,
|
|
"user": "accusys",
|
|
"password": "Test3200",
|
|
"database": "momentry",
|
|
}
|
|
|
|
|
|
def load_yolo_data():
|
|
"""Load YOLO JSON data"""
|
|
print(f"Loading YOLO data from {YOLO_FILE}...")
|
|
with open(YOLO_FILE) as f:
|
|
data = json.load(f)
|
|
print(f"Loaded {len(data['frames'])} frames")
|
|
return data
|
|
|
|
|
|
def get_object_time_ranges(yolo_data, object_name, min_confidence=0.3):
|
|
"""Get time ranges where an object appears"""
|
|
time_ranges = []
|
|
|
|
for frame_num, frame_data in yolo_data["frames"].items():
|
|
for det in frame_data.get("detections", []):
|
|
if det["class_name"].lower() == object_name.lower():
|
|
if det["confidence"] >= min_confidence:
|
|
time_ranges.append(
|
|
{
|
|
"start": frame_data["time_seconds"],
|
|
"end": frame_data["time_seconds"]
|
|
+ 0.5, # Assume ~0.5s per frame
|
|
"confidence": det["confidence"],
|
|
}
|
|
)
|
|
break # One detection per frame is enough
|
|
|
|
return time_ranges
|
|
|
|
|
|
def search_chunks_by_object_postgres(object_name):
|
|
"""Search PostgreSQL chunks by object using JSON query"""
|
|
conn = psycopg2.connect(**POSTGRES_CONFIG)
|
|
cur = conn.cursor()
|
|
|
|
# Query chunks that have YOLO metadata containing the object
|
|
query = """
|
|
SELECT chunk_id, start_time, end_time, metadata
|
|
FROM chunks
|
|
WHERE uuid = %s
|
|
AND chunk_type = 'sentence'
|
|
AND metadata IS NOT NULL
|
|
AND metadata->'yolo' IS NOT NULL
|
|
"""
|
|
cur.execute(query, (VIDEO_UUID,))
|
|
rows = cur.fetchall()
|
|
|
|
matching_chunks = []
|
|
for chunk_id, start_time, end_time, metadata in rows:
|
|
yolo_data = metadata.get("yolo", {})
|
|
objects = yolo_data.get("objects", [])
|
|
if any(obj.lower() == object_name.lower() for obj in objects):
|
|
matching_chunks.append(
|
|
{
|
|
"chunk_id": chunk_id,
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
}
|
|
)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return matching_chunks
|
|
|
|
|
|
def test_object_search_by_time():
|
|
"""Test object search by matching timestamps"""
|
|
yolo_data = load_yolo_data()
|
|
|
|
test_objects = ["person", "car", "clock", "tie", "chair"]
|
|
results = {}
|
|
|
|
for obj in test_objects:
|
|
start = time.time()
|
|
|
|
# Get time ranges from YOLO
|
|
time_ranges = get_object_time_ranges(yolo_data, obj)
|
|
|
|
if not time_ranges:
|
|
results[obj] = {"ms": 0, "chunks": 0, "frames": 0}
|
|
continue
|
|
|
|
# Get chunks from PostgreSQL that overlap with these time ranges
|
|
conn = psycopg2.connect(**POSTGRES_CONFIG)
|
|
cur = conn.cursor()
|
|
|
|
# Find chunks that have any overlap with YOLO detections
|
|
query = """
|
|
SELECT COUNT(DISTINCT c.chunk_id)
|
|
FROM chunks c
|
|
WHERE c.uuid = %s
|
|
AND c.chunk_type = 'sentence'
|
|
AND c.start_time <= %s
|
|
AND c.end_time >= %s
|
|
"""
|
|
|
|
total_matches = 0
|
|
# Sample time ranges to avoid too many queries
|
|
import random
|
|
|
|
sample_ranges = random.sample(time_ranges, min(100, len(time_ranges)))
|
|
|
|
for tr in sample_ranges:
|
|
cur.execute(query, (VIDEO_UUID, tr["end"], tr["start"]))
|
|
total_matches += cur.fetchone()[0] or 0
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
elapsed = (time.time() - start) * 1000
|
|
results[obj] = {
|
|
"ms": round(elapsed, 2),
|
|
"chunks": total_matches,
|
|
"frames": len(time_ranges),
|
|
}
|
|
print(
|
|
f"Object '{obj}': {elapsed:.2f}ms, {len(time_ranges)} frames, {total_matches} chunks"
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("Object Search Test (Priority c)")
|
|
print("=" * 60)
|
|
|
|
results = test_object_search_by_time()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Summary")
|
|
print("=" * 60)
|
|
print(f"\n{'Object':<20} | {'Time (ms)':<12} | {'Frames':<10} | {'Chunks'}")
|
|
print("-" * 60)
|
|
for obj, data in results.items():
|
|
print(
|
|
f"{obj:<20} | {data['ms']:<12.1f} | {data['frames']:<10} | {data['chunks']}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|