- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
396 lines
11 KiB
Python
396 lines
11 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
ASR Processor with chunked transcription and resource monitoring.
|
|
Supports large audio files by splitting into manageable chunks.
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
import argparse
|
|
import signal
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
# Try to import psutil for resource monitoring, but don't fail if not available
|
|
try:
|
|
import psutil
|
|
|
|
PSUTIL_AVAILABLE = True
|
|
except ImportError:
|
|
PSUTIL_AVAILABLE = False
|
|
print("WARNING: psutil not available, resource monitoring disabled")
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
print(f"ASR: Received signal {signum}, exiting...")
|
|
sys.exit(1)
|
|
|
|
|
|
def has_audio_stream(video_path: str) -> bool:
|
|
"""Check if video file has audio stream using ffprobe."""
|
|
try:
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-select_streams",
|
|
"a",
|
|
"-show_entries",
|
|
"stream=codec_type",
|
|
"-of",
|
|
"csv=p=0",
|
|
video_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
return bool(result.stdout.strip())
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
except FileNotFoundError:
|
|
print("WARNING: ffprobe not found, assuming audio exists")
|
|
return True
|
|
|
|
|
|
def get_audio_duration(audio_path: str) -> float:
|
|
"""Get audio duration in seconds using ffprobe."""
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"csv=p=0",
|
|
audio_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
return float(result.stdout.strip())
|
|
|
|
|
|
def extract_audio(video_path: str, audio_path: str) -> bool:
|
|
"""Extract audio from video to WAV format."""
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
video_path,
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
audio_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True)
|
|
return result.returncode == 0 and os.path.exists(audio_path)
|
|
|
|
|
|
def extract_chunk(
|
|
audio_path: str, start: float, duration: float, output_path: str
|
|
) -> bool:
|
|
"""Extract a chunk of audio using ffmpeg."""
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-i",
|
|
audio_path,
|
|
"-ss",
|
|
str(start),
|
|
"-t",
|
|
str(duration),
|
|
"-acodec",
|
|
"pcm_s16le",
|
|
"-ar",
|
|
"16000",
|
|
"-ac",
|
|
"1",
|
|
"-y",
|
|
output_path,
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True)
|
|
return os.path.exists(output_path) and os.path.getsize(output_path) > 0
|
|
|
|
|
|
def monitor_resources(pid: int, interval: int = 60) -> Dict[str, Any]:
|
|
"""Monitor CPU and memory usage for a process."""
|
|
if not PSUTIL_AVAILABLE:
|
|
return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False}
|
|
|
|
try:
|
|
process = psutil.Process(pid)
|
|
cpu_percent = process.cpu_percent(interval=0.1)
|
|
memory_info = process.memory_info()
|
|
memory_mb = memory_info.rss / (1024 * 1024)
|
|
return {
|
|
"cpu_percent": cpu_percent,
|
|
"memory_mb": memory_mb,
|
|
"available": True,
|
|
"pid": pid,
|
|
}
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
return {"cpu_percent": 0.0, "memory_mb": 0.0, "available": False}
|
|
|
|
|
|
def transcribe_chunk(
|
|
model,
|
|
chunk_path: str,
|
|
chunk_start: float,
|
|
chunk_idx: int,
|
|
total_chunks: int,
|
|
publisher: Optional[RedisPublisher] = None,
|
|
) -> Tuple[List[Dict[str, Any]], Any]:
|
|
"""Transcribe a single audio chunk."""
|
|
if publisher:
|
|
publisher.info("asr", f"Transcribing chunk {chunk_idx + 1}/{total_chunks}")
|
|
|
|
start_time = time.time()
|
|
segments, info = model.transcribe(chunk_path, beam_size=5)
|
|
|
|
results = []
|
|
for segment in segments:
|
|
results.append(
|
|
{
|
|
"start": segment.start + chunk_start,
|
|
"end": segment.end + chunk_start,
|
|
"text": segment.text.strip(),
|
|
}
|
|
)
|
|
|
|
elapsed = time.time() - start_time
|
|
if publisher:
|
|
publisher.info(
|
|
"asr",
|
|
f"Chunk {chunk_idx + 1}/{total_chunks}: {len(results)} segments in {elapsed:.1f}s",
|
|
)
|
|
|
|
return results, info
|
|
|
|
|
|
def run_asr_chunked(
|
|
video_path: str,
|
|
output_path: str,
|
|
uuid: str = "",
|
|
chunk_duration: int = 600, # 10 minutes default
|
|
model_size: str = "tiny",
|
|
compute_type: str = "int8",
|
|
) -> None:
|
|
# Set up signal handlers
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
publisher = RedisPublisher(uuid) if uuid else None
|
|
if publisher:
|
|
publisher.info("asr", "ASR_START_CHUNKED")
|
|
|
|
# Check for audio stream
|
|
if not has_audio_stream(video_path):
|
|
if publisher:
|
|
publisher.info("asr", "No audio stream detected, skipping transcription")
|
|
output = {"language": "", "language_probability": 0.0, "segments": []}
|
|
with open(output_path, "w") as f:
|
|
json.dump(output, f, indent=2)
|
|
if publisher:
|
|
publisher.complete("asr", "0 segments (no audio)")
|
|
sys.stderr.write("ASR: No audio stream, skipping transcription\n")
|
|
sys.stderr.flush()
|
|
sys.exit(0)
|
|
|
|
# Create temporary directory for audio extraction
|
|
temp_dir = tempfile.mkdtemp(prefix="asr_")
|
|
audio_path = os.path.join(temp_dir, "audio.wav")
|
|
|
|
if publisher:
|
|
publisher.info("asr", "Extracting audio from video...")
|
|
|
|
# Extract audio
|
|
if not extract_audio(video_path, audio_path):
|
|
if publisher:
|
|
publisher.error("asr", "Failed to extract audio")
|
|
sys.stderr.write("ASR: Failed to extract audio\n")
|
|
sys.stderr.flush()
|
|
sys.exit(1)
|
|
|
|
# Get audio duration
|
|
try:
|
|
total_duration = get_audio_duration(audio_path)
|
|
except Exception as e:
|
|
if publisher:
|
|
publisher.error("asr", f"Failed to get audio duration: {e}")
|
|
sys.stderr.write(f"ASR: Failed to get audio duration: {e}\n")
|
|
sys.stderr.flush()
|
|
sys.exit(1)
|
|
|
|
if publisher:
|
|
publisher.info(
|
|
"asr",
|
|
f"Audio duration: {total_duration:.1f}s ({total_duration / 3600:.1f} hrs)",
|
|
)
|
|
publisher.info("asr", f"Chunk duration: {chunk_duration}s")
|
|
|
|
# Calculate chunks
|
|
chunks = []
|
|
start = 0.0
|
|
chunk_idx = 0
|
|
while start < total_duration:
|
|
chunk_end = min(start + chunk_duration, total_duration)
|
|
chunks.append(
|
|
{
|
|
"start": start,
|
|
"end": chunk_end,
|
|
"duration": chunk_end - start,
|
|
"idx": chunk_idx,
|
|
}
|
|
)
|
|
start = chunk_end
|
|
chunk_idx += 1
|
|
|
|
if publisher:
|
|
publisher.info("asr", f"Split into {len(chunks)} chunks")
|
|
|
|
# Load Whisper model
|
|
if publisher:
|
|
publisher.info(
|
|
"asr", f"Loading Whisper model ({model_size}, {compute_type})..."
|
|
)
|
|
|
|
try:
|
|
from faster_whisper import WhisperModel
|
|
|
|
model = WhisperModel(model_size, device="cpu", compute_type=compute_type)
|
|
except Exception as e:
|
|
if publisher:
|
|
publisher.error("asr", f"Failed to load Whisper model: {e}")
|
|
sys.stderr.write(f"ASR: Failed to load Whisper model: {e}\n")
|
|
sys.stderr.flush()
|
|
sys.exit(1)
|
|
|
|
if publisher:
|
|
publisher.info("asr", "Whisper model loaded successfully")
|
|
|
|
# Process each chunk
|
|
all_segments = []
|
|
language = None
|
|
language_prob = None
|
|
|
|
chunk_temp_dir = os.path.join(temp_dir, "chunks")
|
|
os.makedirs(chunk_temp_dir, exist_ok=True)
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
chunk_path = os.path.join(chunk_temp_dir, f"chunk_{i:04d}.wav")
|
|
|
|
if publisher:
|
|
publisher.progress(
|
|
"asr", i, len(chunks), f"Processing chunk {i + 1}/{len(chunks)}"
|
|
)
|
|
|
|
# Extract chunk
|
|
if not extract_chunk(audio_path, chunk["start"], chunk["duration"], chunk_path):
|
|
if publisher:
|
|
publisher.warning("asr", f"Failed to extract chunk {i}, skipping")
|
|
continue
|
|
|
|
# Monitor resources
|
|
if PSUTIL_AVAILABLE and publisher:
|
|
resources = monitor_resources(os.getpid())
|
|
if resources["available"]:
|
|
publisher.info(
|
|
"asr",
|
|
f"Resource usage: CPU {resources['cpu_percent']:.1f}%, "
|
|
f"Memory {resources['memory_mb']:.1f}MB",
|
|
)
|
|
|
|
# Transcribe chunk with timeout
|
|
try:
|
|
segments, info = transcribe_chunk(
|
|
model, chunk_path, chunk["start"], i, len(chunks), publisher
|
|
)
|
|
all_segments.extend(segments)
|
|
|
|
if language is None:
|
|
language = info.language
|
|
language_prob = info.language_probability
|
|
if publisher:
|
|
publisher.info(
|
|
"asr",
|
|
f"Detected language: {language} (prob {language_prob:.2f})",
|
|
)
|
|
except Exception as e:
|
|
if publisher:
|
|
publisher.error("asr", f"Error transcribing chunk {i}: {e}")
|
|
sys.stderr.write(f"ASR: Error transcribing chunk {i}: {e}\n")
|
|
sys.stderr.flush()
|
|
# Continue with next chunk
|
|
|
|
# Clean up chunk file
|
|
try:
|
|
os.unlink(chunk_path)
|
|
except:
|
|
pass
|
|
|
|
# Clean up temporary directory
|
|
try:
|
|
import shutil
|
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
except:
|
|
pass
|
|
|
|
# Sort segments by start time
|
|
all_segments.sort(key=lambda x: x["start"])
|
|
|
|
# Prepare output
|
|
output = {
|
|
"language": language or "",
|
|
"language_probability": language_prob or 0.0,
|
|
"segments": all_segments,
|
|
"chunk_count": len(chunks),
|
|
"chunk_duration": chunk_duration,
|
|
"total_segments": len(all_segments),
|
|
"processing_mode": "chunked",
|
|
}
|
|
|
|
# Write output
|
|
with open(output_path, "w") as f:
|
|
json.dump(output, f, indent=2)
|
|
|
|
if publisher:
|
|
publisher.complete(
|
|
"asr", f"{len(all_segments)} segments from {len(chunks)} chunks"
|
|
)
|
|
|
|
sys.stderr.write(
|
|
f"ASR: Transcription complete, {len(all_segments)} segments written to {output_path}\n"
|
|
)
|
|
sys.stderr.flush()
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="ASR Transcription (Chunked)")
|
|
parser.add_argument("video_path", help="Path to video file")
|
|
parser.add_argument("output_path", help="Output JSON path")
|
|
parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="")
|
|
parser.add_argument(
|
|
"--chunk-duration",
|
|
type=int,
|
|
default=600,
|
|
help="Chunk duration in seconds (default: 600 = 10 minutes)",
|
|
)
|
|
parser.add_argument("--model-size", default="tiny", help="Whisper model size")
|
|
parser.add_argument("--compute-type", default="int8", help="Compute type")
|
|
args = parser.parse_args()
|
|
|
|
run_asr_chunked(
|
|
args.video_path,
|
|
args.output_path,
|
|
args.uuid,
|
|
args.chunk_duration,
|
|
args.model_size,
|
|
args.compute_type,
|
|
)
|