Files
momentry_core/test_limited_chunks.py
Warren b54c2def30 feat: add migrations, test scripts, and utility tools
- Add database migrations (006-028) for face recognition, identity, file_uuid
- Add test scripts for ASR, face, search, processing
- Add portal frontend (Tauri)
- Add config, benchmark, and monitoring utilities
- Add model checkpoints and pretrained model references
2026-04-30 15:11:53 +08:00

229 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/opt/homebrew/bin/python3.11
"""Test limited number of chunks to verify fix works end-to-end."""
import subprocess
import tempfile
import os
import time
import sys
import json
def test_limited_chunks():
"""Test processing only first 3 chunks (30 minutes) of large video."""
test_video = "/Users/accusys/test_video/1636719d-c31f-78ac-f1dd-8ab0b0b36c66.mov"
if not os.path.exists(test_video):
print(f"Test video not found: {test_video}")
return
print(f"Testing first 3 chunks (30 minutes) of large video:")
print(f" Video: {os.path.basename(test_video)}")
print(f" Expected: 3 chunks × 10 minutes = 30 minutes audio")
print("-" * 60)
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
output_path = f.name
try:
# We'll modify the script to only process 3 chunks
# First, let's check if we can process with a smaller max_direct_duration
# to force chunked mode but limit total processing time
cmd = [
"/opt/homebrew/bin/python3.11",
"scripts/asr_processor.py",
test_video,
output_path,
"--uuid",
"test_limited",
"--chunk-duration",
"600", # 10 minutes
"--max-direct-duration",
"300", # Force chunked mode
]
env = os.environ.copy()
env["MOMENTRY_DISABLE_REDIS"] = "1"
env["ASR_DEBUG"] = "1"
env["MOMENTRY_ASR_CHUNK_TIMEOUT"] = "60" # 1 minute per chunk
print(f"Command: {' '.join(cmd)}")
print(f"Environment: ASR_DEBUG=1, MOMENTRY_ASR_CHUNK_TIMEOUT=60")
print("-" * 60)
start = time.time()
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=env,
)
timeout = 300 # 5 minutes max for 3 chunks
killed = False
stderr_lines = []
from select import select
chunk_success_count = 0
chunk_error_count = 0
while True:
if proc.poll() is not None:
remaining_stderr = proc.stderr.read()
if remaining_stderr:
for line in remaining_stderr.split("\n"):
if line:
stderr_lines.append(line)
break
if time.time() - start > timeout:
print(f"\n⏱️ TOTAL TIMEOUT after {timeout}s - killing process")
proc.kill()
killed = True
break
readable, _, _ = select([proc.stderr], [], [], 0.1)
if readable:
line = proc.stderr.readline()
if line:
line = line.rstrip("\n")
stderr_lines.append(line)
# Count chunk successes in real-time
if "transcribe_chunk succeeded" in line:
chunk_success_count += 1
print(f" ✓ Chunk {chunk_success_count} succeeded")
elif "error" in line.lower() and "debug" not in line:
chunk_error_count += 1
print(f" ✗ Error: {line}")
elif "Chunk" in line and "extracting audio" in line:
# Show progress
print(f" Processing chunk...")
time.sleep(0.05)
if killed:
proc.wait()
elapsed = time.time() - start
print(f"\n" + "=" * 60)
print(f"Results:")
print(f" Elapsed time: {elapsed:.1f}s")
print(f" Killed: {killed}")
print(f" Return code: {proc.returncode}")
print(f" Chunks succeeded: {chunk_success_count}")
print(f" Chunks with errors: {chunk_error_count}")
# Analyze stderr for detailed results
print(f"\nDetailed analysis:")
# Count various events
extract_success = [l for l in stderr_lines if "extract_chunk succeeded" in l]
transcribe_success = [
l for l in stderr_lines if "transcribe_chunk succeeded" in l
]
timeout_warnings = [l for l in stderr_lines if "timeout" in l.lower()]
print(f" Audio extractions: {len(extract_success)}")
print(f" Transcriptions: {len(transcribe_success)}")
print(f" Timeout warnings: {len(timeout_warnings)}")
if timeout_warnings:
print(f" ⚠️ Timeout warnings detected:")
for warning in timeout_warnings[:3]:
print(f" {warning}")
# Check if output was created
if os.path.exists(output_path):
with open(output_path, "r") as f:
data = json.load(f)
segments = data.get("segments", [])
processing_mode = data.get("processing_mode", "unknown")
chunk_count = data.get("chunk_count", 0)
print(f"\nOutput analysis:")
print(f" Processing mode: {processing_mode}")
print(f" Chunk count: {chunk_count}")
print(f" Total segments: {len(segments)}")
if segments:
# Calculate audio coverage
first_start = segments[0].get("start", 0)
last_end = segments[-1].get("end", 0)
total_duration = last_end - first_start
print(f" First segment: {first_start:.1f}s")
print(f" Last segment: {last_end:.1f}s")
print(
f" Total transcribed duration: {total_duration:.1f}s ({total_duration / 60:.1f} minutes)"
)
# Expected: ~1800 seconds for 3 chunks (30 minutes)
expected_duration = 1800 # 30 minutes
coverage = (
(total_duration / expected_duration) * 100
if expected_duration > 0
else 0
)
print(f" Coverage of 30-minute target: {coverage:.1f}%")
if coverage >= 90:
print(f" ✅ Good coverage of target audio")
elif coverage >= 50:
print(f" ⚠️ Partial coverage")
else:
print(f" ❌ Low coverage")
# Check segment quality
empty_segments = [s for s in segments if not s.get("text", "").strip()]
print(f" Empty segments: {len(empty_segments)}")
# Sample first few segments
print(f"\n Sample segments:")
for i, seg in enumerate(segments[:5]):
text = seg.get("text", "")
if len(text) > 100:
text = text[:97] + "..."
print(
f" {i + 1}. [{seg.get('start', 0):.1f}-{seg.get('end', 0):.1f}s]: {text}"
)
else:
print(f"\n ❌ Output file not created")
# Print last 20 lines of stderr for debugging
print(f"\n Last 20 lines of stderr:")
for line in stderr_lines[-20:]:
if line.strip():
print(f" {line}")
print(f"\n" + "=" * 60)
# Overall assessment
if chunk_success_count >= 3 and not killed:
print(f"✅ SUCCESS: Processed {chunk_success_count} chunks successfully")
print(f" The fix appears to work correctly")
elif chunk_success_count > 0:
print(f"⚠️ PARTIAL: Processed {chunk_success_count} chunks")
print(f" Some chunks succeeded, but not all")
else:
print(f"❌ FAILED: No chunks processed successfully")
except Exception as e:
print(f"✗ Error: {e}")
import traceback
traceback.print_exc()
finally:
if os.path.exists(output_path):
os.unlink(output_path)
print(f"✓ Cleaned up output file")
if __name__ == "__main__":
test_limited_chunks()