Files
momentry_core/test_chunk_extraction.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

198 lines
5.4 KiB
Python

#!/usr/bin/env python3
"""Test chunk extraction and transcription separately."""
import sys
import os
import tempfile
import subprocess
import time
# Test video
test_video = "../test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
if not os.path.exists(test_video):
test_video = "../test_video/Old_Time_Movie_Show_-_Charade_1963.HD.mov"
print(f"Testing: {test_video}")
# Create temp directory
temp_dir = tempfile.mkdtemp(prefix="asr_chunk_test_")
print(f"Temp dir: {temp_dir}")
# Extract audio first
audio_path = os.path.join(temp_dir, "audio.wav")
extract_cmd = [
"ffmpeg",
"-i",
test_video,
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
audio_path,
]
print(f"\n1. Extracting audio...")
start = time.time()
result = subprocess.run(extract_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error: {result.stderr[:500]}")
sys.exit(1)
print(f"Audio extracted: {time.time() - start:.1f}s")
# Get duration
duration_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
audio_path,
]
result = subprocess.run(duration_cmd, capture_output=True, text=True)
duration = float(result.stdout.strip())
print(f"Audio duration: {duration:.1f}s")
# Simulate the ASR processor chunk calculation
chunk_duration = 600 # 10 minutes
chunks = []
start_time = 0.0
chunk_idx = 0
while start_time < duration:
chunk_end = min(start_time + chunk_duration, duration)
chunks.append(
{
"start": start_time,
"end": chunk_end,
"duration": chunk_end - start_time,
"idx": chunk_idx,
}
)
start_time = chunk_end
chunk_idx += 1
print(f"\n2. Calculated {len(chunks)} chunks")
# Create chunk directory
chunk_temp_dir = os.path.join(temp_dir, "chunks")
os.makedirs(chunk_temp_dir, exist_ok=True)
print(f"Chunk directory: {chunk_temp_dir}")
# Test first chunk
print(f"\n3. Testing first chunk extraction and transcription...")
chunk = chunks[0]
chunk_path = os.path.join(chunk_temp_dir, f"chunk_{chunk['idx']:04d}.wav")
# Extract chunk using the exact function from asr_processor.py
def extract_chunk(audio_path, start, duration, output_path):
"""Extract a chunk of audio using ffmpeg."""
cmd = [
"ffmpeg",
"-i",
audio_path,
"-ss",
str(start),
"-t",
str(duration),
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
output_path,
]
print(f" Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True)
success = (
result.returncode == 0
and os.path.exists(output_path)
and os.path.getsize(output_path) > 0
)
if not success:
print(f" Error: returncode={result.returncode}, stderr={result.stderr[:200]}")
return success
print(
f"Extracting chunk 0: start={chunk['start']:.1f}, duration={chunk['duration']:.1f}"
)
start = time.time()
success = extract_chunk(audio_path, chunk["start"], chunk["duration"], chunk_path)
if not success:
print("Chunk extraction failed!")
sys.exit(1)
print(f"Chunk extracted: {time.time() - start:.1f}s")
print(f"Chunk file size: {os.path.getsize(chunk_path) / (1024**2):.1f} MB")
# Load Whisper model
print(f"\n4. Loading Whisper model...")
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from faster_whisper import WhisperModel
start = time.time()
model = WhisperModel("tiny", device="cpu", compute_type="int8")
print(f"Model loaded: {time.time() - start:.1f}s")
# Transcribe chunk
print(f"\n5. Transcribing chunk...")
def transcribe_chunk(model, chunk_path, chunk_start, chunk_idx, total_chunks):
"""Transcribe a single audio chunk."""
print(f" Starting transcription of chunk {chunk_idx + 1}/{total_chunks}")
start_time = time.time()
segments, info = model.transcribe(chunk_path, beam_size=5)
results = []
for segment in segments:
results.append(
{
"start": segment.start + chunk_start,
"end": segment.end + chunk_start,
"text": segment.text.strip(),
}
)
elapsed = time.time() - start_time
print(
f" Chunk {chunk_idx + 1}/{total_chunks}: {len(results)} segments in {elapsed:.1f}s"
)
return results, info
start = time.time()
segments, info = transcribe_chunk(model, chunk_path, chunk["start"], 0, len(chunks))
print(f"Total time for chunk transcription: {time.time() - start:.1f}s")
print(f"Language: {info.language} (prob {info.language_probability:.2f})")
# Test second chunk to see if it also works
if len(chunks) > 1:
print(f"\n6. Testing second chunk...")
chunk = chunks[1]
chunk_path2 = os.path.join(chunk_temp_dir, f"chunk_{chunk['idx']:04d}.wav")
print(
f"Extracting chunk 1: start={chunk['start']:.1f}, duration={chunk['duration']:.1f}"
)
start = time.time()
success = extract_chunk(audio_path, chunk["start"], chunk["duration"], chunk_path2)
if success:
print(f"Chunk extracted: {time.time() - start:.1f}s")
start = time.time()
segments2, info2 = transcribe_chunk(
model, chunk_path2, chunk["start"], 1, len(chunks)
)
print(f"Total time: {time.time() - start:.1f}s")
else:
print("Second chunk extraction failed")
print(f"\nTemp directory preserved: {temp_dir}")