- Update ASR, face, OCR, pose processors - Add release pre-flight check script - Add synonym generation, chunk processing scripts - Add face recognition, stamp search utilities
255 lines
8.3 KiB
Python
255 lines
8.3 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Fast Multi-Stage Stamp Search
|
|
Stage 1: OpenCV fast container detection (skin/hands, rectangles/paper)
|
|
Stage 2: OWL-ViT only on container crops for stamp detection
|
|
"""
|
|
|
|
import os
|
|
import cv2
|
|
import json
|
|
import time
|
|
import numpy as np
|
|
from PIL import Image
|
|
import torch
|
|
from transformers import OwlViTProcessor, OwlViTForObjectDetection
|
|
|
|
UUID = "384b0ff44aaaa1f1"
|
|
VIDEO_PATH = f"output/{UUID}/{UUID}.mp4"
|
|
OUTPUT_DIR = f"output/{UUID}/fast_stamp_search"
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
CROPS_DIR = os.path.join(OUTPUT_DIR, "crops")
|
|
os.makedirs(CROPS_DIR, exist_ok=True)
|
|
|
|
FRAME_INTERVAL = 5
|
|
MIN_STAMP_SCORE = 0.06
|
|
|
|
print("=" * 60)
|
|
print("⚡ Fast Multi-Stage Stamp Search")
|
|
print("=" * 60)
|
|
|
|
cap = cv2.VideoCapture(VIDEO_PATH)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
total_sec = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps)
|
|
print(f"📹 Video: {total_sec}s ({total_sec // 60} min), {fps:.1f} fps")
|
|
|
|
# Load OWL-ViT once for stamp detection
|
|
print("🔬 Loading OWL-ViT stamp detector...")
|
|
processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32")
|
|
model = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32")
|
|
model.eval()
|
|
|
|
STAMP_TERMS = ["postage stamp", "stamp on paper", "small stamp", "stamp"]
|
|
|
|
|
|
def find_containers_fast(frame):
|
|
"""Fast OpenCV-based container detection"""
|
|
containers = []
|
|
h, w = frame.shape[:2]
|
|
|
|
# 1. Skin color detection (hands)
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
|
skin_mask = cv2.inRange(hsv, np.array([0, 20, 70]), np.array([20, 150, 255]))
|
|
skin_mask += cv2.inRange(hsv, np.array([160, 20, 70]), np.array([179, 150, 255]))
|
|
|
|
# Morphological cleanup
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (11, 11))
|
|
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_CLOSE, kernel)
|
|
skin_mask = cv2.morphologyEx(skin_mask, cv2.MORPH_OPEN, kernel)
|
|
|
|
# Find hand contours
|
|
contours, _ = cv2.findContours(
|
|
skin_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
|
|
)
|
|
for cnt in contours:
|
|
area = cv2.contourArea(cnt)
|
|
if 2000 < area < h * w * 0.4:
|
|
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
|
|
margin = 40
|
|
containers.append(
|
|
{
|
|
"type": "hand",
|
|
"bbox": [
|
|
max(0, x - margin),
|
|
max(0, y - margin),
|
|
min(w, x + w_cnt + margin),
|
|
min(h, y + h_cnt + margin),
|
|
],
|
|
}
|
|
)
|
|
|
|
# 2. Bright rectangular regions (envelopes/paper)
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
_, bright = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY)
|
|
|
|
contours, _ = cv2.findContours(bright, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
for cnt in contours:
|
|
area = cv2.contourArea(cnt)
|
|
if 5000 < area < h * w * 0.5:
|
|
x, y, w_cnt, h_cnt = cv2.boundingRect(cnt)
|
|
aspect = w_cnt / h_cnt if h_cnt > 0 else 0
|
|
if 0.3 < aspect < 3.0:
|
|
margin = 30
|
|
containers.append(
|
|
{
|
|
"type": "paper",
|
|
"bbox": [
|
|
max(0, x - margin),
|
|
max(0, y - margin),
|
|
min(w, x + w_cnt + margin),
|
|
min(h, y + h_cnt + margin),
|
|
],
|
|
}
|
|
)
|
|
|
|
return containers
|
|
|
|
|
|
all_results = []
|
|
start_time = time.time()
|
|
|
|
for sec in range(0, total_sec, FRAME_INTERVAL):
|
|
cap.set(cv2.CAP_PROP_POS_MSEC, sec * 1000)
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
elapsed = time.time() - start_time
|
|
eta = (elapsed / (sec / FRAME_INTERVAL + 1)) * (
|
|
total_sec / FRAME_INTERVAL - sec / FRAME_INTERVAL - 1
|
|
)
|
|
|
|
# Stage 1: Fast container detection
|
|
containers = find_containers_fast(frame)
|
|
|
|
if not containers:
|
|
if sec % 60 == 0:
|
|
print(
|
|
f" [{sec // 60}min/{total_sec // 60}min] No containers | ETA: {eta:.0f}s"
|
|
)
|
|
continue
|
|
|
|
print(
|
|
f" [{sec}s] Found {len(containers)} containers ({[c['type'] for c in containers]})"
|
|
)
|
|
|
|
# Stage 2: OWL-ViT stamp detection on each container
|
|
for container in containers:
|
|
cx1, cy1, cx2, cy2 = container["bbox"]
|
|
container_img = frame[cy1:cy2, cx1:cx2]
|
|
|
|
if container_img.size == 0:
|
|
continue
|
|
|
|
ch, cw = container_img.shape[:2]
|
|
|
|
# Scale up for better detection
|
|
scale = max(2, 500 // max(ch, cw))
|
|
if scale > 1:
|
|
scaled = cv2.resize(
|
|
container_img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC
|
|
)
|
|
else:
|
|
scaled = container_img
|
|
|
|
scaled_pil = Image.fromarray(cv2.cvtColor(scaled, cv2.COLOR_BGR2RGB))
|
|
sh, sw = scaled.shape[:2]
|
|
|
|
for term in STAMP_TERMS:
|
|
try:
|
|
inputs = processor(
|
|
text=[[term]], images=scaled_pil, return_tensors="pt"
|
|
)
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
|
|
target_sizes = torch.Tensor([sh, sw])
|
|
results = processor.post_process_object_detection(
|
|
outputs=outputs,
|
|
target_sizes=target_sizes,
|
|
threshold=MIN_STAMP_SCORE,
|
|
)
|
|
|
|
for score, label, box in zip(
|
|
results[0]["scores"], results[0]["labels"], results[0]["boxes"]
|
|
):
|
|
s = float(score)
|
|
if s > MIN_STAMP_SCORE:
|
|
sx1, sy1, sx2, sy2 = box.tolist()
|
|
|
|
orig_w = (sx2 - sx1) / scale
|
|
orig_h = (sy2 - sy1) / scale
|
|
if not (15 < orig_w < 200 and 15 < orig_h < 200):
|
|
continue
|
|
|
|
ox1 = cx1 + int(sx1 / scale)
|
|
oy1 = cy1 + int(sy1 / scale)
|
|
ox2 = cx1 + int(sx2 / scale)
|
|
oy2 = cy1 + int(sy2 / scale)
|
|
|
|
crop = frame[oy1:oy2, ox1:ox2]
|
|
if crop.size == 0:
|
|
continue
|
|
|
|
result = {
|
|
"timestamp": sec,
|
|
"container": container["type"],
|
|
"stamp_term": term,
|
|
"score": s,
|
|
"bbox": [ox1, oy1, ox2, oy2],
|
|
"size": [int(orig_w), int(orig_h)],
|
|
}
|
|
all_results.append(result)
|
|
|
|
# Save
|
|
crop_name = f"stamp_{sec}s_{term.replace(' ', '_')}_{s:.2f}.jpg"
|
|
cv2.imwrite(os.path.join(CROPS_DIR, crop_name), crop)
|
|
|
|
# Annotate full frame
|
|
cv2.rectangle(frame, (ox1, oy1), (ox2, oy2), (0, 255, 0), 3)
|
|
cv2.putText(
|
|
frame,
|
|
f"{term[:8]} {s:.2f}",
|
|
(ox1, oy1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
0.6,
|
|
(0, 255, 0),
|
|
2,
|
|
)
|
|
|
|
print(
|
|
f" 🎯 {sec}s | {term} | {s:.2f} | {int(orig_w)}x{int(orig_h)}px"
|
|
)
|
|
except Exception as e:
|
|
pass
|
|
|
|
# Save annotated frame if stamps found
|
|
if any(r["timestamp"] == sec for r in all_results):
|
|
ann_path = os.path.join(OUTPUT_DIR, f"annotated_{sec}s.jpg")
|
|
cv2.imwrite(ann_path, frame)
|
|
|
|
cap.release()
|
|
|
|
# Deduplicate by timestamp
|
|
seen = set()
|
|
unique = []
|
|
for r in all_results:
|
|
ts = r["timestamp"]
|
|
if ts not in seen:
|
|
seen.add(ts)
|
|
unique.append(r)
|
|
|
|
unique.sort(key=lambda x: x["score"], reverse=True)
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print(f"📊 Found {len(unique)} unique stamp candidates")
|
|
for r in unique:
|
|
print(
|
|
f" 🎯 {r['timestamp']}s | {r['stamp_term']} | {r['score']:.2f} | via: {r['container']}"
|
|
)
|
|
|
|
with open(os.path.join(OUTPUT_DIR, "results.json"), "w") as f:
|
|
json.dump(unique, f, indent=2)
|
|
|
|
print(f"\n🏁 Done. Crops: {CROPS_DIR}")
|