8f05a7c188
- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
340 lines
11 KiB
Python
340 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
ASR Processor - 簡化標準化版本
|
|
|
|
功能:執行自動語音識別處理
|
|
輸入:視頻文件路徑,輸出文件路徑
|
|
輸出:JSON 格式的語音識別結果
|
|
|
|
標準化特性:
|
|
1. 移除不必要的監控邏輯
|
|
2. 簡化架構(<300 行)
|
|
3. 統一的錯誤處理
|
|
4. 標準化的輸出格式
|
|
5. 配置參數化
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
import argparse
|
|
import signal
|
|
import tempfile
|
|
import time
|
|
import subprocess
|
|
from typing import Dict, Any, Tuple
|
|
import traceback
|
|
|
|
|
|
# 環境檢查
|
|
def check_environment() -> Tuple[bool, str]:
|
|
"""檢查必要的環境和依賴"""
|
|
try:
|
|
# 檢查 Whisper
|
|
import whisper
|
|
|
|
# 檢查 ffmpeg/ffprobe
|
|
result = subprocess.run(["ffprobe", "-version"], capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
return False, "ffprobe not found or not working"
|
|
|
|
return True, "Environment OK"
|
|
|
|
except ImportError as e:
|
|
return False, f"Missing dependency: {e}"
|
|
except Exception as e:
|
|
return False, f"Environment check failed: {e}"
|
|
|
|
|
|
# 信號處理
|
|
def signal_handler(signum, frame):
|
|
"""處理中斷信號"""
|
|
print(f"[ASR] Received signal {signum}, cleaning up...", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
# Whisper 模型緩存
|
|
_whisper_model_cache = {}
|
|
|
|
|
|
def get_whisper_model(model_name: str = "base"):
|
|
"""獲取 Whisper 模型(帶緩存)"""
|
|
if model_name not in _whisper_model_cache:
|
|
import whisper
|
|
|
|
print(f"[ASR] Loading Whisper model: {model_name}", file=sys.stderr)
|
|
_whisper_model_cache[model_name] = whisper.load_model(model_name)
|
|
return _whisper_model_cache[model_name]
|
|
|
|
|
|
# 主要處理類
|
|
class ASRProcessor:
|
|
def __init__(
|
|
self,
|
|
video_path: str,
|
|
output_path: str,
|
|
model_name: str = "base",
|
|
chunk_size: int = 300,
|
|
):
|
|
self.video_path = video_path
|
|
self.output_path = output_path
|
|
self.model_name = model_name
|
|
self.chunk_size = chunk_size # 分塊大小(秒)
|
|
self.start_time = time.time()
|
|
|
|
def validate_input(self) -> Tuple[bool, str]:
|
|
"""驗證輸入文件"""
|
|
if not os.path.exists(self.video_path):
|
|
return False, f"Video file not found: {self.video_path}"
|
|
|
|
# 檢查是否有音頻流
|
|
if not self._has_audio_stream():
|
|
return False, f"No audio stream found in: {self.video_path}"
|
|
|
|
return True, "Input validation passed"
|
|
|
|
def _has_audio_stream(self) -> bool:
|
|
"""檢查視頻文件是否有音頻流"""
|
|
try:
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-select_streams",
|
|
"a",
|
|
"-show_entries",
|
|
"stream=codec_type",
|
|
"-of",
|
|
"csv=p=0",
|
|
self.video_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return "audio" in result.stdout
|
|
except Exception:
|
|
return False
|
|
|
|
def _get_media_duration(self) -> float:
|
|
"""獲取媒體文件時長(秒)"""
|
|
try:
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"csv=p=0",
|
|
self.video_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return float(result.stdout.strip())
|
|
except Exception as e:
|
|
print(f"[ASR] Warning: Failed to get duration: {e}", file=sys.stderr)
|
|
return 0.0
|
|
|
|
def _extract_audio(self, audio_path: str) -> bool:
|
|
"""提取音頻到臨時文件"""
|
|
try:
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
self.video_path,
|
|
"-vn", # 禁用視頻
|
|
"-acodec",
|
|
"pcm_s16le", # PCM 16-bit 小端
|
|
"-ar",
|
|
"16000", # 16kHz 採樣率
|
|
"-ac",
|
|
"1", # 單聲道
|
|
"-y", # 覆蓋輸出文件
|
|
audio_path,
|
|
]
|
|
|
|
print(f"[ASR] Extracting audio to: {audio_path}", file=sys.stderr)
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
print(
|
|
f"[ASR] Audio extraction failed: {result.stderr}", file=sys.stderr
|
|
)
|
|
return False
|
|
|
|
return os.path.exists(audio_path) and os.path.getsize(audio_path) > 0
|
|
|
|
except Exception as e:
|
|
print(f"[ASR] Audio extraction error: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
def process(self) -> Dict[str, Any]:
|
|
"""執行 ASR 處理邏輯"""
|
|
try:
|
|
# 1. 準備工作目錄
|
|
work_dir = tempfile.mkdtemp(prefix="asr_")
|
|
print(f"[ASR] Working directory: {work_dir}", file=sys.stderr)
|
|
|
|
# 2. 獲取媒體時長
|
|
duration = self._get_media_duration()
|
|
print(f"[ASR] Media duration: {duration:.2f} seconds", file=sys.stderr)
|
|
|
|
# 3. 根據時長決定處理策略
|
|
if duration <= self.chunk_size or self.chunk_size <= 0:
|
|
# 小文件或不分塊:直接處理
|
|
result = self._process_single_file(work_dir)
|
|
else:
|
|
# 大文件:分塊處理
|
|
result = self._process_chunked(work_dir, duration)
|
|
|
|
# 4. 添加元數據
|
|
processing_time = time.time() - self.start_time
|
|
result["metadata"] = {
|
|
"processing_time": processing_time,
|
|
"video_path": self.video_path,
|
|
"duration": duration,
|
|
"model": self.model_name,
|
|
"chunk_size": self.chunk_size,
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"module_version": "1.0.0",
|
|
}
|
|
|
|
# 5. 清理工作目錄
|
|
try:
|
|
import shutil
|
|
|
|
shutil.rmtree(work_dir)
|
|
print("[ASR] Cleaned up working directory", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"[ASR] Warning: Failed to clean up: {e}", file=sys.stderr)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"[ASR] Processing failed: {e}", file=sys.stderr)
|
|
print(f"[ASR] Traceback: {traceback.format_exc()}", file=sys.stderr)
|
|
raise
|
|
|
|
def _process_single_file(self, work_dir: str) -> Dict[str, Any]:
|
|
"""處理單個文件(不分塊)"""
|
|
# 1. 提取音頻
|
|
audio_path = os.path.join(work_dir, "audio.wav")
|
|
if not self._extract_audio(audio_path):
|
|
raise RuntimeError("Failed to extract audio")
|
|
|
|
# 2. 加載模型
|
|
model = get_whisper_model(self.model_name)
|
|
|
|
# 3. 執行轉錄
|
|
print("[ASR] Transcribing audio...", file=sys.stderr)
|
|
|
|
result = model.transcribe(audio_path)
|
|
|
|
# 4. 格式化結果
|
|
segments = []
|
|
for segment in result.get("segments", []):
|
|
segments.append(
|
|
{
|
|
"start": segment.get("start", 0.0),
|
|
"end": segment.get("end", 0.0),
|
|
"text": segment.get("text", "").strip(),
|
|
"confidence": segment.get("confidence", 0.0),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"language": result.get("language"),
|
|
"language_probability": result.get("language_probability"),
|
|
"segments": segments,
|
|
"summary": {
|
|
"segment_count": len(segments),
|
|
"total_duration": result.get("duration", 0.0),
|
|
},
|
|
}
|
|
|
|
def _process_chunked(self, work_dir: str, duration: float) -> Dict[str, Any]:
|
|
"""分塊處理大文件"""
|
|
# 簡化版本:暫時只實現單文件處理
|
|
# 完整分塊處理邏輯可以在後續版本中添加
|
|
print(
|
|
f"[ASR] Large file detected ({duration:.2f}s), using single file mode",
|
|
file=sys.stderr,
|
|
)
|
|
return self._process_single_file(work_dir)
|
|
|
|
def save_result(self, result: Dict[str, Any]):
|
|
"""保存結果到文件"""
|
|
# 確保輸出目錄存在
|
|
output_dir = os.path.dirname(self.output_path)
|
|
if output_dir and not os.path.exists(output_dir):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
with open(self.output_path, "w", encoding="utf-8") as f:
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"[ASR] Result saved to: {self.output_path}", file=sys.stderr)
|
|
print(
|
|
f"[ASR] Processing completed in {result['metadata']['processing_time']:.2f} seconds",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
|
|
# 命令行接口
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="ASR 處理器 - 簡化標準化版本")
|
|
parser.add_argument("video_path", help="輸入視頻文件路徑")
|
|
parser.add_argument("output_path", help="輸出 JSON 文件路徑")
|
|
parser.add_argument(
|
|
"--model",
|
|
default="base",
|
|
help="Whisper 模型名稱 (tiny, base, small, medium, large)",
|
|
)
|
|
parser.add_argument(
|
|
"--chunk-size", type=int, default=300, help="分塊大小(秒),0 表示不分塊"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# 設置信號處理
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# 環境檢查
|
|
env_ok, env_msg = check_environment()
|
|
if not env_ok:
|
|
print(f"ERROR: {env_msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print("[ASR] Starting ASR processing", file=sys.stderr)
|
|
print(f"[ASR] Video: {args.video_path}", file=sys.stderr)
|
|
print(f"[ASR] Output: {args.output_path}", file=sys.stderr)
|
|
print(f"[ASR] Model: {args.model}, Chunk size: {args.chunk_size}s", file=sys.stderr)
|
|
|
|
# 執行處理
|
|
processor = ASRProcessor(
|
|
video_path=args.video_path,
|
|
output_path=args.output_path,
|
|
model_name=args.model,
|
|
chunk_size=args.chunk_size,
|
|
)
|
|
|
|
# 驗證輸入
|
|
valid, msg = processor.validate_input()
|
|
if not valid:
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
result = processor.process()
|
|
processor.save_result(result)
|
|
print("[ASR] Processing completed successfully", file=sys.stderr)
|
|
|
|
except KeyboardInterrupt:
|
|
print("[ASR] Processing interrupted by user", file=sys.stderr)
|
|
sys.exit(130)
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|