Files
momentry_core/scripts/fast_stamp_search.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

255 lines
8.3 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Fast Multi-Stage Stamp Search
Stage 1: OpenCV fast container detection (skin/hands, rectangles/paper)
Stage 2: OWL-ViT only on container crops for stamp detection
"""
import os
import cv2
import json
import time
import numpy as np
from PIL import Image
import torch
from transformers import OwlViTProcessor, OwlViTForObjectDetection
UUID = "384b0ff44aaaa1f1"
VIDEO_PATH = f"output/{UUID}/{UUID}.mp4"
OUTPUT_DIR = f"output/{UUID}/fast_stamp_search"
os.makedirs(OUTPUT_DIR, exist_ok=True)
CROPS_DIR = os.path.join(OUTPUT_DIR, "crops")
os.makedirs(CROPS_DIR, exist_ok=True)
FRAME_INTERVAL = 5
MIN_STAMP_SCORE = 0.06
print("=" * 60)
print("⚡ Fast Multi-Stage Stamp Search")
print("=" * 60)
cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)
total_sec = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps)
print(f"📹 Video: {total_sec}s ({total_sec // 60} min), {fps:.1f} fps")
# Load OWL-ViT once for stamp detection
print("🔬 Loading OWL-ViT stamp detector...")
processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
model.eval()
STAMP_TERMS = ["postage stamp", "stamp on paper", "small stamp", "stamp"]
def find_containers_fast(frame):
"""Fast OpenCV-based container detection"""
containers = []
h, w = frame.shape[:2]
# 1. Skin color detection (hands)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
skin_mask = cv2.inRange(hsv, np.array([0, 20, 70]), np.array([20, 150, 255]))
skin_mask += cv2.inRange(hsv, np.array([160, 20, 70]), np.array([179, 150, 255]))
# Morphological cleanup
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (11, 11))
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel)
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel)
# Find hand contours
contours, _ = cv2.findContours(
skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
for cnt in contours:
area = cv2.contourArea(cnt)
if 2000 < area < h * w * 0.4:
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
margin = 40
containers.append(
{
"type": "hand",
"bbox": [
max(0, x - margin),
max(0, y - margin),
min(w, x + w_cnt + margin),
min(h, y + h_cnt + margin),
],
}
)
# 2. Bright rectangular regions (envelopes/paper)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
_, bright = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if 5000 < area < h * w * 0.5:
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
aspect = w_cnt / h_cnt if h_cnt > 0 else 0
if 0.3 < aspect < 3.0:
margin = 30
containers.append(
{
"type": "paper",
"bbox": [
max(0, x - margin),
max(0, y - margin),
min(w, x + w_cnt + margin),
min(h, y + h_cnt + margin),
],
}
)
return containers
all_results = []
start_time = time.time()
for sec in range(0, total_sec, FRAME_INTERVAL):
cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000)
ret, frame = cap.read()
if not ret:
continue
elapsed = time.time() - start_time
eta = (elapsed / (sec / FRAME_INTERVAL + 1)) * (
total_sec / FRAME_INTERVAL - sec / FRAME_INTERVAL - 1
)
# Stage 1: Fast container detection
containers = find_containers_fast(frame)
if not containers:
if sec % 60 == 0:
print(
f" [{sec // 60}min/{total_sec // 60}min] No containers | ETA: {eta:.0f}s"
)
continue
print(
f" [{sec}s] Found {len(containers)} containers ({[c['type'] for c in containers]})"
)
# Stage 2: OWL-ViT stamp detection on each container
for container in containers:
cx1, cy1, cx2, cy2 = container["bbox"]
container_img = frame[cy1:cy2, cx1:cx2]
if container_img.size == 0:
continue
ch, cw = container_img.shape[:2]
# Scale up for better detection
scale = max(2, 500 // max(ch, cw))
if scale > 1:
scaled = cv2.resize(
container_img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC
)
else:
scaled = container_img
scaled_pil = Image.fromarray(cv2.cvtColor(scaled, cv2.COLOR_BGR2RGB))
sh, sw = scaled.shape[:2]
for term in STAMP_TERMS:
try:
inputs = processor(
text=[[term]], images=scaled_pil, return_tensors="pt"
)
with torch.no_grad():
outputs = model(**inputs)
target_sizes = torch.Tensor([sh, sw])
results = processor.post_process_object_detection(
outputs=outputs,
target_sizes=target_sizes,
threshold=MIN_STAMP_SCORE,
)
for score, label, box in zip(
results[0]["scores"], results[0]["labels"], results[0]["boxes"]
):
s = float(score)
if s > MIN_STAMP_SCORE:
sx1, sy1, sx2, sy2 = box.tolist()
orig_w = (sx2 - sx1) / scale
orig_h = (sy2 - sy1) / scale
if not (15 < orig_w < 200 and 15 < orig_h < 200):
continue
ox1 = cx1 + int(sx1 / scale)
oy1 = cy1 + int(sy1 / scale)
ox2 = cx1 + int(sx2 / scale)
oy2 = cy1 + int(sy2 / scale)
crop = frame[oy1:oy2, ox1:ox2]
if crop.size == 0:
continue
result = {
"timestamp": sec,
"container": container["type"],
"stamp_term": term,
"score": s,
"bbox": [ox1, oy1, ox2, oy2],
"size": [int(orig_w), int(orig_h)],
}
all_results.append(result)
# Save
crop_name = f"stamp_{sec}s_{term.replace(' ', '_')}_{s:.2f}.jpg"
cv2.imwrite(os.path.join(CROPS_DIR, crop_name), crop)
# Annotate full frame
cv2.rectangle(frame, (ox1, oy1), (ox2, oy2), (0, 255, 0), 3)
cv2.putText(
frame,
f"{term[:8]} {s:.2f}",
(ox1, oy1 - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2,
)
print(
f" 🎯 {sec}s | {term} | {s:.2f} | {int(orig_w)}x{int(orig_h)}px"
)
except Exception as e:
pass
# Save annotated frame if stamps found
if any(r["timestamp"] == sec for r in all_results):
ann_path = os.path.join(OUTPUT_DIR, f"annotated_{sec}s.jpg")
cv2.imwrite(ann_path, frame)
cap.release()
# Deduplicate by timestamp
seen = set()
unique = []
for r in all_results:
ts = r["timestamp"]
if ts not in seen:
seen.add(ts)
unique.append(r)
unique.sort(key=lambda x: x["score"], reverse=True)
print(f"\n{'=' * 60}")
print(f"📊 Found {len(unique)} unique stamp candidates")
for r in unique:
print(
f" 🎯 {r['timestamp']}s | {r['stamp_term']} | {r['score']:.2f} | via: {r['container']}"
)
with open(os.path.join(OUTPUT_DIR, "results.json"), "w") as f:
json.dump(unique, f, indent=2)
print(f"\n🏁 Done. Crops: {CROPS_DIR}")