Files
momentry_core/scripts/appearance_processor.py
T
Accusys 3eabd45882 fix: ASRX duplication, TKG edges, trace ingest, and add pipeline progress publishing
- ASRX handler no longer stores duplicate 'asr' pre_chunks
- Pre_chunks storage made idempotent (delete-before-insert)
- Rule 1 + trace_ingest changed to query 'asrx' not 'asr'
- Trace chunks removed (dynamic from TKG/Qdrant)
- TKG scroll_face_points fixed: trace_id >= 1 (not == 1)
- TKG AsrxSegmentEntry: start/end -> start_time/end_time (match ASRX JSON)
- Unregister error handling: log instead of silent discard
- Add publish_pipeline_progress calls at each pipeline stage
  (processors, rule1, face_trace, identity_agent, TKG, rule2, completion)
2026-07-02 10:43:46 +08:00

321 lines
10 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Appearance Processor - Body part color extraction using pose keypoints
Input:
- video_path: source video
- pose_json: pose.json with keypoints and bbox
- output_path: output JSON
Output: appearance.json with per-person per-frame body part colors
Regions: head, neck, front_upper_body, front_lower_body,
back_upper_body, back_lower_body, left_hand, right_hand,
left_foot, right_foot
"""
import sys
import os
import json
import argparse
import cv2
import numpy as np
def get_kp(keypoints, name):
for kp in keypoints:
if kp.get("name") == name:
return (kp["x"], kp["y"], kp.get("confidence", 1.0))
return None
def determine_facing(keypoints):
nose = get_kp(keypoints, "nose")
left_shoulder = get_kp(keypoints, "left_shoulder")
right_shoulder = get_kp(keypoints, "right_shoulder")
if nose and nose[2] > 0.5:
return "front"
sh_vis = sum(1 for s in [left_shoulder, right_shoulder] if s and s[2] > 0.5)
if sh_vis >= 2 and (not nose or nose[2] < 0.2):
return "back"
if sh_vis >= 1:
return "profile"
return "unknown"
def extract_color(roi_bgr):
"""Extract HSV histogram and dominant colors from an ROI"""
if roi_bgr is None or roi_bgr.size == 0:
return None
if roi_bgr.shape[0] < 2 or roi_bgr.shape[1] < 2:
return None
hsv = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2HSV)
pixels = hsv.reshape(-1, 3).astype(np.float32)
h_hist = cv2.calcHist([hsv], [0], None, [30], [0, 180]).flatten()
s_hist = cv2.calcHist([hsv], [1], None, [32], [0, 256]).flatten()
v_hist = cv2.calcHist([hsv], [2], None, [32], [0, 256]).flatten()
hs = h_hist.sum() or 1
ss = s_hist.sum() or 1
vs = v_hist.sum() or 1
dominant = []
if len(pixels) >= 5:
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
_, labels, centers = cv2.kmeans(pixels, 5, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
counts = np.bincount(labels.flatten())
dominant = centers[np.argsort(-counts)[:5]].tolist()
elif len(pixels) > 0:
dominant = [pixels.mean(axis=0).tolist()]
return {
"hsv_histogram": [(h_hist / hs).tolist(), (s_hist / ss).tolist(), (v_hist / vs).tolist()],
"dominant_colors": dominant,
}
def safe_roi(frame, x, y, w, h):
"""Extract a safe ROI, returning None if invalid"""
if w <= 0 or h <= 0:
return None
x1 = max(0, int(x))
y1 = max(0, int(y))
x2 = min(frame.shape[1], int(x + w))
y2 = min(frame.shape[0], int(y + h))
if x2 <= x1 or y2 <= y1:
return None
return frame[y1:y2, x1:x2]
def compute_body_regions(keypoints, face_bbox, frame_shape):
"""Use face bbox for size, pose keypoints for alignment"""
h, w = frame_shape[:2]
fx, fy, fw, fh = face_bbox["x"], face_bbox["y"], face_bbox["width"], face_bbox["height"]
face_cx = fx + fw / 2
nose = get_kp(keypoints, "nose")
ls = get_kp(keypoints, "left_shoulder")
rs = get_kp(keypoints, "right_shoulder")
lw = get_kp(keypoints, "left_wrist")
rw = get_kp(keypoints, "right_wrist")
lh = get_kp(keypoints, "left_hip")
rh = get_kp(keypoints, "right_hip")
la = get_kp(keypoints, "left_ankle")
ra = get_kp(keypoints, "right_ankle")
kp_nose = (nose[0], nose[1]) if nose else (face_cx, fy + fh * 0.5)
kp_sh_l = ls[0] if ls else (face_cx - fw * 1.5)
kp_sh_r = rs[0] if rs else (face_cx + fw * 1.5)
kp_sh_mid_x = (kp_sh_l + kp_sh_r) / 2
kp_sh_mid_y = ((ls[1] + rs[1]) / 2) if (ls and rs) else (fy + fh + fh * 0.3)
kp_hip_y = ((lh[1] + rh[1]) / 2) if (lh and rh) else (kp_sh_mid_y + fw * 2.0)
kp_hip_l = lh[0] if lh else (kp_sh_mid_x - fw * 1.2)
kp_hip_r = rh[0] if rh else (kp_sh_mid_x + fw * 1.2)
regions = {}
# head: nose-aligned, face-proportional
head_w = fw * 1.6
head_h = fh * 1.5
regions["head"] = {
"x": kp_nose[0] - head_w / 2,
"y": kp_nose[1] - head_h * 0.5,
"width": head_w,
"height": head_h,
}
# neck: nose-to-shoulder, face-width
neck_w = fw * 1.5
regions["neck"] = {
"x": kp_sh_mid_x - neck_w / 2,
"y": kp_nose[1] + fh * 0.4,
"width": neck_w,
"height": max(kp_sh_mid_y - kp_nose[1] - fh * 0.4, fh * 0.3),
}
# upper body: shoulder-aligned
ub_w = max(abs(kp_sh_r - kp_sh_l) * 1.3, fw * 3.0)
ub_h = fh * 3.0
regions["front_upper_body"] = {
"x": kp_sh_mid_x - ub_w / 2,
"y": kp_sh_mid_y,
"width": ub_w,
"height": ub_h,
}
regions["back_upper_body"] = dict(regions["front_upper_body"])
# lower body: hip-aligned
lb_w = max(abs(kp_hip_r - kp_hip_l) * 1.3, fw * 3.5)
lb_h = fh * 3.0
regions["front_lower_body"] = {
"x": kp_sh_mid_x - lb_w / 2,
"y": kp_hip_y,
"width": lb_w,
"height": lb_h,
}
regions["back_lower_body"] = dict(regions["front_lower_body"])
# hands: wrist-aligned
hs = fw * 1.0
if lw and lw[2] > 0.3:
regions["left_hand"] = {"x": lw[0] - hs / 2, "y": lw[1] - hs / 2, "width": hs, "height": hs}
else:
regions["left_hand"] = {"x": kp_sh_l - hs, "y": kp_sh_mid_y + fh * 0.5, "width": hs, "height": hs}
if rw and rw[2] > 0.3:
regions["right_hand"] = {"x": rw[0] - hs / 2, "y": rw[1] - hs / 2, "width": hs, "height": hs}
else:
regions["right_hand"] = {"x": kp_sh_r, "y": kp_sh_mid_y + fh * 0.5, "width": hs, "height": hs}
# feet: ankle-aligned
fs = fw * 1.0
if la and la[2] > 0.3:
regions["left_foot"] = {"x": la[0] - fs / 2, "y": la[1], "width": fs, "height": fs * 0.75}
else:
regions["left_foot"] = {"x": kp_sh_mid_x - fw * 1.0, "y": kp_hip_y + fh * 2.5, "width": fs, "height": fs * 0.75}
if ra and ra[2] > 0.3:
regions["right_foot"] = {"x": ra[0] - fs / 2, "y": ra[1], "width": fs, "height": fs * 0.75}
else:
regions["right_foot"] = {"x": kp_sh_mid_x + fw * 1.0 - fs, "y": kp_hip_y + fh * 2.5, "width": fs, "height": fs * 0.75}
# Extrapolate each bbox outward
expanded = {}
margins = {
"head": 0.10, "neck": 0.15,
"front_upper_body": 0.20, "back_upper_body": 0.20,
"front_lower_body": 0.15, "back_lower_body": 0.15,
"left_hand": 0.25, "right_hand": 0.25,
"left_foot": 0.20, "right_foot": 0.20,
}
for name, rb in regions.items():
m = margins.get(name, 0.15)
dx = int(rb["width"] * m)
dy = int(rb["height"] * m)
expanded[name] = {
"x": rb["x"] - dx,
"y": rb["y"] - dy,
"width": rb["width"] + dx * 2,
"height": rb["height"] + dy * 2,
}
return expanded
def filter_by_facing(regions, facing):
if facing == "front":
regions.pop("back_upper_body", None)
regions.pop("back_lower_body", None)
elif facing == "back":
regions.pop("front_upper_body", None)
regions.pop("front_lower_body", None)
return regions
def main():
parser = argparse.ArgumentParser(description="Appearance Processor")
parser.add_argument("video_path")
parser.add_argument("pose_json")
parser.add_argument("output_path")
parser.add_argument("--uuid", "-u", default="")
args = parser.parse_args()
with open(args.pose_json) as f:
pose_data = json.load(f)
# Load face.json for anchor bbox (same directory as pose_json)
face_path = args.pose_json.replace(".pose.json", ".face.json")
face_data = {}
if os.path.exists(face_path):
with open(face_path) as f:
face_data = json.load(f)
# Build frame -> face bbox lookup
face_by_frame = {}
for fframe in face_data.get("frames", []):
fn = fframe.get("frame")
faces = fframe.get("faces", [])
if faces:
face_by_frame[fn] = faces[0] # first face bbox
fps = pose_data.get("fps", 30.0)
cap = cv2.VideoCapture(args.video_path)
if not cap.isOpened():
print("[APPEARANCE] Cannot open video", file=sys.stderr)
sys.exit(1)
frames_out = []
for pose_frame in pose_data.get("frames", []):
frame_num = pose_frame["frame"]
persons = pose_frame.get("persons", [])
if not persons:
continue
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if not ret:
continue
# Get face bbox for this frame
face_bbox = face_by_frame.get(frame_num, persons[0].get("bbox", {"x": 0, "y": 0, "width": 0, "height": 0}))
frame_persons = []
for pid, person in enumerate(persons):
keypoints = person.get("keypoints", [])
bbox = person.get("bbox", {})
if not keypoints:
continue
facing = determine_facing(keypoints)
all_regions = compute_body_regions(keypoints, face_bbox, frame.shape)
regions = filter_by_facing(all_regions, facing)
body_parts = []
for name, rb in regions.items():
roi = safe_roi(frame, rb["x"], rb["y"], rb["width"], rb["height"])
color = extract_color(roi)
if color is None:
continue
body_parts.append({
"name": name,
"bbox": rb,
"hsv_histogram": color["hsv_histogram"],
"dominant_colors": color["dominant_colors"],
})
# Full bbox reference colors
full = None
if bbox.get("width", 0) > 0 and bbox.get("height", 0) > 0:
full_roi = safe_roi(frame, bbox["x"], bbox["y"], bbox["width"], bbox["height"])
full = extract_color(full_roi)
frame_persons.append({
"person_id": pid,
"bbox": bbox,
"facing": facing,
"body_parts": body_parts,
"dominant_colors": full["dominant_colors"] if full else [],
"hsv_histogram": full["hsv_histogram"] if full else [[], [], []],
})
if frame_persons:
frames_out.append({
"frame": frame_num,
"timestamp": pose_frame.get("timestamp", frame_num / fps),
"persons": frame_persons,
})
cap.release()
output = {"frame_count": len(frames_out), "fps": fps, "frames": frames_out}
with open(args.output_path, "w") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"[APPEARANCE] Done: {len(frames_out)} frames")
if __name__ == "__main__":
main()