- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
202 lines
5.6 KiB
Python
Executable File
202 lines
5.6 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Generate vectors for chunk summaries and store in Qdrant.
|
|
|
|
Process:
|
|
1. Fetch chunks with summary_text from PostgreSQL
|
|
2. Generate embeddings using nomic-embed-text
|
|
3. Store vectors in Qdrant collection: momentry_dev_chunk_summaries
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import ollama
|
|
import requests
|
|
import os
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
DB_CONFIG = {
|
|
"host": "localhost",
|
|
"user": "accusys",
|
|
"dbname": "momentry",
|
|
}
|
|
|
|
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
|
|
QDRANT_URL = "http://localhost:6333"
|
|
QDRANT_API_KEY = "Test3200Test3200Test3200"
|
|
QDRANT_COLLECTION = f"momentry_{SCHEMA}_chunk_summaries"
|
|
EMBED_MODEL = "nomic-embed-text"
|
|
BATCH_SIZE = 100
|
|
MAX_WORKERS = 4
|
|
|
|
|
|
def get_chunks_with_summaries(uuid=None, limit=None):
|
|
"""Get chunks that have summary_text"""
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
query = f"""
|
|
SELECT chunk_id, uuid, summary_text, chunk_type,
|
|
start_frame, end_frame, fps, parent_chunk_id
|
|
FROM {SCHEMA}.chunks
|
|
WHERE summary_text IS NOT NULL
|
|
"""
|
|
params = []
|
|
if uuid:
|
|
query += " AND uuid = %s"
|
|
params.append(uuid)
|
|
|
|
query += " ORDER BY chunk_id"
|
|
if limit:
|
|
query += " LIMIT %s"
|
|
params.append(limit)
|
|
|
|
cur.execute(query, params)
|
|
chunks = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return chunks
|
|
|
|
|
|
def get_embedding(text):
|
|
"""Generate embedding using Ollama"""
|
|
try:
|
|
emb_res = ollama.embed(model=EMBED_MODEL, input=text)
|
|
return emb_res["embeddings"][0]
|
|
except Exception as e:
|
|
print(f" ⚠️ Embedding error: {e}")
|
|
return None
|
|
|
|
|
|
def upsert_to_qdrant(chunk_id, vector, payload):
|
|
"""Upsert vector to Qdrant"""
|
|
try:
|
|
url = f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points"
|
|
headers = {
|
|
"api-key": QDRANT_API_KEY,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
# Convert chunk_id to numeric ID (hash-based)
|
|
# Qdrant requires integer or UUID format for point IDs
|
|
import hashlib
|
|
|
|
numeric_id = int(hashlib.md5(chunk_id.encode()).hexdigest()[:16], 16)
|
|
|
|
data = {
|
|
"points": [
|
|
{
|
|
"id": numeric_id,
|
|
"vector": vector,
|
|
"payload": payload,
|
|
}
|
|
]
|
|
}
|
|
|
|
resp = requests.put(url, headers=headers, json=data, timeout=30)
|
|
if resp.status_code == 200:
|
|
return True
|
|
else:
|
|
print(f" ⚠️ Qdrant error: {resp.status_code} - {resp.text[:100]}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" ⚠️ Qdrant upsert error: {e}")
|
|
return False
|
|
|
|
|
|
def process_chunk(chunk):
|
|
"""Process single chunk: embed + upsert"""
|
|
chunk_id = chunk["chunk_id"]
|
|
summary_text = chunk["summary_text"]
|
|
|
|
if not summary_text:
|
|
return None
|
|
|
|
# Generate embedding
|
|
vector = get_embedding(summary_text)
|
|
if not vector:
|
|
return None
|
|
|
|
# Build payload
|
|
payload = {
|
|
"chunk_id": chunk_id,
|
|
"uuid": chunk["uuid"],
|
|
"chunk_type": chunk["chunk_type"],
|
|
"parent_chunk_id": chunk.get("parent_chunk_id"),
|
|
"summary_text": summary_text[:500], # Truncate for payload
|
|
}
|
|
|
|
# Upsert to Qdrant
|
|
success = upsert_to_qdrant(chunk_id, vector, payload)
|
|
|
|
if success:
|
|
return {"chunk_id": chunk_id, "success": True}
|
|
return None
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Vectorize chunk summaries")
|
|
parser.add_argument("--uuid", help="Process specific video UUID")
|
|
parser.add_argument("--limit", type=int, help="Limit number of chunks")
|
|
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE)
|
|
args = parser.parse_args()
|
|
|
|
print(
|
|
f"🚀 Vectorizing chunk summaries (schema={SCHEMA}, collection={QDRANT_COLLECTION})"
|
|
)
|
|
|
|
# Fetch chunks
|
|
print("📂 Fetching chunks with summaries...")
|
|
chunks = get_chunks_with_summaries(uuid=args.uuid, limit=args.limit)
|
|
print(f" Found {len(chunks)} chunks with summaries")
|
|
|
|
if not chunks:
|
|
print("❌ No chunks to process")
|
|
return
|
|
|
|
# Process in parallel
|
|
print(f"🧠 Generating embeddings ({MAX_WORKERS} workers)...")
|
|
start_time = time.time()
|
|
success_count = 0
|
|
failed_count = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
futures = {executor.submit(process_chunk, c): c["chunk_id"] for c in chunks}
|
|
|
|
for future in as_completed(futures):
|
|
chunk_id = futures[future]
|
|
result = future.result()
|
|
|
|
if result:
|
|
success_count += 1
|
|
elapsed = time.time() - start_time
|
|
rate = success_count / elapsed if elapsed > 0 else 0
|
|
print(
|
|
f" ✓ {chunk_id} ({success_count}/{len(chunks)}, {rate:.1f} chunks/s)"
|
|
)
|
|
else:
|
|
failed_count += 1
|
|
|
|
# Summary
|
|
elapsed = time.time() - start_time
|
|
print(f"\n{'=' * 50}")
|
|
print(f"✅ Done! Success: {success_count}, Failed: {failed_count}")
|
|
print(f" Time: {elapsed:.1f}s, Rate: {success_count / elapsed:.1f} chunks/s")
|
|
|
|
# Verify collection
|
|
resp = requests.get(
|
|
f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}",
|
|
headers={"api-key": QDRANT_API_KEY},
|
|
)
|
|
if resp.status_code == 200:
|
|
info = resp.json()["result"]
|
|
print(f" Collection vectors count: {info['points_count']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|