## v0.9.20260325_144654 ### Features - API Key Authentication System - Job Worker System - V2 Backup Versioning ### Bug Fixes - get_processor_results_by_job column mapping Co-authored-by: OpenCode
123 lines
3.1 KiB
Python
123 lines
3.1 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Sync chunks from PostgreSQL to MongoDB
|
|
"""
|
|
|
|
import psycopg2
|
|
from pymongo import MongoClient
|
|
|
|
|
|
VIDEO_UUID = "39567a0eb16f39fd"
|
|
|
|
POSTGRES_CONFIG = {
|
|
"host": "localhost",
|
|
"port": 5432,
|
|
"user": "accusys",
|
|
"password": "Test3200",
|
|
"database": "momentry",
|
|
}
|
|
|
|
MONGO_URI = "mongodb://localhost:27017"
|
|
MONGO_DB = "momentry"
|
|
MONGO_COLLECTION = "chunks"
|
|
|
|
|
|
def sync_to_mongodb():
|
|
"""Sync chunks from PostgreSQL to MongoDB"""
|
|
# Connect to PostgreSQL
|
|
pg_conn = psycopg2.connect(**POSTGRES_CONFIG)
|
|
pg_cur = pg_conn.cursor()
|
|
|
|
# Get all chunks for the video
|
|
pg_cur.execute(
|
|
"""
|
|
SELECT uuid, chunk_id, chunk_index, chunk_type,
|
|
start_time, end_time, fps, start_frame, end_frame,
|
|
content, metadata, vector_id
|
|
FROM chunks
|
|
WHERE uuid = %s AND chunk_type = 'sentence'
|
|
ORDER BY chunk_index
|
|
""",
|
|
(VIDEO_UUID,),
|
|
)
|
|
|
|
rows = pg_cur.fetchall()
|
|
print(f"Found {len(rows)} chunks in PostgreSQL")
|
|
|
|
# Connect to MongoDB
|
|
mongo_client = MongoClient(MONGO_URI)
|
|
mongo_db = mongo_client[MONGO_DB]
|
|
mongo_collection = mongo_db[MONGO_COLLECTION]
|
|
|
|
# Prepare documents
|
|
documents = []
|
|
for row in rows:
|
|
doc = {
|
|
"uuid": row[0],
|
|
"chunk_id": row[1],
|
|
"chunk_index": row[2],
|
|
"chunk_type": row[3],
|
|
"start_time": row[4],
|
|
"end_time": row[5],
|
|
"fps": row[6],
|
|
"start_frame": row[7],
|
|
"end_frame": row[8],
|
|
"content": row[9],
|
|
"metadata": row[10],
|
|
"vector_id": row[11],
|
|
}
|
|
documents.append(doc)
|
|
|
|
# Insert into MongoDB (upsert)
|
|
if documents:
|
|
# Delete existing chunks for this video
|
|
mongo_collection.delete_many({"uuid": VIDEO_UUID, "chunk_type": "sentence"})
|
|
|
|
# Insert new chunks
|
|
result = mongo_collection.insert_many(documents)
|
|
print(f"Inserted {len(result.inserted_ids)} chunks into MongoDB")
|
|
|
|
# Create text index for search
|
|
mongo_collection.create_index([("content", "text"), ("chunk_type", 1)])
|
|
print("Created text index")
|
|
|
|
pg_cur.close()
|
|
pg_conn.close()
|
|
mongo_client.close()
|
|
|
|
print("Done!")
|
|
|
|
|
|
def test_mongodb_text_search():
|
|
"""Test MongoDB text search"""
|
|
from pymongo import MongoClient
|
|
import time
|
|
|
|
mongo_client = MongoClient(MONGO_URI)
|
|
mongo_db = mongo_client[MONGO_DB]
|
|
mongo_collection = mongo_db[MONGO_COLLECTION]
|
|
|
|
test_queries = ["Paris", "Audrey Hepburn", "Cary Grant"]
|
|
results = {}
|
|
|
|
for query in test_queries:
|
|
start = time.time()
|
|
cursor = mongo_collection.find(
|
|
{"uuid": VIDEO_UUID, "chunk_type": "sentence", "$text": {"$search": query}}
|
|
).limit(10)
|
|
|
|
rows = list(cursor)
|
|
elapsed = (time.time() - start) * 1000
|
|
|
|
results[query] = {"ms": round(elapsed, 2), "rows": len(rows)}
|
|
print(f"MongoDB text '{query}': {elapsed:.2f}ms, {len(rows)} rows")
|
|
|
|
mongo_client.close()
|
|
return results
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sync_to_mongodb()
|
|
print("\nTesting MongoDB text search:")
|
|
test_mongodb_text_search()
|