#!/opt/homebrew/bin/python3.11 """Test limited number of chunks to verify fix works end-to-end.""" import subprocess import tempfile import os import time import sys import json def test_limited_chunks(): """Test processing only first 3 chunks (30 minutes) of large video.""" test_video = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov" if not os.path.exists(test_video): print(f"Test video not found: {test_video}") return print(f"Testing first 3 chunks (30 minutes) of large video:") print(f" Video: {os.path.basename(test_video)}") print(f" Expected: 3 chunks × 10 minutes = 30 minutes audio") print("-" * 60) with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f: output_path = f.name try: # We'll modify the script to only process 3 chunks # First, let's check if we can process with a smaller max_direct_duration # to force chunked mode but limit total processing time cmd = [ "/opt/homebrew/bin/python3.11", "scripts/asr_processor.py", test_video, output_path, "--uuid", "test_limited", "--chunk-duration", "600", # 10 minutes "--max-direct-duration", "300", # Force chunked mode ] env = os.environ.copy() env["MOMENTRY_DISABLE_REDIS"] = "1" env["ASR_DEBUG"] = "1" env["MOMENTRY_ASR_CHUNK_TIMEOUT"] = "60" # 1 minute per chunk print(f"Command: {' '.join(cmd)}") print(f"Environment: ASR_DEBUG=1, MOMENTRY_ASR_CHUNK_TIMEOUT=60") print("-" * 60) start = time.time() proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, env=env, ) timeout = 300 # 5 minutes max for 3 chunks killed = False stderr_lines = [] from select import select chunk_success_count = 0 chunk_error_count = 0 while True: if proc.poll() is not None: remaining_stderr = proc.stderr.read() if remaining_stderr: for line in remaining_stderr.split("\n"): if line: stderr_lines.append(line) break if time.time() - start > timeout: print(f"\n⏱️ TOTAL TIMEOUT after {timeout}s - killing process") proc.kill() killed = True break readable, _, _ = select([proc.stderr], [], [], 0.1) if readable: line = proc.stderr.readline() if line: line = line.rstrip("\n") stderr_lines.append(line) # Count chunk successes in real-time if "transcribe_chunk succeeded" in line: chunk_success_count += 1 print(f" ✓ Chunk {chunk_success_count} succeeded") elif "error" in line.lower() and "debug" not in line: chunk_error_count += 1 print(f" ✗ Error: {line}") elif "Chunk" in line and "extracting audio" in line: # Show progress print(f" Processing chunk...") time.sleep(0.05) if killed: proc.wait() elapsed = time.time() - start print(f"\n" + "=" * 60) print(f"Results:") print(f" Elapsed time: {elapsed:.1f}s") print(f" Killed: {killed}") print(f" Return code: {proc.returncode}") print(f" Chunks succeeded: {chunk_success_count}") print(f" Chunks with errors: {chunk_error_count}") # Analyze stderr for detailed results print(f"\nDetailed analysis:") # Count various events extract_success = [l for l in stderr_lines if "extract_chunk succeeded" in l] transcribe_success = [ l for l in stderr_lines if "transcribe_chunk succeeded" in l ] timeout_warnings = [l for l in stderr_lines if "timeout" in l.lower()] print(f" Audio extractions: {len(extract_success)}") print(f" Transcriptions: {len(transcribe_success)}") print(f" Timeout warnings: {len(timeout_warnings)}") if timeout_warnings: print(f" ⚠️ Timeout warnings detected:") for warning in timeout_warnings[:3]: print(f" {warning}") # Check if output was created if os.path.exists(output_path): with open(output_path, "r") as f: data = json.load(f) segments = data.get("segments", []) processing_mode = data.get("processing_mode", "unknown") chunk_count = data.get("chunk_count", 0) print(f"\nOutput analysis:") print(f" Processing mode: {processing_mode}") print(f" Chunk count: {chunk_count}") print(f" Total segments: {len(segments)}") if segments: # Calculate audio coverage first_start = segments[0].get("start", 0) last_end = segments[-1].get("end", 0) total_duration = last_end - first_start print(f" First segment: {first_start:.1f}s") print(f" Last segment: {last_end:.1f}s") print( f" Total transcribed duration: {total_duration:.1f}s ({total_duration / 60:.1f} minutes)" ) # Expected: ~1800 seconds for 3 chunks (30 minutes) expected_duration = 1800 # 30 minutes coverage = ( (total_duration / expected_duration) * 100 if expected_duration > 0 else 0 ) print(f" Coverage of 30-minute target: {coverage:.1f}%") if coverage >= 90: print(f" ✅ Good coverage of target audio") elif coverage >= 50: print(f" ⚠️ Partial coverage") else: print(f" ❌ Low coverage") # Check segment quality empty_segments = [s for s in segments if not s.get("text", "").strip()] print(f" Empty segments: {len(empty_segments)}") # Sample first few segments print(f"\n Sample segments:") for i, seg in enumerate(segments[:5]): text = seg.get("text", "") if len(text) > 100: text = text[:97] + "..." print( f" {i + 1}. [{seg.get('start', 0):.1f}-{seg.get('end', 0):.1f}s]: {text}" ) else: print(f"\n ❌ Output file not created") # Print last 20 lines of stderr for debugging print(f"\n Last 20 lines of stderr:") for line in stderr_lines[-20:]: if line.strip(): print(f" {line}") print(f"\n" + "=" * 60) # Overall assessment if chunk_success_count >= 3 and not killed: print(f"✅ SUCCESS: Processed {chunk_success_count} chunks successfully") print(f" The fix appears to work correctly") elif chunk_success_count > 0: print(f"⚠️ PARTIAL: Processed {chunk_success_count} chunks") print(f" Some chunks succeeded, but not all") else: print(f"❌ FAILED: No chunks processed successfully") except Exception as e: print(f"✗ Error: {e}") import traceback traceback.print_exc() finally: if os.path.exists(output_path): os.unlink(output_path) print(f"✓ Cleaned up output file") if __name__ == "__main__": test_limited_chunks()