#!/opt/homebrew/bin/python3.11 """ ASR Processor - faster-whisper small model (Production) Version: 2.1 Model: small (int8 quantization, CPU) Reason: small 模型在準確率和速度間取得最佳平衡 經實驗驗證,最少要使用 small 才可以較好的處理多語種及台灣腔國語 Configuration: - Model: faster-whisper/small - Device: CPU (MPS not supported by faster_whisper) - Compute: int8 - Beam size: 5 - VAD filter: enabled (min_silence=500ms, speech_pad=200ms) - Audio fallback: ffmpeg extraction for PyAV-incompatible streams (v2.1) """ import sys import json import os import time import argparse import signal import subprocess import tempfile from faster_whisper import WhisperModel PROCESSOR_VERSION = "2.1" MODEL_SIZE = "small" DEVICE = "cpu" COMPUTE_TYPE = "int8" sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"ASR: Received signal {signum}, exiting...") sys.exit(1) def has_audio_stream(video_path): """Check if video file has audio stream using ffprobe.""" try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", video_path, ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return bool(result.stdout.strip()) except subprocess.CalledProcessError: return False except FileNotFoundError: print("WARNING: ffprobe not found, assuming audio exists") return True def extract_audio_with_ffmpeg(video_path): """Extract audio from video to WAV using ffmpeg. Returns path to temporary WAV file. Caller is responsible for cleanup. """ wav_path = tempfile.mktemp(suffix=".wav", prefix="asr_audio_") cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", wav_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: sys.stderr.write(f"ASR: ffmpeg extraction failed: {result.stderr}\n") sys.stderr.flush() return None return wav_path def transcribe_with_fallback(model, video_path, publisher=None): """Transcribe video with fallback to ffmpeg-extracted WAV. First tries direct transcription (PyAV). If PyAV fails to decode, falls back to ffmpeg audio extraction then transcription. """ # Try direct transcription first try: if publisher: publisher.info("asr", "Direct transcription attempt...") return model.transcribe( video_path, beam_size=5, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200), ) except Exception as e: error_str = str(e) # Check if it's a PyAV/av decoding error is_pyav_error = any( keyword in error_str.lower() for keyword in ["av.error", "avcodec", "decode", "packet"] ) if not is_pyav_error: raise # Re-raise non-PyAV errors if publisher: publisher.info("asr", "PyAV decode failed, falling back to ffmpeg extraction...") sys.stderr.write("ASR: PyAV decode error detected, falling back to ffmpeg extraction\n") sys.stderr.flush() wav_path = extract_audio_with_ffmpeg(video_path) if wav_path is None: raise RuntimeError("Failed to extract audio with ffmpeg") try: if publisher: publisher.info("asr", "Transcribing extracted WAV audio...") segments, info = model.transcribe( wav_path, beam_size=5, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200), ) return segments, info finally: # Clean up temporary WAV file try: os.remove(wav_path) except OSError: pass def get_fps_from_cut(cut_path): """從 CUT 資料獲取 FPS""" if os.path.exists(cut_path): try: with open(cut_path) as f: cut_data = json.load(f) fps = cut_data.get("fps") if fps and fps > 0: return fps except Exception as e: print(f"[ASR] Failed to load CUT FPS: {e}", file=sys.stderr) return None def get_fps_from_ffprobe(video_path): """從影片獲取 FPS (ffprobe)""" try: cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate", "-of", "csv=p=0", video_path] result = subprocess.run(cmd, capture_output=True, text=True, check=True) fps_str = result.stdout.strip() if "/" in fps_str: num, den = fps_str.split("/") return float(num) / float(den) return float(fps_str) except Exception: return None def run_asr(video_path, output_path, uuid: str = "", fps: float = None): # Set up signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # FPS detection chain: CLI → CUT → ffprobe → FAIL if fps is not None: print(f"[ASR] Using CLI-provided FPS: {fps}", file=sys.stderr) else: cut_path_check = output_path.replace(".asr.json", ".cut.json") fps = get_fps_from_cut(cut_path_check) if fps: print(f"[ASR] FPS from CUT: {fps}", file=sys.stderr) if fps is None: fps = get_fps_from_ffprobe(video_path) if fps: print(f"[ASR] FPS from ffprobe: {fps}", file=sys.stderr) if fps is None: print("[ASR] ERROR: Cannot determine FPS (no CUT data, ffprobe failed). Aborting.", file=sys.stderr) sys.exit(1) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("asr", "ASR_START") # Check for audio stream if not has_audio_stream(video_path): if publisher: publisher.info("asr", "No audio stream detected, skipping transcription") output = {"language": "", "language_probability": 0.0, "segments": []} with open(output_path, "w") as f: json.dump(output, f, indent=2) if publisher: publisher.complete("asr", "0 segments (no audio)") sys.stderr.write("ASR: No audio stream, skipping transcription\n") sys.stderr.flush() sys.exit(0) # 嘗試以 CUT 場景分段處理(降低長片記憶體使用) cut_scenes = [] cut_path = output_path.replace(".asr.json", ".cut.json") if os.path.exists(cut_path): try: with open(cut_path) as f: cut_data = json.load(f) scenes = cut_data.get("scenes", []) if scenes: cut_scenes = [(s["start_time"], s["end_time"]) for s in scenes] print(f"[ASR] Loaded {len(cut_scenes)} cut scenes for segmented transcription", file=sys.stderr) except Exception as e: print(f"[ASR] Failed to load cut scenes: {e}", file=sys.stderr) if publisher: publisher.info("asr", "Loading Whisper model...") sys.stderr.write(f"[ASR] Loading Whisper model {MODEL_SIZE}...\n") sys.stderr.flush() model = WhisperModel(MODEL_SIZE, device="cpu", compute_type="int8") sys.stderr.write(f"[ASR] Model loaded\n") sys.stderr.flush() if publisher: publisher.info("asr", f"Transcribing: {video_path}") results = [] total_segments = 0 if cut_scenes: # 分段處理:對每個場景萃取音訊並轉錄 sys.stderr.write(f"[ASR] Starting segmented transcription for {len(cut_scenes)} scenes\n") sys.stderr.flush() import subprocess import tempfile temp_dir = tempfile.mkdtemp(prefix="asr_cut_") sys.stderr.write(f"[ASR] Temp dir: {temp_dir}\n") sys.stderr.flush() transcript_language = None # 建立 scene lookup: 給定時間點,找是哪個 scene import bisect scene_starts = [s[0] for s in cut_scenes] def find_scene_idx(t): i = bisect.bisect_right(scene_starts, t) - 1 return max(0, i) # 逐段處理,每段結果即時寫入 .asr.tmp tmp_path = output_path + ".tmp" err_path = output_path + ".err" all_segments = [] # Resume: 若 executor 將 .tmp rename 成 .err,先救回 if not os.path.exists(tmp_path) and os.path.exists(err_path) and os.path.getsize(err_path) > 10: try: os.rename(err_path, tmp_path) sys.stderr.write(f"[ASR] Recovered .err → .tmp for resume ({os.path.getsize(tmp_path)} bytes)\n") sys.stderr.flush() except Exception as e: sys.stderr.write(f"[ASR] Failed to recover .err: {e}\n") sys.stderr.flush() # Resume: 若已有 .asr.tmp,載入已完成的 segments 並跳過已處理的 scenes resume_from_scene = 0 if os.path.exists(tmp_path) and os.path.getsize(tmp_path) > 10: try: with open(tmp_path) as f: existing = json.load(f) all_segments = existing.get("segments", []) if all_segments: # 找出最後一個 segment 的 end_time,決定 resume 起點 last_end = max(s.get("end", 0) for s in all_segments) # 找出最後完成的 scene_idx(場景 end_time > last_end) for i, (st, et) in enumerate(cut_scenes): if et > last_end: resume_from_scene = i break else: resume_from_scene = len(cut_scenes) # 全部完成 # 繼承 language if existing.get("language"): transcript_language = existing["language"] sys.stderr.write(f"[ASR] Resume from scene {resume_from_scene}/{len(cut_scenes)} " f"(last segment end={last_end:.1f}s, {len(all_segments)} existing segments)\n") sys.stderr.flush() except Exception as e: sys.stderr.write(f"[ASR] Failed to load tmp for resume: {e}, starting fresh\n") sys.stderr.flush() all_segments = [] for idx, (start_t, end_t) in enumerate(cut_scenes): if idx < resume_from_scene: continue # 跳過已處理的 scenes seg_wav = os.path.join(temp_dir, f"seg_{idx:04d}.wav") sys.stderr.write(f"[ASR] Scene {idx}: {start_t:.1f}-{end_t:.1f}s\n") sys.stderr.flush() # 用 ffmpeg 萃取出該段音訊 t0 = time.time() cmd = ["ffmpeg", "-y", "-v", "quiet", "-i", video_path, "-ss", str(start_t), "-to", str(end_t), "-ar", "16000", "-ac", "1", seg_wav] subprocess.run(cmd, check=False, capture_output=True) sys.stderr.write(f"[ASR] Scene {idx}: ffmpeg took {time.time()-t0:.1f}s\n") sys.stderr.flush() if not os.path.exists(seg_wav) or os.path.getsize(seg_wav) < 100: sys.stderr.write(f"[ASR] Scene {idx}: empty audio, skipping\n") sys.stderr.flush() continue try: t1 = time.time() seg_result, seg_info = model.transcribe( seg_wav, beam_size=5, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200), ) sys.stderr.write(f"[ASR] Scene {idx}: transcribe took {time.time()-t1:.1f}s, language={seg_info.language}\n") sys.stderr.flush() scene_segments = [] seg_language = seg_info.language if seg_info else transcript_language for segment in seg_result: seg_start = start_t + segment.start seg_end = start_t + segment.end scene_idx = find_scene_idx((seg_start + seg_end) / 2) scene_segments.append({ "start_time": seg_start, "end_time": seg_end, "start_frame": int(round(seg_start * fps)), "end_frame": int(round(seg_end * fps)), "text": segment.text.strip(), "scene_number": scene_idx + 1, "language": seg_language, }) total_segments += 1 # 當前 scene 結果寫入 .asr.tmp all_segments.extend(scene_segments) with open(tmp_path, "w") as f: json.dump({"language": transcript_language or "", "segments": all_segments}, f) if total_segments % 100 == 0: if publisher: publisher.progress("asr", total_segments, 0, f"Segment {total_segments}") except Exception as e: print(f"[ASR] Segment {idx} failed: {e}", file=sys.stderr) # 清理暫存 WAV try: os.remove(seg_wav) except: pass try: os.rmdir(temp_dir) except: pass info_language = transcript_language or "unknown" print(f"[ASR] Segmented transcription complete: {total_segments} segments", file=sys.stderr) else: # 無 CUT 資料,直接轉錄(原有流程) segments, info = transcribe_with_fallback(model, video_path, publisher) info_language = info.language tmp_path = output_path + ".tmp" all_segments = [] for segment in segments: all_segments.append({ "start_time": segment.start, "end_time": segment.end, "start_frame": int(round(segment.start * fps)), "end_frame": int(round(segment.end * fps)), "text": segment.text.strip(), }) total_segments += 1 if total_segments % 100 == 0: if publisher: publisher.progress("asr", total_segments, 0, f"Segment {total_segments}") with open(tmp_path, "w") as f: json.dump({"language": info_language, "segments": all_segments}, f) if publisher: publisher.info("asr", f"ASR_LANGUAGE:{info_language}") # rename .tmp → .json os.rename(tmp_path, output_path) if publisher: publisher.complete("asr", f"{len(results)} segments") sys.stderr.write( f"ASR: Transcription complete, {len(results)} segments written to {output_path}\n" ) sys.stderr.flush() sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser(description="ASR Transcription") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument("--fps", type=float, help="Override FPS (default: auto-detect)") args = parser.parse_args() run_asr(args.video_path, args.output_path, args.uuid, fps=args.fps)