- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
233 lines
7.4 KiB
Python
Executable File
233 lines
7.4 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Face + ASRX 整合處理器
|
|
將人臉檢測與說話人識別整合,識別「誰在說話」
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
|
|
def load_json(path):
|
|
"""Load JSON file"""
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def match_face_with_speaker(face_data, asrx_data, time_threshold=1.0):
|
|
"""
|
|
Match faces with speakers based on timestamp proximity
|
|
|
|
Args:
|
|
face_data: Face detection results
|
|
asrx_data: ASRX (speaker diarization) results
|
|
time_threshold: Maximum time difference to consider a match (seconds)
|
|
|
|
Returns:
|
|
Integrated results with face + speaker information
|
|
"""
|
|
integrated_segments = []
|
|
|
|
# Extract faces with timestamps
|
|
face_frames = []
|
|
for frame_info in face_data.get("frames", []):
|
|
timestamp = frame_info.get("timestamp", 0)
|
|
for face in frame_info.get("faces", []):
|
|
face_frames.append(
|
|
{
|
|
"timestamp": timestamp,
|
|
"x": face.get("x"),
|
|
"y": face.get("y"),
|
|
"width": face.get("width"),
|
|
"height": face.get("height"),
|
|
"confidence": face.get("confidence", 0),
|
|
}
|
|
)
|
|
|
|
# Match each ASRX segment with nearest face
|
|
for segment in asrx_data.get("segments", []):
|
|
start_time = segment.get("start", 0)
|
|
end_time = segment.get("end", 0)
|
|
mid_time = (start_time + end_time) / 2
|
|
|
|
# Find closest face within time threshold
|
|
matched_face = None
|
|
min_time_diff = float("inf")
|
|
|
|
for face in face_frames:
|
|
time_diff = abs(face["timestamp"] - mid_time)
|
|
if time_diff < min_time_diff and time_diff <= time_threshold:
|
|
min_time_diff = time_diff
|
|
matched_face = face
|
|
|
|
# Create integrated segment
|
|
integrated_segment = {
|
|
"start": start_time,
|
|
"end": end_time,
|
|
"text": segment.get("text", ""),
|
|
"speaker_id": segment.get("speaker_id"),
|
|
"face_detected": matched_face is not None,
|
|
"face": matched_face,
|
|
"time_diff": min_time_diff if matched_face else None,
|
|
}
|
|
|
|
integrated_segments.append(integrated_segment)
|
|
|
|
return integrated_segments
|
|
|
|
|
|
def generate_statistics(integrated_segments, face_data):
|
|
"""Generate statistics about the integrated data"""
|
|
|
|
total_segments = len(integrated_segments)
|
|
segments_with_face = sum(1 for s in integrated_segments if s["face_detected"])
|
|
segments_without_face = total_segments - segments_with_face
|
|
|
|
# Speaker statistics
|
|
speakers = {}
|
|
for seg in integrated_segments:
|
|
speaker = seg.get("speaker_id")
|
|
if speaker:
|
|
if speaker not in speakers:
|
|
speakers[speaker] = {
|
|
"speaker_id": speaker,
|
|
"segment_count": 0,
|
|
"total_duration": 0,
|
|
"with_face": 0,
|
|
}
|
|
speakers[speaker]["segment_count"] += 1
|
|
speakers[speaker]["total_duration"] += seg["end"] - seg["start"]
|
|
if seg["face_detected"]:
|
|
speakers[speaker]["with_face"] += 1
|
|
|
|
return {
|
|
"total_segments": total_segments,
|
|
"segments_with_face": segments_with_face,
|
|
"segments_without_face": segments_without_face,
|
|
"face_match_rate": segments_with_face / total_segments
|
|
if total_segments > 0
|
|
else 0,
|
|
"speakers": list(speakers.values()),
|
|
"total_faces_detected": len(face_data.get("frames", [])),
|
|
}
|
|
|
|
|
|
def integrate_face_asrx(face_path, asrx_path, output_path, time_threshold=1.0):
|
|
"""
|
|
Integrate face detection and ASRX results
|
|
|
|
Args:
|
|
face_path: Path to face detection JSON
|
|
asrx_path: Path to ASRX JSON
|
|
output_path: Path to save integrated results
|
|
time_threshold: Time threshold for matching (seconds)
|
|
"""
|
|
|
|
# Load data
|
|
print(f"[Face-ASRX] Loading face data: {face_path}")
|
|
face_data = load_json(face_path)
|
|
|
|
print(f"[Face-ASRX] Loading ASRX data: {asrx_path}")
|
|
asrx_data = load_json(asrx_path)
|
|
|
|
# Check if ASRX has data
|
|
if not asrx_data.get("segments"):
|
|
print("[Face-ASRX] Warning: ASRX has no segments, creating empty output")
|
|
output = {
|
|
"integration_time": datetime.now().isoformat(),
|
|
"face_data": face_data,
|
|
"asrx_data": asrx_data,
|
|
"integrated_segments": [],
|
|
"stats": {
|
|
"total_segments": 0,
|
|
"segments_with_face": 0,
|
|
"face_match_rate": 0,
|
|
"note": "ASRX has no segments",
|
|
},
|
|
}
|
|
with open(output_path, "w") as f:
|
|
json.dump(output, f, indent=2)
|
|
return
|
|
|
|
# Match faces with speakers
|
|
print(f"[Face-ASRX] Matching faces with speakers (threshold: {time_threshold}s)")
|
|
integrated_segments = match_face_with_speaker(face_data, asrx_data, time_threshold)
|
|
|
|
# Generate statistics
|
|
print("[Face-ASRX] Generating statistics")
|
|
stats = generate_statistics(integrated_segments, face_data)
|
|
|
|
# Create output
|
|
output = {
|
|
"integration_time": datetime.now().isoformat(),
|
|
"face_source": face_path,
|
|
"asrx_source": asrx_path,
|
|
"time_threshold": time_threshold,
|
|
"face_data": face_data,
|
|
"asrx_data": asrx_data,
|
|
"integrated_segments": integrated_segments,
|
|
"stats": stats,
|
|
}
|
|
|
|
# Save results
|
|
print(f"[Face-ASRX] Saving results to: {output_path}")
|
|
with open(output_path, "w") as f:
|
|
json.dump(output, f, indent=2, ensure_ascii=False)
|
|
|
|
# Print summary
|
|
print("\n=== Face-ASRX Integration Summary ===")
|
|
print(f"Total segments: {stats['total_segments']}")
|
|
print(f"Segments with face: {stats['segments_with_face']}")
|
|
print(f"Segments without face: {stats['segments_without_face']}")
|
|
print(f"Face match rate: {stats['face_match_rate'] * 100:.1f}%")
|
|
print(f"Total speakers: {len(stats['speakers'])}")
|
|
|
|
for speaker in stats["speakers"]:
|
|
print(f"\n Speaker {speaker['speaker_id']}:")
|
|
print(f" Segments: {speaker['segment_count']}")
|
|
print(f" Duration: {speaker['total_duration']:.1f}s")
|
|
print(
|
|
f" With face: {speaker['with_face']} ({speaker['with_face'] / speaker['segment_count'] * 100:.0f}%)"
|
|
)
|
|
|
|
print(f"\n[Face-ASRX] Integration complete!")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Integrate Face Detection with ASRX Speaker Diarization"
|
|
)
|
|
parser.add_argument("face_json", help="Path to face detection JSON")
|
|
parser.add_argument("asrx_json", help="Path to ASRX JSON")
|
|
parser.add_argument("output_path", help="Path to save integrated results")
|
|
parser.add_argument(
|
|
"--threshold",
|
|
"-t",
|
|
type=float,
|
|
default=1.0,
|
|
help="Time threshold for matching face with speaker (seconds, default: 1.0)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Check if files exist
|
|
if not Path(args.face_json).exists():
|
|
print(f"Error: Face JSON not found: {args.face_json}")
|
|
sys.exit(1)
|
|
|
|
if not Path(args.asrx_json).exists():
|
|
print(f"Error: ASRX JSON not found: {args.asrx_json}")
|
|
sys.exit(1)
|
|
|
|
integrate_face_asrx(
|
|
args.face_json, args.asrx_json, args.output_path, args.threshold
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|