- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
214 lines
7.4 KiB
Python
214 lines
7.4 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Hybrid Stamp Search: OpenCV + OWL-ViT
|
|
Stage 1: OpenCV finds frames with containers (hands/paper) - FAST
|
|
Stage 2: OWL-ViT validates those frames for actual stamps - ACCURATE
|
|
"""
|
|
|
|
import os
|
|
import cv2
|
|
import json
|
|
import time
|
|
import numpy as np
|
|
from PIL import Image
|
|
import torch
|
|
from transformers import OwlViTProcessor, OwlViTForObjectDetection
|
|
|
|
UUID = "384b0ff44aaaa1f1"
|
|
VIDEO_PATH = f"output/{UUID}/{UUID}.mp4"
|
|
OUTPUT_DIR = f"output/{UUID}/hybrid_stamp_search"
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
CROPS_DIR = os.path.join(OUTPUT_DIR, "crops")
|
|
os.makedirs(CROPS_DIR, exist_ok=True)
|
|
|
|
FRAME_INTERVAL = 5
|
|
|
|
print("=" * 60)
|
|
print("🔬 Hybrid Stamp Search: OpenCV + OWL-ViT")
|
|
print("=" * 60)
|
|
|
|
cap = cv2.VideoCapture(VIDEO_PATH)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
total_sec = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps)
|
|
print(f"📹 Video: {total_sec}s ({total_sec // 60} min)")
|
|
|
|
# ═══════════════════════════════════════════
|
|
# Stage 1: OpenCV - Find container frames
|
|
# ═══════════════════════════════════════════
|
|
print("\n⚡ Stage 1: OpenCV container scanning...")
|
|
candidate_frames = [] # (sec, frame_array)
|
|
start = time.time()
|
|
|
|
for sec in range(0, total_sec, FRAME_INTERVAL):
|
|
cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000)
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
h, w = frame.shape[:2]
|
|
has_container = False
|
|
|
|
# 1. Skin/hand detection
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
|
skin = cv2.inRange(hsv, np.array([0, 20, 60]), np.array([25, 180, 255]))
|
|
skin += cv2.inRange(hsv, np.array([160, 20, 60]), np.array([179, 180, 255]))
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))
|
|
skin = cv2.morphologyEx(skin, cv2.MORPH_CLOSE, kernel)
|
|
skin = cv2.morphologyEx(skin, cv2.MORPH_OPEN, kernel)
|
|
|
|
contours, _ = cv2.findContours(skin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
for cnt in contours:
|
|
area = cv2.contourArea(cnt)
|
|
if 1500 < area < h * w * 0.35:
|
|
has_container = True
|
|
break
|
|
|
|
# 2. Bright rectangular regions (paper/envelope)
|
|
if not has_container:
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
_, bright = cv2.threshold(gray, 175, 255, cv2.THRESH_BINARY)
|
|
bright = cv2.morphologyEx(
|
|
bright, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
|
|
)
|
|
contours, _ = cv2.findContours(
|
|
bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
|
|
)
|
|
for cnt in contours:
|
|
area = cv2.contourArea(cnt)
|
|
if 3000 < area < h * w * 0.5:
|
|
x, y, cw, ch = cv2.boundingRect(cnt)
|
|
aspect = cw / ch if ch > 0 else 0
|
|
if 0.2 < aspect < 4.0:
|
|
has_container = True
|
|
break
|
|
|
|
if has_container:
|
|
candidate_frames.append((sec, frame))
|
|
|
|
cap.release()
|
|
|
|
t1 = time.time() - start
|
|
print(f" ✅ Stage 1 done in {t1:.1f}s")
|
|
print(
|
|
f" 📊 {len(candidate_frames)} candidate frames out of {total_sec // FRAME_INTERVAL} total"
|
|
)
|
|
|
|
if not candidate_frames:
|
|
print(" ❌ No containers found. Exiting.")
|
|
exit()
|
|
|
|
# ═══════════════════════════════════════════
|
|
# Stage 2: OWL-ViT - Precise stamp detection
|
|
# ═══════════════════════════════════════════
|
|
print("\n🔬 Stage 2: OWL-ViT stamp validation...")
|
|
print(" Loading model...")
|
|
processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
|
|
model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
|
|
model.eval()
|
|
|
|
STAMP_TERMS = ["postage stamp", "stamp", "small stamp", "stamp on paper"]
|
|
all_results = []
|
|
start2 = time.time()
|
|
|
|
for idx, (sec, frame) in enumerate(candidate_frames):
|
|
elapsed = time.time() - start2
|
|
eta = (elapsed / (idx + 1)) * (len(candidate_frames) - idx - 1) if idx > 0 else 0
|
|
|
|
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
|
h, w = frame.shape[:2]
|
|
|
|
found = False
|
|
for term in STAMP_TERMS:
|
|
try:
|
|
inputs = processor(text=[[term]], images=image, return_tensors="pt")
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
|
|
target_sizes = torch.Tensor([h, w])
|
|
results = processor.post_process_object_detection(
|
|
outputs=outputs, target_sizes=target_sizes, threshold=0.06
|
|
)
|
|
|
|
for score, label, box in zip(
|
|
results[0]["scores"], results[0]["labels"], results[0]["boxes"]
|
|
):
|
|
s = float(score)
|
|
if s > 0.06:
|
|
x1, y1, x2, y2 = map(int, box.tolist())
|
|
bw, bh = x2 - x1, y2 - y1
|
|
|
|
# Filter: stamps are small (15-150px)
|
|
if not (15 < bw < 150 and 15 < bh < 150):
|
|
continue
|
|
|
|
crop = frame[y1:y2, x1:x2]
|
|
if crop.size == 0:
|
|
continue
|
|
|
|
result = {
|
|
"timestamp": sec,
|
|
"term": term,
|
|
"score": s,
|
|
"bbox": [x1, y1, x2, y2],
|
|
"size": [bw, bh],
|
|
}
|
|
all_results.append(result)
|
|
found = True
|
|
|
|
# Save
|
|
crop_name = f"stamp_{sec}s_{term.replace(' ', '_')}_{s:.2f}.jpg"
|
|
cv2.imwrite(os.path.join(CROPS_DIR, crop_name), crop)
|
|
|
|
# Annotate
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
|
|
cv2.putText(
|
|
frame,
|
|
f"{term[:10]} {s:.2f}",
|
|
(x1, y1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
0.6,
|
|
(0, 255, 0),
|
|
2,
|
|
)
|
|
|
|
print(f" 🎯 {sec}s | {term} | {s:.2f} | {bw}x{bh}px")
|
|
except Exception as e:
|
|
pass
|
|
|
|
if found:
|
|
ann_path = os.path.join(OUTPUT_DIR, f"annotated_{sec}s.jpg")
|
|
cv2.imwrite(ann_path, frame)
|
|
|
|
if idx % 10 == 0 or idx == len(candidate_frames) - 1:
|
|
print(f" Progress: {idx + 1}/{len(candidate_frames)} | ETA: {eta:.0f}s")
|
|
|
|
t2 = time.time() - start2
|
|
total_time = t1 + t2
|
|
|
|
# ═══════════════════════════════════════════
|
|
# Stage 3: Deduplicate & rank
|
|
# ═══════════════════════════════════════════
|
|
all_results.sort(key=lambda x: x["score"], reverse=True)
|
|
seen = set()
|
|
unique = []
|
|
for r in all_results:
|
|
ts = r["timestamp"]
|
|
if ts not in seen:
|
|
seen.add(ts)
|
|
unique.append(r)
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print(f"⏱️ Total time: {total_time:.1f}s (OpenCV: {t1:.1f}s + OWL-ViT: {t2:.1f}s)")
|
|
print(f"📊 Found {len(unique)} unique stamp candidates")
|
|
print(f"{'=' * 60}")
|
|
|
|
for r in unique:
|
|
print(
|
|
f" 🎯 {r['timestamp']}s | {r['term']} | {r['score']:.2f} | {r['size'][0]}x{r['size'][1]}px"
|
|
)
|
|
|
|
with open(os.path.join(OUTPUT_DIR, "results.json"), "w") as f:
|
|
json.dump(unique, f, indent=2)
|
|
|
|
print(f"\n🏁 Done. Crops: {CROPS_DIR}")
|