- Add database migrations (006-028) for face recognition, identity, file_uuid - Add test scripts for ASR, face, search, processing - Add portal frontend (Tauri) - Add config, benchmark, and monitoring utilities - Add model checkpoints and pretrained model references
119 lines
3.1 KiB
Python
119 lines
3.1 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""Test chunk extraction from large video."""
|
|
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import subprocess
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
|
|
def extract_audio(video_path, audio_path):
|
|
"""Extract audio from video to WAV."""
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
video_path,
|
|
"-vn",
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
audio_path,
|
|
]
|
|
print(f"Extracting audio: {' '.join(cmd[:5])} ...")
|
|
start = time.time()
|
|
result = subprocess.run(cmd, capture_output=True)
|
|
elapsed = time.time() - start
|
|
print(f"Audio extraction took {elapsed:.1f}s, return code: {result.returncode}")
|
|
if result.returncode != 0:
|
|
print(f"stderr: {result.stderr.decode()[:200]}")
|
|
return result.returncode == 0 and os.path.exists(audio_path)
|
|
|
|
|
|
def test_extract_chunk(audio_path, start, duration):
|
|
"""Extract a single chunk."""
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
|
chunk_path = f.name
|
|
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
audio_path,
|
|
"-ss",
|
|
str(start),
|
|
"-t",
|
|
str(duration),
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
chunk_path,
|
|
]
|
|
print(f"Extracting chunk {start}-{start + duration}s: {' '.join(cmd[:5])} ...")
|
|
start_time = time.time()
|
|
result = subprocess.run(cmd, capture_output=True, timeout=30)
|
|
elapsed = time.time() - start_time
|
|
print(f"Chunk extraction took {elapsed:.1f}s, return code: {result.returncode}")
|
|
if result.returncode != 0:
|
|
print(f"stderr: {result.stderr.decode()[:500]}")
|
|
|
|
success = (
|
|
result.returncode == 0
|
|
and os.path.exists(chunk_path)
|
|
and os.path.getsize(chunk_path) > 0
|
|
)
|
|
if success:
|
|
print(f"Chunk size: {os.path.getsize(chunk_path)} bytes")
|
|
|
|
# Clean up
|
|
if os.path.exists(chunk_path):
|
|
os.unlink(chunk_path)
|
|
|
|
return success
|
|
|
|
|
|
def main():
|
|
video_path = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
|
|
if not os.path.exists(video_path):
|
|
print(f"Video not found: {video_path}")
|
|
return
|
|
|
|
# First extract audio
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
|
audio_path = f.name
|
|
|
|
try:
|
|
if not extract_audio(video_path, audio_path):
|
|
print("Failed to extract audio")
|
|
return
|
|
|
|
print(f"Audio file size: {os.path.getsize(audio_path)} bytes")
|
|
|
|
# Test extracting first few chunks
|
|
for i in range(3):
|
|
start = i * 60 # 0, 60, 120 seconds
|
|
success = test_extract_chunk(audio_path, start, 60)
|
|
if not success:
|
|
print(f"Chunk extraction failed at start={start}")
|
|
break
|
|
else:
|
|
print(f"Chunk {i} extraction successful\n")
|
|
|
|
finally:
|
|
if os.path.exists(audio_path):
|
|
os.unlink(audio_path)
|
|
print(f"Cleaned up audio file")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|