#!/opt/homebrew/bin/python3.11 """ ASR Processor - 簡化標準化版本 功能:執行自動語音識別處理 輸入:視頻文件路徑,輸出文件路徑 輸出:JSON 格式的語音識別結果 標準化特性: 1. 移除不必要的監控邏輯 2. 簡化架構(<300 行) 3. 統一的錯誤處理 4. 標準化的輸出格式 5. 配置參數化 """ import sys import json import os import argparse import signal import tempfile import time import subprocess from typing import Dict, Any, Tuple import traceback # 環境檢查 def check_environment() -> Tuple[bool, str]: """檢查必要的環境和依賴""" try: # 檢查 Whisper import whisper # 檢查 ffmpeg/ffprobe result = subprocess.run(["ffprobe", "-version"], capture_output=True, text=True) if result.returncode != 0: return False, "ffprobe not found or not working" return True, "Environment OK" except ImportError as e: return False, f"Missing dependency: {e}" except Exception as e: return False, f"Environment check failed: {e}" # 信號處理 def signal_handler(signum, frame): """處理中斷信號""" print(f"[ASR] Received signal {signum}, cleaning up...", file=sys.stderr) sys.exit(1) # Whisper 模型緩存 _whisper_model_cache = {} def get_whisper_model(model_name: str = "base"): """獲取 Whisper 模型(帶緩存)""" if model_name not in _whisper_model_cache: import whisper print(f"[ASR] Loading Whisper model: {model_name}", file=sys.stderr) _whisper_model_cache[model_name] = whisper.load_model(model_name) return _whisper_model_cache[model_name] # 主要處理類 class ASRProcessor: def __init__( self, video_path: str, output_path: str, model_name: str = "base", chunk_size: int = 300, ): self.video_path = video_path self.output_path = output_path self.model_name = model_name self.chunk_size = chunk_size # 分塊大小(秒) self.start_time = time.time() def validate_input(self) -> Tuple[bool, str]: """驗證輸入文件""" if not os.path.exists(self.video_path): return False, f"Video file not found: {self.video_path}" # 檢查是否有音頻流 if not self._has_audio_stream(): return False, f"No audio stream found in: {self.video_path}" return True, "Input validation passed" def _has_audio_stream(self) -> bool: """檢查視頻文件是否有音頻流""" try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", self.video_path, ] result = subprocess.run(cmd, capture_output=True, text=True) return "audio" in result.stdout except Exception: return False def _get_media_duration(self) -> float: """獲取媒體文件時長(秒)""" try: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", self.video_path, ] result = subprocess.run(cmd, capture_output=True, text=True) return float(result.stdout.strip()) except Exception as e: print(f"[ASR] Warning: Failed to get duration: {e}", file=sys.stderr) return 0.0 def _extract_audio(self, audio_path: str) -> bool: """提取音頻到臨時文件""" try: cmd = [ "ffmpeg", "-i", self.video_path, "-vn", # 禁用視頻 "-acodec", "pcm_s16le", # PCM 16-bit 小端 "-ar", "16000", # 16kHz 採樣率 "-ac", "1", # 單聲道 "-y", # 覆蓋輸出文件 audio_path, ] print(f"[ASR] Extracting audio to: {audio_path}", file=sys.stderr) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print( f"[ASR] Audio extraction failed: {result.stderr}", file=sys.stderr ) return False return os.path.exists(audio_path) and os.path.getsize(audio_path) > 0 except Exception as e: print(f"[ASR] Audio extraction error: {e}", file=sys.stderr) return False def process(self) -> Dict[str, Any]: """執行 ASR 處理邏輯""" try: # 1. 準備工作目錄 work_dir = tempfile.mkdtemp(prefix="asr_") print(f"[ASR] Working directory: {work_dir}", file=sys.stderr) # 2. 獲取媒體時長 duration = self._get_media_duration() print(f"[ASR] Media duration: {duration:.2f} seconds", file=sys.stderr) # 3. 根據時長決定處理策略 if duration <= self.chunk_size or self.chunk_size <= 0: # 小文件或不分塊:直接處理 result = self._process_single_file(work_dir) else: # 大文件:分塊處理 result = self._process_chunked(work_dir, duration) # 4. 添加元數據 processing_time = time.time() - self.start_time result["metadata"] = { "processing_time": processing_time, "video_path": self.video_path, "duration": duration, "model": self.model_name, "chunk_size": self.chunk_size, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "module_version": "1.0.0", } # 5. 清理工作目錄 try: import shutil shutil.rmtree(work_dir) print("[ASR] Cleaned up working directory", file=sys.stderr) except Exception as e: print(f"[ASR] Warning: Failed to clean up: {e}", file=sys.stderr) return result except Exception as e: print(f"[ASR] Processing failed: {e}", file=sys.stderr) print(f"[ASR] Traceback: {traceback.format_exc()}", file=sys.stderr) raise def _process_single_file(self, work_dir: str) -> Dict[str, Any]: """處理單個文件(不分塊)""" # 1. 提取音頻 audio_path = os.path.join(work_dir, "audio.wav") if not self._extract_audio(audio_path): raise RuntimeError("Failed to extract audio") # 2. 加載模型 model = get_whisper_model(self.model_name) # 3. 執行轉錄 print("[ASR] Transcribing audio...", file=sys.stderr) result = model.transcribe(audio_path) # 4. 格式化結果 segments = [] for segment in result.get("segments", []): segments.append( { "start": segment.get("start", 0.0), "end": segment.get("end", 0.0), "text": segment.get("text", "").strip(), "confidence": segment.get("confidence", 0.0), } ) return { "language": result.get("language"), "language_probability": result.get("language_probability"), "segments": segments, "summary": { "segment_count": len(segments), "total_duration": result.get("duration", 0.0), }, } def _process_chunked(self, work_dir: str, duration: float) -> Dict[str, Any]: """分塊處理大文件""" # 簡化版本:暫時只實現單文件處理 # 完整分塊處理邏輯可以在後續版本中添加 print( f"[ASR] Large file detected ({duration:.2f}s), using single file mode", file=sys.stderr, ) return self._process_single_file(work_dir) def save_result(self, result: Dict[str, Any]): """保存結果到文件""" # 確保輸出目錄存在 output_dir = os.path.dirname(self.output_path) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) with open(self.output_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"[ASR] Result saved to: {self.output_path}", file=sys.stderr) print( f"[ASR] Processing completed in {result['metadata']['processing_time']:.2f} seconds", file=sys.stderr, ) # 命令行接口 def main(): parser = argparse.ArgumentParser(description="ASR 處理器 - 簡化標準化版本") parser.add_argument("video_path", help="輸入視頻文件路徑") parser.add_argument("output_path", help="輸出 JSON 文件路徑") parser.add_argument( "--model", default="base", help="Whisper 模型名稱 (tiny, base, small, medium, large)", ) parser.add_argument( "--chunk-size", type=int, default=300, help="分塊大小(秒),0 表示不分塊" ) args = parser.parse_args() # 設置信號處理 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 環境檢查 env_ok, env_msg = check_environment() if not env_ok: print(f"ERROR: {env_msg}", file=sys.stderr) sys.exit(1) print("[ASR] Starting ASR processing", file=sys.stderr) print(f"[ASR] Video: {args.video_path}", file=sys.stderr) print(f"[ASR] Output: {args.output_path}", file=sys.stderr) print(f"[ASR] Model: {args.model}, Chunk size: {args.chunk_size}s", file=sys.stderr) # 執行處理 processor = ASRProcessor( video_path=args.video_path, output_path=args.output_path, model_name=args.model, chunk_size=args.chunk_size, ) # 驗證輸入 valid, msg = processor.validate_input() if not valid: print(f"ERROR: {msg}", file=sys.stderr) sys.exit(1) try: result = processor.process() processor.save_result(result) print("[ASR] Processing completed successfully", file=sys.stderr) except KeyboardInterrupt: print("[ASR] Processing interrupted by user", file=sys.stderr) sys.exit(130) except Exception as e: print(f"ERROR: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()