Files
momentry_core/scripts/identity_agent.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

520 lines
17 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Identity Agent - Multi-Evidence Identity Inference
Core Logic:
1. Time Overlap Matching (Speaker vs Person frames)
2. Embedding Similarity Calculation
3. Multi-Evidence Fusion
4. LLM Inference for Ambiguous Cases
5. Identity Assignment
Usage:
python3 scripts/identity_agent.py --video-uuid <uuid> --analyze
python3 scripts/identity_agent.py --video-uuid <uuid> --suggest
"""
import sys
import json
import argparse
import os
import numpy as np
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from sklearn.metrics.pairwise import cosine_similarity
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
class IdentityAgent:
"""
Identity Agent for Multi-Evidence Identity Inference
Attributes:
video_uuid (str): Video UUID
output_dir (str): Output directory
fps (float): Video frame rate
auto_merge_threshold (float): Auto merge threshold (default: 0.8)
llm_threshold (float): LLM inference threshold (default: 0.5)
face_similarity_threshold (float): Face similarity threshold (default: 0.3)
use_llm (bool): Use LLM for ambiguous cases
model (str): LLM model name
"""
def __init__(
self,
video_uuid: str,
output_dir: str = None,
auto_merge_threshold: float = 0.8,
llm_threshold: float = 0.5,
face_similarity_threshold: float = 0.3,
use_llm: bool = True,
model: str = "gemma4",
):
self.video_uuid = video_uuid
self.output_dir = output_dir or os.getenv(
"MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev"
)
self.auto_merge_threshold = auto_merge_threshold
self.llm_threshold = llm_threshold
self.face_similarity_threshold = face_similarity_threshold
self.use_llm = use_llm
self.model = model
self.fps = 23.976 # Default FPS
self.face_data = None
self.asrx_data = None
self.persons = []
self.speakers = []
self.identities = []
self.publisher = RedisPublisher(video_uuid) if video_uuid else None
def load_data(self) -> bool:
"""Load face clustered and ASRX data from files"""
video_dir = os.path.join(self.output_dir, self.video_uuid)
face_clustered_path = os.path.join(
video_dir, f"{self.video_uuid}.face_clustered.json"
)
asrx_path = os.path.join(video_dir, f"{self.video_uuid}.asrx.json")
probe_path = os.path.join(video_dir, f"{self.video_uuid}.probe.json")
if not os.path.exists(face_clustered_path):
print(f"Error: Face clustered data not found: {face_clustered_path}")
return False
self.face_data = self._load_json(face_clustered_path)
self.asrx_data = self._load_json(asrx_path) if os.path.exists(asrx_path) else None
if os.path.exists(probe_path):
probe_data = self._load_json(probe_path)
self.fps = probe_data.get("fps", 23.976)
self.persons = self._extract_persons()
self.speakers = self._extract_speakers()
print(f"Loaded {len(self.persons)} persons, {len(self.speakers)} speakers")
return True
def _load_json(self, path: str) -> Dict:
"""Load JSON file"""
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def _extract_persons(self) -> List[Dict]:
"""Extract persons from face clustered data"""
persons = []
if not self.face_data:
return persons
if "clusters" in self.face_data:
for cluster in self.face_data["clusters"]:
person_id = cluster.get("person_id", f"Person_{len(persons) + 1}")
frames = cluster.get("frames", [])
avg_embedding = cluster.get("avg_embedding", None)
persons.append({
"person_id": person_id,
"frames": frames,
"frame_count": len(frames),
"avg_embedding": avg_embedding,
"timestamps": [f / self.fps for f in frames],
})
return persons
def _extract_speakers(self) -> List[Dict]:
"""Extract speakers from ASRX data"""
speakers = []
if not self.asrx_data:
return speakers
if "segments" in self.asrx_data:
speaker_segments_map = {}
for segment in self.asrx_data["segments"]:
speaker_id = segment.get("speaker", "SPEAKER_01")
start = segment.get("start", 0.0)
end = segment.get("end", 0.0)
if speaker_id not in speaker_segments_map:
speaker_segments_map[speaker_id] = []
speaker_segments_map[speaker_id].append({"start": start, "end": end})
for speaker_id, segments in speaker_segments_map.items():
total_duration = sum(s["end"] - s["start"] for s in segments)
speakers.append({
"speaker_id": speaker_id,
"segments": segments,
"total_duration": total_duration,
})
return speakers
def calculate_speaker_person_overlap(
self, person: Dict, speaker: Dict
) -> Tuple[int, float]:
"""
Calculate overlap between Person and Speaker
Returns:
Tuple of (overlap_frames, overlap_ratio)
"""
overlap_frames = 0
for frame in person["frames"]:
frame_time = frame / self.fps
for segment in speaker["segments"]:
if segment["start"] <= frame_time <= segment["end"]:
overlap_frames += 1
break
overlap_ratio = overlap_frames / person["frame_count"] if person["frame_count"] > 0 else 0
return overlap_frames, overlap_ratio
def calculate_person_similarity(
self, person1: Dict, person2: Dict
) -> Optional[float]:
"""
Calculate cosine similarity between two Person embeddings
Returns:
Similarity score (0-1) or None if embeddings not available
"""
if not person1.get("avg_embedding") or not person2.get("avg_embedding"):
return None
emb1 = np.array(person1["avg_embedding"]).reshape(1, -1)
emb2 = np.array(person2["avg_embedding"]).reshape(1, -1)
similarity = cosine_similarity(emb1, emb2)[0][0]
return similarity
def fuse_evidence(
self,
face_similarity: Optional[float],
speaker_overlap: float,
time_overlap: float,
frame_ratio: float,
) -> float:
"""
Fuse multiple evidence sources into a single confidence score
Args:
face_similarity: Cosine similarity between face embeddings (0-1)
speaker_overlap: Speaker-Person overlap ratio (0-1)
time_overlap: Temporal overlap ratio (0-1)
frame_ratio: Person's frame count ratio in video (0-1)
Returns:
Fused confidence score (0-1)
"""
weights = {
"face": 0.4,
"speaker": 0.3,
"time": 0.2,
"frame": 0.1,
}
face_score = face_similarity if face_similarity is not None else 0.5
confidence = (
weights["face"] * face_score
+ weights["speaker"] * speaker_overlap
+ weights["time"] * time_overlap
+ weights["frame"] * frame_ratio
)
return confidence
def analyze(self) -> Dict:
"""
Analyze video identity
Returns:
Identity analysis result
"""
if not self.load_data():
return {"success": False, "error": "Failed to load data"}
if self.publisher:
self.publisher.info("identity", "IDENTITY_ANALYZE_START")
identities = []
for i, person in enumerate(self.persons):
identity_id = f"identity_{i + 1}"
speaker_overlaps = []
max_overlap = 0.0
max_speaker_id = None
for speaker in self.speakers:
overlap_frames, overlap_ratio = self.calculate_speaker_person_overlap(
person, speaker
)
if overlap_ratio > 0.3:
speaker_overlaps.append({
"speaker_id": speaker["speaker_id"],
"overlap_frames": overlap_frames,
"overlap_ratio": overlap_ratio,
})
if overlap_ratio > max_overlap:
max_overlap = overlap_ratio
max_speaker_id = speaker["speaker_id"]
frame_ratio = person["frame_count"] / max(p["frame_count"] for p in self.persons)
confidence = self.fuse_evidence(
face_similarity=None,
speaker_overlap=max_overlap,
time_overlap=max_overlap,
frame_ratio=frame_ratio,
)
identity = {
"identity_id": identity_id,
"person_ids": [person["person_id"]],
"speaker_ids": [s["speaker_id"] for s in speaker_overlaps],
"confidence": confidence,
"evidence": {
"face_similarity": None,
"speaker_overlap": max_overlap,
"time_overlap": max_overlap,
"frame_ratio": frame_ratio,
},
"reasoning": f"Person {person['person_id']} has {max_overlap:.0%} overlap with {max_speaker_id or 'no speaker'}",
}
identities.append(identity)
if self.publisher:
self.publisher.info("identity", f"IDENTITY_ANALYZE_COMPLETE:{len(identities)}")
return {
"success": True,
"video_uuid": self.video_uuid,
"identities": identities,
"processing_status": {
"status": "completed",
"persons_analyzed": len(self.persons),
"identities_created": len(identities),
"merges_suggested": 0,
},
}
def suggest_merges(self) -> Dict:
"""
Suggest Identity merges
Returns:
Merge suggestions
"""
analyze_result = self.analyze()
if not analyze_result.get("success"):
return analyze_result
identities = analyze_result["identities"]
merge_suggestions = []
for identity in identities:
if len(identity["person_ids"]) >= 1 and len(identity["speaker_ids"]) >= 1:
confidence = identity["confidence"]
if confidence > self.auto_merge_threshold:
action = "auto_apply"
elif confidence > self.llm_threshold:
action = "review_needed"
else:
continue
reasons = [
f"Shared speaker overlap: {identity['evidence']['speaker_overlap']:.0%}",
f"Confidence: {confidence:.2f}",
]
merge_suggestions.append({
"target_person_id": identity["person_ids"][0],
"source_person_ids": identity["person_ids"][1:] if len(identity["person_ids"]) > 1 else [],
"confidence": confidence,
"reasons": reasons,
"action": action,
})
return {
"success": True,
"video_uuid": self.video_uuid,
"merge_suggestions": merge_suggestions,
"naming_suggestions": [],
}
def call_llm(self, prompt: str) -> Dict:
"""
Call LLM for inference
Args:
prompt: LLM prompt
Returns:
LLM response
"""
import requests
ollama_url = "http://localhost:11434/api/generate"
body = {
"model": self.model,
"prompt": prompt,
"stream": False,
}
try:
response = requests.post(ollama_url, json=body, timeout=30)
result = response.json()
llm_output = result.get("response", "")
try:
parsed = json.loads(llm_output)
return parsed
except json.JSONDecodeError:
return {
"decision": "keep_separate",
"confidence": 0.5,
"reasoning": llm_output,
}
except Exception as e:
print(f"LLM call failed: {e}")
return {
"decision": "keep_separate",
"confidence": 0.5,
"reasoning": f"LLM call failed: {e}",
}
def llm_identity_inference(self, evidence: Dict) -> Dict:
"""
Use LLM to infer identity for ambiguous cases
Args:
evidence: Multi-evidence data
Returns:
LLM inference result
"""
confidence = evidence.get("confidence", 0.5)
if confidence > self.auto_merge_threshold:
return {
"decision": "merge",
"confidence": confidence,
"reasoning": f"High confidence ({confidence:.2f}) - auto merge",
}
if confidence < self.llm_threshold:
return {
"decision": "keep_separate",
"confidence": confidence,
"reasoning": f"Low confidence ({confidence:.2f}) - keep separate",
}
if not self.use_llm:
return {
"decision": "review_needed",
"confidence": confidence,
"reasoning": "Medium confidence - manual review required",
}
prompt = f"""
You are an identity analyst for a video analysis system.
Given the following evidence:
- Face similarity: {evidence.get('face_similarity', 'N/A')}
- Speaker overlap: {evidence.get('speaker_overlap', 0):.2f}
- Time overlap: {evidence.get('time_overlap', 0):.2f}
- Frame ratio: {evidence.get('frame_ratio', 0):.2f}
- Person: {evidence.get('person_id', 'Unknown')} ({evidence.get('frame_count', 0)} frames)
- Shared speaker: {evidence.get('shared_speaker', 'None')}
Should this person be merged with other persons sharing the same speaker?
Provide:
1. Decision: "merge" or "keep_separate"
2. Confidence: 0.0-1.0
3. Reasoning: 1-2 sentences explaining your decision
Output in JSON format only:
{{
"decision": "merge" or "keep_separate",
"confidence": 0.85,
"reasoning": "..."
}}
"""
return self.call_llm(prompt)
def main():
parser = argparse.ArgumentParser(description="Identity Agent - Multi-Evidence Identity Inference")
parser.add_argument("--video-uuid", "-u", help="Video UUID", required=True)
parser.add_argument("--output-dir", "-o", help="Output directory", default=None)
parser.add_argument(
"--analyze", "-a", help="Analyze video identity", action="store_true"
)
parser.add_argument(
"--suggest", "-s", help="Suggest Identity merges", action="store_true"
)
parser.add_argument(
"--auto-merge-threshold",
"-t",
help="Auto merge threshold",
type=float,
default=0.8,
)
parser.add_argument(
"--llm-threshold",
"-l",
help="LLM inference threshold",
type=float,
default=0.5,
)
parser.add_argument(
"--use-llm", help="Use LLM for ambiguous cases", action="store_true"
)
parser.add_argument("--model", "-m", help="LLM model", default="gemma4")
args = parser.parse_args()
agent = IdentityAgent(
video_uuid=args.video_uuid,
output_dir=args.output_dir,
auto_merge_threshold=args.auto_merge_threshold,
llm_threshold=args.llm_threshold,
use_llm=args.use_llm,
model=args.model,
)
if args.analyze:
result = agent.analyze()
print(json.dumps(result, indent=2))
if args.suggest:
result = agent.suggest_merges()
print(json.dumps(result, indent=2))
if not args.analyze and not args.suggest:
print("Please specify --analyze or --suggest")
if __name__ == "__main__":
main()