#!/opt/homebrew/bin/python3.11 """Test chunk extraction from large video.""" import sys import os import tempfile import subprocess import time sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def extract_audio(video_path, audio_path): """Extract audio from video to WAV.""" cmd = [ "ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ] print(f"Extracting audio: {' '.join(cmd[:5])} ...") start = time.time() result = subprocess.run(cmd, capture_output=True) elapsed = time.time() - start print(f"Audio extraction took {elapsed:.1f}s, return code: {result.returncode}") if result.returncode != 0: print(f"stderr: {result.stderr.decode()[:200]}") return result.returncode == 0 and os.path.exists(audio_path) def test_extract_chunk(audio_path, start, duration): """Extract a single chunk.""" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: chunk_path = f.name cmd = [ "ffmpeg", "-i", audio_path, "-ss", str(start), "-t", str(duration), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", chunk_path, ] print(f"Extracting chunk {start}-{start + duration}s: {' '.join(cmd[:5])} ...") start_time = time.time() result = subprocess.run(cmd, capture_output=True, timeout=30) elapsed = time.time() - start_time print(f"Chunk extraction took {elapsed:.1f}s, return code: {result.returncode}") if result.returncode != 0: print(f"stderr: {result.stderr.decode()[:500]}") success = ( result.returncode == 0 and os.path.exists(chunk_path) and os.path.getsize(chunk_path) > 0 ) if success: print(f"Chunk size: {os.path.getsize(chunk_path)} bytes") # Clean up if os.path.exists(chunk_path): os.unlink(chunk_path) return success def main(): video_path = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(video_path): print(f"Video not found: {video_path}") return # First extract audio with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: audio_path = f.name try: if not extract_audio(video_path, audio_path): print("Failed to extract audio") return print(f"Audio file size: {os.path.getsize(audio_path)} bytes") # Test extracting first few chunks for i in range(3): start = i * 60 # 0, 60, 120 seconds success = test_extract_chunk(audio_path, start, 60) if not success: print(f"Chunk extraction failed at start={start}") break else: print(f"Chunk {i} extraction successful\n") finally: if os.path.exists(audio_path): os.unlink(audio_path) print(f"Cleaned up audio file") if __name__ == "__main__": main()