Files
momentry_core/scripts/scan_keyframes.py
Warren 8f05a7c188 feat: update Python processors and add utility scripts
- Update ASR, face, OCR, pose processors
- Add release pre-flight check script
- Add synonym generation, chunk processing scripts
- Add face recognition, stamp search utilities
2026-04-30 15:07:49 +08:00

148 lines
4.6 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Scan Multiple Frames for Stamps
"""
import os
import cv2
import torch
import types
from PIL import Image
from transformers import AutoProcessor, AutoModelForCausalLM
UUID = "384b0ff44aaaa1f1"
OUTPUT_DIR = f"output/{UUID}/florence2_results"
# Frames to check
FRAMES = [
"scan_6751.jpg",
"scan_6755.jpg",
"scan_6756.jpg", # Original
"scan_6759.jpg",
]
# Patch for compatibility
def patch_model(model):
inner_model = model.language_model
original_prepare = inner_model.prepare_inputs_for_generation
def patched_prepare(
self,
input_ids,
past_key_values=None,
attention_mask=None,
inputs_embeds=None,
**kwargs,
):
is_valid_cache = False
if past_key_values is not None:
if isinstance(past_key_values, (list, tuple)) and len(past_key_values) > 0:
first_layer = past_key_values[0]
if first_layer is not None and (
not isinstance(first_layer, (list, tuple)) or len(first_layer) > 0
):
is_valid_cache = True
if not is_valid_cache:
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"past_key_values": None,
"use_cache": True,
}
else:
return original_prepare(
input_ids,
past_key_values=past_key_values,
attention_mask=attention_mask,
inputs_embeds=inputs_embeds,
**kwargs,
)
inner_model.prepare_inputs_for_generation = types.MethodType(
patched_prepare, inner_model
)
print("🧠 Loading Florence-2 model...")
try:
processor = AutoProcessor.from_pretrained(
"microsoft/Florence-2-base", trust_remote_code=True
)
model = AutoModelForCausalLM.from_pretrained(
"microsoft/Florence-2-base", trust_remote_code=True, attn_implementation="eager"
)
patch_model(model)
prompt = "<OPEN_VOCABULARY_DETECTION>"
term = "postage stamp"
search_terms = ["postage stamp", "stamp", "envelope"]
for img_name in FRAMES:
img_path = os.path.join(OUTPUT_DIR, img_name)
if not os.path.exists(img_path):
continue
print(f"\n🔍 Scanning {img_name}...")
image = Image.open(img_path).convert("RGB")
# Mask Watermark (Top Right)
img_cv = cv2.imread(img_path)
h, w, _ = img_cv.shape
cv2.rectangle(img_cv, (w - 200, 0), (w, 200), (0, 0, 0), -1)
masked_img_path = os.path.join(OUTPUT_DIR, "masked_" + img_name)
cv2.imwrite(masked_img_path, img_cv)
masked_image = Image.open(masked_img_path).convert("RGB")
found = False
for t in search_terms:
inputs = processor(text=prompt, images=masked_image, return_tensors="pt")
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
max_new_tokens=1024,
num_beams=3,
)
generated_text = processor.batch_decode(
generated_ids, skip_special_tokens=False
)[0]
try:
parsed_answer = processor.post_process_generation(
generated_text,
task=prompt,
image_size=(masked_image.width, masked_image.height),
)
results = parsed_answer.get("<OPEN_VOCABULARY_DETECTION>", {})
bboxes = results.get("bboxes", [])
labels = results.get("bboxes_labels", [])
if bboxes:
print(f" ✅ Found '{t}' in {img_name}! ({len(bboxes)} found)")
for i, (box, label) in enumerate(zip(bboxes, labels)):
x1, y1, x2, y2 = map(int, box)
# Crop
crop = img_cv[y1:y2, x1:x2]
out_crop = os.path.join(
OUTPUT_DIR,
f"crop_{img_name.replace('.jpg', '')}_{t}_{i}.jpg",
)
cv2.imwrite(out_crop, crop)
# Draw
cv2.rectangle(img_cv, (x1, y1), (x2, y2), (0, 255, 0), 3)
found = True
else:
print(f" ❌ No '{t}' in {img_name}.")
except:
pass
if found:
res_path = os.path.join(OUTPUT_DIR, f"result_{img_name}")
cv2.imwrite(res_path, img_cv)
except Exception as e:
print(f"❌ Error: {e}")