Milestone YOLO11 navigation planning baseline

This commit is contained in:
administrator
2026-05-15 16:52:54 +02:00
parent 6a18517743
commit a92dcf2659
212 changed files with 1691 additions and 63 deletions

378
flywms.py
View File

@@ -96,10 +96,15 @@ class RuntimeStats:
yolo_submitted_frames: int = 0
quality_rejected_frames: int = 0
quality_cycles: int = 0
ocr_roi_submitted: int = 0
ocr_roi_rejected: int = 0
ocr_roi_quality_cycles: int = 0
yolo_cycles: int = 0
ocr_cycles: int = 0
quality_total_ms: float = 0.0
quality_score_total: float = 0.0
ocr_roi_quality_total_ms: float = 0.0
ocr_roi_sharpness_total: float = 0.0
yolo_total_ms: float = 0.0
yolo_blob_ms: float = 0.0
yolo_forward_ms: float = 0.0
@@ -110,6 +115,8 @@ class RuntimeStats:
last_capture_frame_id: int = 0
last_quality_score: float = 0.0
last_quality_passed: bool = True
last_ocr_roi_sharpness: float = 0.0
last_ocr_roi_passed: bool = True
last_yolo_frame_id: int = 0
last_ocr_frame_id: int = 0
last_detection_count: int = 0
@@ -174,6 +181,8 @@ class SharedState:
self.ocr_results: deque[OcrResult] = deque(maxlen=ocr_history_size)
self.debug_yolo_frame: np.ndarray | None = None
self.debug_ocr_frame: np.ndarray | None = None
self.debug_rejected_ocr_roi_frame: np.ndarray | None = None
self.debug_rejected_ocr_roi_text: str = ""
self.debug_ocr_text: str = ""
self.stats = RuntimeStats()
@@ -218,11 +227,27 @@ class SharedState:
self.debug_ocr_frame = None if frame is None else frame.copy()
self.debug_ocr_text = text
def get_debug_frames(self) -> tuple[np.ndarray | None, np.ndarray | None, str]:
def set_debug_rejected_ocr_roi_frame(
self,
frame: np.ndarray | None,
text: str = "",
) -> None:
with self._lock:
self.debug_rejected_ocr_roi_frame = None if frame is None else frame.copy()
self.debug_rejected_ocr_roi_text = text
def get_debug_frames(
self,
) -> tuple[np.ndarray | None, np.ndarray | None, str, np.ndarray | None, str]:
with self._lock:
yolo = None if self.debug_yolo_frame is None else self.debug_yolo_frame.copy()
ocr = None if self.debug_ocr_frame is None else self.debug_ocr_frame.copy()
return yolo, ocr, self.debug_ocr_text
rejected_ocr = (
None
if self.debug_rejected_ocr_roi_frame is None
else self.debug_rejected_ocr_roi_frame.copy()
)
return yolo, ocr, self.debug_ocr_text, rejected_ocr, self.debug_rejected_ocr_roi_text
def add_capture_read(self, frame_id: int, read_ms: float) -> None:
with self._lock:
@@ -248,6 +273,23 @@ class SharedState:
else:
self.stats.quality_rejected_frames += 1
def add_ocr_roi_quality_result(
self,
score: float,
passed: bool,
elapsed_ms: float,
) -> None:
with self._lock:
self.stats.ocr_roi_quality_cycles += 1
self.stats.ocr_roi_sharpness_total += score
self.stats.ocr_roi_quality_total_ms += elapsed_ms
self.stats.last_ocr_roi_sharpness = score
self.stats.last_ocr_roi_passed = passed
if passed:
self.stats.ocr_roi_submitted += 1
else:
self.stats.ocr_roi_rejected += 1
def add_display(self, display_ms: float) -> None:
with self._lock:
self.stats.display_frames += 1
@@ -275,8 +317,10 @@ def parse_args():
ap.add_argument("-v", "--video", default=None,
help="Percorso video. Se omesso usa webcam 0")
ap.add_argument("--detector", choices=["yolov2", "ultralytics"],
default="yolov2", help="Motore detection oggetti")
ap.add_argument("--weights", default="yolov2.weights",
help="File pesi YOLOv2")
help="File pesi YOLOv2 o modello Ultralytics .pt")
ap.add_argument("--config", default="yolov2.cfg",
help="File config YOLOv2")
ap.add_argument("--labels", default="labels.txt",
@@ -286,6 +330,8 @@ def parse_args():
ap.add_argument("--backend", choices=["cpu", "cuda", "cuda-fp16"],
default="cpu", help="Backend OpenCV DNN")
ap.add_argument("--ultralytics-device", default="cpu",
help="Device Ultralytics: cpu oppure 0 per GPU")
ap.add_argument("--input-size", type=int, default=416,
help="Dimensione input YOLO")
ap.add_argument("--swap-rb", action="store_true",
@@ -298,6 +344,8 @@ def parse_args():
help="Numero risultati OCR recenti mantenuti in memoria")
ap.add_argument("--preview-width", type=int, default=1280,
help="Larghezza massima preview")
ap.add_argument("--capture-overlay-max-lag", type=int, default=2,
help="Massimo ritardo in frame per disegnare detection sulla finestra capture")
ap.add_argument("--stats-interval", type=float, default=2.0,
help="Secondi tra riepiloghi prestazioni")
ap.add_argument("--max-frames", type=int, default=0,
@@ -320,6 +368,14 @@ def parse_args():
help="Logga lo score qualita' di ogni frame")
ap.add_argument("--debug-rejected-window", action="store_true",
help="Mostra una finestra con i frame scartati dal filtro qualita'")
ap.add_argument("--ocr-roi-quality-filter", action="store_true",
help="Filtra le ROI etichetta troppo sfocate prima dell'OCR")
ap.add_argument("--min-ocr-roi-sharpness", type=float, default=250.0,
help="Soglia minima nitidezza della ROI etichetta per inviarla all'OCR")
ap.add_argument("--ocr-roi-blur-resize-width", type=int, default=240,
help="Larghezza usata per la metrica blur sulle ROI etichetta")
ap.add_argument("--debug-rejected-ocr-roi-window", action="store_true",
help="Mostra una finestra con le ROI etichetta scartate prima dell'OCR")
ap.add_argument("--min-confidence", type=float, default=0.30,
help="Soglia minima confidenza")
@@ -361,6 +417,8 @@ def parse_args():
help=argparse.SUPPRESS)
ap.add_argument("--ocr-input", choices=["roi", "processed"],
default="roi", help="Immagine passata al motore OCR")
ap.add_argument("--ocr-no-preprocess", action="store_true",
help="Passa all'OCR la ROI etichetta raw uscita da YOLO, senza crop/preprocess")
ap.add_argument("--ocr-code-mode", choices=["full", "fixed-band", "large-components"],
default="fixed-band", help="Prefiltro per isolare il codice grande")
ap.add_argument("--ocr-scale", type=float, default=1.5,
@@ -638,7 +696,7 @@ class TesseractOcrEngine:
self._args = args
def read_digits(self, roi: np.ndarray) -> OcrEngineResult:
code_roi = extract_code_roi(roi, self._args)
code_roi = roi.copy() if self._args.ocr_no_preprocess else extract_code_roi(roi, self._args)
return ocr_digits_only(code_roi, self._pytesseract)
@@ -657,14 +715,19 @@ class PaddleOcrEngine:
self._args = args
def read_digits(self, roi: np.ndarray) -> OcrEngineResult:
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
if self._args.ocr_no_preprocess:
code_roi = roi.copy()
processed = code_roi.copy()
ocr_input = code_roi
else:
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
result = self._ocr.predict(ocr_input)
texts: list[str] = []
for item in result:
@@ -710,14 +773,19 @@ class EasyOcrInProcessEngine:
log(f"EasyOCR device: {'gpu' if self._using_gpu else 'cpu'}")
def read_digits(self, roi: np.ndarray) -> OcrEngineResult:
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
if self._args.ocr_no_preprocess:
code_roi = roi.copy()
processed = code_roi.copy()
ocr_input = code_roi
else:
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
result = self._reader.readtext(
ocr_input,
allowlist="0123456789",
@@ -786,14 +854,19 @@ class EasyOcrProcessEngine:
if self._proc.poll() is not None:
raise RuntimeError("EasyOCR worker non attivo")
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
if self._args.ocr_no_preprocess:
code_roi = roi.copy()
processed = code_roi.copy()
ocr_input = code_roi
else:
code_roi = extract_code_roi(roi, self._args)
code_roi = limit_width(code_roi, self._args.ocr_max_width)
processed = preprocess_for_ocr(
code_roi,
scale=self._args.ocr_scale,
max_width=self._args.ocr_max_width,
)
ocr_input = processed if self._input_mode == "processed" else code_roi
image_path = Path(self._tmpdir.name) / f"ocr_{time.perf_counter_ns()}.png"
cv2.imwrite(str(image_path), ocr_input)
@@ -1003,6 +1076,79 @@ def detect_yolov2(
return detections, info
class UltralyticsDetector:
def __init__(self, model_path: str, device: str):
from ultralytics import YOLO
self.model = YOLO(model_path)
self.device = device
names = self.model.names
if isinstance(names, dict):
self.classes = [str(names[i]) for i in sorted(names)]
else:
self.classes = [str(name) for name in names]
def detect(
self,
frame: np.ndarray,
min_confidence: float,
input_size: int,
) -> tuple[list[Detection], dict[str, float | tuple[int, ...]]]:
t0 = time.perf_counter()
results = self.model.predict(
source=frame,
imgsz=input_size,
conf=min_confidence,
device=self.device,
verbose=False,
)
t1 = time.perf_counter()
detections: list[Detection] = []
if results:
boxes = results[0].boxes
if boxes is not None:
xyxy = boxes.xyxy.cpu().numpy()
confs = boxes.conf.cpu().numpy()
clss = boxes.cls.cpu().numpy().astype(int)
for box, conf, cls_id in zip(xyxy, confs, clss):
x1, y1, x2, y2 = [int(round(v)) for v in box]
x1, y1, x2, y2 = clip_box(
x1,
y1,
x2,
y2,
frame.shape[1],
frame.shape[0],
)
if x2 <= x1 or y2 <= y1:
continue
class_name = (
self.classes[cls_id]
if 0 <= cls_id < len(self.classes)
else str(cls_id)
)
detections.append(Detection(
class_id=int(cls_id),
class_name=class_name,
confidence=float(conf),
bbox=(x1, y1, x2, y2),
))
total_ms = (t1 - t0) * 1000.0
info: dict[str, float | tuple[int, ...]] = {
"shape": (len(detections), 6),
"blob_ms": 0.0,
"forward_ms": total_ms,
"parse_ms": 0.0,
"total_ms": total_ms,
"raw_max": max((det.confidence for det in detections), default=0.0),
"class_max": max((det.confidence for det in detections), default=0.0),
"raw_predictions": np.empty((0, 0), dtype=np.float32),
}
return detections, info
def best_label_detections(
detections: list[Detection],
label_name: str,
@@ -1160,6 +1306,26 @@ def draw_ocr_debug(ocr_frame: np.ndarray, text: str, preview_width: int) -> np.n
return canvas
def draw_rejected_roi_debug(roi_frame: np.ndarray, text: str, preview_width: int) -> np.ndarray:
display = roi_frame.copy()
display = resize_preview(display, preview_width)
canvas_h = display.shape[0] + 55
canvas_w = max(display.shape[1], 560)
canvas = np.full((canvas_h, canvas_w, 3), 255, dtype=np.uint8)
canvas[:display.shape[0], :display.shape[1]] = display
cv2.putText(
canvas,
text,
(10, display.shape[0] + 35),
cv2.FONT_HERSHEY_SIMPLEX,
0.85,
(0, 0, 255),
2,
cv2.LINE_AA,
)
return canvas
def resize_for_quality(frame: np.ndarray, target_width: int) -> np.ndarray:
if target_width <= 0 or frame.shape[1] <= target_width:
return frame
@@ -1197,12 +1363,16 @@ def format_stats(
avg_display = stats.display_total_ms / max(1, stats.display_frames)
avg_quality = stats.quality_total_ms / max(1, stats.quality_cycles)
avg_sharpness = stats.quality_score_total / max(1, stats.quality_cycles)
avg_roi_quality = stats.ocr_roi_quality_total_ms / max(1, stats.ocr_roi_quality_cycles)
avg_roi_sharpness = stats.ocr_roi_sharpness_total / max(1, stats.ocr_roi_quality_cycles)
return [
f"cap_fps={stats.capture_frames / elapsed:.1f}",
f"disp_fps={stats.display_frames / elapsed:.1f}",
f"quality yolo_in/reject={stats.yolo_submitted_frames}/{stats.quality_rejected_frames} "
f"last={stats.last_quality_score:.1f} avg={avg_sharpness:.1f} ms={avg_quality:.2f}",
f"roi_quality ocr_in/reject={stats.ocr_roi_submitted}/{stats.ocr_roi_rejected} "
f"last={stats.last_ocr_roi_sharpness:.1f} avg={avg_roi_sharpness:.1f} ms={avg_roi_quality:.2f}",
f"yolo_fps={stats.yolo_cycles / elapsed:.1f} avg={avg_yolo:.1f}ms fwd={avg_forward:.1f}ms",
f"ocr_fps={stats.ocr_cycles / elapsed:.1f} avg={avg_ocr:.1f}ms",
f"display_avg={avg_display:.1f}ms",
@@ -1218,7 +1388,7 @@ def yolo_worker(
frame_buffer: LatestBuffer,
roi_buffer: LatestBuffer,
shared: SharedState,
net,
detector,
classes: list[str],
args,
roi_id_gen: IdGenerator,
@@ -1234,16 +1404,23 @@ def yolo_worker(
continue
local_frame = packet.frame.copy()
detections, info = detect_yolov2(
net=net,
frame=local_frame,
classes=classes,
min_confidence=args.min_confidence,
nms_threshold=args.nms_threshold,
input_size=args.input_size,
use_nms=args.use_nms,
swap_rb=args.swap_rb,
)
if args.detector == "ultralytics":
detections, info = detector.detect(
frame=local_frame,
min_confidence=args.min_confidence,
input_size=args.input_size,
)
else:
detections, info = detect_yolov2(
net=detector,
frame=local_frame,
classes=classes,
min_confidence=args.min_confidence,
nms_threshold=args.nms_threshold,
input_size=args.input_size,
use_nms=args.use_nms,
swap_rb=args.swap_rb,
)
result = DetectionResult(
frame_id=packet.frame_id,
@@ -1340,6 +1517,39 @@ def yolo_worker(
continue
roi_copy = roi.copy()
roi_quality_ms = 0.0
roi_sharpness = 0.0
roi_quality_passed = True
if args.ocr_roi_quality_filter:
t_roi_quality0 = time.perf_counter()
roi_sharpness = estimate_sharpness(
roi_copy,
args.blur_metric,
args.ocr_roi_blur_resize_width,
)
roi_quality_ms = (time.perf_counter() - t_roi_quality0) * 1000.0
roi_quality_passed = roi_sharpness >= args.min_ocr_roi_sharpness
shared.add_ocr_roi_quality_result(
score=roi_sharpness,
passed=roi_quality_passed,
elapsed_ms=roi_quality_ms,
)
if not roi_quality_passed:
shared.set_debug_rejected_ocr_roi_frame(
roi_copy,
(
f"ROI SCARTATA sharp={roi_sharpness:.1f} "
f"< {args.min_ocr_roi_sharpness:.1f}"
),
)
continue
else:
shared.add_ocr_roi_quality_result(
score=0.0,
passed=True,
elapsed_ms=0.0,
)
roi_buffer.put(RoiPacket(
roi_id=roi_id_gen.next(),
source_frame_id=packet.frame_id,
@@ -1424,18 +1634,26 @@ def main() -> int:
if args.easyocr_worker:
return run_easyocr_worker(args)
require_file(args.weights, "File pesi YOLOv2")
require_file(args.config, "File config YOLOv2")
require_file(args.labels, "File labels")
classes = load_classes(args.labels)
require_file(args.weights, "File pesi detector")
if args.detector == "yolov2":
require_file(args.config, "File config YOLOv2")
require_file(args.labels, "File labels")
classes = load_classes(args.labels)
else:
classes = []
cv2.setNumThreads(args.opencv_threads)
log(f"OpenCV version: {cv2.__version__}")
log(f"Classi: {classes}")
log(f"Backend richiesto: {args.backend}")
log(f"Detector richiesto: {args.detector}")
log(f"Backend richiesto: {args.backend if args.detector == 'yolov2' else args.ultralytics_device}")
net = cv2.dnn.readNetFromDarknet(args.config, args.weights)
configure_net_backend(net, args.backend)
if args.detector == "ultralytics":
detector = UltralyticsDetector(args.weights, args.ultralytics_device)
classes = detector.classes
else:
detector = cv2.dnn.readNetFromDarknet(args.config, args.weights)
configure_net_backend(detector, args.backend)
log(f"Classi: {classes}")
cap, source_name = open_capture(args.video)
if not cap.isOpened():
@@ -1451,8 +1669,8 @@ def main() -> int:
yolo_thread = threading.Thread(
target=yolo_worker,
name="yolo-worker",
args=(stop_event, frame_buffer, roi_buffer, shared, net, classes, args, roi_id_gen),
name="detector-worker",
args=(stop_event, frame_buffer, roi_buffer, shared, detector, classes, args, roi_id_gen),
daemon=True,
)
yolo_thread.start()
@@ -1476,6 +1694,8 @@ def main() -> int:
cv2.namedWindow("flywms ocr", cv2.WINDOW_NORMAL)
if args.quality_filter and args.debug_rejected_window:
cv2.namedWindow("flywms scartati", cv2.WINDOW_NORMAL)
if args.ocr_roi_quality_filter and args.debug_rejected_ocr_roi_window:
cv2.namedWindow("flywms etichette scartate", cv2.WINDOW_NORMAL)
frame_id = 0
last_stats_log = time.perf_counter()
@@ -1552,27 +1772,46 @@ def main() -> int:
)
latest_detection = shared.get_latest_detection()
if latest_detection is not None:
display_detections = list(latest_detection.detections)
if args.infer_gaylord_from_label:
inferred_gaylords = infer_gaylords_from_labels(
latest_detection.detections,
latest_detection.source_width,
latest_detection.source_height,
args.label_class,
args.inferred_gaylord_width_factor,
args.inferred_gaylord_height_factor,
args.inferred_gaylord_y_shift,
detection_lag = frame_id - latest_detection.frame_id
if 0 <= detection_lag <= args.capture_overlay_max_lag:
display_detections = list(latest_detection.detections)
if args.infer_gaylord_from_label:
inferred_gaylords = infer_gaylords_from_labels(
latest_detection.detections,
latest_detection.source_width,
latest_detection.source_height,
args.label_class,
args.inferred_gaylord_width_factor,
args.inferred_gaylord_height_factor,
args.inferred_gaylord_y_shift,
)
display_detections.extend(inferred_gaylords)
for det in display_detections:
draw_detection(display, det, args.label_class)
else:
cv2.putText(
display,
f"det lag={detection_lag} frame",
(20, 78),
cv2.FONT_HERSHEY_SIMPLEX,
0.9,
(0, 0, 255),
2,
cv2.LINE_AA,
)
display_detections.extend(inferred_gaylords)
for det in display_detections:
draw_detection(display, det, args.label_class)
draw_ocr_results(display, shared.get_recent_ocr_results())
draw_status(display, format_stats(shared, frame_buffer, roi_buffer, start_time)[:5])
display = resize_preview(display, args.preview_width)
cv2.imshow("flywms capture", display)
debug_yolo, debug_ocr, debug_ocr_text = shared.get_debug_frames()
(
debug_yolo,
debug_ocr,
debug_ocr_text,
debug_rejected_ocr,
debug_rejected_ocr_text,
) = shared.get_debug_frames()
if args.debug_yolo_window and debug_yolo is not None:
cv2.imshow("flywms yolo", debug_yolo)
if (args.debug_ocr_window or not args.no_ocr) and debug_ocr is not None:
@@ -1580,6 +1819,19 @@ def main() -> int:
"flywms ocr",
draw_ocr_debug(debug_ocr, debug_ocr_text, args.preview_width),
)
if (
args.ocr_roi_quality_filter
and args.debug_rejected_ocr_roi_window
and debug_rejected_ocr is not None
):
cv2.imshow(
"flywms etichette scartate",
draw_rejected_roi_debug(
debug_rejected_ocr,
debug_rejected_ocr_text,
args.preview_width,
),
)
if (
args.quality_filter
and args.debug_rejected_window