feat: add search_by_appearance agent tool for clothing color search

- New Python script: clothing_color_search.py
- New agent tool: search_by_appearance (red, blue, green, etc.)
- Uses appearance.json person bboxes + HSV color analysis
- Returns matched frames with confidence scores
This commit is contained in:
Accusys
2026-07-02 22:22:07 +08:00
parent 78364afc51
commit e4d6fbac50
3 changed files with 291 additions and 0 deletions
+216
View File
@@ -0,0 +1,216 @@
#!/opt/homebrew/bin/python3.11
"""
Clothing Color Search - Find people wearing specific colors
Usage:
python3 clothing_color_search.py --file-uuid UUID --color red --output output.json
Color matching uses HSV hue ranges:
red: 0-15, 165-180
orange: 15-35
yellow: 35-50
green: 50-85
cyan: 85-105
blue: 105-140
purple: 140-165
"""
import sys
import os
import json
import argparse
import cv2
import numpy as np
COLOR_RANGES = {
"red": [(0, 40), (165, 180)],
"orange": [(15, 35)],
"yellow": [(35, 50)],
"green": [(50, 85)],
"cyan": [(85, 105)],
"blue": [(105, 140)],
"purple": [(140, 165)],
"white": [(0, 180, 0, 40, 200, 255)], # (h_min, h_max, s_min, s_max, v_min, v_max)
"black": [(0, 180, 0, 255, 0, 50)],
}
def hsv_to_color_name(h, s, v):
"""Convert HSV to color name"""
if v < 50:
return "black"
if s < 40 and v > 200:
return "white"
if 0 <= h <= 15 or 165 <= h <= 180:
return "red"
if 15 < h <= 35:
return "orange"
if 35 < h <= 50:
return "yellow"
if 50 < h <= 85:
return "green"
if 85 < h <= 105:
return "cyan"
if 105 < h <= 140:
return "blue"
if 140 < h <= 165:
return "purple"
return "unknown"
def check_color_match(dominant_colors, target_color):
"""Check if dominant colors match target color"""
if not dominant_colors:
return False, 0.0
target_lower = target_color.lower()
match_count = 0
total = len(dominant_colors)
for color_hsv in dominant_colors:
h, s, v = color_hsv[0], color_hsv[1], color_hsv[2]
color_name = hsv_to_color_name(h, s, v)
if color_name == target_lower:
match_count += 1
ratio = match_count / total if total > 0 else 0.0
return ratio > 0.3, ratio # Match if >30% of dominant colors match
def search_by_color(appearance_path, video_path, target_color, output_path, max_frames=500):
"""Search for people wearing target color"""
if not os.path.exists(appearance_path):
print(json.dumps({"error": f"appearance.json not found: {appearance_path}"}))
return
with open(appearance_path) as f:
appearance = json.load(f)
frames = appearance.get("frames", [])
fps = appearance.get("fps", 30)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(json.dumps({"error": f"Cannot open video: {video_path}"}))
return
results = []
frame_count = 0
for frame_data in frames[:max_frames]:
frame_num = frame_data.get("frame", 0)
persons = frame_data.get("persons", [])
timestamp = frame_data.get("timestamp", 0)
if not persons:
frame_count += 1
continue
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if not ret:
frame_count += 1
continue
frame_h, frame_w = frame.shape[:2]
for person in persons:
bbox = person.get("bbox", {})
if not bbox:
continue
x, y = bbox.get("x", 0), bbox.get("y", 0)
w, h = bbox.get("width", 0), bbox.get("height", 0)
# Extract upper body region (clothing area)
upper_h = int(h * 0.6) # Upper 60% of person
roi_x = max(0, int(x))
roi_y = max(0, int(y))
roi_w = min(w, frame_w - roi_x)
roi_h = min(upper_h, frame_h - roi_y)
if roi_w < 10 or roi_h < 10:
continue
roi = frame[roi_y:roi_y+roi_h, roi_x:roi_x+roi_w]
# Get dominant colors
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
pixels = hsv.reshape(-1, 3).astype(np.float32)
if len(pixels) < 10:
continue
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
_, labels, centers = cv2.kmeans(pixels, 5, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
counts = np.bincount(labels.flatten())
dominant = centers[np.argsort(-counts)[:5]].tolist()
match, confidence = check_color_match(dominant, target_color)
if match:
results.append({
"frame": frame_num,
"timestamp": round(timestamp, 2),
"bbox": bbox,
"confidence": round(confidence, 3),
"dominant_colors": [[round(c, 1) for c in dc] for dc in dominant[:3]]
})
frame_count += 1
cap.release()
# Summary
color_names = set()
for r in results:
for dc in r.get("dominant_colors", []):
if len(dc) >= 3:
color_names.add(hsv_to_color_name(dc[0], dc[1], dc[2]))
output = {
"file_uuid": os.path.basename(appearance_path).split(".")[0],
"target_color": target_color,
"total_matches": len(results),
"matched_frames": list(set(r["frame"] for r in results)),
"results": results[:50], # Limit to 50 results
"color_names_found": list(color_names)
}
os.makedirs(os.path.dirname(output_path) if os.path.dirname(output_path) else ".", exist_ok=True)
with open(output_path, "w") as f:
json.dump(output, f, indent=2)
print(json.dumps({"success": True, **output}))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Search for people by clothing color")
parser.add_argument("--file-uuid", required=True)
parser.add_argument("--color", required=True, choices=list(COLOR_RANGES.keys()))
parser.add_argument("--video-path", default="")
parser.add_argument("--appearance-path", default="")
parser.add_argument("--output", default="")
args = parser.parse_args()
output_dir = os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output")
appearance_path = args.appearance_path or f"{output_dir}/{args.file_uuid}.appearance.json"
video_path = args.video_path
output_path = args.output or f"{output_dir}/{args.file_uuid}.color_search_{args.color}.json"
if not video_path:
# Try to find video in common locations
for ext in ["mp4", "mov", "avi"]:
candidate = f"/Users/accusys/momentry/var/sftpgo/data/demo/{args.file_uuid}.{ext}"
if os.path.exists(candidate):
video_path = candidate
break
if not video_path:
# Search in output directory for video
import glob
matches = glob.glob(f"/Users/accusys/momentry/var/sftpgo/**/*{args.file_uuid}*", recursive=True)
if matches:
video_path = matches[0]
if not video_path:
print(json.dumps({"error": "video_path not found, please provide --video-path"}))
sys.exit(1)
search_by_color(appearance_path, video_path, args.color, output_path)
+10
View File
@@ -276,6 +276,15 @@ fn make_tools(pool: &sqlx::PgPool) -> Vec<ToolDef> {
}),
vec!["file_uuid", "node_id"],
),
function_calling::make_tool(
"search_by_appearance",
"根據衣服顏色搜尋影片中的人物。支援顏色:red, orange, yellow, green, cyan, blue, purple, white, black。",
serde_json::json!({
"file_uuid": {"type": "string", "description": "影片 UUID"},
"color": {"type": "string", "description": "目標顏色: red, orange, yellow, green, cyan, blue, purple, white, black"}
}),
vec!["file_uuid", "color"],
),
]
}
@@ -310,6 +319,7 @@ async fn execute_tool(pool: &sqlx::PgPool, tool_call: &ToolCall) -> (String, Str
"get_file_info" => tools::exec_get_file_info(pool, &args).await,
"get_representative_frame" => tools::exec_get_representative_frame(pool, &args).await,
"analyze_frame" => tools::exec_analyze_frame(pool, &args).await,
"search_by_appearance" => tools::exec_search_by_appearance(pool, &args).await,
_ => Err(format!("Unknown tool: {}", name)),
};
let content = match result {
+65
View File
@@ -1092,3 +1092,68 @@ pub async fn exec_tkg_node_detail(
None => Err("Node not found".to_string()),
}
}
/// Search for people by clothing color using appearance data
pub async fn exec_search_by_appearance(pool: &sqlx::PgPool, args: &serde_json::Value) -> Result<String, String> {
let file_uuid = args.get("file_uuid")
.and_then(|v| v.as_str())
.ok_or("file_uuid is required".to_string())?;
let color = args.get("color")
.and_then(|v| v.as_str())
.ok_or("color is required (red, blue, green, yellow, orange, cyan, purple, white, black)".to_string())?;
let output_dir = std::env::var("MOMENTRY_OUTPUT_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry/output".to_string());
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
.unwrap_or_else(|_| "/Users/accusys/momentry_core/scripts".to_string());
let script_path = format!("{}/clothing_color_search.py", scripts_dir);
let appearance_path = format!("{}/{}.appearance.json", output_dir, file_uuid);
let output_path = format!("{}/{}.color_search_{}.json", output_dir, file_uuid, color);
if !std::path::Path::new(&appearance_path).exists() {
return Err(format!("appearance.json not found for file {}", file_uuid));
}
// Get video path from videos table
let videos_table = schema::table_name("videos");
let video_path: Option<String> = sqlx::query_scalar(&format!(
"SELECT file_path FROM {} WHERE file_uuid = $1", videos_table
))
.bind(file_uuid)
.fetch_optional(pool)
.await
.map_err(|e| e.to_string())?;
let video_path = video_path.unwrap_or_default();
if video_path.is_empty() {
return Err("Video path not found".to_string());
}
let executor = crate::core::processor::PythonExecutor::new()
.map_err(|e| e.to_string())?;
executor.run(
&script_path,
&[
"--file-uuid", file_uuid,
"--color", color,
"--video-path", &video_path,
"--appearance-path", &appearance_path,
"--output", &output_path,
],
None,
"CLOTHING_COLOR_SEARCH",
Some(std::time::Duration::from_secs(300)),
)
.await
.map_err(|e| e.to_string())?;
// Read results
if std::path::Path::new(&output_path).exists() {
let content = std::fs::read_to_string(&output_path)
.map_err(|e| e.to_string())?;
Ok(content)
} else {
Err("Color search output not found".to_string())
}
}