#!/opt/homebrew/bin/python3.11 """ Lip Processor - OpenCV + MediaPipe Face Mesh (簡化版) 使用 OpenCV 的 DNN 模組進行 Face Mesh 檢測 """ import sys import json import argparse import os import signal import cv2 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"LIP: Received signal {signum}, exiting...") sys.exit(1) # 嘴部關鍵點索引 UPPER_LIP_BOTTOM = 78 LOWER_LIP_TOP = 308 LEFT_MOUTH = 61 RIGHT_MOUTH = 291 def calculate_lip_metrics(landmarks, img_width, img_height): """計算嘴部指標""" if len(landmarks) < 468: return 0.0, 0.0, 0.0 # 轉換為像素座標 def to_pixel(lm): return (int(lm[0] * img_width), int(lm[1] * img_height)) upper_bottom = landmarks[UPPER_LIP_BOTTOM] lower_top = landmarks[LOWER_LIP_TOP] left_corner = landmarks[LEFT_MOUTH] right_corner = landmarks[RIGHT_MOUTH] # 計算垂直開合度 y1 = int(upper_bottom[1] * img_height) y2 = int(lower_top[1] * img_height) vertical_openness = abs(y1 - y2) # 計算水平寬度 x1 = int(left_corner[0] * img_width) x2 = int(right_corner[0] * img_width) width = abs(x1 - x2) # 歸一化 if width > 0: openness = vertical_openness / width else: openness = 0.0 openness = min(1.0, max(0.0, openness)) return openness, width, vertical_openness def process_lip( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30 ): """Process video for lip movement detection""" signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("lip", "LIP_START") if publisher: publisher.info("lip", "LIP_OPENING_VIDEO") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) img_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) img_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if publisher: publisher.info( "lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}" ) publisher.progress("lip", 0, total_frames, "Starting") frames = [] frame_count = 0 processed = 0 speaking_frames = 0 total_openness = 0.0 max_openness = 0.0 if publisher: publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})") # 使用 OpenCV 的簡單臉部檢測 face_cascade = cv2.CascadeClassifier( cv2.data.haarcascades + "haarcascade_frontalface_default.xml" ) while True: ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % sample_interval != 0: continue processed += 1 timestamp = (frame_count - 1) / fps # 檢測人臉 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = face_cascade.detectMultiScale(gray, 1.3, 5) if len(faces) > 0: # 假設最大的人臉是說話者 face = max(faces, key=lambda f: f[2] * f[3]) x, y, w, h = face # 估算嘴部位置(人臉下半部) mouth_y = y + int(h * 0.7) mouth_h = int(h * 0.1) # 簡單估算:人臉越寬,嘴部可能越張開 # 這是一個簡化近似 openness = min(1.0, w / 200.0) # 假設 200px 寬臉為最大張開 speaking = openness > 0.3 if speaking: speaking_frames += 1 total_openness += openness max_openness = max(max_openness, openness) frames.append( { "frame": int(frame_count - 1), "timestamp": round(float(timestamp), 3), "face_detected": True, "lip_openness": round(float(openness), 4), "lip_width": round(float(w), 2), "lip_height": round(float(mouth_h), 2), "is_speaking": bool(speaking), "face_bbox": { "x": int(x), "y": int(y), "width": int(w), "height": int(h), }, } ) if publisher and processed % 50 == 0: publisher.progress( "lip", processed, total_frames // sample_interval, f"openness={openness:.3f}", ) else: if frame_count % 10 == 0: frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "face_detected": False, "lip_openness": 0.0, "lip_width": 0.0, "lip_height": 0.0, "is_speaking": False, } ) cap.release() avg_openness = total_openness / processed if processed > 0 else 0.0 speaking_rate = speaking_frames / processed if processed > 0 else 0.0 frames_with_face = len([f for f in frames if f.get("face_detected", False)]) result = { "frame_count": total_frames, "fps": fps, "processed_frames": processed, "sample_interval": sample_interval, "frames": frames, "stats": { "speaking_frames": speaking_frames, "speaking_rate": round(speaking_rate, 4), "avg_openness": round(avg_openness, 4), "max_openness": round(max_openness, 4), "frames_with_face": frames_with_face, }, } if publisher: publisher.complete("lip", f"{len(frames)} frames, {speaking_frames} speaking") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.stderr.write(f"LIP: Done, {len(frames)} frames\n") sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Lip Movement Detection (OpenCV)") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--sample-interval", "-s", type=int, default=30, help="Process every N frames (default: 30)", ) args = parser.parse_args() process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)