- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
456 lines
14 KiB
Python
Executable File
456 lines
14 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Generate individual chunk summaries combining:
|
|
- chunk.text_content (specific content)
|
|
- parent.structured_summary (5W1H context)
|
|
|
|
Each chunk gets a tailored summary that contextualizes its specific content
|
|
within the broader parent chunk narrative.
|
|
"""
|
|
|
|
import json
|
|
import requests
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import time
|
|
import os
|
|
|
|
DB_CONFIG = {
|
|
"host": "localhost",
|
|
"user": "accusys",
|
|
"dbname": "momentry",
|
|
}
|
|
|
|
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
|
|
LLAMA_URL = "http://127.0.0.1:8081/v1/chat/completions"
|
|
BATCH_SIZE = 50
|
|
DELAY_BETWEEN_BATCHES = 1
|
|
|
|
|
|
def get_chunks_with_parents(uuid=None, limit=None):
|
|
"""Get chunks with their parent 5W1H metadata and identity info"""
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
where_clause = "WHERE c.summary_text IS NULL AND c.text_content IS NOT NULL AND c.parent_chunk_id IS NOT NULL"
|
|
if uuid:
|
|
where_clause += f" AND c.uuid = '{uuid}'"
|
|
|
|
query = f"""
|
|
SELECT c.chunk_id, c.uuid, c.text_content, c.chunk_type,
|
|
c.parent_chunk_id,
|
|
c.speaker_ids,
|
|
c.face_ids,
|
|
c.visual_stats,
|
|
pc.metadata->'structured_summary' as structured_summary,
|
|
pc.summary_text as parent_summary,
|
|
c.start_time,
|
|
c.end_time
|
|
FROM {SCHEMA}.chunks c
|
|
LEFT JOIN {SCHEMA}.parent_chunks pc
|
|
ON c.parent_chunk_id = pc.id::varchar
|
|
{where_clause}
|
|
ORDER BY c.chunk_id
|
|
"""
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
|
|
cur.execute(query)
|
|
chunks = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return chunks
|
|
|
|
|
|
def get_person_identities(uuid, start_time, end_time):
|
|
"""取得 chunk 時間範圍內的人物識別"""
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
cur.execute(
|
|
f"""
|
|
SELECT person_id, name, speaker_id
|
|
FROM {SCHEMA}.person_identities
|
|
WHERE video_uuid = %s
|
|
AND speaker_id IS NOT NULL
|
|
AND last_appearance_time >= %s
|
|
AND first_appearance_time <= %s
|
|
""",
|
|
(uuid, start_time, end_time),
|
|
)
|
|
|
|
persons = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return persons
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
|
|
cur.execute(query)
|
|
chunks = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return chunks
|
|
|
|
|
|
def call_llm(prompt, max_tokens=500):
|
|
"""Call Gemma4 via llama-server"""
|
|
payload = {
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": max_tokens,
|
|
"temperature": 0.3,
|
|
"min_p": 0.1,
|
|
}
|
|
try:
|
|
resp = requests.post(LLAMA_URL, json=payload, timeout=60)
|
|
if resp.status_code == 200:
|
|
result = resp.json()
|
|
choice = result.get("choices", [{}])[0]
|
|
message = choice.get("message", {})
|
|
|
|
# Gemma4 returns content directly (final answer)
|
|
content = message.get("content", "").strip()
|
|
|
|
# If content exists and is not empty, use it
|
|
if content:
|
|
return content
|
|
|
|
# If content is empty, try to extract from reasoning
|
|
reasoning = message.get("reasoning_content", "")
|
|
if reasoning:
|
|
# Look for final answer markers in reasoning
|
|
markers = ["Final:", "**Final**:", "Final answer:", "**Final answer**:"]
|
|
for marker in markers:
|
|
if marker in reasoning:
|
|
answer = reasoning.split(marker)[-1].strip()
|
|
# Clean up the answer
|
|
answer = answer.split("\n")[0].strip()
|
|
if answer and not answer.startswith("Thinking"):
|
|
return answer
|
|
|
|
# Last resort: return the whole reasoning (will contain thinking process)
|
|
return reasoning.strip()
|
|
except Exception as e:
|
|
print(f" ⚠️ LLM error: {e}")
|
|
return ""
|
|
|
|
|
|
def generate_chunk_summary(chunk):
|
|
"""Generate summary for a single chunk with 5W1H"""
|
|
text_content = chunk.get("text_content", "")
|
|
parent_5w1h = chunk.get("structured_summary") or {}
|
|
parent_summary = chunk.get("parent_summary", "")
|
|
speaker_ids = chunk.get("speaker_ids", [])
|
|
face_ids = chunk.get("face_ids", [])
|
|
visual_stats = chunk.get("visual_stats", {})
|
|
uuid = chunk.get("uuid", "")
|
|
start_time = chunk.get("start_time", 0)
|
|
end_time = chunk.get("end_time", 0)
|
|
|
|
if not text_content:
|
|
return ""
|
|
|
|
speaker_list = ", ".join(speaker_ids) if speaker_ids else "None"
|
|
face_list = ", ".join([f"face_{x}" for x in face_ids]) if face_ids else "None"
|
|
visual_objects = (
|
|
visual_stats.get("objects", []) if isinstance(visual_stats, dict) else []
|
|
)
|
|
visual_places = (
|
|
visual_stats.get("places", []) if isinstance(visual_stats, dict) else []
|
|
)
|
|
visual_actions = (
|
|
visual_stats.get("actions", []) if isinstance(visual_stats, dict) else []
|
|
)
|
|
visual_list = ", ".join(visual_objects[:5]) if visual_objects else "None"
|
|
places_list = ", ".join(visual_places[:3]) if visual_places else "None"
|
|
actions_list = ", ".join(visual_actions[:3]) if visual_actions else "None"
|
|
|
|
identified_persons = []
|
|
if uuid and start_time and end_time:
|
|
try:
|
|
identified_persons = get_person_identities(uuid, start_time, end_time)
|
|
except Exception as e:
|
|
print(f" ⚠️ Person lookup error: {e}")
|
|
|
|
person_list = (
|
|
", ".join(
|
|
[
|
|
f"{p['name'] or p['person_id']}({p['speaker_id']})"
|
|
for p in identified_persons
|
|
]
|
|
)
|
|
if identified_persons
|
|
else "None"
|
|
)
|
|
|
|
prompt = f"""You are analyzing a video chunk. Provide accurate, detailed 5W1H analysis.
|
|
|
|
CHUNK INFO:
|
|
- Chunk ID: {chunk.get("chunk_id")}
|
|
- Time range: {start_time:.2f}s - {end_time:.2f}s
|
|
|
|
BROADER SCENE CONTEXT (parent chunk, high confidence):
|
|
- Scene Who: {parent_5w1h.get("who", "N/A")}
|
|
- Scene What: {parent_5w1h.get("what", "N/A")}
|
|
- Scene When: {parent_5w1h.get("when", "N/A")}
|
|
- Scene Where: {parent_5w1h.get("where", "N/A")}
|
|
- Scene Why: {parent_5w1h.get("why", "N/A")}
|
|
- Scene How: {parent_5w1h.get("how", "N/A")}
|
|
- Tone: {parent_5w1h.get("tone", [])}
|
|
- Characters: {parent_5w1h.get("characters", [])}
|
|
- Key Events: {parent_5w1h.get("key_events", [])}
|
|
|
|
Parent summary: {parent_summary[:150] if parent_summary else "N/A"}...
|
|
|
|
CHUNK IDENTITY (from ASRX + Face + Person Recognition):
|
|
- Speakers (ASRX): {speaker_list}
|
|
- Faces (Face): {face_list}
|
|
- Identified Persons (verified): {person_list}
|
|
|
|
VISUAL CONTEXT (YOLO + Places365):
|
|
- Objects: {visual_list}
|
|
- Places: {places_list}
|
|
- Actions: {actions_list}
|
|
|
|
THIS CHUNK'S CONTENT:
|
|
"{text_content}"
|
|
|
|
Based on ALL the above information, provide accurate analysis:
|
|
|
|
1. **Who** (use verified names if available, e.g., "John (SPEAKER_1)"):
|
|
- List characters with confidence level
|
|
|
|
2. **What** (key action in this specific moment)
|
|
|
|
3. **When** (temporal position: beginning/middle/end of scene)
|
|
|
|
4. **Where** (location from video or None)
|
|
|
|
5. **Why** (purpose of this specific action)
|
|
|
|
6. **How** (manner: tone, emotion, expression)
|
|
|
|
7. **Emotion/Tone** (specific emotions detected)
|
|
|
|
8. **Key Actions** (verbs describing what's happening)
|
|
|
|
Output format:
|
|
Who: [names with source]
|
|
What: [action]
|
|
When: [position]
|
|
Where: [location or None]
|
|
Why: [purpose]
|
|
How: [manner]
|
|
Emotion: [emotion]
|
|
Actions: [verb1, verb2]
|
|
---
|
|
Summary: [2-3 sentence detailed summary connecting to scene]"""
|
|
|
|
result = call_llm(prompt)
|
|
return result
|
|
|
|
|
|
def parse_5w1h_summary(result_text):
|
|
"""Parse 5W1H and summary from LLM response"""
|
|
import re
|
|
|
|
data = {
|
|
"who": "",
|
|
"what": "",
|
|
"when": "",
|
|
"where": "",
|
|
"why": "",
|
|
"how": "",
|
|
"emotion": "",
|
|
"actions": "",
|
|
"summary": "",
|
|
}
|
|
|
|
try:
|
|
parts = result_text.split("---")
|
|
if len(parts) >= 2:
|
|
five_w_one_h = parts[0].strip()
|
|
data["summary"] = parts[1].strip().replace("Summary:", "").strip()
|
|
|
|
for line in five_w_one_h.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("Who:"):
|
|
data["who"] = line.replace("Who:", "").strip()
|
|
elif line.startswith("What:"):
|
|
data["what"] = line.replace("What:", "").strip()
|
|
elif line.startswith("When:"):
|
|
data["when"] = line.replace("When:", "").strip()
|
|
elif line.startswith("Where:"):
|
|
data["where"] = line.replace("Where:", "").strip()
|
|
elif line.startswith("Why:"):
|
|
data["why"] = line.replace("Why:", "").strip()
|
|
elif line.startswith("How:"):
|
|
data["how"] = line.replace("How:", "").strip()
|
|
elif line.startswith("Emotion:"):
|
|
data["emotion"] = line.replace("Emotion:", "").strip()
|
|
elif line.startswith("Actions:"):
|
|
data["actions"] = line.replace("Actions:", "").strip()
|
|
data["what"] = line.replace("What:", "").strip()
|
|
elif line.startswith("When:"):
|
|
data["when"] = line.replace("When:", "").strip()
|
|
elif line.startswith("Where:"):
|
|
data["where"] = line.replace("Where:", "").strip()
|
|
elif line.startswith("Why:"):
|
|
data["why"] = line.replace("Why:", "").strip()
|
|
elif line.startswith("How:"):
|
|
data["how"] = line.replace("How:", "").strip()
|
|
except Exception as e:
|
|
print(f" ⚠️ Parse error: {e}")
|
|
|
|
return data
|
|
|
|
|
|
def update_chunk_summary(
|
|
chunk_id,
|
|
summary_text,
|
|
chunk_5w1h=None,
|
|
identity_info=None,
|
|
visual_stats=None,
|
|
uuid=None,
|
|
):
|
|
"""Update chunk summary, 5W1H, identity, and visual in database"""
|
|
import json
|
|
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cur = conn.cursor()
|
|
|
|
metadata_obj = {}
|
|
if chunk_5w1h:
|
|
metadata_obj["chunk_5w1h"] = chunk_5w1h
|
|
if identity_info:
|
|
metadata_obj["chunk_identity"] = identity_info
|
|
if visual_stats:
|
|
try:
|
|
metadata_obj["chunk_visual"] = (
|
|
visual_stats
|
|
if isinstance(visual_stats, dict)
|
|
else json.loads(str(visual_stats))
|
|
)
|
|
except:
|
|
metadata_obj["chunk_visual"] = {}
|
|
|
|
if metadata_obj:
|
|
metadata = json.dumps(metadata_obj)
|
|
cur.execute(
|
|
f"""
|
|
UPDATE {SCHEMA}.chunks
|
|
SET summary_text = %s,
|
|
metadata = COALESCE(metadata, '{{}}'::jsonb) || %s::jsonb,
|
|
metadata_version = metadata_version + 1,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE chunk_id = %s
|
|
""",
|
|
(summary_text, metadata, chunk_id),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
f"""
|
|
UPDATE {SCHEMA}.chunks
|
|
SET summary_text = %s,
|
|
content_version = content_version + 1,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE chunk_id = %s
|
|
""",
|
|
(summary_text, chunk_id),
|
|
)
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Generate chunk summaries")
|
|
parser.add_argument("--uuid", help="Process specific video UUID")
|
|
parser.add_argument("--limit", type=int, help="Limit number of chunks")
|
|
parser.add_argument("--dry-run", action="store_true", help="Print without saving")
|
|
args = parser.parse_args()
|
|
|
|
print(f"Fetching chunks (schema={SCHEMA})...")
|
|
chunks = get_chunks_with_parents(uuid=args.uuid, limit=args.limit)
|
|
print(f"Found {len(chunks)} chunks to process")
|
|
|
|
if not chunks:
|
|
print("No chunks need summary generation")
|
|
return
|
|
|
|
success = 0
|
|
failed = 0
|
|
|
|
for i, chunk in enumerate(chunks, 1):
|
|
chunk_id = chunk["chunk_id"]
|
|
print(f"\n[{i}/{len(chunks)}] {chunk_id}")
|
|
|
|
if not chunk.get("text_content"):
|
|
print(" ⚠️ No text_content, skipping")
|
|
continue
|
|
|
|
if not chunk.get("structured_summary"):
|
|
print(" ⚠️ No parent 5W1H, skipping")
|
|
continue
|
|
|
|
print(f" Text: {chunk['text_content'][:50]}...")
|
|
result = generate_chunk_summary(chunk)
|
|
|
|
if result:
|
|
parsed = parse_5w1h_summary(result)
|
|
summary_text = parsed.get("summary", result)
|
|
chunk_5w1h = {k: v for k, v in parsed.items() if k != "summary" and v}
|
|
|
|
speaker_ids = chunk.get("speaker_ids", [])
|
|
face_ids = chunk.get("face_ids", [])
|
|
visual_stats = chunk.get("visual_stats", {})
|
|
|
|
identity_info = {
|
|
"speakers": speaker_ids,
|
|
"faces": [f"face_{x}" for x in face_ids] if face_ids else [],
|
|
}
|
|
|
|
print(f" ✓ Summary: {summary_text[:80]}...")
|
|
if chunk_5w1h:
|
|
print(
|
|
f" ✓ Chunk 5W1H: Who={chunk_5w1h.get('who', 'N/A')[:30]}, What={chunk_5w1h.get('what', 'N/A')[:30]}"
|
|
)
|
|
if identity_info["speakers"] or identity_info["faces"]:
|
|
print(
|
|
f" ✓ Identity: speakers={identity_info['speakers']}, faces={identity_info['faces']}"
|
|
)
|
|
if visual_stats:
|
|
print(
|
|
f" ✓ Visual: {list(visual_stats.keys()) if isinstance(visual_stats, dict) else 'present'}"
|
|
)
|
|
|
|
if not args.dry_run:
|
|
update_chunk_summary(
|
|
chunk_id,
|
|
summary_text,
|
|
chunk_5w1h,
|
|
identity_info,
|
|
visual_stats,
|
|
args.uuid,
|
|
)
|
|
success += 1
|
|
else:
|
|
print(" ✗ Failed to generate summary")
|
|
failed += 1
|
|
|
|
if i % BATCH_SIZE == 0:
|
|
print(f"\n Batch complete ({success} success, {failed} failed)")
|
|
time.sleep(DELAY_BETWEEN_BATCHES)
|
|
|
|
print(f"\n{'=' * 50}")
|
|
print(f"Done! Success: {success}, Failed: {failed}")
|
|
if args.dry_run:
|
|
print("(Dry run - no updates saved)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|