- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
792 lines
26 KiB
Python
792 lines
26 KiB
Python
#!/opt/homebrew/bin/python3.11
|
||
"""
|
||
Momentry Core Visual Demo Dashboard
|
||
職責:提供處理器模組的視覺化預覽,支持時間軸檢查與多模組疊加顯示。
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import json
|
||
import cv2
|
||
import numpy as np
|
||
import streamlit as st
|
||
import pandas as pd
|
||
import altair as alt
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
import time
|
||
|
||
# ==========================================
|
||
# 設定與輔助函數
|
||
# ==========================================
|
||
|
||
OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output")
|
||
VIDEO_BASE_DIR = os.path.join(OUTPUT_DIR, "quick_preview") # 指向預覽目錄
|
||
|
||
# 色彩定義 (OpenCV BGR 格式)
|
||
COLORS = {
|
||
"YOLO": (0, 255, 0), # 綠
|
||
"FACE": (255, 0, 0), # 藍
|
||
"POSE": (0, 0, 255), # 紅
|
||
"OCR": (0, 255, 255), # 黃
|
||
"SCENE": (255, 255, 255), # 白 (文字)
|
||
}
|
||
|
||
# 骨架連接對 (MediaPipe Pose)
|
||
POSE_CONNECTIONS = [
|
||
(11, 12),
|
||
(11, 13),
|
||
(13, 15),
|
||
(12, 14),
|
||
(14, 16), # 上半身
|
||
(11, 23),
|
||
(12, 23),
|
||
(23, 24),
|
||
(23, 25),
|
||
(25, 27), # 下半身左
|
||
(24, 26),
|
||
(26, 28), # 下半身右
|
||
]
|
||
|
||
|
||
def load_json_safe(uuid, module):
|
||
path = os.path.join(OUTPUT_DIR, "quick_preview", f"preview.{module}.json")
|
||
if not os.path.exists(path):
|
||
return None
|
||
with open(path, "r") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def get_video_path(uuid):
|
||
# 直接返回預覽影片
|
||
return os.path.join(OUTPUT_DIR, "quick_preview", "preview.mp4")
|
||
|
||
|
||
# ==========================================
|
||
# 渲染邏輯 (Renderers)
|
||
# ==========================================
|
||
|
||
|
||
def draw_yolo_overlay(frame, yolo_data, timestamp):
|
||
"""繪製 YOLO 檢測框"""
|
||
if not yolo_data:
|
||
return frame
|
||
h, w = frame.shape[:2]
|
||
|
||
# 尋找最接近的幀
|
||
best_frame = None
|
||
min_diff = float("inf")
|
||
|
||
frames_data = yolo_data.get("frames", {})
|
||
if isinstance(frames_data, dict):
|
||
frames_list = list(frames_data.values())
|
||
else:
|
||
frames_list = frames_data
|
||
|
||
for f in frames_list:
|
||
ts = f.get("time_seconds") or f.get("timestamp", 0)
|
||
diff = abs(ts - timestamp)
|
||
if diff < min_diff:
|
||
min_diff = diff
|
||
best_frame = f
|
||
|
||
if best_frame and min_diff < 0.1:
|
||
for obj in best_frame.get("detections", []):
|
||
# YOLO output has x1, y1, x2, y2 directly
|
||
x1 = int(obj.get("x1", 0))
|
||
y1 = int(obj.get("y1", 0))
|
||
x2 = int(obj.get("x2", 0))
|
||
y2 = int(obj.get("y2", 0))
|
||
|
||
label = f"{obj.get('class_name', '?')} {obj.get('confidence', 0):.2f}"
|
||
|
||
# Draw Rectangle
|
||
cv2.rectangle(frame, (x1, y1), (x2, y2), COLORS["YOLO"], 2)
|
||
|
||
# Draw Label Background
|
||
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
|
||
cv2.rectangle(frame, (x1, y1 - 15), (x1 + tw, y1), COLORS["YOLO"], -1)
|
||
|
||
# Draw Text
|
||
cv2.putText(
|
||
frame, label, (x1, y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1
|
||
)
|
||
|
||
return frame
|
||
|
||
|
||
def draw_pose_overlay(frame, pose_data, timestamp):
|
||
"""繪製 Pose 骨架"""
|
||
if not pose_data:
|
||
return frame
|
||
h, w = frame.shape[:2]
|
||
|
||
best_frame = None
|
||
min_diff = float("inf")
|
||
for f in pose_data.get("frames", []):
|
||
diff = abs(f.get("timestamp", 0) - timestamp)
|
||
if diff < min_diff:
|
||
min_diff = diff
|
||
best_frame = f
|
||
|
||
if best_frame and min_diff < 0.5:
|
||
for person in best_frame.get("persons", []):
|
||
kps = person.get("keypoints", [])
|
||
if not kps:
|
||
continue
|
||
|
||
# 繪製節點與連線
|
||
for conn in POSE_CONNECTIONS:
|
||
p1 = kps[conn[0]] if conn[0] < len(kps) else None
|
||
p2 = kps[conn[1]] if conn[1] < len(kps) else None
|
||
if (
|
||
p1
|
||
and p2
|
||
and p1.get("confidence", 0) > 0.5
|
||
and p2.get("confidence", 0) > 0.5
|
||
):
|
||
pt1 = (int(p1["x"] * w), int(p1["y"] * h))
|
||
pt2 = (int(p2["x"] * w), int(p2["y"] * h))
|
||
cv2.line(frame, pt1, pt2, COLORS["POSE"], 2)
|
||
return frame
|
||
|
||
|
||
def draw_ocr_overlay(frame, ocr_data, timestamp):
|
||
"""繪製 OCR 文字區域"""
|
||
if not ocr_data:
|
||
return frame
|
||
h, w = frame.shape[:2]
|
||
|
||
frames_data = ocr_data.get("frames", [])
|
||
if isinstance(frames_data, dict):
|
||
frames_list = list(frames_data.values())
|
||
else:
|
||
frames_list = frames_data
|
||
|
||
best_frame = None
|
||
min_diff = float("inf")
|
||
for f in frames_list:
|
||
diff = abs(f.get("timestamp", 0) - timestamp)
|
||
if diff < min_diff:
|
||
min_diff = diff
|
||
best_frame = f
|
||
|
||
if best_frame and min_diff < 0.5:
|
||
for text in best_frame.get("texts", []):
|
||
# Check if bbox is a list of 4 points OR x,y,w,h
|
||
box = text.get("bbox", [])
|
||
|
||
if isinstance(box, list) and len(box) == 4:
|
||
# Format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
|
||
pts = np.array([[int(p[0]), int(p[1])] for p in box], np.int32)
|
||
pts = pts.reshape((-1, 1, 2))
|
||
cv2.polylines(frame, [pts], True, COLORS["OCR"], 2)
|
||
cv2.putText(
|
||
frame,
|
||
text.get("text", ""),
|
||
(pts[0][0][0], pts[0][0][1] - 5),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.4,
|
||
COLORS["OCR"],
|
||
1,
|
||
)
|
||
else:
|
||
# Format: x, y, width, height (EasyOCR style)
|
||
x = text.get("x", 0)
|
||
y = text.get("y", 0)
|
||
width = text.get("width", 0)
|
||
height = text.get("height", 0)
|
||
|
||
# Normalize to pixels if < 1
|
||
if x <= 1:
|
||
x *= w
|
||
if y <= 1:
|
||
y *= h
|
||
if width <= 1:
|
||
width *= w
|
||
if height <= 1:
|
||
height *= h
|
||
|
||
x, y, width, height = int(x), int(y), int(width), int(height)
|
||
cv2.rectangle(frame, (x, y), (x + width, y + height), COLORS["OCR"], 2)
|
||
cv2.putText(
|
||
frame,
|
||
text.get("text", ""),
|
||
(x, y - 5),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.4,
|
||
COLORS["OCR"],
|
||
1,
|
||
)
|
||
return frame
|
||
|
||
|
||
def draw_scene_label(frame, scene_data, timestamp):
|
||
"""繪製場景標籤"""
|
||
if not scene_data:
|
||
return frame
|
||
|
||
for scene in scene_data.get("scenes", []):
|
||
if scene.get("start_time", 0) <= timestamp <= scene.get("end_time", 0):
|
||
label = f"📍 {scene.get('scene_type_zh') or scene.get('scene_type')}"
|
||
cv2.putText(
|
||
frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 4
|
||
) # 陰影
|
||
cv2.putText(
|
||
frame,
|
||
label,
|
||
(10, 30),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.8,
|
||
COLORS["SCENE"],
|
||
2,
|
||
)
|
||
break
|
||
return frame
|
||
|
||
|
||
def draw_face_overlay(frame, face_data, timestamp):
|
||
"""繪製 Face 檢測框"""
|
||
if not face_data:
|
||
return frame
|
||
h, w = frame.shape[:2]
|
||
|
||
frames_data = face_data.get("frames", [])
|
||
if isinstance(frames_data, dict):
|
||
frames_list = list(frames_data.values())
|
||
else:
|
||
frames_list = frames_data
|
||
|
||
best_frame = None
|
||
min_diff = float("inf")
|
||
for f in frames_list:
|
||
diff = abs(f.get("timestamp", 0) - timestamp)
|
||
if diff < min_diff:
|
||
min_diff = diff
|
||
best_frame = f
|
||
|
||
if best_frame and min_diff < 1.5: # 放寬容忍度到 1.5 秒,以匹配稀疏的關鍵幀
|
||
for face in best_frame.get("faces", []):
|
||
# Format: x, y, width, height (pixels)
|
||
x = face.get("x", 0)
|
||
y = face.get("y", 0)
|
||
width = face.get("width", 0)
|
||
height = face.get("height", 0)
|
||
|
||
cv2.rectangle(frame, (x, y), (x + width, y + height), COLORS["FACE"], 2)
|
||
# 優先顯示聚類後的 Person ID (使用 PIL 支援中文)
|
||
person_id = face.get("person_id")
|
||
if person_id:
|
||
label = f"ID: {person_id}"
|
||
color_rgb = (255, 255, 0) # Yellow
|
||
else:
|
||
label = f"Face {face.get('confidence', 0):.2f}"
|
||
color_rgb = tuple(COLORS["FACE"][::-1]) # RGB
|
||
|
||
# 1. 轉換為 PIL 格式以繪製中文
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||
draw = ImageDraw.Draw(img_pil)
|
||
|
||
# 2. 載入中文字型 (直接使用 STHeiti,因為 PingFang.ttc 是集合檔有時無法讀取)
|
||
try:
|
||
font = ImageFont.truetype(
|
||
"/System/Library/Fonts/STHeiti Medium.ttc", 24
|
||
)
|
||
except:
|
||
# 備案:如果 STHeiti 也失敗,嘗試 Arial Unicode 或預設
|
||
try:
|
||
font = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 24)
|
||
except:
|
||
font = ImageFont.load_default()
|
||
|
||
# 3. 計算文字大小
|
||
bbox = draw.textbbox((0, 0), label, font=font)
|
||
tw = bbox[2] - bbox[0]
|
||
th = bbox[3] - bbox[1]
|
||
|
||
# 4. 繪製位置 (臉部框上方)
|
||
px = x
|
||
py = max(th + 5, y) # 確保文字不會超出畫面頂部
|
||
|
||
# 5. 繪製黑色背景
|
||
draw.rectangle([px, py - th - 4, px + tw + 4, py], fill=(0, 0, 0))
|
||
|
||
# 6. 繪製文字
|
||
draw.text((px + 2, py - th - 2), label, font=font, fill=color_rgb)
|
||
|
||
# 7. 轉回 OpenCV 格式 (BGR)
|
||
frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
|
||
return frame
|
||
|
||
|
||
def draw_speaker_overlay(frame, asrx_data, timestamp):
|
||
"""繪製 Speaker 標籤 (右上角)"""
|
||
if not asrx_data:
|
||
return frame
|
||
|
||
# 尋找當前時間段的說話人
|
||
segments = asrx_data.get("segments", [])
|
||
current_speaker = None
|
||
|
||
for seg in segments:
|
||
start = seg.get("start", 0)
|
||
end = seg.get("end", 0)
|
||
if start <= timestamp <= end:
|
||
current_speaker = seg.get("speaker_id")
|
||
break
|
||
|
||
if current_speaker:
|
||
# 檢查是否有綁定身份 (這裡暫時直接顯示 ID,未來可擴展查詢 DB)
|
||
label = f"🎤 {current_speaker}"
|
||
|
||
# 繪製標籤
|
||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||
font_scale = 1.0
|
||
thickness = 2
|
||
color = (255, 165, 0) # 橙色
|
||
|
||
(tw, th), _ = cv2.getTextSize(label, font, font_scale, thickness)
|
||
margin = 10
|
||
x, y = frame.shape[1] - tw - margin, th + margin
|
||
|
||
# 背景
|
||
cv2.rectangle(frame, (x - 5, y - th - 5), (x + tw + 5, y + 5), color, -1)
|
||
# 文字
|
||
cv2.putText(frame, label, (x, y), font, font_scale, (0, 0, 0), thickness)
|
||
|
||
return frame
|
||
|
||
|
||
def draw_asr_subtitle(frame, asr_data, timestamp):
|
||
"""繪製字幕 (Support Chinese)"""
|
||
if not asr_data:
|
||
return frame
|
||
h, w = frame.shape[:2]
|
||
|
||
# 尋找當前句子
|
||
text = ""
|
||
for seg in asr_data.get("segments", []):
|
||
if seg.get("start", 0) <= timestamp <= seg.get("end", 0):
|
||
text = seg.get("text", "")
|
||
break
|
||
|
||
if text:
|
||
# Convert BGR (OpenCV) to RGB (PIL)
|
||
img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||
draw = ImageDraw.Draw(img_pil)
|
||
|
||
# Measure text size to draw background
|
||
try:
|
||
font = ImageFont.truetype("/System/Library/Fonts/STHeiti Medium.ttc", 24)
|
||
except:
|
||
try:
|
||
font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24)
|
||
except:
|
||
font = ImageFont.load_default()
|
||
|
||
bbox = draw.textbbox((0, 0), text, font=font)
|
||
text_w = bbox[2] - bbox[0]
|
||
text_h = bbox[3] - bbox[1]
|
||
|
||
# Background position
|
||
bg_x = (w - text_w) // 2
|
||
bg_y = h - text_h - 20
|
||
|
||
# Draw Background
|
||
draw.rectangle(
|
||
[bg_x - 10, bg_y - 10, bg_x + text_w + 10, bg_y + text_h + 10],
|
||
fill=(0, 0, 0),
|
||
)
|
||
|
||
# Draw Text
|
||
draw.text((bg_x, bg_y), text, font=font, fill=(255, 255, 255))
|
||
|
||
# Convert back to BGR
|
||
frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
|
||
return frame
|
||
h, w = frame.shape[:2]
|
||
|
||
# 尋找當前句子
|
||
text = ""
|
||
for seg in asr_data.get("segments", []):
|
||
if seg.get("start", 0) <= timestamp <= seg.get("end", 0):
|
||
text = seg.get("text", "")
|
||
break
|
||
|
||
if text:
|
||
# 黑底白字
|
||
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
|
||
text_x = (w - text_size[0]) // 2
|
||
text_y = h - 30
|
||
cv2.rectangle(
|
||
frame,
|
||
(text_x - 5, text_y - 25),
|
||
(text_x + text_size[0] + 5, text_y + 5),
|
||
(0, 0, 0),
|
||
-1,
|
||
)
|
||
cv2.putText(
|
||
frame,
|
||
text,
|
||
(text_x, text_y),
|
||
cv2.FONT_HERSHEY_SIMPLEX,
|
||
0.6,
|
||
(255, 255, 255),
|
||
2,
|
||
)
|
||
return frame
|
||
|
||
|
||
# ==========================================
|
||
# 主應用邏輯
|
||
# ==========================================
|
||
|
||
|
||
def main():
|
||
st.set_page_config(layout="wide", page_title="Momentry Visual Demo")
|
||
st.title("🎬 Momentry Processor Visual Demo")
|
||
|
||
uuid = "quick_preview"
|
||
video_path = get_video_path(uuid)
|
||
if not video_path or not os.path.exists(video_path):
|
||
st.error(f"Video file not found at {video_path}")
|
||
return
|
||
|
||
# 1. 原始音視頻播放器 (讓用戶聽到聲音)
|
||
st.subheader("🔊 原始聲音播放器 (可聽 Speaker 聲音)")
|
||
st.video(video_path, start_time=0)
|
||
st.markdown("---")
|
||
|
||
# 2. 使用說明 (How to Use)
|
||
with st.expander("📖 如何使用本工具?(點擊展開說明)"):
|
||
st.markdown(
|
||
"""
|
||
1. **時間軸控制**: 拖動下方的滑動條 (Slider) 來移動影片時間點。
|
||
2. **開啟/關閉功能**: 在右側的 **Layers** 面板中,勾選您想看到的效果。
|
||
- **✅ YOLO**: 綠色框標記物體 (如人、桌子)。
|
||
- **✅ ASR**: 底部顯示白色字幕。
|
||
- **✅ Scene**: 左上角顯示場景名稱。
|
||
3. **查看統計**: 底部圖表顯示各模組在哪些時間段有數據。
|
||
"""
|
||
)
|
||
|
||
# 3. 載入 JSON 數據
|
||
col1, col2 = st.columns([3, 1])
|
||
with col1:
|
||
st.header("Frame Inspector (幀檢查器)")
|
||
with col2:
|
||
st.subheader("顯示層控制 (Layers)")
|
||
show_yolo = st.checkbox("YOLO (Object)", value=True)
|
||
show_face = st.checkbox("Face (Person)", value=True)
|
||
show_pose = st.checkbox("Pose (Skeleton)", value=False)
|
||
show_ocr = st.checkbox("OCR (Text)", value=False)
|
||
show_scene = st.checkbox("Scene (Label)", value=True)
|
||
show_asr = st.checkbox("ASR (Subtitle)", value=True)
|
||
|
||
# 3. 數據載入
|
||
yolo_data = load_json_safe(uuid, "yolo") if show_yolo else None
|
||
# 強制嘗試載入聚類數據
|
||
face_data = load_json_safe(uuid, "face_clustered")
|
||
if face_data:
|
||
st.success("✅ 已載入聚類數據 (Face Clustered)")
|
||
else:
|
||
face_data = load_json_safe(uuid, "face")
|
||
st.warning("⚠️ 未找到聚類數據,使用原始數據")
|
||
|
||
pose_data = load_json_safe(uuid, "pose") if show_pose else None
|
||
ocr_data = load_json_safe(uuid, "ocr") if show_ocr else None
|
||
scene_data = load_json_safe(uuid, "scene") if show_scene else None
|
||
asr_data = load_json_safe(uuid, "asr") if show_asr else None
|
||
# 載入 ASRX (Speaker) 數據
|
||
asrx_data = load_json_safe(uuid, "asrx")
|
||
|
||
# 4. 視頻與幀控制與播放邏輯
|
||
cap = cv2.VideoCapture(video_path)
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||
duration = total_frames / fps if fps else 0
|
||
|
||
# 初始化 Session State
|
||
if "playing" not in st.session_state:
|
||
st.session_state.playing = False
|
||
if "current_time" not in st.session_state:
|
||
st.session_state.current_time = 0.0
|
||
|
||
# 播放控制區
|
||
col_play, col_reset, col_info = st.columns([1, 1, 4])
|
||
|
||
with col_play:
|
||
if st.button("▶ 播放"):
|
||
st.session_state.playing = True
|
||
with col_reset:
|
||
if st.button("⏹ 重置"):
|
||
st.session_state.playing = False
|
||
st.session_state.current_time = 0.0
|
||
with col_info:
|
||
st.write(f"時間: {st.session_state.current_time:.2f} / {duration:.1f} s")
|
||
|
||
# 自動播放邏輯
|
||
placeholder = st.empty()
|
||
progress_bar = st.progress(0.0)
|
||
|
||
while st.session_state.playing:
|
||
if st.session_state.current_time >= duration:
|
||
st.session_state.playing = False
|
||
st.session_state.current_time = 0.0
|
||
break
|
||
|
||
current_time = st.session_state.current_time
|
||
frame_idx = int(current_time * fps)
|
||
|
||
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
|
||
ret, frame = cap.read()
|
||
|
||
if ret:
|
||
# 渲染
|
||
if show_asr:
|
||
frame = draw_asr_subtitle(frame, asr_data, current_time)
|
||
frame = draw_speaker_overlay(frame, asrx_data, current_time)
|
||
if show_scene:
|
||
frame = draw_scene_label(frame, scene_data, current_time)
|
||
if show_yolo:
|
||
frame = draw_yolo_overlay(frame, yolo_data, current_time)
|
||
if show_face:
|
||
frame = draw_face_overlay(frame, face_data, current_time)
|
||
if show_pose:
|
||
frame = draw_pose_overlay(frame, pose_data, current_time)
|
||
if show_ocr:
|
||
frame = draw_ocr_overlay(frame, ocr_data, current_time)
|
||
|
||
# 顯示
|
||
with placeholder.container():
|
||
st.image(frame, channels="BGR", use_container_width=True)
|
||
progress_bar.progress(
|
||
current_time / duration, text=f"播放中: {current_time:.1f}s"
|
||
)
|
||
|
||
# 更新時間 (每幀間隔)
|
||
time.sleep(1.0 / fps if fps > 0 else 0.04)
|
||
st.session_state.current_time += 1.0 / fps if fps > 0 else 0.04
|
||
else:
|
||
st.session_state.playing = False
|
||
break
|
||
|
||
# 手動拖動條 (僅在暫停時顯示/可用)
|
||
if not st.session_state.playing:
|
||
st.session_state.current_time = st.slider(
|
||
"⏯ 手動調整時間",
|
||
0.0,
|
||
duration,
|
||
st.session_state.current_time,
|
||
step=0.1,
|
||
key="manual_slider",
|
||
)
|
||
progress_bar.progress(
|
||
st.session_state.current_time / duration,
|
||
text=f"已暫停: {st.session_state.current_time:.1f}s",
|
||
)
|
||
|
||
# 最後一幀顯示 (如果是暫停狀態)
|
||
if not st.session_state.playing:
|
||
current_time = st.session_state.current_time
|
||
frame_idx = int(current_time * fps)
|
||
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
|
||
ret, frame = cap.read()
|
||
if ret:
|
||
if show_asr:
|
||
frame = draw_asr_subtitle(frame, asr_data, current_time)
|
||
frame = draw_speaker_overlay(frame, asrx_data, current_time)
|
||
if show_scene:
|
||
frame = draw_scene_label(frame, scene_data, current_time)
|
||
if show_yolo:
|
||
frame = draw_yolo_overlay(frame, yolo_data, current_time)
|
||
if show_face:
|
||
frame = draw_face_overlay(frame, face_data, current_time)
|
||
if show_pose:
|
||
frame = draw_pose_overlay(frame, pose_data, current_time)
|
||
if show_ocr:
|
||
frame = draw_ocr_overlay(frame, ocr_data, current_time)
|
||
|
||
with placeholder.container():
|
||
st.image(frame, channels="BGR", use_container_width=True)
|
||
|
||
# 5. 人工互動聚類介面 (Identity Manager)
|
||
st.header("👥 身份管理與合併 (Identity Manager)")
|
||
|
||
# 找出所有 Person 截圖
|
||
thumbnail_dir = os.path.join(OUTPUT_DIR, "quick_preview")
|
||
person_thumbnails = [
|
||
f
|
||
for f in os.listdir(thumbnail_dir)
|
||
if f.startswith("Person_") and f.endswith(".jpg")
|
||
]
|
||
|
||
if person_thumbnails:
|
||
# 顯示所有面孔
|
||
cols = st.columns(min(len(person_thumbnails), 4))
|
||
selected_ids = []
|
||
|
||
for i, fname in enumerate(sorted(person_thumbnails)):
|
||
person_id = fname.replace(".jpg", "")
|
||
img_path = os.path.join(thumbnail_dir, fname)
|
||
|
||
with cols[i % 4]:
|
||
st.image(img_path, caption=person_id, use_container_width=True)
|
||
if st.checkbox(f"選擇 {person_id}", key=f"chk_{person_id}"):
|
||
selected_ids.append(person_id)
|
||
|
||
# 合併操作區
|
||
if selected_ids:
|
||
st.markdown("---")
|
||
st.write(f"已選擇: **{', '.join(selected_ids)}**")
|
||
|
||
with st.form(key="merge_form"):
|
||
new_name = st.text_input(
|
||
"合併後的身份名稱 (e.g., 主角, 張三)", value="Speaker_A"
|
||
)
|
||
submitted = st.form_submit_button("✅ 確認合併與綁定")
|
||
|
||
if submitted:
|
||
# 1. 更新 JSON
|
||
face_json_path = os.path.join(
|
||
OUTPUT_DIR, "quick_preview", "preview.face_clustered.json"
|
||
)
|
||
if os.path.exists(face_json_path):
|
||
with open(face_json_path, "r") as f:
|
||
face_data = json.load(f)
|
||
|
||
count = 0
|
||
for frame in face_data.get("frames", []):
|
||
for face in frame.get("faces", []):
|
||
if face.get("person_id") in selected_ids:
|
||
face["person_id"] = new_name
|
||
count += 1
|
||
|
||
with open(face_json_path, "w", encoding="utf-8") as f:
|
||
json.dump(face_data, f, indent=2, ensure_ascii=False)
|
||
st.success(f"✅ 已更新 {count} 個臉部標籤為 '{new_name}'")
|
||
|
||
# 2. 更新資料庫 (綁定 Talent)
|
||
import psycopg2
|
||
|
||
try:
|
||
conn = psycopg2.connect(
|
||
"postgresql://accusys@localhost:5432/momentry"
|
||
)
|
||
cur = conn.cursor()
|
||
|
||
# 創建或更新 Talent
|
||
cur.execute(
|
||
"SELECT id FROM talents WHERE real_name = %s", (new_name,)
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if row:
|
||
talent_id = row[0]
|
||
else:
|
||
cur.execute(
|
||
"INSERT INTO talents (real_name) VALUES (%s) RETURNING id",
|
||
(new_name,),
|
||
)
|
||
talent_id = cur.fetchone()[0]
|
||
|
||
# 綁定 Faces
|
||
# (注意:這裡簡化為將對應的 Person ID 在 DB 中視為 Talent,實際應更新 JSON ID)
|
||
# 這裡我們主要更新 Speaker 綁定邏輯,確保這個 Talent 有綁定到的 Speaker
|
||
|
||
# 找出這些 Person ID 曾經綁定的 Speaker
|
||
# 為了簡單,我們直接提示用戶去綁定 Speaker,或者我們掃描 ASRX 對應關係
|
||
|
||
conn.commit()
|
||
cur.close()
|
||
conn.close()
|
||
st.success(
|
||
f"✅ 資料庫已建立 Talent '{new_name}' (ID: {talent_id})"
|
||
)
|
||
|
||
# 重新載入頁面以反映變更
|
||
st.rerun()
|
||
except Exception as e:
|
||
st.error(f"資料庫錯誤: {e}")
|
||
|
||
else:
|
||
st.info("未發現聚類截圖。請先執行 `face_clustering_processor.py`。")
|
||
|
||
# 6. 時間軸視覺化 (Timeline)
|
||
st.header("📅 Processor Timeline (處理器活動軸)")
|
||
plot_timeline(uuid, duration)
|
||
|
||
cap.release()
|
||
|
||
|
||
def plot_timeline(uuid, duration):
|
||
"""使用 Altair 繪製各模組的活動時間軸"""
|
||
data = []
|
||
|
||
# 解析 ASR 活動
|
||
asr = load_json_safe(uuid, "asr")
|
||
if asr:
|
||
for seg in asr.get("segments", []):
|
||
data.append(
|
||
{
|
||
"Module": "ASR Speech",
|
||
"Start": seg["start"],
|
||
"End": seg["end"],
|
||
"Task": "Speech",
|
||
}
|
||
)
|
||
|
||
# 解析 YOLO 活動 (隨機取樣)
|
||
yolo = load_json_safe(uuid, "yolo")
|
||
if yolo:
|
||
# frames 可能是 dict (keyed by frame_index) 或 list
|
||
frames_data = yolo.get("frames", {})
|
||
if isinstance(frames_data, dict):
|
||
frames_list = list(frames_data.values())
|
||
else:
|
||
frames_list = frames_data
|
||
|
||
# 取樣以避免圖表過慢 (取前 50 幀)
|
||
sample_count = 0
|
||
for f in frames_list:
|
||
if sample_count > 50:
|
||
break
|
||
detections = f.get("detections", []) or f.get("objects", [])
|
||
if detections:
|
||
ts = f.get("time_seconds") or f.get("timestamp", 0)
|
||
data.append(
|
||
{
|
||
"Module": "YOLO Detect",
|
||
"Start": ts,
|
||
"End": ts + 0.5,
|
||
"Task": "Obj",
|
||
}
|
||
)
|
||
sample_count += 1
|
||
|
||
if not data:
|
||
st.info("No timeline data available.")
|
||
return
|
||
|
||
df = pd.DataFrame(data)
|
||
|
||
chart = (
|
||
alt.Chart(df)
|
||
.mark_bar()
|
||
.encode(
|
||
x=alt.X("Start:Q", title="Time (sec)"),
|
||
x2="End:Q",
|
||
y=alt.Y("Module:N", title=""),
|
||
color=alt.Color("Module:N", scale=alt.Scale(scheme="category10")),
|
||
)
|
||
.properties(height=200)
|
||
)
|
||
|
||
st.altair_chart(chart, use_container_width=True)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|