Files
momentry_core/scripts/voice_embedding_extractor.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

241 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""
Voice Embedding Extractor
職責:從視頻音軌提取 Speaker ID 的聲紋向量 (192-dim) 並存入資料庫。
依賴SpeechBrain, Librosa, Psycopg2
"""
import sys
import os
import json
import torch
import librosa
import numpy as np
import psycopg2
from psycopg2.extras import execute_values
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 引入 SpeechBrain (需確保環境已安裝)
try:
from speechbrain.inference.speaker import EncoderClassifier
HAS_SPEECHBRAIN = True
except ImportError:
HAS_SPEECHBRAIN = False
print("[Warning] SpeechBrain not found. Install via: pip install speechbrain")
DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output")
def get_db_connection():
return psycopg2.connect(DB_URL)
def extract_speaker_embeddings(uuid: str, video_path: str):
"""
提取指定視頻中所有 Speaker 的聲紋向量
"""
if not HAS_SPEECHBRAIN:
return {}
# 1. 加載 ASRX 數據以獲取時間軸
asrx_path = os.path.join(OUTPUT_DIR, f"{uuid}.asrx.json")
if not os.path.exists(asrx_path):
print(f" [Skip] No ASRX data for {uuid}")
return {}
with open(asrx_path, "r") as f:
asrx_data = json.load(f)
segments = asrx_data.get("segments", [])
if not segments:
return {}
# 2. 加載聲紋模型 (ECAPA-TDNN)
# 注意:首次運行會下載模型 (~50MB)
print(f" [Model] Loading SpeechBrain EncoderClassifier...")
try:
classifier = EncoderClassifier.from_hparams(
source="speechbrain/spkrec-ecapa-voxceleb",
savedir="pretrained_models/spkrec-ecapa-voxceleb",
run_opts={"device": "cpu"}, # Use CPU to avoid device_type bug
)
except Exception as e:
print(f" [Error] Failed to load model: {e}")
return {}
# 3. 加載音頻
print(f" [Audio] Loading audio for {uuid}...")
audio, sr = librosa.load(video_path, sr=16000, mono=True)
# 優化:濾除背景雜訊 (Bandpass Filter 300Hz-3400Hz)
# 保留人聲頻率,去除低頻嗡嗡聲與高頻雜音,避免干擾聲紋識別
try:
from scipy import signal
nyquist = 0.5 * sr
low = 300.0 / nyquist
high = 3400.0 / nyquist
b, a = signal.butter(4, [low, high], btype="band")
audio = signal.lfilter(b, a, audio)
print(" [Filter] ✅ 已套用濾波器:去除背景雜訊 (300Hz-3400Hz)")
except Exception as e:
print(f" [Warning] ⚠️ 濾波失敗 (可能缺少 scipy): {e}")
# 按 Speaker ID 分組
speaker_samples = {}
for seg in segments:
sid = seg.get("speaker_id")
if not sid:
continue
start = seg.get("start", 0.0)
end = seg.get("end", 0.0)
# 截取音頻片段
start_sample = int(start * sr)
end_sample = int(end * sr)
# 過濾過短的片段 (< 1s) 以保證向量質量
if (end_sample - start_sample) < sr:
continue
segment_audio = audio[start_sample:end_sample]
if sid not in speaker_samples:
speaker_samples[sid] = []
speaker_samples[sid].append(segment_audio)
# 4. 計算每個 Speaker 的 Embedding (取平均)
speaker_embeddings = {}
for sid, samples in speaker_samples.items():
print(f" [Embedding] Processing {sid} ({len(samples)} segments)...")
embeddings = []
for sample in samples:
# SpeechBrain 需要 Tensor: (1, samples)
waveform = torch.tensor(sample).unsqueeze(0).to(classifier.device)
# 提取特徵
embedding = (
classifier.encode_batch(waveform).squeeze(0).squeeze(0).cpu().numpy()
)
embeddings.append(embedding)
# 平均池化
if embeddings:
avg_embedding = np.mean(embeddings, axis=0)
# 轉換為 List[float] 供 JSON/DB 使用
speaker_embeddings[sid] = avg_embedding.tolist()
return speaker_embeddings
def save_embeddings_to_db(uuid: str, embeddings: dict):
"""
將提取的聲紋向量存入資料庫
"""
if not embeddings:
return
conn = get_db_connection()
cur = conn.cursor()
# 確保 identity_bindings 表中有對應的 Speaker ID (即使還沒綁定 Talent)
# 這裡我們主要更新或創建與該 Speaker ID 對應的記錄
# 策略:
# 1. 檢查是否有現行的 Talent 已經綁定了這個 Speaker ID。
# 2. 如果有,更新該 Talent 的 voice_embedding。
# 3. 如果沒有,創建一個名為 "Unknown_Speaker_X" 的新 Talent 並綁定,存入向量。
for sid, vector in embeddings.items():
# 查找是否已綁定
cur.execute(
"""
SELECT t.id FROM talents t
JOIN identity_bindings b ON t.id = b.talent_id
WHERE b.binding_type = 'speaker' AND b.binding_value = %s
""",
(sid,),
)
row = cur.fetchone()
if row:
talent_id = row[0]
# 更新向量
cur.execute(
"""
UPDATE talents SET voice_embedding = %s WHERE id = %s
""",
(vector, talent_id),
)
print(
f" [DB] Updated embedding for bound Speaker {sid} (Talent #{talent_id})"
)
else:
# 創建新 Talent
# 使用 ON CONFLICT 確保不會重複創建同名
cur.execute(
"""
INSERT INTO talents (real_name, voice_embedding)
VALUES (%s, %s)
ON CONFLICT (real_name) DO UPDATE SET voice_embedding = EXCLUDED.voice_embedding
RETURNING id
""",
(f"Speaker_{sid}", vector),
)
talent_id = cur.fetchone()[0]
# 綁定關係
cur.execute(
"""
INSERT INTO identity_bindings (talent_id, binding_type, binding_value, source, confidence)
VALUES (%s, 'speaker', %s, 'auto_extracted', 0.9)
ON CONFLICT (binding_type, binding_value) DO NOTHING
""",
(talent_id, sid),
)
print(
f" [DB] Created new Talent 'Speaker_{sid}' (#{talent_id}) with embedding"
)
conn.commit()
cur.close()
conn.close()
def main():
import argparse
parser = argparse.ArgumentParser(description="Extract Speaker Embeddings")
parser.add_argument("--uuid", required=True, help="Video UUID")
parser.add_argument("--video-path", required=True, help="Path to video file")
args = parser.parse_args()
if not os.path.exists(args.video_path):
print(f"Error: Video file not found at {args.video_path}")
sys.exit(1)
print(f"Starting Voice Embedding Extraction for {args.uuid}")
# 1. 提取
embeddings = extract_speaker_embeddings(args.uuid, args.video_path)
# 2. 入庫
save_embeddings_to_db(args.uuid, embeddings)
print("Done.")
if __name__ == "__main__":
main()