Files
momentry_core/test_transcribe_chunk.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

138 lines
3.8 KiB
Python

#!/opt/homebrew/bin/python3.11
"""Test transcription of a chunk from large video."""
import sys
import os
import tempfile
import subprocess
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def extract_chunk(audio_path, start, duration, chunk_path):
"""Extract a single chunk."""
cmd = [
"ffmpeg",
"-i",
audio_path,
"-ss",
str(start),
"-t",
str(duration),
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
chunk_path,
]
result = subprocess.run(cmd, capture_output=True, timeout=30)
return (
result.returncode == 0
and os.path.exists(chunk_path)
and os.path.getsize(chunk_path) > 0
)
def main():
video_path = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
if not os.path.exists(video_path):
print(f"Video not found: {video_path}")
return
# First extract audio (or reuse existing audio.wav from previous run)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
audio_path = f.name
# Extract audio
print("Extracting audio from video...")
cmd = [
"ffmpeg",
"-i",
video_path,
"-vn",
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
audio_path,
]
result = subprocess.run(cmd, capture_output=True, timeout=60)
if result.returncode != 0:
print(f"Audio extraction failed: {result.stderr.decode()[:200]}")
return
print(f"Audio extracted: {os.path.getsize(audio_path)} bytes")
# Extract first chunk (60 seconds)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
chunk_path = f.name
try:
if not extract_chunk(audio_path, 0, 60, chunk_path):
print("Failed to extract chunk")
return
print(f"Chunk extracted: {os.path.getsize(chunk_path)} bytes")
# Load Whisper model
print("Loading Whisper model...")
try:
from faster_whisper import WhisperModel
model = WhisperModel("tiny", device="cpu", compute_type="int8")
print("Model loaded")
except ImportError as e:
print(f"Failed to import faster_whisper: {e}")
return
except Exception as e:
print(f"Failed to load model: {e}")
return
# Try transcription
print("Transcribing chunk...")
start_time = time.time()
try:
# Use beam_size=5 like in ASR processor
segments, info = model.transcribe(chunk_path, beam_size=5)
elapsed = time.time() - start_time
print(f"Transcription initiated in {elapsed:.2f}s")
# Convert generator to list (actual transcription happens here)
print("Converting segments to list...")
segments_list = list(segments)
total_elapsed = time.time() - start_time
print(f"Transcription completed in {total_elapsed:.2f}s")
print(f"Segments: {len(segments_list)}")
print(
f"Language: {info.language}, Probability: {info.language_probability}"
)
for i, segment in enumerate(segments_list[:5]):
print(
f"Segment {i}: {segment.start:.2f}s - {segment.end:.2f}s: {segment.text}"
)
except Exception as e:
print(f"Transcription failed: {e}")
import traceback
traceback.print_exc()
finally:
if os.path.exists(chunk_path):
os.unlink(chunk_path)
if os.path.exists(audio_path):
os.unlink(audio_path)
print("Cleaned up temporary files")
if __name__ == "__main__":
main()