#!/opt/homebrew/bin/python3.11 """ Self-implemented ASRX - 自實作說話人分離系統 基於聲紋嵌入 + 譜聚類 技術架構: 1. VAD (Silero VAD) - 語音活動檢測 2. Speaker Encoder (ECAPA-TDNN) - 聲紋特徵提取 3. Spectral Clustering - 譜聚類 4. Post-processing - 後處理 流程: 音頻 → VAD → 語音片段 → 聲紋嵌入 → 相似度矩陣 → 譜聚類 → 說話人 ID """ import sys import json import time import numpy as np from pathlib import Path # 導入自定義模組 from vad import load_vad_model, extract_speech_segments from speaker_encoder import ( load_speaker_encoder, extract_speaker_embeddings_batch, compute_similarity_matrix, normalize_embeddings, ) from speaker_cluster import spectral_clustering_speaker, smooth_speaker_labels class SelfASRX: """ 自實作說話人分離系統 """ def __init__(self): """初始化模型""" print("[SelfASRX] Initializing models...") # 載入 VAD 模型 print("[SelfASRX] Loading VAD model (Silero)...") self.vad_model, self.vad_utils = load_vad_model() # 載入聲紋模型 print("[SelfASRX] Loading speaker encoder (ECAPA-TDNN)...") self.speaker_encoder = load_speaker_encoder() print("[SelfASRX] Models loaded successfully") def process( self, audio_path, output_path=None, min_speech_duration_ms=500, n_speakers=None, smooth_window=5, ): """ 處理音頻文件進行說話人分離 Args: audio_path: 音頻文件路徑 output_path: 輸出 JSON 路徑(可選) min_speech_duration_ms: 最小語音持續時間 n_speakers: 說話人數量(None=自動估計) smooth_window: 平滑窗口大小 Returns: result: 說話人分離結果 """ start_time = time.time() print(f"\n[SelfASRX] Processing: {audio_path}") print("=" * 60) # 步驟 1: VAD - 語音活動檢測 print("\n[Step 1] Voice Activity Detection...") step1_start = time.time() speech_segments, wav, sample_rate = extract_speech_segments( audio_path, self.vad_model, self.vad_utils, min_speech_duration_ms=min_speech_duration_ms, ) step1_time = time.time() - step1_start print(f" Speech segments: {len(speech_segments)}") print(f" Total duration: {len(wav) / sample_rate:.2f}s") print(f" VAD time: {step1_time:.2f}s") if len(speech_segments) == 0: print("[SelfASRX] No speech detected!") return {"error": "No speech detected", "segments": []} # 步驟 2: 聲紋特徵提取 print("\n[Step 2] Speaker embedding extraction...") step2_start = time.time() # 提取語音片段音頻 audio_segments = [] for start_sec, end_sec in speech_segments: start_sample = int(start_sec * sample_rate) end_sample = int(end_sec * sample_rate) audio_segments.append(wav[start_sample:end_sample]) # 批量提取嵌入 embeddings = extract_speaker_embeddings_batch( self.speaker_encoder, audio_segments, sample_rate ) # 正規化 embeddings = normalize_embeddings(embeddings) step2_time = time.time() - step2_start print(f" Embedding shape: {embeddings.shape}") print(f" Embedding time: {step2_time:.2f}s") # 步驟 3: 計算相似度矩陣 print("\n[Step 3] Computing similarity matrix...") step3_start = time.time() similarity_matrix = compute_similarity_matrix(embeddings, method="cosine") step3_time = time.time() - step3_start print(f" Similarity matrix shape: {similarity_matrix.shape}") print(f" Similarity time: {step3_time:.2f}s") # 步驟 4: 譜聚類 print("\n[Step 4] Spectral clustering...") step4_start = time.time() speaker_labels, estimated_n_speakers = spectral_clustering_speaker( similarity_matrix, n_speakers=n_speakers, auto_estimate=(n_speakers is None) ) # 平滑標籤 if smooth_window > 1: speaker_labels = smooth_speaker_labels( speaker_labels, window_size=smooth_window ) step4_time = time.time() - step4_start print(f" Estimated speakers: {estimated_n_speakers}") print(f" Clustering time: {step4_time:.2f}s") # 步驟 5: 建立輸出結果 print("\n[Step 5] Building output...") result = { "audio_path": str(audio_path), "total_duration": len(wav) / sample_rate, "n_speech_segments": len(speech_segments), "n_speakers": int(estimated_n_speakers), "segments": [], } for i, ((start, end), label) in enumerate(zip(speech_segments, speaker_labels)): result["segments"].append( { "index": i, "start": round(start, 3), "end": round(end, 3), "duration": round(end - start, 3), "speaker": f"SPEAKER_{int(label)}", } ) # 統計每個說話人的總時長 speaker_stats = {} for seg in result["segments"]: speaker = seg["speaker"] if speaker not in speaker_stats: speaker_stats[speaker] = {"count": 0, "duration": 0} speaker_stats[speaker]["count"] += 1 speaker_stats[speaker]["duration"] += seg["duration"] result["speaker_stats"] = speaker_stats total_time = time.time() - start_time result["processing_time"] = round(total_time, 2) result["realtime_factor"] = round(result["total_duration"] / total_time, 2) print(f"\n[SelfASRX] Processing completed!") print(f" Total time: {total_time:.2f}s") print(f" Realtime factor: {result['realtime_factor']:.2f}x") print(f" Detected speakers: {estimated_n_speakers}") # 保存結果 if output_path: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False) print(f" Results saved to: {output_path}") print("=" * 60) return result def main(): """主函數""" import argparse parser = argparse.ArgumentParser( description="Self-implemented ASRX - Speaker Diarization" ) parser.add_argument("audio_path", help="Path to audio file") parser.add_argument("-o", "--output", help="Output JSON path") parser.add_argument( "--min-speech-duration", type=int, default=500, help="Minimum speech duration in ms (default: 500)", ) parser.add_argument( "--n-speakers", type=int, default=None, help="Number of speakers (default: auto-estimate)", ) parser.add_argument( "--smooth-window", type=int, default=5, help="Smoothing window size (default: 5)", ) args = parser.parse_args() # 檢查文件是否存在 if not Path(args.audio_path).exists(): print(f"Error: Audio file not found: {args.audio_path}") sys.exit(1) # 創建 ASRX 實例並處理 asrx = SelfASRX() result = asrx.process( args.audio_path, args.output, min_speech_duration_ms=args.min_speech_duration, n_speakers=args.n_speakers, smooth_window=args.smooth_window, ) # 顯示結果摘要 if "error" not in result: print(f"\n[Summary]") print(f" Audio duration: {result['total_duration']:.2f}s") print(f" Speech segments: {result['n_speech_segments']}") print(f" Detected speakers: {result['n_speakers']}") print(f" Processing time: {result['processing_time']:.2f}s") print(f" Realtime factor: {result['realtime_factor']:.2f}x") print(f"\n[Speaker Statistics]") for speaker, stats in result["speaker_stats"].items(): pct = stats["duration"] / result["total_duration"] * 100 print( f" {speaker}: {stats['count']} segments, " + f"{stats['duration']:.2f}s ({pct:.1f}%)" ) if __name__ == "__main__": main()