Files
momentry_core/scripts/save_events_to_db.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

221 lines
6.6 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Save Events to Database
職責:將偵測到的打鬥 (Fight)、吵架 (Argument) 和特殊音效 (Gunshot) 寫入 Postgres。
"""
import psycopg2
import librosa
import numpy as np
import json
import os
# 設定
UUID = os.getenv("UUID", "384b0ff44aaaa1f1")
OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output")
AUDIO_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.wav")
ASRX_PATH = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.asrx.json")
SOUND_JSON = os.path.join(OUTPUT_DIR, UUID, f"{UUID}.sound_events.json")
DB_URL = os.getenv("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry")
def connect_db():
return psycopg2.connect(DB_URL)
def create_schema(cur):
print("🏗️ Creating schema...")
cur.execute("""
CREATE TABLE IF NOT EXISTS video_events (
id SERIAL PRIMARY KEY,
uuid TEXT NOT NULL,
start_time FLOAT NOT NULL,
end_time FLOAT NOT NULL,
event_type TEXT NOT NULL,
confidence FLOAT DEFAULT 0.0,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_video_events_uuid ON video_events(uuid);
CREATE INDEX IF NOT EXISTS idx_video_events_type ON video_events(event_type);
""")
def detect_and_save_fights(cur):
print(f"🥊 Detecting Fights for {UUID}...")
y, sr = librosa.load(AUDIO_PATH, sr=22050, mono=True)
hop_length = int(0.05 * sr)
rms = librosa.feature.rms(y=y, frame_length=2048, hop_length=hop_length)[0]
# Speech Mask
speech_mask = np.zeros_like(rms, dtype=bool)
with open(ASRX_PATH, "r") as f:
data = json.load(f)
segments = data if isinstance(data, list) else data.get("segments", [])
for s in segments:
start_idx = int(s["start"] / 0.05)
end_idx = int(s["end"] / 0.05) + 1
start_idx = max(0, min(start_idx, len(speech_mask)))
end_idx = max(0, min(end_idx, len(speech_mask)))
speech_mask[start_idx:end_idx] = True
# Detection
THRESHOLD = 0.10
impact_pulses = (rms > THRESHOLD) & (~speech_mask)
WINDOW_SIZE = 40 # 2s
MIN_PULSES = 4
pulse_density = np.convolve(
impact_pulses.astype(int), np.ones(WINDOW_SIZE), mode="same"
)
fight_zones = pulse_density >= MIN_PULSES
changes = np.diff(fight_zones.astype(int))
starts = np.where(changes == 1)[0]
ends = np.where(changes == -1)[0]
if fight_zones[0]:
starts = np.insert(starts, 0, 0)
if fight_zones[-1]:
ends = np.append(ends, len(fight_zones))
count = 0
for start, end in zip(starts, ends):
dur = (end - start) * 0.05
if dur >= 2.0:
start_t = float(start * 0.05)
end_t = float(end * 0.05)
dur_f = float(dur)
cur.execute(
"""
INSERT INTO video_events (uuid, start_time, end_time, event_type, confidence, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(
UUID,
start_t,
end_t,
"fight",
dur_f,
json.dumps(
{"method": "pulse_density", "energy_threshold": THRESHOLD}
),
),
)
count += 1
print(f" ✅ Saved {count} Fight Scenes.")
def detect_and_save_arguments(cur):
print(f"🗣️ Detecting Arguments for {UUID}...")
with open(ASRX_PATH, "r") as f:
data = json.load(f)
segments = data if isinstance(data, list) else data.get("segments", [])
window_sec = 10.0
turn_threshold = 4
current_time = segments[0]["start"] if segments else 0
end_time = segments[-1]["end"] if segments else 0
count = 0
while current_time < end_time:
window_start = current_time
window_end = current_time + window_sec
speakers_in_window = [
s["speaker_id"]
for s in segments
if s["end"] > window_start and s["start"] < window_end
]
switches = 0
if len(speakers_in_window) > 1:
for i in range(len(speakers_in_window) - 1):
if speakers_in_window[i] != speakers_in_window[i + 1]:
switches += 1
if switches >= turn_threshold:
cur.execute(
"""
INSERT INTO video_events (uuid, start_time, end_time, event_type, confidence, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(
UUID,
window_start,
window_end,
"argument",
switches,
json.dumps(
{
"switches": switches,
"speakers": list(set(speakers_in_window)),
}
),
),
)
count += 1
current_time += window_sec # Skip window to avoid overlapping duplicates
continue
current_time += 2.0
print(f" ✅ Saved {count} Argument Scenes.")
def save_gunshots(cur):
print(f"🔫 Saving Gunshots/Explosions for {UUID}...")
if not os.path.exists(SOUND_JSON):
print(" ⚠️ No sound events file found.")
return
with open(SOUND_JSON) as f:
events = json.load(f).get("sound_events", [])
count = 0
for ev in events:
if "Gunshot" in ev["type"] or "Explosion" in ev["type"]:
cur.execute(
"""
INSERT INTO video_events (uuid, start_time, end_time, event_type, confidence, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(
UUID,
ev["timestamp"],
ev["timestamp"] + 0.5,
"gunshot",
ev["energy"],
json.dumps(ev),
),
)
count += 1
print(f" ✅ Saved {count} Gunshot Events.")
if __name__ == "__main__":
print(f"🚀 Starting Event Ingestion for {UUID}")
conn = connect_db()
cur = conn.cursor()
try:
create_schema(cur)
conn.commit()
detect_and_save_fights(cur)
conn.commit()
detect_and_save_arguments(cur)
conn.commit()
save_gunshots(cur)
conn.commit()
print("\n🎉 All events saved successfully!")
except Exception as e:
print(f"❌ Error: {e}")
conn.rollback()
finally:
cur.close()
conn.close()