Files
momentry_core/scripts/hybrid_stamp_search.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

214 lines
7.4 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Hybrid Stamp Search: OpenCV + OWL-ViT
Stage 1: OpenCV finds frames with containers (hands/paper) - FAST
Stage 2: OWL-ViT validates those frames for actual stamps - ACCURATE
"""
import os
import cv2
import json
import time
import numpy as np
from PIL import Image
import torch
from transformers import OwlViTProcessor, OwlViTForObjectDetection
UUID = "384b0ff44aaaa1f1"
VIDEO_PATH = f"output/{UUID}/{UUID}.mp4"
OUTPUT_DIR = f"output/{UUID}/hybrid_stamp_search"
os.makedirs(OUTPUT_DIR, exist_ok=True)
CROPS_DIR = os.path.join(OUTPUT_DIR, "crops")
os.makedirs(CROPS_DIR, exist_ok=True)
FRAME_INTERVAL = 5
print("=" * 60)
print("🔬 Hybrid Stamp Search: OpenCV + OWL-ViT")
print("=" * 60)
cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)
total_sec = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps)
print(f"📹 Video: {total_sec}s ({total_sec // 60} min)")
# ═══════════════════════════════════════════
# Stage 1: OpenCV - Find container frames
# ═══════════════════════════════════════════
print("\n⚡ Stage 1: OpenCV container scanning...")
candidate_frames = [] # (sec, frame_array)
start = time.time()
for sec in range(0, total_sec, FRAME_INTERVAL):
cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000)
ret, frame = cap.read()
if not ret:
continue
h, w = frame.shape[:2]
has_container = False
# 1. Skin/hand detection
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
skin = cv2.inRange(hsv, np.array([0, 20, 60]), np.array([25, 180, 255]))
skin += cv2.inRange(hsv, np.array([160, 20, 60]), np.array([179, 180, 255]))
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))
skin = cv2.morphologyEx(skin, cv2.MORPH_CLOSE, kernel)
skin = cv2.morphologyEx(skin, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(skin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if 1500 < area < h * w * 0.35:
has_container = True
break
# 2. Bright rectangular regions (paper/envelope)
if not has_container:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, bright = cv2.threshold(gray, 175, 255, cv2.THRESH_BINARY)
bright = cv2.morphologyEx(
bright, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
)
contours, _ = cv2.findContours(
bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
for cnt in contours:
area = cv2.contourArea(cnt)
if 3000 < area < h * w * 0.5:
x, y, cw, ch = cv2.boundingRect(cnt)
aspect = cw / ch if ch > 0 else 0
if 0.2 < aspect < 4.0:
has_container = True
break
if has_container:
candidate_frames.append((sec, frame))
cap.release()
t1 = time.time() - start
print(f" ✅ Stage 1 done in {t1:.1f}s")
print(
f" 📊 {len(candidate_frames)} candidate frames out of {total_sec // FRAME_INTERVAL} total"
)
if not candidate_frames:
print(" ❌ No containers found. Exiting.")
exit()
# ═══════════════════════════════════════════
# Stage 2: OWL-ViT - Precise stamp detection
# ═══════════════════════════════════════════
print("\n🔬 Stage 2: OWL-ViT stamp validation...")
print(" Loading model...")
processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
model.eval()
STAMP_TERMS = ["postage stamp", "stamp", "small stamp", "stamp on paper"]
all_results = []
start2 = time.time()
for idx, (sec, frame) in enumerate(candidate_frames):
elapsed = time.time() - start2
eta = (elapsed / (idx + 1)) * (len(candidate_frames) - idx - 1) if idx > 0 else 0
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
h, w = frame.shape[:2]
found = False
for term in STAMP_TERMS:
try:
inputs = processor(text=[[term]], images=image, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
target_sizes = torch.Tensor([h, w])
results = processor.post_process_object_detection(
outputs=outputs, target_sizes=target_sizes, threshold=0.06
)
for score, label, box in zip(
results[0]["scores"], results[0]["labels"], results[0]["boxes"]
):
s = float(score)
if s > 0.06:
x1, y1, x2, y2 = map(int, box.tolist())
bw, bh = x2 - x1, y2 - y1
# Filter: stamps are small (15-150px)
if not (15 < bw < 150 and 15 < bh < 150):
continue
crop = frame[y1:y2, x1:x2]
if crop.size == 0:
continue
result = {
"timestamp": sec,
"term": term,
"score": s,
"bbox": [x1, y1, x2, y2],
"size": [bw, bh],
}
all_results.append(result)
found = True
# Save
crop_name = f"stamp_{sec}s_{term.replace(' ', '_')}_{s:.2f}.jpg"
cv2.imwrite(os.path.join(CROPS_DIR, crop_name), crop)
# Annotate
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
cv2.putText(
frame,
f"{term[:10]} {s:.2f}",
(x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2,
)
print(f" 🎯 {sec}s | {term} | {s:.2f} | {bw}x{bh}px")
except Exception as e:
pass
if found:
ann_path = os.path.join(OUTPUT_DIR, f"annotated_{sec}s.jpg")
cv2.imwrite(ann_path, frame)
if idx % 10 == 0 or idx == len(candidate_frames) - 1:
print(f" Progress: {idx + 1}/{len(candidate_frames)} | ETA: {eta:.0f}s")
t2 = time.time() - start2
total_time = t1 + t2
# ═══════════════════════════════════════════
# Stage 3: Deduplicate & rank
# ═══════════════════════════════════════════
all_results.sort(key=lambda x: x["score"], reverse=True)
seen = set()
unique = []
for r in all_results:
ts = r["timestamp"]
if ts not in seen:
seen.add(ts)
unique.append(r)
print(f"\n{'=' * 60}")
print(f"⏱️ Total time: {total_time:.1f}s (OpenCV: {t1:.1f}s + OWL-ViT: {t2:.1f}s)")
print(f"📊 Found {len(unique)} unique stamp candidates")
print(f"{'=' * 60}")
for r in unique:
print(
f" 🎯 {r['timestamp']}s | {r['term']} | {r['score']:.2f} | {r['size'][0]}x{r['size'][1]}px"
)
with open(os.path.join(OUTPUT_DIR, "results.json"), "w") as f:
json.dump(unique, f, indent=2)
print(f"\n🏁 Done. Crops: {CROPS_DIR}")