#!/opt/homebrew/bin/python3.11 """ Lip Processor - 嘴部動作檢測 使用 MediaPipe Tasks API (v0.10+) """ import sys import json import argparse import os import signal import cv2 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"LIP: Received signal {signum}, exiting...") sys.exit(1) # 嘴部關鍵點索引 UPPER_LIP_BOTTOM = 78 LOWER_LIP_TOP = 308 LEFT_MOUTH = 61 RIGHT_MOUTH = 291 def process_lip( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30 ): """Process video for lip movement detection using MediaPipe Tasks API""" signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("lip", "LIP_START") if publisher: publisher.info("lip", "LIP_LOADING_MEDIAPIPE") try: from mediapipe.tasks import python from mediapipe.tasks.python import vision # 創建 Face Landmarker base_options = python.BaseOptions( model_asset_path="face_landmarker.task", delegate=python.BaseOptions.Delegate.CPU, ) options = vision.FaceLandmarkerOptions( base_options=base_options, running_mode=vision.RunningMode.VIDEO, num_faces=1, min_face_detection_confidence=0.5, min_tracking_confidence=0.5, ) detector = vision.FaceLandmarker.create_from_options(options) except Exception as e: if publisher: publisher.error("lip", f"Failed to load MediaPipe: {e}") result = {"error": str(e), "frames": []} with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.exit(1) if publisher: publisher.info("lip", "LIP_OPENING_VIDEO") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if publisher: publisher.info("lip", f"fps={fps}, frames={total_frames}") publisher.progress("lip", 0, total_frames, "Starting") frames = [] frame_count = 0 processed = 0 speaking_frames = 0 total_openness = 0.0 timestamp_ms = 0 if publisher: publisher.info("lip", f"LIP_PROCESSING (sample={sample_interval})") while True: ret, frame = cap.read() if not ret: break frame_count += 1 timestamp_ms = int(((frame_count - 1) / fps) * 1000) if frame_count % sample_interval != 0: continue processed += 1 timestamp = (frame_count - 1) / fps # 轉換為 MediaPipe Image rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = vision.Image(image_format=vision.ImageFormat.SRGB, data=rgb) # 檢測 result = detector.detect_for_video(mp_image, timestamp_ms) if result.face_landmarks and len(result.face_landmarks) > 0: lm = result.face_landmarks[0] # 計算嘴部開合度 openness = abs(lm[UPPER_LIP_BOTTOM].y - lm[LOWER_LIP_TOP].y) width = abs(lm[LEFT_MOUTH].x - lm[RIGHT_MOUTH].x) if width > 0: normalized = openness / width else: normalized = 0.0 speaking = normalized > 0.1 if speaking: speaking_frames += 1 total_openness += normalized frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "face_detected": True, "lip_openness": round(normalized, 4), "is_speaking": speaking, } ) if publisher and processed % 50 == 0: publisher.progress( "lip", processed, total_frames // sample_interval, f"openness={normalized:.3f}", ) cap.release() detector.close() avg_openness = total_openness / processed if processed > 0 else 0.0 speaking_rate = speaking_frames / processed if processed > 0 else 0.0 result = { "frame_count": total_frames, "fps": fps, "processed_frames": processed, "sample_interval": sample_interval, "frames": frames, "stats": { "speaking_frames": speaking_frames, "speaking_rate": round(speaking_rate, 4), "avg_openness": round(avg_openness, 4), }, } if publisher: publisher.complete("lip", f"{len(frames)} frames") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.stderr.write(f"LIP: Done, {len(frames)} frames\n") sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("video_path") parser.add_argument("output_path") parser.add_argument("--uuid", "-u", default="") parser.add_argument("--sample-interval", "-s", type=int, default=30) args = parser.parse_args() process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)