#!/usr/bin/env python3 """ Test transcription of full audio file with progress monitoring. """ import sys import time import threading import warnings import psutil from pathlib import Path # Capture warnings (disabled due to urllib3 warning) # warnings.filterwarnings("error") # Convert warnings to exceptions def monitor_memory(pid, interval=1, stop_event=None): """Monitor memory usage of process.""" samples = [] while not stop_event or not stop_event.is_set(): try: proc = psutil.Process(pid) mem = proc.memory_info().rss / 1024 / 1024 samples.append((time.time(), mem)) except: pass time.sleep(interval) return samples def transcribe_full( audio_path, model_size="tiny", compute_type="int8", timeout_per_segment=30 ): """Transcribe full audio with timeout per segment.""" from faster_whisper import WhisperModel print(f"Loading model {model_size} ({compute_type})...") start = time.time() model = WhisperModel(model_size, device="cpu", compute_type=compute_type) print(f"Model loaded in {time.time() - start:.1f}s") print(f"Starting transcription of {audio_path}...") print(f"File size: {Path(audio_path).stat().st_size / 1024 / 1024:.1f} MB") segments, info = model.transcribe(audio_path, beam_size=5) # Start memory monitoring in background import threading stop_event = threading.Event() mem_samples = [] def monitor(): nonlocal mem_samples while not stop_event.is_set(): try: proc = psutil.Process() mem = proc.memory_info().rss / 1024 / 1024 mem_samples.append((time.time(), mem)) except: pass time.sleep(2) monitor_thread = threading.Thread(target=monitor, daemon=True) monitor_thread.start() results = [] segment_times = [] start_time = time.time() last_segment_time = start_time try: for i, segment in enumerate(segments): segment_time = time.time() elapsed = segment_time - last_segment_time last_segment_time = segment_time segment_times.append(elapsed) results.append( { "start": segment.start, "end": segment.end, "text": segment.text.strip(), } ) # Print progress if len(mem_samples) > 0: current_mem = mem_samples[-1][1] else: current_mem = 0 print( f"[{i + 1}] {segment.start:.1f}-{segment.end:.1f} ({elapsed:.1f}s, mem: {current_mem:.1f} MB): {segment.text[:80]}..." ) # Reset timeout for next segment # If segment takes too long, maybe something is wrong if elapsed > timeout_per_segment: print( f"WARNING: Segment {i + 1} took {elapsed:.1f}s > {timeout_per_segment}s timeout" ) # Continue anyway total_time = time.time() - start_time print(f"Transcription completed in {total_time:.1f}s") print(f"Total segments: {len(results)}") print( f"Average time per segment: {total_time / len(results) if results else 0:.2f}s" ) except Exception as e: print(f"Error during transcription: {e}") import traceback traceback.print_exc() finally: stop_event.set() monitor_thread.join(timeout=5) if mem_samples: peak_mem = max(m[1] for m in mem_samples) avg_mem = sum(m[1] for m in mem_samples) / len(mem_samples) print(f"Memory usage: peak {peak_mem:.1f} MB, average {avg_mem:.1f} MB") return results, info def main(): audio_path = "/tmp/test_audio.wav" if not Path(audio_path).exists(): print(f"Audio file not found: {audio_path}") sys.exit(1) print(f"Testing full audio transcription") print(f"Audio duration: 1:54:39 (approx)") # Set a total timeout of 10 minutes start = time.time() results = None info = None def run_transcribe(): nonlocal results, info results, info = transcribe_full(audio_path, timeout_per_segment=60) thread = threading.Thread(target=run_transcribe) thread.start() thread.join(timeout=600) # 10 minutes if thread.is_alive(): print("\nTIMEOUT: Transcription took longer than 10 minutes") # Can't interrupt, but we can exit sys.exit(1) if results is not None: print(f"\nSuccessfully transcribed {len(results)} segments") print(f"Language: {info.language} (prob {info.language_probability:.2f})") # Save results output_path = Path("test_output/full_audio_transcription.json") output_path.parent.mkdir(exist_ok=True) import json with open(output_path, "w") as f: json.dump( { "language": info.language, "language_probability": info.language_probability, "segments": results, }, f, indent=2, ) print(f"Results saved to {output_path}") print(f"Total execution time: {time.time() - start:.1f}s") if __name__ == "__main__": main()