Initial import
This commit is contained in:
498
ware_detect_diagnostic.py
Normal file
498
ware_detect_diagnostic.py
Normal file
@@ -0,0 +1,498 @@
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import pytesseract
|
||||
|
||||
|
||||
def log(msg):
|
||||
ts = time.strftime("%H:%M:%S")
|
||||
print(f"[{ts}] {msg}", flush=True)
|
||||
|
||||
|
||||
def parse_args():
|
||||
ap = argparse.ArgumentParser()
|
||||
|
||||
ap.add_argument("-v", "--video", default=None,
|
||||
help="Percorso video. Se omesso usa webcam 0")
|
||||
ap.add_argument("--weights", default="yolov2.weights",
|
||||
help="File pesi YOLOv2")
|
||||
ap.add_argument("--config", default="yolov2.cfg",
|
||||
help="File config YOLOv2")
|
||||
ap.add_argument("--labels", default="labels.txt",
|
||||
help="File labels classi")
|
||||
ap.add_argument("--tesseract-cmd", default=None,
|
||||
help="Percorso esplicito a tesseract.exe")
|
||||
|
||||
# Prestazioni / debug
|
||||
ap.add_argument("--input-size", type=int, default=320,
|
||||
help="Dimensione input YOLO")
|
||||
ap.add_argument("--detect-every", type=int, default=4,
|
||||
help="Esegue YOLO ogni N frame")
|
||||
ap.add_argument("--ocr-every-detect", type=int, default=2,
|
||||
help="Esegue OCR ogni N cicli di detection")
|
||||
ap.add_argument("--preview-width", type=int, default=1280,
|
||||
help="Larghezza massima finestra preview")
|
||||
ap.add_argument("--max-ocr-boxes", type=int, default=1,
|
||||
help="Numero massimo di etichette da leggere per detection")
|
||||
ap.add_argument("--opencv-threads", type=int, default=1,
|
||||
help="Numero thread OpenCV")
|
||||
ap.add_argument("--warmup-frames", type=int, default=3,
|
||||
help="Numero frame iniziali da leggere senza detection")
|
||||
ap.add_argument("--log-every-frame", action="store_true",
|
||||
help="Logga ogni frame")
|
||||
ap.add_argument("--no-ocr", action="store_true",
|
||||
help="Disabilita OCR per test")
|
||||
ap.add_argument("--no-detection", action="store_true",
|
||||
help="Disabilita detection YOLO per test")
|
||||
ap.add_argument("--save-log-summary-every", type=int, default=10,
|
||||
help="Ogni quanti frame stampare un riepilogo")
|
||||
|
||||
# Qualita'
|
||||
ap.add_argument("--min-confidence", type=float, default=0.30,
|
||||
help="Soglia minima confidenza")
|
||||
ap.add_argument("--label-class", default="etichetta",
|
||||
help="Nome classe etichetta")
|
||||
ap.add_argument("--min-label-width", type=int, default=50,
|
||||
help="Larghezza minima bbox etichetta")
|
||||
ap.add_argument("--min-label-height", type=int, default=20,
|
||||
help="Altezza minima bbox etichetta")
|
||||
ap.add_argument("--ocr-min-digits", type=int, default=2,
|
||||
help="Numero minimo di cifre per considerare valida una lettura")
|
||||
ap.add_argument("--show-roi", action="store_true",
|
||||
help="Mostra ROI preprocessata per OCR")
|
||||
ap.add_argument("--print-all", action="store_true",
|
||||
help="Stampa anche OCR grezzi non validi")
|
||||
|
||||
return ap.parse_args()
|
||||
|
||||
|
||||
def require_file(path_str, description):
|
||||
path = Path(path_str)
|
||||
if not path.exists():
|
||||
log(f"ERRORE: {description} non trovato: {path}")
|
||||
sys.exit(1)
|
||||
return path
|
||||
|
||||
|
||||
def load_classes(labels_path):
|
||||
with open(labels_path, "rt", encoding="utf-8") as f:
|
||||
classes = [line.strip() for line in f if line.strip()]
|
||||
|
||||
if not classes:
|
||||
log("ERRORE: labels.txt vuoto")
|
||||
sys.exit(1)
|
||||
|
||||
return classes
|
||||
|
||||
|
||||
def open_capture(video_arg):
|
||||
if video_arg is None:
|
||||
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
|
||||
if not cap.isOpened():
|
||||
cap = cv2.VideoCapture(0)
|
||||
return cap
|
||||
|
||||
if str(video_arg).isdigit():
|
||||
idx = int(video_arg)
|
||||
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
|
||||
if not cap.isOpened():
|
||||
cap = cv2.VideoCapture(idx)
|
||||
return cap
|
||||
|
||||
return cv2.VideoCapture(video_arg)
|
||||
|
||||
|
||||
def resize_preview(frame, max_width):
|
||||
h, w = frame.shape[:2]
|
||||
if max_width <= 0 or w <= max_width:
|
||||
return frame
|
||||
scale = max_width / float(w)
|
||||
new_w = int(w * scale)
|
||||
new_h = int(h * scale)
|
||||
return cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
|
||||
|
||||
|
||||
def clip_box(x1, y1, x2, y2, w, h):
|
||||
x1 = max(0, min(x1, w - 1))
|
||||
y1 = max(0, min(y1, h - 1))
|
||||
x2 = max(0, min(x2, w - 1))
|
||||
y2 = max(0, min(y2, h - 1))
|
||||
return x1, y1, x2, y2
|
||||
|
||||
|
||||
def expand_box(x1, y1, x2, y2, frame_w, frame_h, pad_ratio=0.08):
|
||||
bw = x2 - x1
|
||||
bh = y2 - y1
|
||||
pad_x = int(bw * pad_ratio)
|
||||
pad_y = int(bh * pad_ratio)
|
||||
|
||||
x1 -= pad_x
|
||||
y1 -= pad_y
|
||||
x2 += pad_x
|
||||
y2 += pad_y
|
||||
|
||||
return clip_box(x1, y1, x2, y2, frame_w, frame_h)
|
||||
|
||||
|
||||
def preprocess_for_ocr(roi):
|
||||
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
||||
gray = cv2.resize(gray, None, fx=1.7, fy=1.7, interpolation=cv2.INTER_CUBIC)
|
||||
gray = cv2.GaussianBlur(gray, (3, 3), 0)
|
||||
gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
|
||||
gray = cv2.copyMakeBorder(gray, 8, 8, 8, 8, borderType=cv2.BORDER_CONSTANT, value=255)
|
||||
return gray
|
||||
|
||||
|
||||
def ocr_digits_only(roi):
|
||||
processed = preprocess_for_ocr(roi)
|
||||
config = r'--oem 3 --psm 7 -c tessedit_char_whitelist=0123456789'
|
||||
raw_text = pytesseract.image_to_string(processed, config=config)
|
||||
digits = re.sub(r"\D+", "", raw_text)
|
||||
return digits, processed, raw_text
|
||||
|
||||
|
||||
def detect_yolov2_original_style(net, frame, classes, min_confidence, input_size):
|
||||
"""
|
||||
Parsing vicino allo script originale dell'utente.
|
||||
"""
|
||||
t0 = time.perf_counter()
|
||||
h, w = frame.shape[:2]
|
||||
|
||||
blob = cv2.dnn.blobFromImage(
|
||||
frame,
|
||||
scalefactor=1.0 / 255.0,
|
||||
size=(input_size, input_size),
|
||||
mean=(0, 0, 0),
|
||||
swapRB=False,
|
||||
crop=False
|
||||
)
|
||||
t_blob = time.perf_counter()
|
||||
|
||||
net.setInput(blob)
|
||||
predictions = net.forward()
|
||||
t_forward = time.perf_counter()
|
||||
|
||||
predictions = np.array(predictions)
|
||||
|
||||
if predictions.ndim == 4:
|
||||
predictions = predictions.reshape(predictions.shape[1], predictions.shape[-1])
|
||||
elif predictions.ndim == 3:
|
||||
predictions = predictions[0]
|
||||
|
||||
detections = []
|
||||
|
||||
if predictions.ndim != 2:
|
||||
return detections, {
|
||||
"shape": tuple(predictions.shape),
|
||||
"blob_ms": (t_blob - t0) * 1000.0,
|
||||
"forward_ms": (t_forward - t_blob) * 1000.0,
|
||||
"parse_ms": 0.0,
|
||||
"total_ms": (time.perf_counter() - t0) * 1000.0,
|
||||
}
|
||||
|
||||
for i in range(predictions.shape[0]):
|
||||
if predictions.shape[1] <= 5:
|
||||
continue
|
||||
|
||||
prob_arr = predictions[i][5:]
|
||||
if prob_arr.size == 0:
|
||||
continue
|
||||
|
||||
class_index = int(prob_arr.argmax(axis=0))
|
||||
confidence = float(prob_arr[class_index])
|
||||
|
||||
if confidence <= min_confidence:
|
||||
continue
|
||||
|
||||
x_center = float(predictions[i][0]) * w
|
||||
y_center = float(predictions[i][1]) * h
|
||||
width_box = float(predictions[i][2]) * w
|
||||
height_box = float(predictions[i][3]) * h
|
||||
|
||||
x1 = int(x_center - width_box * 0.5)
|
||||
y1 = int(y_center - height_box * 0.5)
|
||||
x2 = int(x_center + width_box * 0.5)
|
||||
y2 = int(y_center + height_box * 0.5)
|
||||
|
||||
x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, w, h)
|
||||
|
||||
detections.append({
|
||||
"class_id": class_index,
|
||||
"label": classes[class_index],
|
||||
"confidence": confidence,
|
||||
"box": (x1, y1, x2, y2),
|
||||
})
|
||||
|
||||
t_parse = time.perf_counter()
|
||||
|
||||
return detections, {
|
||||
"shape": tuple(predictions.shape),
|
||||
"blob_ms": (t_blob - t0) * 1000.0,
|
||||
"forward_ms": (t_forward - t_blob) * 1000.0,
|
||||
"parse_ms": (t_parse - t_forward) * 1000.0,
|
||||
"total_ms": (t_parse - t0) * 1000.0,
|
||||
}
|
||||
|
||||
|
||||
def draw_detection(frame, det, label_class, extra_text=""):
|
||||
x1, y1, x2, y2 = det["box"]
|
||||
label = det["label"]
|
||||
conf = det["confidence"]
|
||||
|
||||
color = (255, 255, 255)
|
||||
if label.lower() == label_class.lower():
|
||||
color = (0, 255, 255)
|
||||
elif label.lower() == "gaylord":
|
||||
color = (0, 255, 0)
|
||||
|
||||
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
|
||||
|
||||
text = f"{label} {conf:.2f}"
|
||||
if extra_text:
|
||||
text += f" | {extra_text}"
|
||||
|
||||
y_text = max(20, y1 - 8)
|
||||
cv2.putText(frame, text, (x1, y_text),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA)
|
||||
|
||||
|
||||
def best_label_detections(detections, label_name, max_boxes):
|
||||
labels = [d for d in detections if d["label"].strip().lower() == label_name.lower()]
|
||||
labels.sort(
|
||||
key=lambda d: (
|
||||
d["confidence"],
|
||||
(d["box"][2] - d["box"][0]) * (d["box"][3] - d["box"][1])
|
||||
),
|
||||
reverse=True
|
||||
)
|
||||
return labels[:max_boxes]
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
log("=== AVVIO SCRIPT DIAGNOSTICO ===")
|
||||
log(f"Python executable: {sys.executable}")
|
||||
log(f"OpenCV version: {cv2.__version__}")
|
||||
log(f"Numpy version: {np.__version__}")
|
||||
|
||||
cv2.setNumThreads(args.opencv_threads)
|
||||
log(f"OpenCV threads impostati a: {args.opencv_threads}")
|
||||
|
||||
if args.tesseract_cmd:
|
||||
pytesseract.pytesseract.tesseract_cmd = args.tesseract_cmd
|
||||
log(f"Tesseract cmd esplicito: {args.tesseract_cmd}")
|
||||
|
||||
require_file(args.weights, "File pesi YOLOv2")
|
||||
require_file(args.config, "File config YOLOv2")
|
||||
require_file(args.labels, "File labels")
|
||||
|
||||
t0 = time.perf_counter()
|
||||
classes = load_classes(args.labels)
|
||||
log(f"Labels caricate in {(time.perf_counter() - t0) * 1000.0:.1f} ms: {classes}")
|
||||
|
||||
log("Caricamento rete YOLOv2...")
|
||||
t0 = time.perf_counter()
|
||||
net = cv2.dnn.readNetFromDarknet(args.config, args.weights)
|
||||
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
|
||||
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
|
||||
log(f"Rete caricata in {(time.perf_counter() - t0) * 1000.0:.1f} ms")
|
||||
|
||||
log("Apertura sorgente video...")
|
||||
t0 = time.perf_counter()
|
||||
cap = open_capture(args.video)
|
||||
log(f"VideoCapture creato in {(time.perf_counter() - t0) * 1000.0:.1f} ms")
|
||||
|
||||
if not cap.isOpened():
|
||||
log("ERRORE: impossibile aprire la sorgente video")
|
||||
sys.exit(1)
|
||||
|
||||
win_name = "YOLOv2 diagnostico"
|
||||
cv2.namedWindow(win_name, cv2.WINDOW_NORMAL)
|
||||
log("Finestra preview creata")
|
||||
|
||||
frame_idx = 0
|
||||
detect_cycle = 0
|
||||
|
||||
last_detections = []
|
||||
last_text_by_box = {}
|
||||
|
||||
accum_read_ms = 0.0
|
||||
accum_detect_ms = 0.0
|
||||
accum_ocr_ms = 0.0
|
||||
accum_draw_ms = 0.0
|
||||
|
||||
# Warmup iniziale senza detection
|
||||
log(f"Lettura warmup frame iniziali: {args.warmup_frames}")
|
||||
for i in range(args.warmup_frames):
|
||||
t_read0 = time.perf_counter()
|
||||
grabbed, frame = cap.read()
|
||||
t_read1 = time.perf_counter()
|
||||
if not grabbed or frame is None:
|
||||
log(f"ERRORE durante warmup frame {i + 1}")
|
||||
break
|
||||
log(f"Warmup frame {i + 1}/{args.warmup_frames}: read {(t_read1 - t_read0) * 1000.0:.1f} ms, shape={frame.shape}")
|
||||
|
||||
log("Entrata nel loop principale")
|
||||
|
||||
while True:
|
||||
t_frame0 = time.perf_counter()
|
||||
grabbed, frame = cap.read()
|
||||
t_frame1 = time.perf_counter()
|
||||
|
||||
if not grabbed or frame is None:
|
||||
log("Fine stream o impossibile leggere il frame")
|
||||
break
|
||||
|
||||
frame_idx += 1
|
||||
read_ms = (t_frame1 - t_frame0) * 1000.0
|
||||
accum_read_ms += read_ms
|
||||
|
||||
if args.log_every_frame:
|
||||
log(f"Frame {frame_idx}: read {read_ms:.1f} ms, shape={frame.shape}")
|
||||
|
||||
detect_info = None
|
||||
ocr_this_cycle_ms = 0.0
|
||||
|
||||
# Detection NON su ogni frame
|
||||
if not args.no_detection and (frame_idx == 1 or (frame_idx % args.detect_every == 0)):
|
||||
detect_cycle += 1
|
||||
log(f"Frame {frame_idx}: INIZIO DETECTION ciclo {detect_cycle}")
|
||||
|
||||
t_det0 = time.perf_counter()
|
||||
last_detections, detect_info = detect_yolov2_original_style(
|
||||
net=net,
|
||||
frame=frame,
|
||||
classes=classes,
|
||||
min_confidence=args.min_confidence,
|
||||
input_size=args.input_size
|
||||
)
|
||||
t_det1 = time.perf_counter()
|
||||
detect_ms = (t_det1 - t_det0) * 1000.0
|
||||
accum_detect_ms += detect_ms
|
||||
|
||||
log(
|
||||
f"Frame {frame_idx}: FINE DETECTION | "
|
||||
f"totale={detect_ms:.1f} ms | "
|
||||
f"blob={detect_info['blob_ms']:.1f} ms | "
|
||||
f"forward={detect_info['forward_ms']:.1f} ms | "
|
||||
f"parse={detect_info['parse_ms']:.1f} ms | "
|
||||
f"shape_out={detect_info['shape']} | "
|
||||
f"num_det={len(last_detections)}"
|
||||
)
|
||||
|
||||
# OCR solo ogni N detection cycles
|
||||
if not args.no_ocr and detect_cycle % args.ocr_every_detect == 0:
|
||||
label_dets = best_label_detections(
|
||||
last_detections,
|
||||
args.label_class,
|
||||
args.max_ocr_boxes
|
||||
)
|
||||
log(f"Frame {frame_idx}: INIZIO OCR su {len(label_dets)} box")
|
||||
|
||||
new_text_by_box = {}
|
||||
|
||||
for idx, det in enumerate(label_dets, start=1):
|
||||
x1, y1, x2, y2 = det["box"]
|
||||
bw = x2 - x1
|
||||
bh = y2 - y1
|
||||
|
||||
if bw < args.min_label_width or bh < args.min_label_height:
|
||||
log(f"Frame {frame_idx}: OCR box {idx} scartato per dimensioni {bw}x{bh}")
|
||||
continue
|
||||
|
||||
rx1, ry1, rx2, ry2 = expand_box(x1, y1, x2, y2, frame.shape[1], frame.shape[0])
|
||||
roi = frame[ry1:ry2, rx1:rx2]
|
||||
|
||||
if roi.size == 0:
|
||||
log(f"Frame {frame_idx}: OCR box {idx} scartato per ROI vuota")
|
||||
continue
|
||||
|
||||
t_ocr0 = time.perf_counter()
|
||||
digits, processed, raw_text = ocr_digits_only(roi)
|
||||
t_ocr1 = time.perf_counter()
|
||||
|
||||
one_ocr_ms = (t_ocr1 - t_ocr0) * 1000.0
|
||||
ocr_this_cycle_ms += one_ocr_ms
|
||||
|
||||
log(
|
||||
f"Frame {frame_idx}: OCR box {idx} "
|
||||
f"{bw}x{bh} -> {one_ocr_ms:.1f} ms | "
|
||||
f"raw='{raw_text.strip()}' | digits='{digits}'"
|
||||
)
|
||||
|
||||
if len(digits) >= args.ocr_min_digits:
|
||||
box_key = (x1, y1, x2, y2)
|
||||
new_text_by_box[box_key] = digits
|
||||
|
||||
if args.show_roi:
|
||||
cv2.imshow("ROI OCR", processed)
|
||||
|
||||
if new_text_by_box:
|
||||
last_text_by_box = new_text_by_box
|
||||
|
||||
accum_ocr_ms += ocr_this_cycle_ms
|
||||
log(f"Frame {frame_idx}: FINE OCR totale {ocr_this_cycle_ms:.1f} ms")
|
||||
|
||||
t_draw0 = time.perf_counter()
|
||||
display = frame.copy()
|
||||
|
||||
for det in last_detections:
|
||||
x1, y1, x2, y2 = det["box"]
|
||||
box_key = (x1, y1, x2, y2)
|
||||
extra = ""
|
||||
|
||||
if box_key in last_text_by_box:
|
||||
extra = f"NUM: {last_text_by_box[box_key]}"
|
||||
|
||||
draw_detection(display, det, args.label_class, extra_text=extra)
|
||||
|
||||
status_lines = [
|
||||
f"frame={frame_idx}",
|
||||
f"detect_every={args.detect_every}",
|
||||
f"ocr_every_detect={args.ocr_every_detect}",
|
||||
f"last_det={len(last_detections)}",
|
||||
]
|
||||
|
||||
y = 25
|
||||
for line in status_lines:
|
||||
cv2.putText(display, line, (10, y),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA)
|
||||
y += 28
|
||||
|
||||
display = resize_preview(display, args.preview_width)
|
||||
cv2.imshow(win_name, display)
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
t_draw1 = time.perf_counter()
|
||||
|
||||
draw_ms = (t_draw1 - t_draw0) * 1000.0
|
||||
accum_draw_ms += draw_ms
|
||||
|
||||
if args.log_every_frame:
|
||||
log(f"Frame {frame_idx}: draw+imshow {draw_ms:.1f} ms")
|
||||
|
||||
if frame_idx % max(1, args.save_log_summary_every) == 0:
|
||||
log(
|
||||
f"RIEPILOGO fino a frame {frame_idx}: "
|
||||
f"avg_read={accum_read_ms / frame_idx:.1f} ms | "
|
||||
f"avg_detect={(accum_detect_ms / max(1, detect_cycle)):.1f} ms per ciclo | "
|
||||
f"avg_ocr={(accum_ocr_ms / max(1, detect_cycle)):.1f} ms per ciclo | "
|
||||
f"avg_draw={accum_draw_ms / frame_idx:.1f} ms"
|
||||
)
|
||||
|
||||
if key == ord("q"):
|
||||
log("Premuto q, uscita")
|
||||
break
|
||||
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
log("=== FINE SCRIPT ===")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user