Files
momentry_core/scripts/vectorize_chunk_summaries.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

202 lines
5.6 KiB
Python
Executable File

#!/opt/homebrew/bin/python3.11
"""
Generate vectors for chunk summaries and store in Qdrant.
Process:
1. Fetch chunks with summary_text from PostgreSQL
2. Generate embeddings using nomic-embed-text
3. Store vectors in Qdrant collection: momentry_dev_chunk_summaries
"""
import json
import time
import psycopg2
import psycopg2.extras
import ollama
import requests
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
DB_CONFIG = {
"host": "localhost",
"user": "accusys",
"dbname": "momentry",
}
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
QDRANT_URL = "http://localhost:6333"
QDRANT_API_KEY = "Test3200Test3200Test3200"
QDRANT_COLLECTION = f"momentry_{SCHEMA}_chunk_summaries"
EMBED_MODEL = "nomic-embed-text"
BATCH_SIZE = 100
MAX_WORKERS = 4
def get_chunks_with_summaries(uuid=None, limit=None):
"""Get chunks that have summary_text"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
query = f"""
SELECT chunk_id, uuid, summary_text, chunk_type,
start_frame, end_frame, fps, parent_chunk_id
FROM {SCHEMA}.chunks
WHERE summary_text IS NOT NULL
"""
params = []
if uuid:
query += " AND uuid = %s"
params.append(uuid)
query += " ORDER BY chunk_id"
if limit:
query += " LIMIT %s"
params.append(limit)
cur.execute(query, params)
chunks = cur.fetchall()
cur.close()
conn.close()
return chunks
def get_embedding(text):
"""Generate embedding using Ollama"""
try:
emb_res = ollama.embed(model=EMBED_MODEL, input=text)
return emb_res["embeddings"][0]
except Exception as e:
print(f" ⚠️ Embedding error: {e}")
return None
def upsert_to_qdrant(chunk_id, vector, payload):
"""Upsert vector to Qdrant"""
try:
url = f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points"
headers = {
"api-key": QDRANT_API_KEY,
"Content-Type": "application/json",
}
# Convert chunk_id to numeric ID (hash-based)
# Qdrant requires integer or UUID format for point IDs
import hashlib
numeric_id = int(hashlib.md5(chunk_id.encode()).hexdigest()[:16], 16)
data = {
"points": [
{
"id": numeric_id,
"vector": vector,
"payload": payload,
}
]
}
resp = requests.put(url, headers=headers, json=data, timeout=30)
if resp.status_code == 200:
return True
else:
print(f" ⚠️ Qdrant error: {resp.status_code} - {resp.text[:100]}")
return False
except Exception as e:
print(f" ⚠️ Qdrant upsert error: {e}")
return False
def process_chunk(chunk):
"""Process single chunk: embed + upsert"""
chunk_id = chunk["chunk_id"]
summary_text = chunk["summary_text"]
if not summary_text:
return None
# Generate embedding
vector = get_embedding(summary_text)
if not vector:
return None
# Build payload
payload = {
"chunk_id": chunk_id,
"uuid": chunk["uuid"],
"chunk_type": chunk["chunk_type"],
"parent_chunk_id": chunk.get("parent_chunk_id"),
"summary_text": summary_text[:500], # Truncate for payload
}
# Upsert to Qdrant
success = upsert_to_qdrant(chunk_id, vector, payload)
if success:
return {"chunk_id": chunk_id, "success": True}
return None
def main():
import argparse
parser = argparse.ArgumentParser(description="Vectorize chunk summaries")
parser.add_argument("--uuid", help="Process specific video UUID")
parser.add_argument("--limit", type=int, help="Limit number of chunks")
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE)
args = parser.parse_args()
print(
f"🚀 Vectorizing chunk summaries (schema={SCHEMA}, collection={QDRANT_COLLECTION})"
)
# Fetch chunks
print("📂 Fetching chunks with summaries...")
chunks = get_chunks_with_summaries(uuid=args.uuid, limit=args.limit)
print(f" Found {len(chunks)} chunks with summaries")
if not chunks:
print("❌ No chunks to process")
return
# Process in parallel
print(f"🧠 Generating embeddings ({MAX_WORKERS} workers)...")
start_time = time.time()
success_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(process_chunk, c): c["chunk_id"] for c in chunks}
for future in as_completed(futures):
chunk_id = futures[future]
result = future.result()
if result:
success_count += 1
elapsed = time.time() - start_time
rate = success_count / elapsed if elapsed > 0 else 0
print(
f"{chunk_id} ({success_count}/{len(chunks)}, {rate:.1f} chunks/s)"
)
else:
failed_count += 1
# Summary
elapsed = time.time() - start_time
print(f"\n{'=' * 50}")
print(f"✅ Done! Success: {success_count}, Failed: {failed_count}")
print(f" Time: {elapsed:.1f}s, Rate: {success_count / elapsed:.1f} chunks/s")
# Verify collection
resp = requests.get(
f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}",
headers={"api-key": QDRANT_API_KEY},
)
if resp.status_code == 200:
info = resp.json()["result"]
print(f" Collection vectors count: {info['points_count']}")
if __name__ == "__main__":
main()