Files
flywms/flywms.py
2026-05-15 09:54:10 +02:00

1645 lines
58 KiB
Python

import argparse
import json
import os
import re
import subprocess
import sys
import tempfile
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import cv2
import numpy as np
@dataclass(frozen=True)
class FramePacket:
frame_id: int
timestamp: float
frame: np.ndarray
width: int
height: int
source: str
@dataclass(frozen=True)
class Detection:
class_id: int
class_name: str
confidence: float
bbox: tuple[int, int, int, int]
@dataclass(frozen=True)
class DetectionResult:
frame_id: int
timestamp: float
detections: list[Detection]
inference_ms: float
blob_ms: float
forward_ms: float
parse_ms: float
source_width: int
source_height: int
@dataclass(frozen=True)
class RoiPacket:
roi_id: int
source_frame_id: int
timestamp: float
class_name: str
confidence: float
bbox: tuple[int, int, int, int]
roi_image: np.ndarray
width: int
height: int
@dataclass(frozen=True)
class OcrResult:
roi_id: int
source_frame_id: int
timestamp: float
text: str
raw_text: str
bbox: tuple[int, int, int, int]
ocr_ms: float
@dataclass(frozen=True)
class OcrEngineResult:
digits: str
processed: np.ndarray
raw_text: str
code_roi: np.ndarray
@dataclass
class BufferStats:
pushed: int = 0
popped: int = 0
dropped_on_put: int = 0
skipped_on_pop: int = 0
waits: int = 0
max_depth_seen: int = 0
@dataclass
class RuntimeStats:
capture_frames: int = 0
display_frames: int = 0
yolo_submitted_frames: int = 0
quality_rejected_frames: int = 0
quality_cycles: int = 0
yolo_cycles: int = 0
ocr_cycles: int = 0
quality_total_ms: float = 0.0
quality_score_total: float = 0.0
yolo_total_ms: float = 0.0
yolo_blob_ms: float = 0.0
yolo_forward_ms: float = 0.0
yolo_parse_ms: float = 0.0
ocr_total_ms: float = 0.0
display_total_ms: float = 0.0
capture_read_total_ms: float = 0.0
last_capture_frame_id: int = 0
last_quality_score: float = 0.0
last_quality_passed: bool = True
last_yolo_frame_id: int = 0
last_ocr_frame_id: int = 0
last_detection_count: int = 0
last_detection_summary: str = ""
class LatestBuffer:
def __init__(self, max_size: int, name: str):
if max_size < 1:
raise ValueError("max_size must be >= 1")
self.max_size = max_size
self.name = name
self._items: deque[Any] = deque(maxlen=max_size)
self._condition = threading.Condition()
self._stats = BufferStats()
def put(self, item: Any) -> None:
with self._condition:
if len(self._items) == self.max_size:
self._stats.dropped_on_put += 1
self._items.append(item)
self._stats.pushed += 1
self._stats.max_depth_seen = max(
self._stats.max_depth_seen,
len(self._items),
)
self._condition.notify_all()
def get_latest_blocking(self, stop_event: threading.Event) -> Any | None:
with self._condition:
while not self._items and not stop_event.is_set():
self._stats.waits += 1
self._condition.wait(timeout=0.1)
if stop_event.is_set():
return None
latest = self._items[-1]
skipped = len(self._items) - 1
self._stats.skipped_on_pop += skipped
self._items.clear()
self._stats.popped += 1
return latest
def wake_all(self) -> None:
with self._condition:
self._condition.notify_all()
def stats(self) -> BufferStats:
with self._condition:
return BufferStats(**self._stats.__dict__)
def depth(self) -> int:
with self._condition:
return len(self._items)
class SharedState:
def __init__(self, ocr_history_size: int):
self._lock = threading.Lock()
self.latest_detection: DetectionResult | None = None
self.ocr_results: deque[OcrResult] = deque(maxlen=ocr_history_size)
self.debug_yolo_frame: np.ndarray | None = None
self.debug_ocr_frame: np.ndarray | None = None
self.debug_ocr_text: str = ""
self.stats = RuntimeStats()
def set_latest_detection(self, result: DetectionResult) -> None:
with self._lock:
self.latest_detection = result
self.stats.yolo_cycles += 1
self.stats.yolo_total_ms += result.inference_ms
self.stats.yolo_blob_ms += result.blob_ms
self.stats.yolo_forward_ms += result.forward_ms
self.stats.yolo_parse_ms += result.parse_ms
self.stats.last_yolo_frame_id = result.frame_id
self.stats.last_detection_count = len(result.detections)
counts: dict[str, int] = {}
for det in result.detections:
counts[det.class_name] = counts.get(det.class_name, 0) + 1
self.stats.last_detection_summary = ",".join(
f"{name}:{count}" for name, count in sorted(counts.items())
)
def get_latest_detection(self) -> DetectionResult | None:
with self._lock:
return self.latest_detection
def add_ocr_result(self, result: OcrResult) -> None:
with self._lock:
self.ocr_results.append(result)
self.stats.ocr_cycles += 1
self.stats.ocr_total_ms += result.ocr_ms
self.stats.last_ocr_frame_id = result.source_frame_id
def get_recent_ocr_results(self) -> list[OcrResult]:
with self._lock:
return list(self.ocr_results)
def set_debug_yolo_frame(self, frame: np.ndarray | None) -> None:
with self._lock:
self.debug_yolo_frame = None if frame is None else frame.copy()
def set_debug_ocr_frame(self, frame: np.ndarray | None, text: str = "") -> None:
with self._lock:
self.debug_ocr_frame = None if frame is None else frame.copy()
self.debug_ocr_text = text
def get_debug_frames(self) -> tuple[np.ndarray | None, np.ndarray | None, str]:
with self._lock:
yolo = None if self.debug_yolo_frame is None else self.debug_yolo_frame.copy()
ocr = None if self.debug_ocr_frame is None else self.debug_ocr_frame.copy()
return yolo, ocr, self.debug_ocr_text
def add_capture_read(self, frame_id: int, read_ms: float) -> None:
with self._lock:
self.stats.capture_frames += 1
self.stats.capture_read_total_ms += read_ms
self.stats.last_capture_frame_id = frame_id
def add_quality_result(
self,
score: float,
passed: bool,
elapsed_ms: float,
submitted_to_yolo: bool,
) -> None:
with self._lock:
self.stats.quality_cycles += 1
self.stats.quality_score_total += score
self.stats.quality_total_ms += elapsed_ms
self.stats.last_quality_score = score
self.stats.last_quality_passed = passed
if submitted_to_yolo:
self.stats.yolo_submitted_frames += 1
else:
self.stats.quality_rejected_frames += 1
def add_display(self, display_ms: float) -> None:
with self._lock:
self.stats.display_frames += 1
self.stats.display_total_ms += display_ms
def snapshot_stats(self) -> RuntimeStats:
with self._lock:
return RuntimeStats(**self.stats.__dict__)
class IdGenerator:
def __init__(self, start: int = 1):
self._value = start
self._lock = threading.Lock()
def next(self) -> int:
with self._lock:
value = self._value
self._value += 1
return value
def parse_args():
ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video", default=None,
help="Percorso video. Se omesso usa webcam 0")
ap.add_argument("--weights", default="yolov2.weights",
help="File pesi YOLOv2")
ap.add_argument("--config", default="yolov2.cfg",
help="File config YOLOv2")
ap.add_argument("--labels", default="labels.txt",
help="File labels classi")
ap.add_argument("--tesseract-cmd", default=None,
help="Percorso esplicito a tesseract.exe")
ap.add_argument("--backend", choices=["cpu", "cuda", "cuda-fp16"],
default="cpu", help="Backend OpenCV DNN")
ap.add_argument("--input-size", type=int, default=416,
help="Dimensione input YOLO")
ap.add_argument("--swap-rb", action="store_true",
help="Scambia canali R/B nella blob YOLO")
ap.add_argument("--frame-buffer-size", type=int, default=10,
help="Dimensione latest buffer frame")
ap.add_argument("--roi-buffer-size", type=int, default=20,
help="Dimensione latest buffer ROI")
ap.add_argument("--ocr-history-size", type=int, default=100,
help="Numero risultati OCR recenti mantenuti in memoria")
ap.add_argument("--preview-width", type=int, default=1280,
help="Larghezza massima preview")
ap.add_argument("--stats-interval", type=float, default=2.0,
help="Secondi tra riepiloghi prestazioni")
ap.add_argument("--max-frames", type=int, default=0,
help="Numero massimo frame da leggere; 0 = fino a fine stream")
ap.add_argument("--drain-seconds", type=float, default=0.0,
help="Secondi di attesa dopo max/fine stream per benchmark headless")
ap.add_argument("--realtime-playback", action="store_true",
help="Per file video, limita il loop al framerate del video")
ap.add_argument("--opencv-threads", type=int, default=1,
help="Numero thread OpenCV")
ap.add_argument("--quality-filter", action="store_true",
help="Filtra i frame troppo sfocati/mossi prima del buffer YOLO")
ap.add_argument("--blur-metric", choices=["laplacian", "tenengrad"],
default="laplacian", help="Metrica nitidezza usata dal filtro qualita'")
ap.add_argument("--min-sharpness", type=float, default=80.0,
help="Soglia minima nitidezza per inviare il frame a YOLO")
ap.add_argument("--blur-resize-width", type=int, default=320,
help="Larghezza usata per ridurre il frame prima della metrica blur")
ap.add_argument("--debug-quality-log", action="store_true",
help="Logga lo score qualita' di ogni frame")
ap.add_argument("--debug-rejected-window", action="store_true",
help="Mostra una finestra con i frame scartati dal filtro qualita'")
ap.add_argument("--min-confidence", type=float, default=0.30,
help="Soglia minima confidenza")
ap.add_argument("--nms-threshold", type=float, default=0.40,
help="Soglia NMS")
ap.add_argument("--use-nms", action="store_true",
help="Applica NMS alle detection; default off per compatibilita' YOLOv2")
ap.add_argument("--label-class", default="etichetta",
help="Nome classe etichetta su cui fare OCR")
ap.add_argument("--min-label-width", type=int, default=50,
help="Larghezza minima bbox etichetta")
ap.add_argument("--min-label-height", type=int, default=20,
help="Altezza minima bbox etichetta")
ap.add_argument("--max-roi-per-frame", type=int, default=2,
help="Numero massimo ROI etichetta inviate a OCR per detection")
ap.add_argument("--infer-gaylord-from-label", action="store_true",
help="Disegna un box gaylord stimato partendo dalle etichette se YOLO non trova gaylord")
ap.add_argument("--inferred-gaylord-width-factor", type=float, default=3.6,
help="Larghezza box gaylord stimato rispetto alla label")
ap.add_argument("--inferred-gaylord-height-factor", type=float, default=4.2,
help="Altezza box gaylord stimato rispetto alla label")
ap.add_argument("--inferred-gaylord-y-shift", type=float, default=1.35,
help="Spostamento verticale verso il basso, in multipli dell'altezza label")
ap.add_argument("--slot-size", type=int, default=120,
help="Dimensione griglia per cooldown OCR")
ap.add_argument("--ocr-cooldown-sec", type=float, default=1.0,
help="Secondi minimi prima di reinviare OCR sulla stessa zona")
ap.add_argument("--ocr-min-digits", type=int, default=2,
help="Numero minimo cifre per lettura valida")
ap.add_argument("--ocr-backend", choices=["tesseract", "paddle", "easyocr"],
default="paddle", help="Motore OCR da usare")
ap.add_argument("--ocr-lang", default="en",
help="Lingua OCR")
ap.add_argument("--easyocr-gpu", choices=["auto", "on", "off"],
default="auto", help="Uso GPU EasyOCR")
ap.add_argument("--easyocr-mode", choices=["subprocess", "inprocess"],
default="subprocess", help="Modalita' EasyOCR")
ap.add_argument("--easyocr-worker", action="store_true",
help=argparse.SUPPRESS)
ap.add_argument("--ocr-input", choices=["roi", "processed"],
default="roi", help="Immagine passata al motore OCR")
ap.add_argument("--ocr-code-mode", choices=["full", "fixed-band", "large-components"],
default="fixed-band", help="Prefiltro per isolare il codice grande")
ap.add_argument("--ocr-scale", type=float, default=1.5,
help="Fattore di ingrandimento preprocess OCR")
ap.add_argument("--ocr-max-width", type=int, default=900,
help="Larghezza massima immagine inviata all'OCR")
ap.add_argument("--ocr-band-x1", type=float, default=0.0,
help="Crop fisso OCR: x iniziale percentuale 0..1")
ap.add_argument("--ocr-band-y1", type=float, default=0.0,
help="Crop fisso OCR: y iniziale percentuale 0..1")
ap.add_argument("--ocr-band-x2", type=float, default=1.0,
help="Crop fisso OCR: x finale percentuale 0..1")
ap.add_argument("--ocr-band-y2", type=float, default=1.0,
help="Crop fisso OCR: y finale percentuale 0..1")
ap.add_argument("--ocr-component-min-height-ratio", type=float, default=0.22,
help="Altezza minima componente grande rispetto alla ROI")
ap.add_argument("--ocr-component-min-area-ratio", type=float, default=0.002,
help="Area minima componente grande rispetto alla ROI")
ap.add_argument("--ocr-component-pad-ratio", type=float, default=0.08,
help="Padding crop finale componenti grandi")
ap.add_argument("--ocr-pad-ratio", type=float, default=0.20,
help="Padding bbox etichetta prima dell'OCR")
ap.add_argument("--ocr-submit-min-interval", type=float, default=2.0,
help="Secondi minimi globali tra due ROI inviate all'OCR")
ap.add_argument("--ocr-max-pending", type=int, default=1,
help="Numero massimo ROI pendenti prima di saltare nuovi invii OCR")
ap.add_argument("--paddle-text-det-limit-side-len", type=int, default=320,
help="Parametro PaddleOCR text_det_limit_side_len")
ap.add_argument("--paddle-text-rec-score-thresh", type=float, default=0.0,
help="Soglia riconoscimento PaddleOCR")
ap.add_argument("--print-all-ocr", action="store_true",
help="Stampa anche OCR grezzi non validi")
ap.add_argument("--save-ocr-roi-dir", default=None,
help="Directory dove salvare ROI OCR raw/code/processed per debug")
ap.add_argument("--no-ocr", action="store_true",
help="Disabilita OCR; utile per benchmark YOLO/capture")
ap.add_argument("--debug-yolo-window", action="store_true",
help="Mostra una finestra debug con l'ultimo frame YOLO")
ap.add_argument("--debug-ocr-window", action="store_true",
help="Mostra una finestra debug con l'ultima ROI preprocessata")
ap.add_argument("--debug-yolo-output", action="store_true",
help="Logga shape e confidenze grezze dell'output YOLO")
ap.add_argument("--debug-yolo-top", type=int, default=0,
help="Logga le top N righe raw YOLO per confidenza classe")
ap.add_argument("--debug-inferred-gaylord", action="store_true",
help="Logga quanti box gaylord stimati vengono generati")
ap.add_argument("--no-display", action="store_true",
help="Disabilita finestre video, utile per benchmark")
return ap.parse_args()
def log(msg: str) -> None:
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
def require_file(path_str: str, description: str) -> Path:
path = Path(path_str)
if not path.exists():
log(f"ERRORE: {description} non trovato: {path}")
sys.exit(1)
return path
def load_classes(labels_path: str) -> list[str]:
with open(labels_path, "rt", encoding="utf-8") as f:
classes = [line.strip() for line in f if line.strip()]
if not classes:
log("ERRORE: labels.txt vuoto")
sys.exit(1)
return classes
def open_capture(video_arg: str | None):
if video_arg is None:
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(0)
return cap, "camera:0"
if str(video_arg).isdigit():
idx = int(video_arg)
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(idx)
return cap, f"camera:{idx}"
return cv2.VideoCapture(video_arg), str(video_arg)
def configure_net_backend(net, backend: str) -> None:
if backend == "cpu":
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
return
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
if backend == "cuda-fp16":
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA_FP16)
else:
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray:
h, w = frame.shape[:2]
if max_width <= 0 or w <= max_width:
return frame
scale = max_width / float(w)
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
def clip_box(x1: int, y1: int, x2: int, y2: int,
w: int, h: int) -> tuple[int, int, int, int]:
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
return x1, y1, x2, y2
def expand_box(x1: int, y1: int, x2: int, y2: int,
frame_w: int, frame_h: int,
pad_ratio: float = 0.08) -> tuple[int, int, int, int]:
bw = x2 - x1
bh = y2 - y1
pad_x = int(bw * pad_ratio)
pad_y = int(bh * pad_ratio)
return clip_box(
x1 - pad_x,
y1 - pad_y,
x2 + pad_x,
y2 + pad_y,
frame_w,
frame_h,
)
def quantized_slot_key(bbox: tuple[int, int, int, int], slot_size: int) -> tuple[int, int]:
x1, y1, x2, y2 = bbox
cx = (x1 + x2) // 2
cy = (y1 + y2) // 2
return cx // slot_size, cy // slot_size
def limit_width(image: np.ndarray, max_width: int) -> np.ndarray:
if max_width <= 0:
return image
h, w = image.shape[:2]
if w <= max_width:
return image
scale = max_width / float(w)
new_h = max(1, int(h * scale))
return cv2.resize(image, (max_width, new_h), interpolation=cv2.INTER_AREA)
def preprocess_for_ocr(roi: np.ndarray, scale: float = 1.5, max_width: int = 900) -> np.ndarray:
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
if scale != 1.0:
gray = cv2.resize(gray, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
gray = cv2.GaussianBlur(gray, (3, 3), 0)
gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
gray = cv2.copyMakeBorder(
gray, 8, 8, 8, 8,
borderType=cv2.BORDER_CONSTANT,
value=255,
)
gray = limit_width(gray, max_width)
return gray
def crop_fixed_band(
roi: np.ndarray,
x1_ratio: float,
y1_ratio: float,
x2_ratio: float,
y2_ratio: float,
) -> np.ndarray:
h, w = roi.shape[:2]
x1 = int(max(0.0, min(1.0, x1_ratio)) * w)
y1 = int(max(0.0, min(1.0, y1_ratio)) * h)
x2 = int(max(0.0, min(1.0, x2_ratio)) * w)
y2 = int(max(0.0, min(1.0, y2_ratio)) * h)
x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, w, h)
if x2 <= x1 or y2 <= y1:
return roi
return roi[y1:y2, x1:x2].copy()
def crop_large_components(
roi: np.ndarray,
min_height_ratio: float,
min_area_ratio: float,
pad_ratio: float,
) -> np.ndarray:
h, w = roi.shape[:2]
if h <= 0 or w <= 0:
return roi
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (3, 3), 0)
binary = cv2.threshold(
gray,
0,
255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU,
)[1]
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel, iterations=1)
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
min_h = max(3, int(h * min_height_ratio))
min_area = max(4, int(w * h * min_area_ratio))
boxes: list[tuple[int, int, int, int]] = []
for contour in contours:
x, y, bw, bh = cv2.boundingRect(contour)
area = bw * bh
if bh < min_h or area < min_area:
continue
aspect = bw / float(max(1, bh))
if aspect < 0.12 or aspect > 1.25:
continue
boxes.append((x, y, x + bw, y + bh))
if not boxes:
return roi
x1 = min(box[0] for box in boxes)
y1 = min(box[1] for box in boxes)
x2 = max(box[2] for box in boxes)
y2 = max(box[3] for box in boxes)
x1, y1, x2, y2 = expand_box(x1, y1, x2, y2, w, h, pad_ratio=pad_ratio)
if x2 <= x1 or y2 <= y1:
return roi
return roi[y1:y2, x1:x2].copy()
def extract_code_roi(roi: np.ndarray, args) -> np.ndarray:
if args.ocr_code_mode == "full":
return roi
if args.ocr_code_mode == "fixed-band":
return crop_fixed_band(
roi,
args.ocr_band_x1,
args.ocr_band_y1,
args.ocr_band_x2,
args.ocr_band_y2,
)
return crop_large_components(
roi,
args.ocr_component_min_height_ratio,
args.ocr_component_min_area_ratio,
args.ocr_component_pad_ratio,
)
def ocr_digits_only(roi: np.ndarray, pytesseract_module: Any) -> OcrEngineResult:
processed = preprocess_for_ocr(roi)
config = r"--oem 3 --psm 7 -c tessedit_char_whitelist=0123456789"
raw_text = pytesseract_module.image_to_string(processed, config=config)
digits = re.sub(r"\D+", "", raw_text)
return OcrEngineResult(digits, processed, raw_text, roi)
class TesseractOcrEngine:
def __init__(self, args):
import pytesseract
if args.tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = args.tesseract_cmd
self._pytesseract = pytesseract
self._args = args
def read_digits(self, roi: np.ndarray) -> OcrEngineResult:
code_roi = extract_code_roi(roi, self._args)
return ocr_digits_only(code_roi, self._pytesseract)
class PaddleOcrEngine:
def __init__(self, args):
from paddleocr import PaddleOCR
self._ocr = PaddleOCR(
lang=args.ocr_lang,
use_doc_orientation_classify=False,
use_doc_unwarping=False,
use_textline_orientation=False,
text_det_limit_side_len=args.paddle_text_det_limit_side_len,
text_rec_score_thresh=args.paddle_text_rec_score_thresh,
)
self._input_mode = args.ocr_input
self._args = args
def read_digits(self, roi: np.ndarray) -> OcrEngineResult:
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
result = self._ocr.predict(ocr_input)
texts: list[str] = []
for item in result:
if isinstance(item, dict):
rec_texts = item.get("rec_texts") or item.get("texts") or []
if isinstance(rec_texts, str):
texts.append(rec_texts)
else:
texts.extend(str(text) for text in rec_texts)
elif isinstance(item, (list, tuple)):
texts.append(str(item))
raw_text = " ".join(texts)
digits = re.sub(r"\D+", "", raw_text)
return OcrEngineResult(digits, processed, raw_text, code_roi)
class EasyOcrInProcessEngine:
def __init__(self, args):
import easyocr
self._args = args
self._input_mode = args.ocr_input
langs = [part.strip() for part in args.ocr_lang.split(",") if part.strip()]
if not langs:
langs = ["en"]
requested_gpu = args.easyocr_gpu != "off"
if args.easyocr_gpu == "on":
self._reader = easyocr.Reader(langs, gpu=True, verbose=False)
self._using_gpu = True
elif requested_gpu:
try:
self._reader = easyocr.Reader(langs, gpu=True, verbose=False)
self._using_gpu = True
except Exception as exc:
log(f"EasyOCR GPU non disponibile, fallback CPU: {exc}")
self._reader = easyocr.Reader(langs, gpu=False, verbose=False)
self._using_gpu = False
else:
self._reader = easyocr.Reader(langs, gpu=False, verbose=False)
self._using_gpu = False
log(f"EasyOCR device: {'gpu' if self._using_gpu else 'cpu'}")
def read_digits(self, roi: np.ndarray) -> OcrEngineResult:
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
result = self._reader.readtext(
ocr_input,
allowlist="0123456789",
detail=1,
paragraph=False,
)
texts: list[str] = []
for item in result:
if isinstance(item, (list, tuple)) and len(item) >= 2:
texts.append(str(item[1]))
else:
texts.append(str(item))
raw_text = " ".join(texts)
digits = re.sub(r"\D+", "", raw_text)
return OcrEngineResult(digits, processed, raw_text, code_roi)
class EasyOcrProcessEngine:
def __init__(self, args):
self._args = args
self._input_mode = args.ocr_input
self._tmpdir = tempfile.TemporaryDirectory(prefix="flywms_easyocr_")
cmd = [
sys.executable,
str(Path(__file__).resolve()),
"--easyocr-worker",
"--ocr-lang", args.ocr_lang,
"--easyocr-gpu", args.easyocr_gpu,
]
env = os.environ.copy()
env["PYTHONUTF8"] = "1"
self._proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
env=env,
)
ready = self._read_json_line()
if ready.get("event") != "ready":
raise RuntimeError(f"EasyOCR worker non pronto: {ready}")
log(f"EasyOCR worker process device: {ready.get('device', 'unknown')}")
def _read_json_line(self) -> dict[str, Any]:
assert self._proc.stdout is not None
while True:
line = self._proc.stdout.readline()
if line == "":
err = ""
if self._proc.stderr is not None:
err = self._proc.stderr.read()
raise RuntimeError(f"EasyOCR worker terminato: {err.strip()}")
line = line.strip()
if not line:
continue
try:
return json.loads(line)
except json.JSONDecodeError:
continue
def read_digits(self, roi: np.ndarray) -> OcrEngineResult:
if self._proc.poll() is not None:
raise RuntimeError("EasyOCR worker non attivo")
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
image_path = Path(self._tmpdir.name) / f"ocr_{time.perf_counter_ns()}.png"
cv2.imwrite(str(image_path), ocr_input)
assert self._proc.stdin is not None
self._proc.stdin.write(json.dumps({"path": str(image_path)}) + "\n")
self._proc.stdin.flush()
response = self._read_json_line()
try:
image_path.unlink(missing_ok=True)
except OSError:
pass
if response.get("error"):
raise RuntimeError(str(response["error"]))
raw_text = str(response.get("raw_text", ""))
digits = re.sub(r"\D+", "", raw_text)
return OcrEngineResult(digits, processed, raw_text, code_roi)
def close(self) -> None:
if getattr(self, "_proc", None) is None:
return
if self._proc.poll() is None:
try:
assert self._proc.stdin is not None
self._proc.stdin.write(json.dumps({"cmd": "stop"}) + "\n")
self._proc.stdin.flush()
except Exception:
pass
try:
self._proc.wait(timeout=3)
except subprocess.TimeoutExpired:
self._proc.kill()
self._tmpdir.cleanup()
def __del__(self):
try:
self.close()
except Exception:
pass
def create_ocr_engine(args):
if args.ocr_backend == "paddle":
return PaddleOcrEngine(args)
if args.ocr_backend == "easyocr":
if args.easyocr_mode == "inprocess":
return EasyOcrInProcessEngine(args)
return EasyOcrProcessEngine(args)
return TesseractOcrEngine(args)
def run_easyocr_worker(args) -> int:
import easyocr
langs = [part.strip() for part in args.ocr_lang.split(",") if part.strip()] or ["en"]
requested_gpu = args.easyocr_gpu != "off"
using_gpu = False
if args.easyocr_gpu == "on":
reader = easyocr.Reader(langs, gpu=True, verbose=False)
using_gpu = True
elif requested_gpu:
try:
reader = easyocr.Reader(langs, gpu=True, verbose=False)
using_gpu = True
except Exception:
reader = easyocr.Reader(langs, gpu=False, verbose=False)
else:
reader = easyocr.Reader(langs, gpu=False, verbose=False)
print(json.dumps({"event": "ready", "device": "gpu" if using_gpu else "cpu"}), flush=True)
for line in sys.stdin:
try:
request = json.loads(line)
if request.get("cmd") == "stop":
break
result = reader.readtext(
request["path"],
allowlist="0123456789",
detail=1,
paragraph=False,
)
texts: list[str] = []
for item in result:
if isinstance(item, (list, tuple)) and len(item) >= 2:
texts.append(str(item[1]))
else:
texts.append(str(item))
raw_text = " ".join(texts)
print(json.dumps({"raw_text": raw_text}), flush=True)
except Exception as exc:
print(json.dumps({"error": str(exc)}), flush=True)
return 0
def save_ocr_debug_images(
output_dir: Path,
roi_packet: RoiPacket,
engine_result: OcrEngineResult,
) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
prefix = output_dir / f"frame_{roi_packet.source_frame_id:06d}_roi_{roi_packet.roi_id:06d}"
cv2.imwrite(str(prefix) + "_raw.png", roi_packet.roi_image)
cv2.imwrite(str(prefix) + "_code.png", engine_result.code_roi)
cv2.imwrite(str(prefix) + "_processed.png", engine_result.processed)
def detect_yolov2(
net,
frame: np.ndarray,
classes: list[str],
min_confidence: float,
nms_threshold: float,
input_size: int,
use_nms: bool,
swap_rb: bool,
) -> tuple[list[Detection], dict[str, float | tuple[int, ...]]]:
t0 = time.perf_counter()
h, w = frame.shape[:2]
blob = cv2.dnn.blobFromImage(
frame,
scalefactor=1.0 / 255.0,
size=(input_size, input_size),
mean=(0, 0, 0),
swapRB=swap_rb,
crop=False,
)
t_blob = time.perf_counter()
net.setInput(blob)
predictions = net.forward()
t_forward = time.perf_counter()
predictions = np.array(predictions)
if predictions.ndim == 4:
predictions = predictions.reshape(predictions.shape[1], predictions.shape[-1])
elif predictions.ndim == 3:
predictions = predictions[0]
boxes: list[list[int]] = []
confidences: list[float] = []
class_ids: list[int] = []
if predictions.ndim == 2 and predictions.shape[1] > 5:
for i in range(predictions.shape[0]):
prob_arr = predictions[i][5:]
if prob_arr.size == 0:
continue
class_index = int(prob_arr.argmax(axis=0))
if class_index >= len(classes):
continue
confidence = float(prob_arr[class_index])
if confidence <= min_confidence:
continue
x_center = float(predictions[i][0]) * w
y_center = float(predictions[i][1]) * h
width_box = float(predictions[i][2]) * w
height_box = float(predictions[i][3]) * h
x1 = int(x_center - width_box * 0.5)
y1 = int(y_center - height_box * 0.5)
x2 = int(x_center + width_box * 0.5)
y2 = int(y_center + height_box * 0.5)
x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, w, h)
bw = max(0, x2 - x1)
bh = max(0, y2 - y1)
if bw == 0 or bh == 0:
continue
boxes.append([x1, y1, bw, bh])
confidences.append(confidence)
class_ids.append(class_index)
detections: list[Detection] = []
if boxes:
if use_nms:
indices = cv2.dnn.NMSBoxes(boxes, confidences, min_confidence, nms_threshold)
selected_indices = np.array(indices).flatten() if len(indices) else []
else:
selected_indices = range(len(boxes))
for idx in selected_indices:
x, y, bw, bh = boxes[int(idx)]
class_id = class_ids[int(idx)]
detections.append(Detection(
class_id=class_id,
class_name=classes[class_id],
confidence=confidences[int(idx)],
bbox=(x, y, x + bw, y + bh),
))
t_parse = time.perf_counter()
info = {
"shape": tuple(predictions.shape),
"blob_ms": (t_blob - t0) * 1000.0,
"forward_ms": (t_forward - t_blob) * 1000.0,
"parse_ms": (t_parse - t_forward) * 1000.0,
"total_ms": (t_parse - t0) * 1000.0,
"raw_max": float(np.max(predictions)) if predictions.size else 0.0,
"class_max": float(np.max(predictions[:, 5:])) if predictions.ndim == 2 and predictions.shape[1] > 5 else 0.0,
"raw_predictions": predictions,
}
return detections, info
def best_label_detections(
detections: list[Detection],
label_name: str,
max_boxes: int,
) -> list[Detection]:
labels = [
d for d in detections
if d.class_name.strip().lower() == label_name.strip().lower()
]
labels.sort(
key=lambda d: (
d.confidence,
(d.bbox[2] - d.bbox[0]) * (d.bbox[3] - d.bbox[1]),
),
reverse=True,
)
return labels[:max_boxes]
def draw_detection(frame: np.ndarray, det: Detection, label_class: str) -> None:
x1, y1, x2, y2 = det.bbox
class_lower = det.class_name.lower()
color = (255, 255, 255)
if class_lower == "gaylord_stimato":
color = (255, 0, 255)
elif class_lower == label_class.lower():
color = (0, 255, 255)
elif class_lower == "gaylord":
color = (0, 255, 0)
thickness = 4 if class_lower in ("gaylord", "gaylord_stimato") else 2
cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness)
text = f"{det.class_name} {det.confidence:.2f}"
cv2.putText(
frame,
text,
(x1, max(20, y1 - 8)),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
color,
2,
cv2.LINE_AA,
)
def infer_gaylords_from_labels(
detections: list[Detection],
frame_w: int,
frame_h: int,
label_class: str,
width_factor: float,
height_factor: float,
y_shift: float,
) -> list[Detection]:
has_gaylord = any(d.class_name.lower() == "gaylord" for d in detections)
if has_gaylord:
return []
inferred: list[Detection] = []
for det in detections:
if det.class_name.lower() != label_class.lower():
continue
x1, y1, x2, y2 = det.bbox
bw = x2 - x1
bh = y2 - y1
if bw <= 0 or bh <= 0:
continue
cx = (x1 + x2) / 2.0
cy = (y1 + y2) / 2.0 + bh * y_shift
gw = bw * width_factor
gh = bh * height_factor
gx1 = int(cx - gw / 2.0)
gy1 = int(cy - gh / 2.0)
gx2 = int(cx + gw / 2.0)
gy2 = int(cy + gh / 2.0)
gx1, gy1, gx2, gy2 = clip_box(gx1, gy1, gx2, gy2, frame_w, frame_h)
inferred.append(Detection(
class_id=-1,
class_name="gaylord_stimato",
confidence=det.confidence,
bbox=(gx1, gy1, gx2, gy2),
))
return inferred
def draw_ocr_results(
frame: np.ndarray,
ocr_results: list[OcrResult],
max_age_sec: float = 5.0,
) -> None:
now = time.perf_counter()
for result in ocr_results:
if now - result.timestamp > max_age_sec:
continue
if not result.text:
continue
x1, y1, x2, y2 = result.bbox
cv2.putText(
frame,
f"NUM: {result.text}",
(x1, min(frame.shape[0] - 5, y2 + 24)),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 255),
2,
cv2.LINE_AA,
)
def draw_status(frame: np.ndarray, stats_text: list[str]) -> None:
y = 25
for line in stats_text:
cv2.putText(
frame,
line,
(10, y),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 0, 255),
2,
cv2.LINE_AA,
)
y += 24
def draw_ocr_debug(ocr_frame: np.ndarray, text: str, preview_width: int) -> np.ndarray:
if len(ocr_frame.shape) == 2:
display = cv2.cvtColor(ocr_frame, cv2.COLOR_GRAY2BGR)
else:
display = ocr_frame.copy()
display = resize_preview(display, preview_width)
canvas_h = display.shape[0] + 70
canvas_w = max(display.shape[1], 500)
canvas = np.full((canvas_h, canvas_w, 3), 255, dtype=np.uint8)
canvas[:display.shape[0], :display.shape[1]] = display
shown_text = text if text else "(nessun codice)"
cv2.putText(
canvas,
f"OCR: {shown_text}",
(10, display.shape[0] + 45),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 0, 255),
2,
cv2.LINE_AA,
)
return canvas
def resize_for_quality(frame: np.ndarray, target_width: int) -> np.ndarray:
if target_width <= 0 or frame.shape[1] <= target_width:
return frame
scale = target_width / frame.shape[1]
height = max(1, int(frame.shape[0] * scale))
return cv2.resize(frame, (target_width, height), interpolation=cv2.INTER_AREA)
def estimate_sharpness(frame: np.ndarray, metric: str, resize_width: int) -> float:
small = resize_for_quality(frame, resize_width)
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
if metric == "tenengrad":
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
return float(np.mean(gx * gx + gy * gy))
lap = cv2.Laplacian(gray, cv2.CV_64F)
return float(lap.var())
def format_stats(
shared: SharedState,
frame_buffer: LatestBuffer,
roi_buffer: LatestBuffer,
start_time: float,
) -> list[str]:
stats = shared.snapshot_stats()
elapsed = max(0.001, time.perf_counter() - start_time)
fb = frame_buffer.stats()
rb = roi_buffer.stats()
avg_yolo = stats.yolo_total_ms / max(1, stats.yolo_cycles)
avg_forward = stats.yolo_forward_ms / max(1, stats.yolo_cycles)
avg_ocr = stats.ocr_total_ms / max(1, stats.ocr_cycles)
avg_display = stats.display_total_ms / max(1, stats.display_frames)
avg_quality = stats.quality_total_ms / max(1, stats.quality_cycles)
avg_sharpness = stats.quality_score_total / max(1, stats.quality_cycles)
return [
f"cap_fps={stats.capture_frames / elapsed:.1f}",
f"disp_fps={stats.display_frames / elapsed:.1f}",
f"quality yolo_in/reject={stats.yolo_submitted_frames}/{stats.quality_rejected_frames} "
f"last={stats.last_quality_score:.1f} avg={avg_sharpness:.1f} ms={avg_quality:.2f}",
f"yolo_fps={stats.yolo_cycles / elapsed:.1f} avg={avg_yolo:.1f}ms fwd={avg_forward:.1f}ms",
f"ocr_fps={stats.ocr_cycles / elapsed:.1f} avg={avg_ocr:.1f}ms",
f"display_avg={avg_display:.1f}ms",
f"frames id cap/yolo/ocr={stats.last_capture_frame_id}/{stats.last_yolo_frame_id}/{stats.last_ocr_frame_id}",
f"last_det={stats.last_detection_count} {stats.last_detection_summary}",
f"frame_buf push/pop/drop/skip={fb.pushed}/{fb.popped}/{fb.dropped_on_put}/{fb.skipped_on_pop}",
f"roi_buf push/pop/drop/skip={rb.pushed}/{rb.popped}/{rb.dropped_on_put}/{rb.skipped_on_pop}",
]
def yolo_worker(
stop_event: threading.Event,
frame_buffer: LatestBuffer,
roi_buffer: LatestBuffer,
shared: SharedState,
net,
classes: list[str],
args,
roi_id_gen: IdGenerator,
) -> None:
slot_last_ocr: dict[tuple[int, int], float] = {}
last_ocr_submit = -999999.0
label_class_lower = args.label_class.strip().lower()
log("YOLO worker avviato")
while not stop_event.is_set():
packet = frame_buffer.get_latest_blocking(stop_event)
if packet is None:
continue
local_frame = packet.frame.copy()
detections, info = detect_yolov2(
net=net,
frame=local_frame,
classes=classes,
min_confidence=args.min_confidence,
nms_threshold=args.nms_threshold,
input_size=args.input_size,
use_nms=args.use_nms,
swap_rb=args.swap_rb,
)
result = DetectionResult(
frame_id=packet.frame_id,
timestamp=time.perf_counter(),
detections=detections,
inference_ms=float(info["total_ms"]),
blob_ms=float(info["blob_ms"]),
forward_ms=float(info["forward_ms"]),
parse_ms=float(info["parse_ms"]),
source_width=packet.width,
source_height=packet.height,
)
shared.set_latest_detection(result)
if args.debug_yolo_output:
log(
f"YOLO frame={packet.frame_id} shape={info['shape']} "
f"raw_max={info['raw_max']:.4f} class_max={info['class_max']:.4f} "
f"det={len(detections)}"
)
if args.debug_yolo_top > 0:
raw_predictions = info["raw_predictions"]
if (
isinstance(raw_predictions, np.ndarray)
and raw_predictions.ndim == 2
and raw_predictions.shape[1] > 5
):
scores = raw_predictions[:, 5:]
row_best = scores.max(axis=1)
top_indices = np.argsort(row_best)[-args.debug_yolo_top:][::-1]
for idx in top_indices:
cls_scores = scores[idx]
cls_parts = " ".join(
f"{classes[i]}={float(cls_scores[i]):.4f}"
for i in range(min(len(classes), cls_scores.shape[0]))
)
log(
f" raw[{int(idx)}] obj={float(raw_predictions[idx, 4]):.4f} "
f"{cls_parts} box={tuple(float(v) for v in raw_predictions[idx, :4])}"
)
debug = local_frame.copy()
debug_detections = list(detections)
if args.infer_gaylord_from_label:
inferred_gaylords = infer_gaylords_from_labels(
detections,
packet.width,
packet.height,
args.label_class,
args.inferred_gaylord_width_factor,
args.inferred_gaylord_height_factor,
args.inferred_gaylord_y_shift,
)
debug_detections.extend(inferred_gaylords)
if args.debug_inferred_gaylord and inferred_gaylords:
log(
f"gaylord stimati frame={packet.frame_id}: "
f"{[det.bbox for det in inferred_gaylords]}"
)
for det in debug_detections:
draw_detection(debug, det, args.label_class)
shared.set_debug_yolo_frame(resize_preview(debug, args.preview_width))
label_dets = best_label_detections(
detections,
args.label_class,
args.max_roi_per_frame,
)
now = time.perf_counter()
for det in label_dets:
if now - last_ocr_submit < args.ocr_submit_min_interval:
continue
if roi_buffer.depth() >= args.ocr_max_pending:
continue
x1, y1, x2, y2 = det.bbox
bw = x2 - x1
bh = y2 - y1
if bw < args.min_label_width or bh < args.min_label_height:
continue
slot_key = quantized_slot_key(det.bbox, args.slot_size)
if now - slot_last_ocr.get(slot_key, -999999.0) < args.ocr_cooldown_sec:
continue
rx1, ry1, rx2, ry2 = expand_box(
x1, y1, x2, y2,
packet.width,
packet.height,
pad_ratio=args.ocr_pad_ratio,
)
roi = local_frame[ry1:ry2, rx1:rx2]
if roi.size == 0:
continue
roi_copy = roi.copy()
roi_buffer.put(RoiPacket(
roi_id=roi_id_gen.next(),
source_frame_id=packet.frame_id,
timestamp=now,
class_name=label_class_lower,
confidence=det.confidence,
bbox=det.bbox,
roi_image=roi_copy,
width=roi_copy.shape[1],
height=roi_copy.shape[0],
))
slot_last_ocr[slot_key] = now
last_ocr_submit = now
log("YOLO worker terminato")
def ocr_worker(
stop_event: threading.Event,
roi_buffer: LatestBuffer,
shared: SharedState,
args,
) -> None:
try:
engine = create_ocr_engine(args)
except Exception as exc:
log(f"OCR worker disabilitato: impossibile inizializzare {args.ocr_backend}: {exc}")
return
debug_dir = Path(args.save_ocr_roi_dir) if args.save_ocr_roi_dir else None
log(f"OCR worker avviato con backend {args.ocr_backend}")
try:
while not stop_event.is_set():
roi_packet = roi_buffer.get_latest_blocking(stop_event)
if roi_packet is None:
continue
t0 = time.perf_counter()
engine_result = engine.read_digits(roi_packet.roi_image)
ocr_ms = (time.perf_counter() - t0) * 1000.0
digits = engine_result.digits
raw_text = engine_result.raw_text
processed = engine_result.processed
if debug_dir is not None:
save_ocr_debug_images(debug_dir, roi_packet, engine_result)
if args.print_all_ocr:
log(
f"OCR frame={roi_packet.source_frame_id} "
f"raw='{raw_text.strip()}' digits='{digits}' ms={ocr_ms:.1f}"
)
result = OcrResult(
roi_id=roi_packet.roi_id,
source_frame_id=roi_packet.source_frame_id,
timestamp=time.perf_counter(),
text=digits if len(digits) >= args.ocr_min_digits else "",
raw_text=raw_text,
bbox=roi_packet.bbox,
ocr_ms=ocr_ms,
)
shared.add_ocr_result(result)
shared.set_debug_ocr_frame(processed, result.text or raw_text.strip())
if result.text:
log(
f"Etichetta letta frame={result.source_frame_id} "
f"roi={result.roi_id}: {result.text} ({ocr_ms:.1f} ms)"
)
finally:
close = getattr(engine, "close", None)
if close is not None:
close()
log("OCR worker terminato")
def main() -> int:
args = parse_args()
if args.easyocr_worker:
return run_easyocr_worker(args)
require_file(args.weights, "File pesi YOLOv2")
require_file(args.config, "File config YOLOv2")
require_file(args.labels, "File labels")
classes = load_classes(args.labels)
cv2.setNumThreads(args.opencv_threads)
log(f"OpenCV version: {cv2.__version__}")
log(f"Classi: {classes}")
log(f"Backend richiesto: {args.backend}")
net = cv2.dnn.readNetFromDarknet(args.config, args.weights)
configure_net_backend(net, args.backend)
cap, source_name = open_capture(args.video)
if not cap.isOpened():
log("ERRORE: impossibile aprire la sorgente video")
return 1
frame_buffer = LatestBuffer(args.frame_buffer_size, "frames")
roi_buffer = LatestBuffer(args.roi_buffer_size, "roi")
shared = SharedState(args.ocr_history_size)
stop_event = threading.Event()
roi_id_gen = IdGenerator()
start_time = time.perf_counter()
yolo_thread = threading.Thread(
target=yolo_worker,
name="yolo-worker",
args=(stop_event, frame_buffer, roi_buffer, shared, net, classes, args, roi_id_gen),
daemon=True,
)
yolo_thread.start()
ocr_thread = None
if not args.no_ocr:
ocr_thread = threading.Thread(
target=ocr_worker,
name="ocr-worker",
args=(stop_event, roi_buffer, shared, args),
daemon=True,
)
ocr_thread.start()
else:
log("OCR disabilitato da --no-ocr")
if not args.no_display:
cv2.namedWindow("flywms capture", cv2.WINDOW_NORMAL)
if args.debug_yolo_window:
cv2.namedWindow("flywms yolo", cv2.WINDOW_NORMAL)
if args.debug_ocr_window or not args.no_ocr:
cv2.namedWindow("flywms ocr", cv2.WINDOW_NORMAL)
if args.quality_filter and args.debug_rejected_window:
cv2.namedWindow("flywms scartati", cv2.WINDOW_NORMAL)
frame_id = 0
last_stats_log = time.perf_counter()
video_fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
frame_period = 1.0 / video_fps if args.realtime_playback and video_fps > 0 else 0.0
try:
while not stop_event.is_set():
loop_start = time.perf_counter()
t_read0 = time.perf_counter()
grabbed, frame = cap.read()
read_ms = (time.perf_counter() - t_read0) * 1000.0
if not grabbed or frame is None:
log("Fine stream o impossibile leggere il frame")
break
frame_id += 1
if args.max_frames > 0 and frame_id > args.max_frames:
log(f"Raggiunto --max-frames={args.max_frames}")
break
height, width = frame.shape[:2]
sharpness = 0.0
quality_ms = 0.0
quality_passed = True
if args.quality_filter or args.debug_quality_log:
t_quality0 = time.perf_counter()
sharpness = estimate_sharpness(
frame,
args.blur_metric,
args.blur_resize_width,
)
quality_ms = (time.perf_counter() - t_quality0) * 1000.0
quality_passed = sharpness >= args.min_sharpness
if args.debug_quality_log:
log(
f"quality frame={frame_id} sharpness={sharpness:.1f} "
f"passed={quality_passed} ms={quality_ms:.2f}"
)
packet = FramePacket(
frame_id=frame_id,
timestamp=time.perf_counter(),
frame=frame,
width=width,
height=height,
source=source_name,
)
shared.add_capture_read(frame_id, read_ms)
if quality_passed:
frame_buffer.put(packet)
shared.add_quality_result(
score=sharpness,
passed=quality_passed,
elapsed_ms=quality_ms,
submitted_to_yolo=quality_passed,
)
t_display0 = time.perf_counter()
if not args.no_display:
display = frame.copy()
if args.quality_filter:
quality_color = (0, 180, 0) if quality_passed else (0, 0, 255)
cv2.putText(
display,
f"sharp={sharpness:.1f} {'OK' if quality_passed else 'BLUR'}",
(20, 38),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
quality_color,
2,
cv2.LINE_AA,
)
latest_detection = shared.get_latest_detection()
if latest_detection is not None:
display_detections = list(latest_detection.detections)
if args.infer_gaylord_from_label:
inferred_gaylords = infer_gaylords_from_labels(
latest_detection.detections,
latest_detection.source_width,
latest_detection.source_height,
args.label_class,
args.inferred_gaylord_width_factor,
args.inferred_gaylord_height_factor,
args.inferred_gaylord_y_shift,
)
display_detections.extend(inferred_gaylords)
for det in display_detections:
draw_detection(display, det, args.label_class)
draw_ocr_results(display, shared.get_recent_ocr_results())
draw_status(display, format_stats(shared, frame_buffer, roi_buffer, start_time)[:5])
display = resize_preview(display, args.preview_width)
cv2.imshow("flywms capture", display)
debug_yolo, debug_ocr, debug_ocr_text = shared.get_debug_frames()
if args.debug_yolo_window and debug_yolo is not None:
cv2.imshow("flywms yolo", debug_yolo)
if (args.debug_ocr_window or not args.no_ocr) and debug_ocr is not None:
cv2.imshow(
"flywms ocr",
draw_ocr_debug(debug_ocr, debug_ocr_text, args.preview_width),
)
if (
args.quality_filter
and args.debug_rejected_window
and not quality_passed
):
rejected = frame.copy()
cv2.putText(
rejected,
f"SCARTATO sharp={sharpness:.1f} < {args.min_sharpness:.1f}",
(20, 38),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 0, 255),
2,
cv2.LINE_AA,
)
rejected = resize_preview(rejected, args.preview_width)
cv2.imshow("flywms scartati", rejected)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
log("Premuto q, uscita")
break
display_ms = (time.perf_counter() - t_display0) * 1000.0
shared.add_display(display_ms)
now = time.perf_counter()
if now - last_stats_log >= args.stats_interval:
for line in format_stats(shared, frame_buffer, roi_buffer, start_time):
log(line)
last_stats_log = now
if frame_period > 0:
elapsed = time.perf_counter() - loop_start
if elapsed < frame_period:
time.sleep(frame_period - elapsed)
except KeyboardInterrupt:
log("Interrotto da tastiera")
finally:
if args.drain_seconds > 0:
log(f"Drain worker per {args.drain_seconds:.1f}s")
time.sleep(args.drain_seconds)
stop_event.set()
frame_buffer.wake_all()
roi_buffer.wake_all()
yolo_thread.join(timeout=3.0)
if ocr_thread is not None:
ocr_thread.join(timeout=3.0)
cap.release()
if not args.no_display:
cv2.destroyAllWindows()
log("=== riepilogo finale ===")
for line in format_stats(shared, frame_buffer, roi_buffer, start_time):
log(line)
return 0
if __name__ == "__main__":
raise SystemExit(main())