Files
momentry_core/test_extract_chunk.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

119 lines
3.1 KiB
Python

#!/opt/homebrew/bin/python3.11
"""Test chunk extraction from large video."""
import sys
import os
import tempfile
import subprocess
import time
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def extract_audio(video_path, audio_path):
"""Extract audio from video to WAV."""
cmd = [
"ffmpeg",
"-i",
video_path,
"-vn",
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
audio_path,
]
print(f"Extracting audio: {' '.join(cmd[:5])} ...")
start = time.time()
result = subprocess.run(cmd, capture_output=True)
elapsed = time.time() - start
print(f"Audio extraction took {elapsed:.1f}s, return code: {result.returncode}")
if result.returncode != 0:
print(f"stderr: {result.stderr.decode()[:200]}")
return result.returncode == 0 and os.path.exists(audio_path)
def test_extract_chunk(audio_path, start, duration):
"""Extract a single chunk."""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
chunk_path = f.name
cmd = [
"ffmpeg",
"-i",
audio_path,
"-ss",
str(start),
"-t",
str(duration),
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-y",
chunk_path,
]
print(f"Extracting chunk {start}-{start + duration}s: {' '.join(cmd[:5])} ...")
start_time = time.time()
result = subprocess.run(cmd, capture_output=True, timeout=30)
elapsed = time.time() - start_time
print(f"Chunk extraction took {elapsed:.1f}s, return code: {result.returncode}")
if result.returncode != 0:
print(f"stderr: {result.stderr.decode()[:500]}")
success = (
result.returncode == 0
and os.path.exists(chunk_path)
and os.path.getsize(chunk_path) > 0
)
if success:
print(f"Chunk size: {os.path.getsize(chunk_path)} bytes")
# Clean up
if os.path.exists(chunk_path):
os.unlink(chunk_path)
return success
def main():
video_path = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
if not os.path.exists(video_path):
print(f"Video not found: {video_path}")
return
# First extract audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
audio_path = f.name
try:
if not extract_audio(video_path, audio_path):
print("Failed to extract audio")
return
print(f"Audio file size: {os.path.getsize(audio_path)} bytes")
# Test extracting first few chunks
for i in range(3):
start = i * 60 # 0, 60, 120 seconds
success = test_extract_chunk(audio_path, start, 60)
if not success:
print(f"Chunk extraction failed at start={start}")
break
else:
print(f"Chunk {i} extraction successful\n")
finally:
if os.path.exists(audio_path):
os.unlink(audio_path)
print(f"Cleaned up audio file")
if __name__ == "__main__":
main()