- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
184 lines
5.5 KiB
Python
184 lines
5.5 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Phase 3 POC: Parent Chunk Semantic Index Builder (Parallel)
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import re
|
|
import psycopg2
|
|
import ollama
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
# Configuration
|
|
UUID = "384b0ff44aaaa1f1"
|
|
ASR_PATH = f"output/{UUID}/{UUID}.asr.json"
|
|
DB_URL = "postgresql://accusys@localhost:5432/momentry"
|
|
MODEL = "gemma4:latest"
|
|
EMBED_MODEL = "nomic-embed-text"
|
|
CHUNK_WINDOW = 60 # 60 seconds per chunk
|
|
MAX_WORKERS = 4 # 4 Workers for M4 optimization
|
|
TARGET_TABLE = "parent_chunks_poc"
|
|
|
|
PROMPT_TEMPLATE = """
|
|
You are an expert film analyst. Analyze the dialogue below and output STRICT JSON only.
|
|
Do NOT output thinking process, markdown, or explanations.
|
|
|
|
JSON Structure:
|
|
{{
|
|
"narrative_summary": "One sentence plot summary.",
|
|
"entities": {{"who": [], "where": "", "objects": []}},
|
|
"emotional_arc": {{"start_mood": "", "end_mood": "", "tension": "low/medium/high"}},
|
|
"plot_sequence": {{"scene_type": "", "key_action": ""}}
|
|
}}
|
|
|
|
Dialogue:
|
|
{context}
|
|
"""
|
|
|
|
|
|
def load_asr_and_chunk():
|
|
"""Load ASR and group into Parent Chunks based on time window"""
|
|
print(f"📂 Loading ASR from {ASR_PATH}...")
|
|
with open(ASR_PATH, "r") as f:
|
|
data = json.load(f)
|
|
segments = data.get("segments", [])
|
|
|
|
chunks = []
|
|
current_chunk = {"segments": [], "start": 0, "end": 0, "text": ""}
|
|
|
|
# Initialize start time
|
|
if segments:
|
|
current_chunk["start"] = segments[0].get("start", 0)
|
|
current_chunk["end"] = current_chunk["start"]
|
|
|
|
for seg in segments:
|
|
t = seg.get("start", 0)
|
|
# If gap is too large or text is too long, split
|
|
if (t - current_chunk["end"] > CHUNK_WINDOW and current_chunk["segments"]) or (
|
|
len(current_chunk["text"]) > 3000
|
|
):
|
|
chunks.append(current_chunk)
|
|
current_chunk = {"segments": [], "start": t, "end": t, "text": ""}
|
|
|
|
current_chunk["segments"].append(seg)
|
|
current_chunk["end"] = seg.get("end", t)
|
|
current_chunk["text"] += " " + seg.get("text", "")
|
|
|
|
if current_chunk["segments"]:
|
|
chunks.append(current_chunk)
|
|
print(f"✅ Grouped into {len(chunks)} Parent Chunks.")
|
|
return chunks
|
|
|
|
|
|
def clean_json(raw_text):
|
|
"""Robust JSON extraction"""
|
|
# 1. Try markdown block
|
|
match = re.search(r"```json\s*(.*?)\s*```", raw_text, re.DOTALL)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
# 2. Try finding { ... } manually
|
|
start = raw_text.find("{")
|
|
end = raw_text.rfind("}")
|
|
if start != -1 and end != -1:
|
|
return raw_text[start : end + 1]
|
|
|
|
return None
|
|
|
|
|
|
def process_chunk(idx, chunk):
|
|
print(f"🔄 Processing Chunk {idx}...")
|
|
"""Process single chunk: LLM + Embedding"""
|
|
text = chunk["text"].strip()
|
|
if len(text) < 20:
|
|
return None
|
|
|
|
try:
|
|
# 1. LLM Summary
|
|
prompt = PROMPT_TEMPLATE.format(context=text)
|
|
try:
|
|
res = ollama.chat(model=MODEL, messages=[{"role": "user", "content": prompt}])
|
|
except Exception as e:
|
|
raise Exception(f"Ollama Chat Failed: {e}")
|
|
raw_json = clean_json(res["message"]["content"])
|
|
if not raw_json:
|
|
raise ValueError("No JSON found in response")
|
|
metadata = json.loads(raw_json)
|
|
|
|
# Check required key
|
|
if "narrative_summary" not in metadata:
|
|
raise ValueError(f"Missing key in JSON: {list(metadata.keys())}")
|
|
|
|
# 2. Embedding
|
|
emb_res = ollama.embed(model=EMBED_MODEL, input=metadata["narrative_summary"])
|
|
vector = emb_res["embeddings"][0]
|
|
|
|
return {
|
|
"scene_order": idx,
|
|
"start": chunk["start"],
|
|
"end": chunk["end"],
|
|
"summary": metadata["narrative_summary"],
|
|
"vector": vector,
|
|
"metadata": metadata,
|
|
}
|
|
except Exception as e:
|
|
print(f"⚠️ Chunk {idx} Failed: {e}")
|
|
# Print raw content for debugging
|
|
if "res" in locals():
|
|
print(f" RAW RESPONSE START: {res['message']['content'][:200]}")
|
|
return None
|
|
|
|
|
|
def build_index():
|
|
print(f"🚀 Starting Parallel Index Build for {UUID} ({MAX_WORKERS} workers)")
|
|
start_time = time.time()
|
|
|
|
chunks = load_asr_and_chunk()
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
results = []
|
|
|
|
# Parallel Execution
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
futures = {
|
|
executor.submit(process_chunk, i, c): i for i, c in enumerate(chunks)
|
|
}
|
|
for future in as_completed(futures):
|
|
idx = futures[future]
|
|
res = future.result()
|
|
if res:
|
|
results.append(res)
|
|
elapsed = (time.time() - start_time) / 60
|
|
print(
|
|
f"✅ Indexed Chunk {idx + 1}/{len(chunks)} (Time: {elapsed:.1f}m)"
|
|
)
|
|
|
|
# Batch Write to DB
|
|
print("💾 Writing to PostgreSQL...")
|
|
for r in results:
|
|
cur.execute(
|
|
f"""
|
|
INSERT INTO {TARGET_TABLE} (uuid, scene_order, start_time, end_time, summary_text, summary_vector, metadata)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(
|
|
UUID,
|
|
r["scene_order"],
|
|
r["start"],
|
|
r["end"],
|
|
r["summary"],
|
|
r["vector"],
|
|
json.dumps(r["metadata"]),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
total_time = (time.time() - start_time) / 60
|
|
print(f"🎉 SUCCESS! Indexed {len(results)} chunks in {total_time:.1f} mins.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
build_index()
|