#!/opt/homebrew/bin/python3.11 """ Generate vectors for chunk summaries and store in Qdrant. Process: 1. Fetch chunks with summary_text from PostgreSQL 2. Generate embeddings using nomic-embed-text 3. Store vectors in Qdrant collection: momentry_dev_chunk_summaries """ import time import psycopg2 import psycopg2.extras import ollama import requests import os from concurrent.futures import ThreadPoolExecutor, as_completed DB_CONFIG = { "host": "localhost", "user": "accusys", "dbname": "momentry", } SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev") QDRANT_URL = "http://localhost:6333" QDRANT_API_KEY = "Test3200Test3200Test3200" QDRANT_COLLECTION = f"momentry_{SCHEMA}_chunk_summaries" EMBED_MODEL = "nomic-embed-text" BATCH_SIZE = 100 MAX_WORKERS = 4 def get_chunks_with_summaries(uuid=None, limit=None): """Get chunks that have summary_text""" conn = psycopg2.connect(**DB_CONFIG) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) query = f""" SELECT chunk_id, uuid, summary_text, chunk_type, start_frame, end_frame, fps, parent_chunk_id FROM {SCHEMA}.chunks WHERE summary_text IS NOT NULL """ params = [] if uuid: query += " AND uuid = %s" params.append(uuid) query += " ORDER BY chunk_id" if limit: query += " LIMIT %s" params.append(limit) cur.execute(query, params) chunks = cur.fetchall() cur.close() conn.close() return chunks def get_embedding(text): """Generate embedding using Ollama""" try: emb_res = ollama.embed(model=EMBED_MODEL, input=text) return emb_res["embeddings"][0] except Exception as e: print(f" ⚠️ Embedding error: {e}") return None def upsert_to_qdrant(chunk_id, vector, payload): """Upsert vector to Qdrant""" try: url = f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points" headers = { "api-key": QDRANT_API_KEY, "Content-Type": "application/json", } # Convert chunk_id to numeric ID (hash-based) # Qdrant requires integer or UUID format for point IDs import hashlib numeric_id = int(hashlib.md5(chunk_id.encode()).hexdigest()[:16], 16) data = { "points": [ { "id": numeric_id, "vector": vector, "payload": payload, } ] } resp = requests.put(url, headers=headers, json=data, timeout=30) if resp.status_code == 200: return True else: print(f" ⚠️ Qdrant error: {resp.status_code} - {resp.text[:100]}") return False except Exception as e: print(f" ⚠️ Qdrant upsert error: {e}") return False def process_chunk(chunk): """Process single chunk: embed + upsert""" chunk_id = chunk["chunk_id"] summary_text = chunk["summary_text"] if not summary_text: return None # Generate embedding vector = get_embedding(summary_text) if not vector: return None # Build payload payload = { "chunk_id": chunk_id, "uuid": chunk["uuid"], "chunk_type": chunk["chunk_type"], "parent_chunk_id": chunk.get("parent_chunk_id"), "summary_text": summary_text[:500], # Truncate for payload } # Upsert to Qdrant success = upsert_to_qdrant(chunk_id, vector, payload) if success: return {"chunk_id": chunk_id, "success": True} return None def main(): import argparse parser = argparse.ArgumentParser(description="Vectorize chunk summaries") parser.add_argument("--uuid", help="Process specific video UUID") parser.add_argument("--limit", type=int, help="Limit number of chunks") parser.add_argument("--batch-size", type=int, default=BATCH_SIZE) args = parser.parse_args() print( f"🚀 Vectorizing chunk summaries (schema={SCHEMA}, collection={QDRANT_COLLECTION})" ) # Fetch chunks print("📂 Fetching chunks with summaries...") chunks = get_chunks_with_summaries(uuid=args.uuid, limit=args.limit) print(f" Found {len(chunks)} chunks with summaries") if not chunks: print("❌ No chunks to process") return # Process in parallel print(f"🧠 Generating embeddings ({MAX_WORKERS} workers)...") start_time = time.time() success_count = 0 failed_count = 0 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(process_chunk, c): c["chunk_id"] for c in chunks} for future in as_completed(futures): chunk_id = futures[future] result = future.result() if result: success_count += 1 elapsed = time.time() - start_time rate = success_count / elapsed if elapsed > 0 else 0 print( f" ✓ {chunk_id} ({success_count}/{len(chunks)}, {rate:.1f} chunks/s)" ) else: failed_count += 1 # Summary elapsed = time.time() - start_time print(f"\n{'=' * 50}") print(f"✅ Done! Success: {success_count}, Failed: {failed_count}") print(f" Time: {elapsed:.1f}s, Rate: {success_count / elapsed:.1f} chunks/s") # Verify collection resp = requests.get( f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}", headers={"api-key": QDRANT_API_KEY}, ) if resp.status_code == 200: info = resp.json()["result"] print(f" Collection vectors count: {info['points_count']}") if __name__ == "__main__": main()