- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
181 lines
4.9 KiB
Python
181 lines
4.9 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Lip Processor - 嘴部動作檢測 (簡化版)
|
|
使用 MediaPipe Face Mesh 檢測嘴部開合度
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import os
|
|
import signal
|
|
import cv2
|
|
import numpy as np
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
print(f"LIP: Received signal {signum}, exiting...")
|
|
sys.exit(1)
|
|
|
|
|
|
# 嘴部關鍵點索引 (MediaPipe Face Mesh 468 點)
|
|
UPPER_LIP_TOP = 13
|
|
LOWER_LIP_BOTTOM = 14
|
|
UPPER_LIP_BOTTOM = 78
|
|
LOWER_LIP_TOP = 308
|
|
LEFT_MOUTH = 61
|
|
RIGHT_MOUTH = 291
|
|
|
|
|
|
def process_lip(
|
|
video_path: str, output_path: str, uuid: str = "", sample_interval: int = 30
|
|
):
|
|
"""Process video for lip movement detection"""
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
publisher = RedisPublisher(uuid) if uuid else None
|
|
if publisher:
|
|
publisher.info("lip", "LIP_START")
|
|
|
|
if publisher:
|
|
publisher.info("lip", "LIP_LOADING_MEDIAPIPE")
|
|
|
|
# 使用 MediaPipe 舊版 API (如果可用)
|
|
try:
|
|
import mediapipe as mp
|
|
|
|
mp_face_mesh = mp.solutions.face_mesh
|
|
face_mesh = mp_face_mesh.FaceMesh(
|
|
static_image_mode=False,
|
|
max_num_faces=1,
|
|
refine_landmarks=True,
|
|
min_detection_confidence=0.5,
|
|
min_tracking_confidence=0.5,
|
|
)
|
|
use_legacy = True
|
|
except:
|
|
use_legacy = False
|
|
if publisher:
|
|
publisher.error("lip", "MediaPipe legacy API not available")
|
|
result = {"error": "MediaPipe API not available", "frames": []}
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
sys.exit(1)
|
|
|
|
if publisher:
|
|
publisher.info("lip", "LIP_OPENING_VIDEO")
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
|
if publisher:
|
|
publisher.info(
|
|
"lip", f"fps={fps}, frames={total_frames}, sample={sample_interval}"
|
|
)
|
|
publisher.progress("lip", 0, total_frames, "Starting")
|
|
|
|
frames = []
|
|
frame_count = 0
|
|
processed = 0
|
|
speaking_frames = 0
|
|
total_openness = 0.0
|
|
|
|
if publisher:
|
|
publisher.info("lip", "LIP_PROCESSING")
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
frame_count += 1
|
|
|
|
if frame_count % sample_interval != 0:
|
|
continue
|
|
|
|
processed += 1
|
|
timestamp = (frame_count - 1) / fps if fps > 0 else 0
|
|
|
|
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
results = face_mesh.process(rgb)
|
|
|
|
if results.face_landmarks:
|
|
lm = results.face_landmarks
|
|
|
|
# 計算嘴部開合度
|
|
openness = abs(lm[UPPER_LIP_BOTTOM].y - lm[LOWER_LIP_TOP].y)
|
|
width = abs(lm[LEFT_MOUTH].x - lm[RIGHT_MOUTH].x)
|
|
|
|
if width > 0:
|
|
normalized = openness / width
|
|
else:
|
|
normalized = 0.0
|
|
|
|
speaking = normalized > 0.1
|
|
if speaking:
|
|
speaking_frames += 1
|
|
|
|
total_openness += normalized
|
|
|
|
frames.append(
|
|
{
|
|
"frame": frame_count - 1,
|
|
"timestamp": round(timestamp, 3),
|
|
"face_detected": True,
|
|
"lip_openness": round(normalized, 4),
|
|
"is_speaking": speaking,
|
|
}
|
|
)
|
|
|
|
if publisher and processed % 50 == 0:
|
|
publisher.progress(
|
|
"lip",
|
|
processed,
|
|
total_frames // sample_interval,
|
|
f"openness={normalized:.3f}",
|
|
)
|
|
|
|
cap.release()
|
|
|
|
avg_openness = total_openness / processed if processed > 0 else 0.0
|
|
speaking_rate = speaking_frames / processed if processed > 0 else 0.0
|
|
|
|
result = {
|
|
"frame_count": total_frames,
|
|
"fps": fps,
|
|
"processed_frames": processed,
|
|
"sample_interval": sample_interval,
|
|
"frames": frames,
|
|
"stats": {
|
|
"speaking_frames": speaking_frames,
|
|
"speaking_rate": round(speaking_rate, 4),
|
|
"avg_openness": round(avg_openness, 4),
|
|
},
|
|
}
|
|
|
|
if publisher:
|
|
publisher.complete("lip", f"{len(frames)} frames")
|
|
|
|
with open(output_path, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
sys.stderr.write(f"LIP: Done, {len(frames)} frames\n")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("video_path")
|
|
parser.add_argument("output_path")
|
|
parser.add_argument("--uuid", "-u", default="")
|
|
parser.add_argument("--sample-interval", "-s", type=int, default=30)
|
|
args = parser.parse_args()
|
|
|
|
process_lip(args.video_path, args.output_path, args.uuid, args.sample_interval)
|