#!/opt/homebrew/bin/python3.11 """ Story Processor - Generate parent-child chunk hierarchy for RAG Uses video analysis (ASR, YOLO, OCR) to create parent chunks that summarize child chunks. Parent-Child Chunk Strategy: - Parent chunks: Summarize multiple scenes/segments with narrative description - Child chunks: Individual ASR segments, OCR texts, detected objects - When embedding: Parent description + Child content for better retrieval """ import sys import json import os import argparse from typing import Dict, List, Any sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def extract_video_metadata(video_path: str) -> Dict[str, Any]: """Extract basic video metadata using ffprobe""" import subprocess try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: return json.loads(result.stdout) except Exception: pass return {} def generate_parent_child_chunks( asr_data: Dict, cut_data: Dict, yolo_data: Dict, ocr_data: Dict, parent_chunk_size: int = 5, ) -> Dict[str, Any]: """ Generate parent-child chunk hierarchy. Parent chunks summarize multiple child chunks for better RAG retrieval. Child chunks are individual segments from ASR, scenes from CUT, etc. """ child_chunks = [] parent_chunks = [] # Get source data asr_segments = asr_data.get("segments", []) cut_scenes = cut_data.get("scenes", []) yolo_frames = yolo_data.get("frames", []) _ocr_frames = ocr_data.get("frames", []) # Create child chunks from ASR segments asr_child_ids = [] for i, seg in enumerate(asr_segments): child_chunk = { "chunk_id": f"asr_{i:04d}", "chunk_type": "sentence", "source": "asr", "start_time": seg.get("start", 0), "end_time": seg.get("end", 0), "text_content": seg.get("text", ""), "content": seg, "child_chunk_ids": [], "parent_chunk_id": None, } child_chunks.append(child_chunk) asr_child_ids.append(child_chunk["chunk_id"]) # Create child chunks from CUT scenes cut_child_ids = [] for i, scene in enumerate(cut_scenes): child_chunk = { "chunk_id": f"cut_{i:04d}", "chunk_type": "cut", "source": "cut", "start_time": scene.get("start_time", scene.get("start", 0)), "end_time": scene.get("end_time", scene.get("end", 0)), "text_content": None, "content": scene, "child_chunk_ids": [], "parent_chunk_id": None, } child_chunks.append(child_chunk) cut_child_ids.append(child_chunk["chunk_id"]) # Group ASR segments into parent chunks for i in range(0, len(asr_child_ids), parent_chunk_size): batch = asr_child_ids[i : i + parent_chunk_size] if not batch: continue # Collect text from child chunks batch_texts = [] batch_objects = [] batch_times = [] for child_id in batch: for child in child_chunks: if child["chunk_id"] == child_id: if child["text_content"]: batch_texts.append(child["text_content"]) batch_times.append((child["start_time"], child["end_time"])) break # Create parent chunk with narrative description start_time = batch_times[0][0] if batch_times else 0 end_time = batch_times[-1][1] if batch_times else 0 # Generate narrative description narrative = generate_narrative(batch_texts, batch_objects, start_time, end_time) parent_chunk = { "chunk_id": f"story_asr_{i // parent_chunk_size:04d}", "chunk_type": "story", "source": "story_asr", "start_time": start_time, "end_time": end_time, "text_content": narrative, "content": { "description": narrative, "child_count": len(batch), "speech_preview": " ".join(batch_texts[:3]) if batch_texts else None, }, "child_chunk_ids": batch, "parent_chunk_id": None, } parent_chunks.append(parent_chunk) # Update child chunks with parent reference for child_id in batch: for child in child_chunks: if child["chunk_id"] == child_id: child["parent_chunk_id"] = parent_chunk["chunk_id"] break # Group CUT scenes into parent chunks for i in range(0, len(cut_child_ids), parent_chunk_size): batch = cut_child_ids[i : i + parent_chunk_size] if not batch: continue batch_times = [] batch_objects = [] for child_id in batch: for child in child_chunks: if child["chunk_id"] == child_id: batch_times.append((child["start_time"], child["end_time"])) break start_time = batch_times[0][0] if batch_times else 0 end_time = batch_times[-1][1] if batch_times else 0 # Find objects in this time range from YOLO for frame in yolo_frames[:100]: # Sample frames ts = frame.get("timestamp", 0) if start_time <= ts <= end_time: for obj in frame.get("objects", []): batch_objects.append(obj.get("class_name", "unknown")) # Generate scene narrative narrative = generate_scene_narrative( batch_objects, start_time, end_time, len(batch) ) parent_chunk = { "chunk_id": f"story_cut_{i // parent_chunk_size:04d}", "chunk_type": "story", "source": "story_cut", "start_time": start_time, "end_time": end_time, "text_content": narrative, "content": { "description": narrative, "child_count": len(batch), "scenes": batch, "detected_objects": list(set(batch_objects))[:10], }, "child_chunk_ids": batch, "parent_chunk_id": None, } parent_chunks.append(parent_chunk) # Update child chunks with parent reference for child_id in batch: for child in child_chunks: if child["chunk_id"] == child_id: child["parent_chunk_id"] = parent_chunk["chunk_id"] break return { "child_chunks": child_chunks, "parent_chunks": parent_chunks, "stats": { "total_child_chunks": len(child_chunks), "total_parent_chunks": len(parent_chunks), "asr_children": len(asr_child_ids), "cut_children": len(cut_child_ids), }, } def generate_narrative( texts: List[str], objects: List[str], start: float, end: float ) -> str: """Generate narrative description from text snippets""" if not texts: return f"Video segment from {start:.1f}s to {end:.1f}s" # Combine and summarize combined = " ".join(texts) if len(combined) > 200: combined = combined[:200] + "..." return f"[{start:.0f}s-{end:.0f}s] {combined}" def generate_scene_narrative( objects: List[str], start: float, end: float, scene_count: int ) -> str: """Generate scene narrative from detected objects""" unique_objects = list(set(objects))[:5] if unique_objects: obj_str = ", ".join(unique_objects) return f"[{start:.0f}s-{end:.0f}s] Scenes {scene_count} segments. Visual: {obj_str}." else: return f"[{start:.0f}s-{end:.0f}s] {scene_count} video scenes." def run_story( video_path: str, output_path: str, uuid: str = "", parent_chunk_size: int = 5 ): publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("story", "STORY_START") # Load existing JSON files base_path = os.path.dirname(output_path) uuid_name = os.path.basename(output_path).split(".")[0] # Load analysis data asr_data = {"segments": []} cut_data = {"scenes": []} yolo_data = {"frames": []} ocr_data = {"frames": []} # Load ASR asr_path = os.path.join(base_path, f"{uuid_name}.asr.json") if os.path.exists(asr_path): with open(asr_path) as f: asr_data = json.load(f) if publisher: publisher.info( "story", f"Loaded ASR: {len(asr_data.get('segments', []))} segments" ) # Load CUT cut_path = os.path.join(base_path, f"{uuid_name}.cut.json") if os.path.exists(cut_path): with open(cut_path) as f: cut_data = json.load(f) if publisher: publisher.info( "story", f"Loaded CUT: {len(cut_data.get('scenes', []))} scenes" ) # Load YOLO yolo_path = os.path.join(base_path, f"{uuid_name}.yolo.json") if os.path.exists(yolo_path): with open(yolo_path) as f: yolo_data = json.load(f) # Load OCR ocr_path = os.path.join(base_path, f"{uuid_name}.ocr.json") if os.path.exists(ocr_path): with open(ocr_path) as f: ocr_data = json.load(f) # Load metadata metadata = extract_video_metadata(video_path) if publisher: publisher.info("story", "Generating parent-child chunks...") # Generate parent-child hierarchy result = generate_parent_child_chunks( asr_data, cut_data, yolo_data, ocr_data, parent_chunk_size ) result["metadata"] = metadata result["parent_chunk_size"] = parent_chunk_size with open(output_path, "w") as f: json.dump(result, f, indent=2, ensure_ascii=False) if publisher: stats = result["stats"] publisher.complete( "story", f"{stats['total_parent_chunks']} parents, {stats['total_child_chunks']} children", ) return result if __name__ == "__main__": parser = argparse.ArgumentParser( description="Video Story Generator - Parent-Child Chunks" ) parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", help="UUID for progress tracking", default="") parser.add_argument( "--parent-chunk-size", type=int, default=5, help="Number of child chunks per parent chunk", ) args = parser.parse_args() result = run_story( args.video_path, args.output_path, args.uuid, args.parent_chunk_size ) print( f"Story generated: {result['stats']['total_parent_chunks']} parent chunks, " f"{result['stats']['total_child_chunks']} child chunks" )