- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
138 lines
3.8 KiB
Python
138 lines
3.8 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""Test transcription of a chunk from large video."""
|
|
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import subprocess
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
|
|
def extract_chunk(audio_path, start, duration, chunk_path):
|
|
"""Extract a single chunk."""
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
audio_path,
|
|
"-ss",
|
|
str(start),
|
|
"-t",
|
|
str(duration),
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
chunk_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, timeout=30)
|
|
return (
|
|
result.returncode == 0
|
|
and os.path.exists(chunk_path)
|
|
and os.path.getsize(chunk_path) > 0
|
|
)
|
|
|
|
|
|
def main():
|
|
video_path = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
|
|
if not os.path.exists(video_path):
|
|
print(f"Video not found: {video_path}")
|
|
return
|
|
|
|
# First extract audio (or reuse existing audio.wav from previous run)
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
|
audio_path = f.name
|
|
|
|
# Extract audio
|
|
print("Extracting audio from video...")
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
video_path,
|
|
"-vn",
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
audio_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, timeout=60)
|
|
if result.returncode != 0:
|
|
print(f"Audio extraction failed: {result.stderr.decode()[:200]}")
|
|
return
|
|
|
|
print(f"Audio extracted: {os.path.getsize(audio_path)} bytes")
|
|
|
|
# Extract first chunk (60 seconds)
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
|
chunk_path = f.name
|
|
|
|
try:
|
|
if not extract_chunk(audio_path, 0, 60, chunk_path):
|
|
print("Failed to extract chunk")
|
|
return
|
|
|
|
print(f"Chunk extracted: {os.path.getsize(chunk_path)} bytes")
|
|
|
|
# Load Whisper model
|
|
print("Loading Whisper model...")
|
|
try:
|
|
from faster_whisper import WhisperModel
|
|
|
|
model = WhisperModel("tiny", device="cpu", compute_type="int8")
|
|
print("Model loaded")
|
|
except ImportError as e:
|
|
print(f"Failed to import faster_whisper: {e}")
|
|
return
|
|
except Exception as e:
|
|
print(f"Failed to load model: {e}")
|
|
return
|
|
|
|
# Try transcription
|
|
print("Transcribing chunk...")
|
|
start_time = time.time()
|
|
try:
|
|
# Use beam_size=5 like in ASR processor
|
|
segments, info = model.transcribe(chunk_path, beam_size=5)
|
|
elapsed = time.time() - start_time
|
|
print(f"Transcription initiated in {elapsed:.2f}s")
|
|
|
|
# Convert generator to list (actual transcription happens here)
|
|
print("Converting segments to list...")
|
|
segments_list = list(segments)
|
|
total_elapsed = time.time() - start_time
|
|
print(f"Transcription completed in {total_elapsed:.2f}s")
|
|
print(f"Segments: {len(segments_list)}")
|
|
print(
|
|
f"Language: {info.language}, Probability: {info.language_probability}"
|
|
)
|
|
|
|
for i, segment in enumerate(segments_list[:5]):
|
|
print(
|
|
f"Segment {i}: {segment.start:.2f}s - {segment.end:.2f}s: {segment.text}"
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"Transcription failed: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
|
|
finally:
|
|
if os.path.exists(chunk_path):
|
|
os.unlink(chunk_path)
|
|
if os.path.exists(audio_path):
|
|
os.unlink(audio_path)
|
|
print("Cleaned up temporary files")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|