#!/opt/homebrew/bin/python3.11 """ Momentry Core Visual Demo Dashboard 職責:提供處理器模組的視覺化預覽,支持時間軸檢查與多模組疊加顯示。 """ import os import json import cv2 import numpy as np import streamlit as st import pandas as pd import altair as alt from PIL import Image, ImageDraw, ImageFont import time # ========================================== # 設定與輔助函數 # ========================================== OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output") VIDEO_BASE_DIR = os.path.join(OUTPUT_DIR, "quick_preview") # 指向預覽目錄 # 色彩定義 (OpenCV BGR 格式) COLORS = { "YOLO": (0, 255, 0), # 綠 "FACE": (255, 0, 0), # 藍 "POSE": (0, 0, 255), # 紅 "OCR": (0, 255, 255), # 黃 "SCENE": (255, 255, 255), # 白 (文字) } # 骨架連接對 (MediaPipe Pose) POSE_CONNECTIONS = [ (11, 12), (11, 13), (13, 15), (12, 14), (14, 16), # 上半身 (11, 23), (12, 23), (23, 24), (23, 25), (25, 27), # 下半身左 (24, 26), (26, 28), # 下半身右 ] def load_json_safe(uuid, module): path = os.path.join(OUTPUT_DIR, "quick_preview", f"preview.{module}.json") if not os.path.exists(path): return None with open(path, "r") as f: return json.load(f) def get_video_path(uuid): # 直接返回預覽影片 return os.path.join(OUTPUT_DIR, "quick_preview", "preview.mp4") # ========================================== # 渲染邏輯 (Renderers) # ========================================== def draw_yolo_overlay(frame, yolo_data, timestamp): """繪製 YOLO 檢測框""" if not yolo_data: return frame h, w = frame.shape[:2] # 尋找最接近的幀 best_frame = None min_diff = float("inf") frames_data = yolo_data.get("frames", {}) if isinstance(frames_data, dict): frames_list = list(frames_data.values()) else: frames_list = frames_data for f in frames_list: ts = f.get("time_seconds") or f.get("timestamp", 0) diff = abs(ts - timestamp) if diff < min_diff: min_diff = diff best_frame = f if best_frame and min_diff < 0.1: for obj in best_frame.get("detections", []): # YOLO output has x1, y1, x2, y2 directly x1 = int(obj.get("x1", 0)) y1 = int(obj.get("y1", 0)) x2 = int(obj.get("x2", 0)) y2 = int(obj.get("y2", 0)) label = f"{obj.get('class_name', '?')} {obj.get('confidence', 0):.2f}" # Draw Rectangle cv2.rectangle(frame, (x1, y1), (x2, y2), COLORS["YOLO"], 2) # Draw Label Background (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(frame, (x1, y1 - 15), (x1 + tw, y1), COLORS["YOLO"], -1) # Draw Text cv2.putText( frame, label, (x1, y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1 ) return frame def draw_pose_overlay(frame, pose_data, timestamp): """繪製 Pose 骨架""" if not pose_data: return frame h, w = frame.shape[:2] best_frame = None min_diff = float("inf") for f in pose_data.get("frames", []): diff = abs(f.get("timestamp", 0) - timestamp) if diff < min_diff: min_diff = diff best_frame = f if best_frame and min_diff < 0.5: for person in best_frame.get("persons", []): kps = person.get("keypoints", []) if not kps: continue # 繪製節點與連線 for conn in POSE_CONNECTIONS: p1 = kps[conn[0]] if conn[0] < len(kps) else None p2 = kps[conn[1]] if conn[1] < len(kps) else None if ( p1 and p2 and p1.get("confidence", 0) > 0.5 and p2.get("confidence", 0) > 0.5 ): pt1 = (int(p1["x"] * w), int(p1["y"] * h)) pt2 = (int(p2["x"] * w), int(p2["y"] * h)) cv2.line(frame, pt1, pt2, COLORS["POSE"], 2) return frame def draw_ocr_overlay(frame, ocr_data, timestamp): """繪製 OCR 文字區域""" if not ocr_data: return frame h, w = frame.shape[:2] frames_data = ocr_data.get("frames", []) if isinstance(frames_data, dict): frames_list = list(frames_data.values()) else: frames_list = frames_data best_frame = None min_diff = float("inf") for f in frames_list: diff = abs(f.get("timestamp", 0) - timestamp) if diff < min_diff: min_diff = diff best_frame = f if best_frame and min_diff < 0.5: for text in best_frame.get("texts", []): # Check if bbox is a list of 4 points OR x,y,w,h box = text.get("bbox", []) if isinstance(box, list) and len(box) == 4: # Format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] pts = np.array([[int(p[0]), int(p[1])] for p in box], np.int32) pts = pts.reshape((-1, 1, 2)) cv2.polylines(frame, [pts], True, COLORS["OCR"], 2) cv2.putText( frame, text.get("text", ""), (pts[0][0][0], pts[0][0][1] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, COLORS["OCR"], 1, ) else: # Format: x, y, width, height (EasyOCR style) x = text.get("x", 0) y = text.get("y", 0) width = text.get("width", 0) height = text.get("height", 0) # Normalize to pixels if < 1 if x <= 1: x *= w if y <= 1: y *= h if width <= 1: width *= w if height <= 1: height *= h x, y, width, height = int(x), int(y), int(width), int(height) cv2.rectangle(frame, (x, y), (x + width, y + height), COLORS["OCR"], 2) cv2.putText( frame, text.get("text", ""), (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, COLORS["OCR"], 1, ) return frame def draw_scene_label(frame, scene_data, timestamp): """繪製場景標籤""" if not scene_data: return frame for scene in scene_data.get("scenes", []): if scene.get("start_time", 0) <= timestamp <= scene.get("end_time", 0): label = f"📍 {scene.get('scene_type_zh') or scene.get('scene_type')}" cv2.putText( frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 4 ) # 陰影 cv2.putText( frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, COLORS["SCENE"], 2, ) break return frame def draw_face_overlay(frame, face_data, timestamp): """繪製 Face 檢測框""" if not face_data: return frame h, w = frame.shape[:2] frames_data = face_data.get("frames", []) if isinstance(frames_data, dict): frames_list = list(frames_data.values()) else: frames_list = frames_data best_frame = None min_diff = float("inf") for f in frames_list: diff = abs(f.get("timestamp", 0) - timestamp) if diff < min_diff: min_diff = diff best_frame = f if best_frame and min_diff < 1.5: # 放寬容忍度到 1.5 秒,以匹配稀疏的關鍵幀 for face in best_frame.get("faces", []): # Format: x, y, width, height (pixels) x = face.get("x", 0) y = face.get("y", 0) width = face.get("width", 0) height = face.get("height", 0) cv2.rectangle(frame, (x, y), (x + width, y + height), COLORS["FACE"], 2) # 優先顯示聚類後的 Person ID (使用 PIL 支援中文) person_id = face.get("person_id") if person_id: label = f"ID: {person_id}" color_rgb = (255, 255, 0) # Yellow else: label = f"Face {face.get('confidence', 0):.2f}" color_rgb = tuple(COLORS["FACE"][::-1]) # RGB # 1. 轉換為 PIL 格式以繪製中文 from PIL import Image, ImageDraw, ImageFont img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) # 2. 載入中文字型 (直接使用 STHeiti,因為 PingFang.ttc 是集合檔有時無法讀取) try: font = ImageFont.truetype( "/System/Library/Fonts/STHeiti Medium.ttc", 24 ) except: # 備案:如果 STHeiti 也失敗,嘗試 Arial Unicode 或預設 try: font = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 24) except: font = ImageFont.load_default() # 3. 計算文字大小 bbox = draw.textbbox((0, 0), label, font=font) tw = bbox[2] - bbox[0] th = bbox[3] - bbox[1] # 4. 繪製位置 (臉部框上方) px = x py = max(th + 5, y) # 確保文字不會超出畫面頂部 # 5. 繪製黑色背景 draw.rectangle([px, py - th - 4, px + tw + 4, py], fill=(0, 0, 0)) # 6. 繪製文字 draw.text((px + 2, py - th - 2), label, font=font, fill=color_rgb) # 7. 轉回 OpenCV 格式 (BGR) frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) return frame def draw_speaker_overlay(frame, asrx_data, timestamp): """繪製 Speaker 標籤 (右上角)""" if not asrx_data: return frame # 尋找當前時間段的說話人 segments = asrx_data.get("segments", []) current_speaker = None for seg in segments: start = seg.get("start", 0) end = seg.get("end", 0) if start <= timestamp <= end: current_speaker = seg.get("speaker_id") break if current_speaker: # 檢查是否有綁定身份 (這裡暫時直接顯示 ID,未來可擴展查詢 DB) label = f"🎤 {current_speaker}" # 繪製標籤 font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1.0 thickness = 2 color = (255, 165, 0) # 橙色 (tw, th), _ = cv2.getTextSize(label, font, font_scale, thickness) margin = 10 x, y = frame.shape[1] - tw - margin, th + margin # 背景 cv2.rectangle(frame, (x - 5, y - th - 5), (x + tw + 5, y + 5), color, -1) # 文字 cv2.putText(frame, label, (x, y), font, font_scale, (0, 0, 0), thickness) return frame def draw_asr_subtitle(frame, asr_data, timestamp): """繪製字幕 (Support Chinese)""" if not asr_data: return frame h, w = frame.shape[:2] # 尋找當前句子 text = "" for seg in asr_data.get("segments", []): if seg.get("start", 0) <= timestamp <= seg.get("end", 0): text = seg.get("text", "") break if text: # Convert BGR (OpenCV) to RGB (PIL) img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(img_pil) # Measure text size to draw background try: font = ImageFont.truetype("/System/Library/Fonts/STHeiti Medium.ttc", 24) except: try: font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24) except: font = ImageFont.load_default() bbox = draw.textbbox((0, 0), text, font=font) text_w = bbox[2] - bbox[0] text_h = bbox[3] - bbox[1] # Background position bg_x = (w - text_w) // 2 bg_y = h - text_h - 20 # Draw Background draw.rectangle( [bg_x - 10, bg_y - 10, bg_x + text_w + 10, bg_y + text_h + 10], fill=(0, 0, 0), ) # Draw Text draw.text((bg_x, bg_y), text, font=font, fill=(255, 255, 255)) # Convert back to BGR frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR) return frame h, w = frame.shape[:2] # 尋找當前句子 text = "" for seg in asr_data.get("segments", []): if seg.get("start", 0) <= timestamp <= seg.get("end", 0): text = seg.get("text", "") break if text: # 黑底白字 text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] text_x = (w - text_size[0]) // 2 text_y = h - 30 cv2.rectangle( frame, (text_x - 5, text_y - 25), (text_x + text_size[0] + 5, text_y + 5), (0, 0, 0), -1, ) cv2.putText( frame, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, ) return frame # ========================================== # 主應用邏輯 # ========================================== def main(): st.set_page_config(layout="wide", page_title="Momentry Visual Demo") st.title("🎬 Momentry Processor Visual Demo") uuid = "quick_preview" video_path = get_video_path(uuid) if not video_path or not os.path.exists(video_path): st.error(f"Video file not found at {video_path}") return # 1. 原始音視頻播放器 (讓用戶聽到聲音) st.subheader("🔊 原始聲音播放器 (可聽 Speaker 聲音)") st.video(video_path, start_time=0) st.markdown("---") # 2. 使用說明 (How to Use) with st.expander("📖 如何使用本工具?(點擊展開說明)"): st.markdown( """ 1. **時間軸控制**: 拖動下方的滑動條 (Slider) 來移動影片時間點。 2. **開啟/關閉功能**: 在右側的 **Layers** 面板中,勾選您想看到的效果。 - **✅ YOLO**: 綠色框標記物體 (如人、桌子)。 - **✅ ASR**: 底部顯示白色字幕。 - **✅ Scene**: 左上角顯示場景名稱。 3. **查看統計**: 底部圖表顯示各模組在哪些時間段有數據。 """ ) # 3. 載入 JSON 數據 col1, col2 = st.columns([3, 1]) with col1: st.header("Frame Inspector (幀檢查器)") with col2: st.subheader("顯示層控制 (Layers)") show_yolo = st.checkbox("YOLO (Object)", value=True) show_face = st.checkbox("Face (Person)", value=True) show_pose = st.checkbox("Pose (Skeleton)", value=False) show_ocr = st.checkbox("OCR (Text)", value=False) show_scene = st.checkbox("Scene (Label)", value=True) show_asr = st.checkbox("ASR (Subtitle)", value=True) # 3. 數據載入 yolo_data = load_json_safe(uuid, "yolo") if show_yolo else None # 強制嘗試載入聚類數據 face_data = load_json_safe(uuid, "face_clustered") if face_data: st.success("✅ 已載入聚類數據 (Face Clustered)") else: face_data = load_json_safe(uuid, "face") st.warning("⚠️ 未找到聚類數據,使用原始數據") pose_data = load_json_safe(uuid, "pose") if show_pose else None ocr_data = load_json_safe(uuid, "ocr") if show_ocr else None scene_data = load_json_safe(uuid, "scene") if show_scene else None asr_data = load_json_safe(uuid, "asr") if show_asr else None # 載入 ASRX (Speaker) 數據 asrx_data = load_json_safe(uuid, "asrx") # 4. 視頻與幀控制與播放邏輯 cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps if fps else 0 # 初始化 Session State if "playing" not in st.session_state: st.session_state.playing = False if "current_time" not in st.session_state: st.session_state.current_time = 0.0 # 播放控制區 col_play, col_reset, col_info = st.columns([1, 1, 4]) with col_play: if st.button("▶ 播放"): st.session_state.playing = True with col_reset: if st.button("⏹ 重置"): st.session_state.playing = False st.session_state.current_time = 0.0 with col_info: st.write(f"時間: {st.session_state.current_time:.2f} / {duration:.1f} s") # 自動播放邏輯 placeholder = st.empty() progress_bar = st.progress(0.0) while st.session_state.playing: if st.session_state.current_time >= duration: st.session_state.playing = False st.session_state.current_time = 0.0 break current_time = st.session_state.current_time frame_idx = int(current_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: # 渲染 if show_asr: frame = draw_asr_subtitle(frame, asr_data, current_time) frame = draw_speaker_overlay(frame, asrx_data, current_time) if show_scene: frame = draw_scene_label(frame, scene_data, current_time) if show_yolo: frame = draw_yolo_overlay(frame, yolo_data, current_time) if show_face: frame = draw_face_overlay(frame, face_data, current_time) if show_pose: frame = draw_pose_overlay(frame, pose_data, current_time) if show_ocr: frame = draw_ocr_overlay(frame, ocr_data, current_time) # 顯示 with placeholder.container(): st.image(frame, channels="BGR", use_container_width=True) progress_bar.progress( current_time / duration, text=f"播放中: {current_time:.1f}s" ) # 更新時間 (每幀間隔) time.sleep(1.0 / fps if fps > 0 else 0.04) st.session_state.current_time += 1.0 / fps if fps > 0 else 0.04 else: st.session_state.playing = False break # 手動拖動條 (僅在暫停時顯示/可用) if not st.session_state.playing: st.session_state.current_time = st.slider( "⏯ 手動調整時間", 0.0, duration, st.session_state.current_time, step=0.1, key="manual_slider", ) progress_bar.progress( st.session_state.current_time / duration, text=f"已暫停: {st.session_state.current_time:.1f}s", ) # 最後一幀顯示 (如果是暫停狀態) if not st.session_state.playing: current_time = st.session_state.current_time frame_idx = int(current_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: if show_asr: frame = draw_asr_subtitle(frame, asr_data, current_time) frame = draw_speaker_overlay(frame, asrx_data, current_time) if show_scene: frame = draw_scene_label(frame, scene_data, current_time) if show_yolo: frame = draw_yolo_overlay(frame, yolo_data, current_time) if show_face: frame = draw_face_overlay(frame, face_data, current_time) if show_pose: frame = draw_pose_overlay(frame, pose_data, current_time) if show_ocr: frame = draw_ocr_overlay(frame, ocr_data, current_time) with placeholder.container(): st.image(frame, channels="BGR", use_container_width=True) # 5. 人工互動聚類介面 (Identity Manager) st.header("👥 身份管理與合併 (Identity Manager)") # 找出所有 Person 截圖 thumbnail_dir = os.path.join(OUTPUT_DIR, "quick_preview") person_thumbnails = [ f for f in os.listdir(thumbnail_dir) if f.startswith("Person_") and f.endswith(".jpg") ] if person_thumbnails: # 顯示所有面孔 cols = st.columns(min(len(person_thumbnails), 4)) selected_ids = [] for i, fname in enumerate(sorted(person_thumbnails)): person_id = fname.replace(".jpg", "") img_path = os.path.join(thumbnail_dir, fname) with cols[i % 4]: st.image(img_path, caption=person_id, use_container_width=True) if st.checkbox(f"選擇 {person_id}", key=f"chk_{person_id}"): selected_ids.append(person_id) # 合併操作區 if selected_ids: st.markdown("---") st.write(f"已選擇: **{', '.join(selected_ids)}**") with st.form(key="merge_form"): new_name = st.text_input( "合併後的身份名稱 (e.g., 主角, 張三)", value="Speaker_A" ) submitted = st.form_submit_button("✅ 確認合併與綁定") if submitted: # 1. 更新 JSON face_json_path = os.path.join( OUTPUT_DIR, "quick_preview", "preview.face_clustered.json" ) if os.path.exists(face_json_path): with open(face_json_path, "r") as f: face_data = json.load(f) count = 0 for frame in face_data.get("frames", []): for face in frame.get("faces", []): if face.get("person_id") in selected_ids: face["person_id"] = new_name count += 1 with open(face_json_path, "w", encoding="utf-8") as f: json.dump(face_data, f, indent=2, ensure_ascii=False) st.success(f"✅ 已更新 {count} 個臉部標籤為 '{new_name}'") # 2. 更新資料庫 (綁定 Talent) import psycopg2 try: conn = psycopg2.connect( "postgresql://accusys@localhost:5432/momentry" ) cur = conn.cursor() # 創建或更新 Talent cur.execute( "SELECT id FROM talents WHERE real_name = %s", (new_name,) ) row = cur.fetchone() if row: talent_id = row[0] else: cur.execute( "INSERT INTO talents (real_name) VALUES (%s) RETURNING id", (new_name,), ) talent_id = cur.fetchone()[0] # 綁定 Faces # (注意:這裡簡化為將對應的 Person ID 在 DB 中視為 Talent,實際應更新 JSON ID) # 這裡我們主要更新 Speaker 綁定邏輯,確保這個 Talent 有綁定到的 Speaker # 找出這些 Person ID 曾經綁定的 Speaker # 為了簡單,我們直接提示用戶去綁定 Speaker,或者我們掃描 ASRX 對應關係 conn.commit() cur.close() conn.close() st.success( f"✅ 資料庫已建立 Talent '{new_name}' (ID: {talent_id})" ) # 重新載入頁面以反映變更 st.rerun() except Exception as e: st.error(f"資料庫錯誤: {e}") else: st.info("未發現聚類截圖。請先執行 `face_clustering_processor.py`。") # 6. 時間軸視覺化 (Timeline) st.header("📅 Processor Timeline (處理器活動軸)") plot_timeline(uuid, duration) cap.release() def plot_timeline(uuid, duration): """使用 Altair 繪製各模組的活動時間軸""" data = [] # 解析 ASR 活動 asr = load_json_safe(uuid, "asr") if asr: for seg in asr.get("segments", []): data.append( { "Module": "ASR Speech", "Start": seg["start"], "End": seg["end"], "Task": "Speech", } ) # 解析 YOLO 活動 (隨機取樣) yolo = load_json_safe(uuid, "yolo") if yolo: # frames 可能是 dict (keyed by frame_index) 或 list frames_data = yolo.get("frames", {}) if isinstance(frames_data, dict): frames_list = list(frames_data.values()) else: frames_list = frames_data # 取樣以避免圖表過慢 (取前 50 幀) sample_count = 0 for f in frames_list: if sample_count > 50: break detections = f.get("detections", []) or f.get("objects", []) if detections: ts = f.get("time_seconds") or f.get("timestamp", 0) data.append( { "Module": "YOLO Detect", "Start": ts, "End": ts + 0.5, "Task": "Obj", } ) sample_count += 1 if not data: st.info("No timeline data available.") return df = pd.DataFrame(data) chart = ( alt.Chart(df) .mark_bar() .encode( x=alt.X("Start:Q", title="Time (sec)"), x2="End:Q", y=alt.Y("Module:N", title=""), color=alt.Color("Module:N", scale=alt.Scale(scheme="category10")), ) .properties(height=200) ) st.altair_chart(chart, use_container_width=True) if __name__ == "__main__": main()