From 8a8bea121168fa3b6657179164c5b76656785a6c Mon Sep 17 00:00:00 2001 From: administrator Date: Fri, 15 May 2026 18:40:07 +0200 Subject: [PATCH] Milestone navigation simulator with config --- .gitignore | 1 + flywms_navigation.ini | 166 +++++++ flywms_navigation.py | 991 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1158 insertions(+) create mode 100644 flywms_navigation.ini create mode 100644 flywms_navigation.py diff --git a/.gitignore b/.gitignore index cbbda1f..f117592 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ dataset_yolo/labels/*.cache dataset_yolo/labels/*_backup_before_remap_*/ runs/flywms_dataset_check/ runs/flywms_dataset_check_1epoch/ +navigate_snapshots*/ diff --git a/flywms_navigation.ini b/flywms_navigation.ini new file mode 100644 index 0000000..4a8d94f --- /dev/null +++ b/flywms_navigation.ini @@ -0,0 +1,166 @@ +[navigation] +; OBBLIGATORIO: no. +; Ruolo: sorgente video usata per simulare la camera del drone. +; Se vuoto o "none", usa webcam 0. +; Default se non indicato: testhd.mp4 +video = testhd.mp4 + +; OBBLIGATORIO: si. +; Ruolo: modello Ultralytics/YOLO moderno usato per rilevare gaylord ed etichette. +; Default se non indicato: C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt +weights = C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt + +; OBBLIGATORIO: no. +; Ruolo: device usato da Ultralytics. Usa "cpu" ora; con GPU compatibile usare "0". +; Default se non indicato: cpu +ultralytics_device = cpu + +; OBBLIGATORIO: no. +; Ruolo: dimensione input YOLO. 640 e' il valore usato nel training rapido. +; Default se non indicato: 640 +input_size = 640 + +; OBBLIGATORIO: no. +; Ruolo: confidenza minima delle detection accettate dal detector. +; Default se non indicato: 0.25 +min_confidence = 0.25 + +; OBBLIGATORIO: no. +; Ruolo: classe tracciata dalla navigazione. Le altre detection non entrano nel tracker. +; Default se non indicato: gaylord +target_class = gaylord + +; OBBLIGATORIO: no. +; Ruolo: numero massimo di frame in cui una track puo' non essere vista prima di essere rimossa. +; Default se non indicato: 8 +max_track_missed = 8 + +; OBBLIGATORIO: no. +; Ruolo: soglia minima dello score che associa una detection a una track esistente. +; Default se non indicato: 0.25 +min_match_score = 0.25 + +; OBBLIGATORIO: no. +; Ruolo: distanza massima ammessa tra centri bbox, espressa come frazione della larghezza frame. +; Default se non indicato: 0.18 +max_center_distance_ratio = 0.18 + +; OBBLIGATORIO: no. +; Ruolo: mezza ampiezza della fascia azzurra di avvicinamento al centro. +; Non fa scattare la foto: indica solo che la track e' candidata. +; Default se non indicato: 0.18 +center_tolerance_ratio = 0.18 + +; OBBLIGATORIO: no. +; Ruolo: tolleranza stretta dalla linea verticale centrale per scattare la foto. +; La foto parte quando il centro bbox e' entro questa soglia. +; Default se non indicato: 0.035 +snapshot_line_tolerance_ratio = 0.035 + +; OBBLIGATORIO: no. +; Ruolo: limite verticale superiore della fascia utile della scaffalatura. +; Default se non indicato: 0.15 +usable_y_min_ratio = 0.15 + +; OBBLIGATORIO: no. +; Ruolo: limite verticale inferiore della fascia utile della scaffalatura. +; Default se non indicato: 0.85 +usable_y_max_ratio = 0.85 + +; OBBLIGATORIO: no. +; Ruolo: numero minimo di detection confermate prima di considerare affidabile una track. +; Default se non indicato: 3 +min_track_hits = 3 + +; OBBLIGATORIO: no. +; Ruolo: area minima del bbox gaylord rispetto all'intero frame. +; Serve a ignorare oggetti troppo lontani/piccoli. +; Default se non indicato: 0.02 +min_gaylord_area_ratio = 0.02 + +; OBBLIGATORIO: no. +; Ruolo: margine da bordo immagine per considerare un bbox tagliato. +; 0 disabilita questo filtro, utile con il video manuale di test. +; Default se non indicato: 0.0 +edge_margin_ratio = 0.0 + +; OBBLIGATORIO: no. +; Ruolo: padding aggiunto al bbox centrale prima di salvare il crop inviato all'OCR remoto. +; Default se non indicato: 0.03 +ocr_payload_pad_ratio = 0.03 + +; OBBLIGATORIO: no. +; Ruolo: trend minimo dell'area bbox negli ultimi frame. Valori negativi tollerano leggera uscita. +; Default se non indicato: -0.35 +min_area_trend = -0.35 + +; OBBLIGATORIO: no. +; Ruolo: numero di candidati da valutare prima dello snapshot. +; 1 significa: scatta subito quando il centro tocca la linea. +; Default se non indicato: 1 +snapshot_window_frames = 1 + +; OBBLIGATORIO: no. +; Ruolo: directory dove salvare frame debug, crop OCR e snapshots.jsonl. +; Default se non indicato: navigate_snapshots +snapshot_output_dir = navigate_snapshots + +; OBBLIGATORIO: no. +; Ruolo: tempo simulato con cui il drone attende OCR remoto + verifica WMS. +; Default se non indicato: 2.0 +remote_ack_timeout_sec = 2.0 + +; OBBLIGATORIO: no. +; Ruolo: risposta remota simulata. Valori: always-ack, always-nack, alternate. +; Default se non indicato: always-ack +remote_ack_mode = always-ack + +; OBBLIGATORIO: no. +; Ruolo: direzione simulata di ripartenza dopo ACK. Valori: destra, sinistra. +; Default se non indicato: destra +scan_direction = destra + +; OBBLIGATORIO: no. +; Ruolo: larghezza massima delle finestre video di debug. +; Default se non indicato: 1280 +preview_width = 1280 + +; OBBLIGATORIO: no. +; Ruolo: se true, il video di test viene riprodotto rispettando il framerate originale. +; Default se non indicato: true +realtime_playback = true + +; OBBLIGATORIO: no. +; Ruolo: massimo numero di frame da processare. 0 significa tutto il video. +; Default se non indicato: 0 +max_frames = 0 + +; OBBLIGATORIO: no. +; Ruolo: ogni quanti secondi stampare statistiche nel terminale. +; Default se non indicato: 2.0 +stats_interval = 2.0 + +; OBBLIGATORIO: no. +; Ruolo: ogni quanti frame aggiornare il moto apparente stimato dalle track. +; Default se non indicato: 5 +motion_report_interval = 5 + +; OBBLIGATORIO: no. +; Ruolo: movimento medio minimo in pixel per dichiarare destra/sinistra/su/giu. +; Default se non indicato: 1.5 +motion_min_pixels = 1.5 + +; OBBLIGATORIO: no. +; Ruolo: se true, logga nel terminale lo stato delle track e i motivi di non scatto. +; Default se non indicato: true +debug_tracks = true + +; OBBLIGATORIO: no. +; Ruolo: intensita' del flash visuale simulato al momento dello scatto, da 0 a 1. +; Default se non indicato: 0.70 +flash_alpha = 0.70 + +; OBBLIGATORIO: no. +; Ruolo: se true, disabilita tutte le finestre video. Usarlo solo per test headless. +; Default se non indicato: false +no_display = false diff --git a/flywms_navigation.py b/flywms_navigation.py new file mode 100644 index 0000000..5d5f238 --- /dev/null +++ b/flywms_navigation.py @@ -0,0 +1,991 @@ +import argparse +import configparser +import json +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path + +import cv2 +import numpy as np + + +DEFAULT_CONFIG_PATH = "flywms_navigation.ini" + + +@dataclass(frozen=True) +class Detection: + class_id: int + class_name: str + confidence: float + bbox: tuple[int, int, int, int] + + +@dataclass(frozen=True) +class CandidateSnapshot: + frame_id: int + timestamp: float + frame: np.ndarray + bbox: tuple[int, int, int, int] + score: float + center_score: float + size_score: float + cut_score: float + + +@dataclass +class Track: + id: int + bbox: tuple[int, int, int, int] + confidence: float + first_seen_frame: int + last_seen_frame: int + hits: int = 1 + missed: int = 0 + state: str = "entering" + last_candidate_reason: str = "" + pending_remote_response: str = "none" + already_snapshotted: bool = False + bbox_history: list[tuple[int, int, int, int]] = field(default_factory=list) + center_history: list[tuple[float, float]] = field(default_factory=list) + area_history: list[float] = field(default_factory=list) + candidates: list[CandidateSnapshot] = field(default_factory=list) + + def __post_init__(self) -> None: + self._append_history(self.bbox) + + def update(self, bbox: tuple[int, int, int, int], confidence: float, frame_id: int) -> None: + self.bbox = bbox + self.confidence = confidence + self.last_seen_frame = frame_id + self.hits += 1 + self.missed = 0 + self._append_history(bbox) + + def mark_missed(self) -> None: + self.missed += 1 + if self.missed > 0 and self.state != "snapshotted": + self.state = "exiting" + + def _append_history(self, bbox: tuple[int, int, int, int]) -> None: + self.bbox_history.append(bbox) + self.center_history.append(bbox_center(bbox)) + self.area_history.append(float(bbox_area(bbox))) + keep = 20 + self.bbox_history = self.bbox_history[-keep:] + self.center_history = self.center_history[-keep:] + self.area_history = self.area_history[-keep:] + + def area_trend(self) -> float: + if len(self.area_history) < 4: + return 0.0 + old = self.area_history[-4] + new = self.area_history[-1] + return (new - old) / max(old, 1.0) + + +@dataclass(frozen=True) +class NavigationSnapshot: + snapshot_id: int + frame_id: int + timestamp: float + simulated_position: str + track_id: int + bbox: tuple[int, int, int, int] + score: float + debug_frame_path: str + ocr_payload_path: str + + +class UltralyticsDetector: + def __init__(self, model_path: str, device: str): + from ultralytics import YOLO + + self.model = YOLO(model_path) + self.device = device + names = self.model.names + if isinstance(names, dict): + self.classes = [str(names[i]) for i in sorted(names)] + else: + self.classes = [str(name) for name in names] + + def detect( + self, + frame: np.ndarray, + min_confidence: float, + input_size: int, + ) -> tuple[list[Detection], float]: + t0 = time.perf_counter() + results = self.model.predict( + source=frame, + imgsz=input_size, + conf=min_confidence, + device=self.device, + verbose=False, + ) + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + + detections: list[Detection] = [] + if not results: + return detections, elapsed_ms + + boxes = results[0].boxes + if boxes is None: + return detections, elapsed_ms + + xyxy = boxes.xyxy.cpu().numpy() + confs = boxes.conf.cpu().numpy() + clss = boxes.cls.cpu().numpy().astype(int) + for box, conf, cls_id in zip(xyxy, confs, clss): + x1, y1, x2, y2 = [int(round(v)) for v in box] + x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, frame.shape[1], frame.shape[0]) + if x2 <= x1 or y2 <= y1: + continue + class_name = self.classes[cls_id] if 0 <= cls_id < len(self.classes) else str(cls_id) + detections.append(Detection( + class_id=int(cls_id), + class_name=class_name, + confidence=float(conf), + bbox=(x1, y1, x2, y2), + )) + return detections, elapsed_ms + + +class LightweightTracker: + """Greedy bbox tracker: enough to explain and test navigation decisions.""" + + def __init__( + self, + max_missed: int, + min_match_score: float, + max_center_distance_ratio: float, + ): + self.max_missed = max_missed + self.min_match_score = min_match_score + self.max_center_distance_ratio = max_center_distance_ratio + self._next_id = 1 + self.tracks: dict[int, Track] = {} + + def update( + self, + detections: list[Detection], + frame_id: int, + frame_width: int, + ) -> list[Track]: + unmatched_tracks = set(self.tracks.keys()) + unmatched_detections = set(range(len(detections))) + pairs: list[tuple[float, int, int]] = [] + + max_center_distance = max(1.0, frame_width * self.max_center_distance_ratio) + for track_id, track in self.tracks.items(): + for det_idx, det in enumerate(detections): + score = association_score(track.bbox, det.bbox, max_center_distance) + if score >= self.min_match_score: + pairs.append((score, track_id, det_idx)) + + pairs.sort(reverse=True, key=lambda item: item[0]) + for _, track_id, det_idx in pairs: + if track_id not in unmatched_tracks or det_idx not in unmatched_detections: + continue + det = detections[det_idx] + self.tracks[track_id].update(det.bbox, det.confidence, frame_id) + unmatched_tracks.remove(track_id) + unmatched_detections.remove(det_idx) + + for track_id in list(unmatched_tracks): + self.tracks[track_id].mark_missed() + if self.tracks[track_id].missed > self.max_missed: + del self.tracks[track_id] + + for det_idx in unmatched_detections: + det = detections[det_idx] + track_id = self._next_id + self._next_id += 1 + self.tracks[track_id] = Track( + id=track_id, + bbox=det.bbox, + confidence=det.confidence, + first_seen_frame=frame_id, + last_seen_frame=frame_id, + ) + + return list(self.tracks.values()) + + +class NavigationController: + def __init__(self, args): + self.args = args + self.output_dir = Path(args.snapshot_output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + self.metadata_path = self.output_dir / "snapshots.jsonl" + self.snapshot_counter = 0 + self.position_counter = 0 + self.last_command_text = "" + self.last_command_lines: list[str] = [] + self.last_snapshot_frame: np.ndarray | None = None + self.last_ocr_payload_frame: np.ndarray | None = None + self.last_remote_result_text = "" + self.motion_text = "MOTO: n/d" + + def process_track( + self, + track: Track, + frame: np.ndarray, + frame_id: int, + timestamp: float, + ) -> NavigationSnapshot | None: + frame_h, frame_w = frame.shape[:2] + eligible, score_parts = self._is_snapshot_candidate(track, frame_w, frame_h) + self._update_track_state(track, eligible, frame_w) + + if track.already_snapshotted: + return None + + if eligible: + candidate = CandidateSnapshot( + frame_id=frame_id, + timestamp=timestamp, + frame=frame.copy(), + bbox=track.bbox, + score=score_parts["score"], + center_score=score_parts["center_score"], + size_score=score_parts["size_score"], + cut_score=score_parts["cut_score"], + ) + track.candidates.append(candidate) + track.candidates = track.candidates[-self.args.snapshot_window_frames:] + + if len(track.candidates) >= self.args.snapshot_window_frames: + return self._finalize_snapshot(track) + elif track.candidates: + return self._finalize_snapshot(track) + + return None + + def _is_snapshot_candidate( + self, + track: Track, + frame_w: int, + frame_h: int, + ) -> tuple[bool, dict[str, float]]: + x1, y1, x2, y2 = track.bbox + cx, cy = bbox_center(track.bbox) + center_x = frame_w * 0.5 + center_tolerance = max(1.0, frame_w * self.args.center_tolerance_ratio) + snapshot_tolerance = max(1.0, frame_w * self.args.snapshot_line_tolerance_ratio) + center_delta = abs(cx - center_x) + center_score = max(0.0, 1.0 - center_delta / center_tolerance) + + area_ratio = bbox_area(track.bbox) / float(frame_w * frame_h) + size_score = min(1.0, area_ratio / max(self.args.min_gaylord_area_ratio * 4.0, 0.001)) + + if self.args.edge_margin_ratio <= 0: + cut = False + else: + edge_margin_x = frame_w * self.args.edge_margin_ratio + edge_margin_y = frame_h * self.args.edge_margin_ratio + cut = ( + x1 <= edge_margin_x + or y1 <= edge_margin_y + or x2 >= frame_w - edge_margin_x + or y2 >= frame_h - edge_margin_y + ) + cut_score = 0.0 if cut else 1.0 + score = 0.50 * center_score + 0.30 * size_score + 0.20 * cut_score + + in_center_band = center_delta <= center_tolerance + on_snapshot_line = center_delta <= snapshot_tolerance + in_y_band = ( + frame_h * self.args.usable_y_min_ratio + <= cy + <= frame_h * self.args.usable_y_max_ratio + ) + enough_hits = track.hits >= self.args.min_track_hits + large_enough = area_ratio >= self.args.min_gaylord_area_ratio + trend_ok = track.area_trend() >= self.args.min_area_trend + eligible = ( + enough_hits + and on_snapshot_line + and in_y_band + and large_enough + and not cut + and trend_ok + and track.missed == 0 + ) + failed: list[str] = [] + if not enough_hits: + failed.append(f"hits<{self.args.min_track_hits}") + if not in_center_band: + failed.append(f"outside_band={center_delta:.0f}>{center_tolerance:.0f}") + elif not on_snapshot_line: + failed.append(f"wait_line={center_delta:.0f}>{snapshot_tolerance:.0f}") + if not in_y_band: + failed.append("y_band") + if not large_enough: + failed.append(f"area={area_ratio:.3f}<{self.args.min_gaylord_area_ratio:.3f}") + if cut: + failed.append("edge_cut") + if not trend_ok: + failed.append(f"trend={track.area_trend():+.2f}<{self.args.min_area_trend:+.2f}") + if track.missed != 0: + failed.append(f"missed={track.missed}") + track.last_candidate_reason = "ok" if eligible else ",".join(failed) + return eligible, { + "score": score, + "center_score": center_score, + "size_score": size_score, + "cut_score": cut_score, + } + + def _update_track_state(self, track: Track, eligible: bool, frame_w: int) -> None: + if track.already_snapshotted: + track.state = "snapshotted" + return + if track.missed > 0: + track.state = "exiting" + return + cx, _ = bbox_center(track.bbox) + center_delta = abs(cx - frame_w * 0.5) + snapshot_tolerance = frame_w * self.args.snapshot_line_tolerance_ratio + if eligible: + track.state = "centered" + elif track.hits < self.args.min_track_hits: + track.state = "entering" + elif center_delta <= snapshot_tolerance: + track.state = "centered" + elif center_delta <= frame_w * self.args.center_tolerance_ratio: + track.state = "candidate" + else: + track.state = "entering" + + def _finalize_snapshot(self, track: Track) -> NavigationSnapshot | None: + if not track.candidates: + return None + best = max(track.candidates, key=lambda item: item.score) + track.candidates.clear() + track.already_snapshotted = True + track.state = "snapshotted" + + self.snapshot_counter += 1 + self.position_counter += 1 + simulated_position = f"gaylord {self.position_counter}" + debug_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_frame.jpg" + payload_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_ocr_payload.jpg" + debug_path = self.output_dir / debug_name + payload_path = self.output_dir / payload_name + cv2.imwrite(str(debug_path), best.frame) + ocr_payload = crop_with_padding( + best.frame, + best.bbox, + self.args.ocr_payload_pad_ratio, + ) + cv2.imwrite(str(payload_path), ocr_payload) + self.last_snapshot_frame = best.frame.copy() + self.last_ocr_payload_frame = ocr_payload.copy() + + snapshot = NavigationSnapshot( + snapshot_id=self.snapshot_counter, + frame_id=best.frame_id, + timestamp=best.timestamp, + simulated_position=simulated_position, + track_id=track.id, + bbox=best.bbox, + score=best.score, + debug_frame_path=str(debug_path), + ocr_payload_path=str(payload_path), + ) + self._write_metadata(snapshot) + self._print_commands(snapshot) + return snapshot + + def simulate_remote_response(self, snapshot: NavigationSnapshot) -> str: + mode = self.args.remote_ack_mode + if mode == "always-ack": + result = "ACK" + elif mode == "always-nack": + result = "NACK" + else: + result = "ACK" if snapshot.snapshot_id % 2 == 1 else "NACK" + + if result == "ACK": + self.last_remote_result_text = "ACK_RICEVUTO: codice valido su WMS" + resume_command = f"RIPARTI_{self.args.scan_direction.upper()}" + self.last_command_lines.extend([ + self.last_remote_result_text, + resume_command, + ]) + log("[REMOTE] ACK_RICEVUTO codice valido su WMS") + log(f"[CMD] {resume_command}") + else: + self.last_remote_result_text = "NACK_RICEVUTO: riprovare foto" + self.last_command_lines.extend([ + self.last_remote_result_text, + "MICRO_MOVE_CORRETTIVO", + "SCATTA_FOTO_RETRY", + ]) + log("[REMOTE] NACK_RICEVUTO codice assente/non valido") + log("[CMD] MICRO_MOVE_CORRETTIVO") + log("[CMD] SCATTA_FOTO_RETRY") + return result + + def set_motion_text(self, text: str) -> None: + self.motion_text = text + + def _write_metadata(self, snapshot: NavigationSnapshot) -> None: + record = { + "snapshot_id": snapshot.snapshot_id, + "frame_id": snapshot.frame_id, + "timestamp": snapshot.timestamp, + "simulated_position": snapshot.simulated_position, + "drone_pose_simulated": { + "mode": "linear_shelf_scan", + "position_label": snapshot.simulated_position, + }, + "track_id": snapshot.track_id, + "gaylord_bbox": list(snapshot.bbox), + "score": snapshot.score, + "debug_frame_path": snapshot.debug_frame_path, + "ocr_payload_path": snapshot.ocr_payload_path, + } + with self.metadata_path.open("at", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=True) + "\n") + + def _print_commands(self, snapshot: NavigationSnapshot) -> None: + self.last_command_text = ( + f"SNAPSHOT {snapshot.snapshot_id:04d} " + f"track={snapshot.track_id} frame={snapshot.frame_id} " + f"pos={snapshot.simulated_position} score={snapshot.score:.2f}" + ) + self.last_command_lines = [ + self.last_command_text, + "STOP", + f"SCATTA_FOTO {Path(snapshot.debug_frame_path).name}", + f"ESTRAI_BBOX_CENTRALE track={snapshot.track_id}", + f"ASSOCIA_POSIZIONE {snapshot.simulated_position}", + f"INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}", + f"ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s", + ] + log(f"[NAV] {self.last_command_text}") + log("[CMD] STOP") + log(f"[CMD] SCATTA_FOTO {Path(snapshot.debug_frame_path).name}") + log(f"[CMD] ESTRAI_BBOX_CENTRALE track={snapshot.track_id}") + log(f"[CMD] ASSOCIA_POSIZIONE {snapshot.simulated_position}") + log(f"[CMD] INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}") + log(f"[CMD] ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s") + + +def parse_args(): + pre = argparse.ArgumentParser(add_help=False) + pre.add_argument("--config", default=DEFAULT_CONFIG_PATH, help="File configurazione INI") + pre_args, _ = pre.parse_known_args() + defaults = load_navigation_config(pre_args.config) + + ap = argparse.ArgumentParser(parents=[pre]) + ap.add_argument("-v", "--video", default=defaults["video"], help="Percorso video. Se omesso usa webcam 0") + ap.add_argument( + "--weights", + default=defaults["weights"], + help="Modello Ultralytics .pt", + ) + ap.add_argument("--ultralytics-device", default=defaults["ultralytics_device"], help="Device Ultralytics: cpu oppure 0") + ap.add_argument("--input-size", type=int, default=defaults["input_size"], help="Dimensione input YOLO") + ap.add_argument("--min-confidence", type=float, default=defaults["min_confidence"], help="Confidenza minima") + ap.add_argument("--target-class", default=defaults["target_class"], help="Classe da tracciare") + + ap.add_argument("--max-track-missed", type=int, default=defaults["max_track_missed"], help="Frame persi prima di rimuovere una track") + ap.add_argument("--min-match-score", type=float, default=defaults["min_match_score"], help="Soglia associazione detection-track") + ap.add_argument("--max-center-distance-ratio", type=float, default=defaults["max_center_distance_ratio"], help="Distanza max centri per matching") + + ap.add_argument("--center-tolerance-ratio", type=float, default=defaults["center_tolerance_ratio"], help="Mezza ampiezza zona centrale") + ap.add_argument("--snapshot-line-tolerance-ratio", type=float, default=defaults["snapshot_line_tolerance_ratio"], + help="Tolleranza stretta dalla linea centrale per scattare") + ap.add_argument("--usable-y-min-ratio", type=float, default=defaults["usable_y_min_ratio"], help="Limite alto fascia utile Y") + ap.add_argument("--usable-y-max-ratio", type=float, default=defaults["usable_y_max_ratio"], help="Limite basso fascia utile Y") + ap.add_argument("--min-track-hits", type=int, default=defaults["min_track_hits"], help="Detection consecutive minime") + ap.add_argument("--min-gaylord-area-ratio", type=float, default=defaults["min_gaylord_area_ratio"], help="Area bbox minima sul frame") + ap.add_argument("--edge-margin-ratio", type=float, default=defaults["edge_margin_ratio"], help="Margine per considerare bbox tagliato") + ap.add_argument("--ocr-payload-pad-ratio", type=float, default=defaults["ocr_payload_pad_ratio"], + help="Padding intorno al bbox centrale inviato all'OCR remoto") + ap.add_argument("--min-area-trend", type=float, default=defaults["min_area_trend"], help="Trend area minimo ammesso") + ap.add_argument("--snapshot-window-frames", type=int, default=defaults["snapshot_window_frames"], help="Candidati da valutare prima dello snapshot") + ap.add_argument("--snapshot-output-dir", default=defaults["snapshot_output_dir"], help="Directory snapshot e JSONL") + ap.add_argument("--remote-ack-timeout-sec", type=float, default=defaults["remote_ack_timeout_sec"], + help="Tempo simulato di attesa OCR remoto/WMS") + ap.add_argument("--remote-ack-mode", choices=["always-ack", "always-nack", "alternate"], + default=defaults["remote_ack_mode"], help="Risposta remota simulata") + ap.add_argument("--scan-direction", choices=["destra", "sinistra"], default=defaults["scan_direction"], + help="Direzione simulata di ripartenza dopo ACK") + + ap.add_argument("--preview-width", type=int, default=defaults["preview_width"], help="Larghezza preview") + ap.add_argument("--realtime-playback", action="store_true", default=defaults["realtime_playback"], help="Rispetta FPS video") + ap.add_argument("--max-frames", type=int, default=defaults["max_frames"], help="Numero massimo frame; 0 = tutto") + ap.add_argument("--stats-interval", type=float, default=defaults["stats_interval"], help="Intervallo log prestazioni") + ap.add_argument("--motion-report-interval", type=int, default=defaults["motion_report_interval"], + help="Ogni quanti frame aggiornare la direzione moto stimata") + ap.add_argument("--motion-min-pixels", type=float, default=defaults["motion_min_pixels"], + help="Spostamento medio minimo per dichiarare una direzione") + ap.add_argument("--debug-tracks", action="store_true", default=defaults["debug_tracks"], help="Logga stato e criteri delle track") + ap.add_argument("--flash-alpha", type=float, default=defaults["flash_alpha"], help="Intensita' flash 0..1 al momento dello scatto") + ap.add_argument("--no-display", action="store_true", default=defaults["no_display"], help="Disabilita finestra video") + return ap.parse_args() + + +def load_navigation_config(path_str: str) -> dict[str, object]: + defaults: dict[str, object] = { + "video": "testhd.mp4", + "weights": r"C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt", + "ultralytics_device": "cpu", + "input_size": 640, + "min_confidence": 0.25, + "target_class": "gaylord", + "max_track_missed": 8, + "min_match_score": 0.25, + "max_center_distance_ratio": 0.18, + "center_tolerance_ratio": 0.18, + "snapshot_line_tolerance_ratio": 0.035, + "usable_y_min_ratio": 0.15, + "usable_y_max_ratio": 0.85, + "min_track_hits": 3, + "min_gaylord_area_ratio": 0.02, + "edge_margin_ratio": 0.0, + "ocr_payload_pad_ratio": 0.03, + "min_area_trend": -0.35, + "snapshot_window_frames": 1, + "snapshot_output_dir": "navigate_snapshots", + "remote_ack_timeout_sec": 2.0, + "remote_ack_mode": "always-ack", + "scan_direction": "destra", + "preview_width": 1280, + "realtime_playback": True, + "max_frames": 0, + "stats_interval": 2.0, + "motion_report_interval": 5, + "motion_min_pixels": 1.5, + "debug_tracks": True, + "flash_alpha": 0.70, + "no_display": False, + } + + path = Path(path_str) + if not path.exists(): + return defaults + + parser = configparser.ConfigParser() + parser.read(path, encoding="utf-8") + section = parser["navigation"] if parser.has_section("navigation") else {} + + types = {key: type(value) for key, value in defaults.items()} + for key, default_value in defaults.items(): + if key not in section: + continue + if types[key] is bool: + defaults[key] = parser.getboolean("navigation", key, fallback=bool(default_value)) + elif types[key] is int: + defaults[key] = parser.getint("navigation", key, fallback=int(default_value)) + elif types[key] is float: + defaults[key] = parser.getfloat("navigation", key, fallback=float(default_value)) + else: + value = section.get(key, str(default_value)).strip() + defaults[key] = None if value.lower() in ("", "none", "null") else value + return defaults + + +def log(msg: str) -> None: + print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) + + +def require_file(path_str: str, description: str) -> Path: + path = Path(path_str) + if not path.exists(): + log(f"ERRORE: {description} non trovato: {path}") + sys.exit(1) + return path + + +def open_capture(video_arg: str | None): + if video_arg is None: + cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) + if not cap.isOpened(): + cap = cv2.VideoCapture(0) + return cap, "camera:0" + + if str(video_arg).isdigit(): + idx = int(video_arg) + cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW) + if not cap.isOpened(): + cap = cv2.VideoCapture(idx) + return cap, f"camera:{idx}" + + return cv2.VideoCapture(video_arg), str(video_arg) + + +def clip_box(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> tuple[int, int, int, int]: + x1 = max(0, min(x1, w - 1)) + y1 = max(0, min(y1, h - 1)) + x2 = max(0, min(x2, w - 1)) + y2 = max(0, min(y2, h - 1)) + return x1, y1, x2, y2 + + +def crop_with_padding( + frame: np.ndarray, + bbox: tuple[int, int, int, int], + pad_ratio: float, +) -> np.ndarray: + x1, y1, x2, y2 = bbox + bw = x2 - x1 + bh = y2 - y1 + pad_x = int(max(0.0, pad_ratio) * bw) + pad_y = int(max(0.0, pad_ratio) * bh) + cx1, cy1, cx2, cy2 = clip_box( + x1 - pad_x, + y1 - pad_y, + x2 + pad_x, + y2 + pad_y, + frame.shape[1], + frame.shape[0], + ) + return frame[cy1:cy2, cx1:cx2].copy() + + +def bbox_area(bbox: tuple[int, int, int, int]) -> int: + x1, y1, x2, y2 = bbox + return max(0, x2 - x1) * max(0, y2 - y1) + + +def bbox_center(bbox: tuple[int, int, int, int]) -> tuple[float, float]: + x1, y1, x2, y2 = bbox + return (x1 + x2) * 0.5, (y1 + y2) * 0.5 + + +def bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float: + ax1, ay1, ax2, ay2 = a + bx1, by1, bx2, by2 = b + ix1 = max(ax1, bx1) + iy1 = max(ay1, by1) + ix2 = min(ax2, bx2) + iy2 = min(ay2, by2) + inter = bbox_area((ix1, iy1, ix2, iy2)) + union = bbox_area(a) + bbox_area(b) - inter + if union <= 0: + return 0.0 + return inter / float(union) + + +def association_score( + track_bbox: tuple[int, int, int, int], + det_bbox: tuple[int, int, int, int], + max_center_distance: float, +) -> float: + iou = bbox_iou(track_bbox, det_bbox) + tx, ty = bbox_center(track_bbox) + dx, dy = bbox_center(det_bbox) + center_dist = float(np.hypot(tx - dx, ty - dy)) + center_similarity = max(0.0, 1.0 - center_dist / max_center_distance) + return 0.70 * iou + 0.30 * center_similarity + + +def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray: + h, w = frame.shape[:2] + if max_width <= 0 or w <= max_width: + return frame + scale = max_width / float(w) + return cv2.resize(frame, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LINEAR) + + +def draw_navigation_debug( + frame: np.ndarray, + tracks: list[Track], + args, + last_command_text: str, + fps_text: str, +) -> np.ndarray: + display = frame.copy() + h, w = display.shape[:2] + center_x = int(w * 0.5) + tol = int(w * args.center_tolerance_ratio) + y_min = int(h * args.usable_y_min_ratio) + y_max = int(h * args.usable_y_max_ratio) + + cv2.rectangle(display, (center_x - tol, y_min), (center_x + tol, y_max), (255, 255, 0), 4) + cv2.line(display, (center_x, 0), (center_x, h), (255, 255, 0), 3) + cv2.line(display, (0, y_min), (w, y_min), (100, 100, 100), 2) + cv2.line(display, (0, y_max), (w, y_max), (100, 100, 100), 2) + + for track in tracks: + x1, y1, x2, y2 = track.bbox + color = state_color(track.state) + thickness = 8 if track.state == "centered" else 5 + cv2.rectangle(display, (x1, y1), (x2, y2), color, thickness) + cx, cy = bbox_center(track.bbox) + cv2.circle(display, (int(cx), int(cy)), 12, color, -1) + cv2.circle(display, (int(cx), int(cy)), 18, (0, 0, 0), 3) + text = ( + f"id={track.id} {track.state} conf={track.confidence:.2f} " + f"hits={track.hits} trend={track.area_trend():+.2f}" + ) + cv2.putText( + display, + text, + (x1, max(24, y1 - 8)), + cv2.FONT_HERSHEY_SIMPLEX, + 0.78, + color, + 3, + cv2.LINE_AA, + ) + + cv2.putText(display, fps_text, (20, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.85, (0, 0, 255), 2) + if last_command_text: + cv2.putText(display, last_command_text, (20, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 255), 2) + return resize_preview(display, args.preview_width) + + +def draw_commands_window(command_lines: list[str], motion_text: str) -> np.ndarray: + lines = command_lines if command_lines else ["Nessun comando generato"] + canvas_h = max(340, 84 + len(lines[:10]) * 34) + canvas = np.full((canvas_h, 980, 3), 245, dtype=np.uint8) + cv2.putText( + canvas, + "COMANDI NAVIGAZIONE", + (24, 42), + cv2.FONT_HERSHEY_SIMPLEX, + 1.0, + (0, 0, 0), + 2, + cv2.LINE_AA, + ) + cv2.putText( + canvas, + motion_text, + (24, 76), + cv2.FONT_HERSHEY_SIMPLEX, + 0.80, + (120, 0, 120), + 2, + cv2.LINE_AA, + ) + y = 122 + for idx, line in enumerate(lines[:10]): + color = (0, 0, 180) if idx == 0 else (0, 90, 0) + cv2.putText( + canvas, + line, + (24, y), + cv2.FONT_HERSHEY_SIMPLEX, + 0.82, + color, + 2, + cv2.LINE_AA, + ) + y += 36 + return canvas + + +def apply_flash(frame: np.ndarray, alpha: float) -> np.ndarray: + flash = np.full_like(frame, 255) + alpha = min(max(alpha, 0.0), 1.0) + return cv2.addWeighted(frame, 1.0 - alpha, flash, alpha, 0.0) + + +def estimate_motion_from_tracks(tracks: list[Track], min_pixels: float) -> str: + deltas: list[tuple[float, float]] = [] + for track in tracks: + if track.missed != 0 or len(track.center_history) < 2: + continue + x0, y0 = track.center_history[-2] + x1, y1 = track.center_history[-1] + deltas.append((x1 - x0, y1 - y0)) + + if not deltas: + return "MOTO: n/d" + + dx = sum(delta[0] for delta in deltas) / len(deltas) + dy = sum(delta[1] for delta in deltas) / len(deltas) + abs_dx = abs(dx) + abs_dy = abs(dy) + + if abs_dx < min_pixels and abs_dy < min_pixels: + direction = "stabile" + elif abs_dx >= abs_dy: + direction = "destra" if dx > 0 else "sinistra" + else: + direction = "giu" if dy > 0 else "su" + + return f"MOTO: {direction} dx={dx:+.1f}px dy={dy:+.1f}px tracks={len(deltas)}" + + +def state_color(state: str) -> tuple[int, int, int]: + if state == "centered": + return (0, 255, 255) + if state == "snapshotted": + return (255, 0, 255) + if state == "candidate": + return (0, 255, 0) + if state == "exiting": + return (0, 140, 255) + return (255, 255, 255) + + +def main() -> int: + args = parse_args() + require_file(args.weights, "modello Ultralytics") + + detector = UltralyticsDetector(args.weights, args.ultralytics_device) + log(f"Classi modello: {detector.classes}") + log("Nota tracker: questa versione usa tracking geometrico interno; ByteTrack/BoT-SORT restano candidati per confronto successivo.") + + cap, source_name = open_capture(args.video) + if not cap.isOpened(): + log(f"ERRORE: impossibile aprire sorgente video: {source_name}") + return 1 + + video_fps = cap.get(cv2.CAP_PROP_FPS) + frame_delay = 1.0 / video_fps if args.realtime_playback and video_fps and video_fps > 1 else 0.0 + tracker = LightweightTracker( + max_missed=args.max_track_missed, + min_match_score=args.min_match_score, + max_center_distance_ratio=args.max_center_distance_ratio, + ) + navigator = NavigationController(args) + + if not args.no_display: + cv2.namedWindow("flywms navigate", cv2.WINDOW_NORMAL) + cv2.namedWindow("flywms snapshot", cv2.WINDOW_NORMAL) + cv2.namedWindow("flywms comandi", cv2.WINDOW_NORMAL) + + frame_id = 0 + start_time = time.perf_counter() + last_stats = start_time + last_loop_end = start_time + yolo_total_ms = 0.0 + yolo_cycles = 0 + + try: + while True: + if frame_delay > 0: + now = time.perf_counter() + sleep_for = frame_delay - (now - last_loop_end) + if sleep_for > 0: + time.sleep(sleep_for) + last_loop_end = time.perf_counter() + + ok, frame = cap.read() + if not ok: + log("Fine stream") + break + frame_id += 1 + timestamp = time.perf_counter() + if args.max_frames > 0 and frame_id > args.max_frames: + log(f"Raggiunto --max-frames={args.max_frames}") + break + + detections, yolo_ms = detector.detect(frame, args.min_confidence, args.input_size) + yolo_total_ms += yolo_ms + yolo_cycles += 1 + gaylords = [ + det for det in detections + if det.class_name.strip().lower() == args.target_class.strip().lower() + ] + + tracks = tracker.update(gaylords, frame_id, frame.shape[1]) + if args.motion_report_interval > 0 and frame_id % args.motion_report_interval == 0: + navigator.set_motion_text( + estimate_motion_from_tracks(tracks, args.motion_min_pixels) + ) + new_snapshots: list[NavigationSnapshot] = [] + for track in tracks: + if track.missed == 0: + snapshot = navigator.process_track(track, frame, frame_id, timestamp) + if snapshot is not None: + new_snapshots.append(snapshot) + if args.no_display and new_snapshots: + if args.remote_ack_timeout_sec > 0: + time.sleep(args.remote_ack_timeout_sec) + for snapshot in new_snapshots: + navigator.simulate_remote_response(snapshot) + + now = time.perf_counter() + if now - last_stats >= args.stats_interval: + elapsed = max(now - start_time, 0.001) + avg_yolo = yolo_total_ms / max(yolo_cycles, 1) + active = sum(1 for t in tracks if t.missed == 0) + log( + f"fps={frame_id / elapsed:.1f} yolo_fps={yolo_cycles / elapsed:.1f} " + f"avg_yolo={avg_yolo:.1f}ms det={len(gaylords)} tracks={len(tracks)} active={active} " + f"snapshots={navigator.snapshot_counter} {navigator.motion_text}" + ) + if args.debug_tracks: + for track in tracks: + cx, cy = bbox_center(track.bbox) + area_ratio = bbox_area(track.bbox) / float(frame.shape[0] * frame.shape[1]) + log( + f" track={track.id} state={track.state} hits={track.hits} " + f"missed={track.missed} center=({cx:.0f},{cy:.0f}) " + f"area={area_ratio:.3f} trend={track.area_trend():+.2f} " + f"reason={track.last_candidate_reason}" + ) + last_stats = now + + if not args.no_display: + elapsed = max(time.perf_counter() - start_time, 0.001) + fps_text = ( + f"frame={frame_id} fps={frame_id / elapsed:.1f} " + f"det={len(gaylords)} tracks={len(tracks)} snap={navigator.snapshot_counter}" + ) + display = draw_navigation_debug( + frame, + tracks, + args, + navigator.last_command_text, + fps_text, + ) + cv2.imshow("flywms navigate", display) + if navigator.last_ocr_payload_frame is not None: + snapshot_display = resize_preview(navigator.last_ocr_payload_frame, args.preview_width) + cv2.imshow("flywms snapshot", snapshot_display) + cv2.imshow( + "flywms comandi", + draw_commands_window(navigator.last_command_lines, navigator.motion_text), + ) + + if new_snapshots: + flash_display = apply_flash(display, args.flash_alpha) + cv2.imshow("flywms navigate", flash_display) + if navigator.last_ocr_payload_frame is not None: + flash_snapshot = apply_flash( + resize_preview(navigator.last_ocr_payload_frame, args.preview_width), + args.flash_alpha, + ) + cv2.imshow("flywms snapshot", flash_snapshot) + cv2.imshow( + "flywms comandi", + draw_commands_window(navigator.last_command_lines, navigator.motion_text), + ) + pause_ms = max(1, int(args.remote_ack_timeout_sec * 1000)) + key = cv2.waitKey(pause_ms) & 0xFF + if key in (27, ord("q")): + log("Interrotto da tastiera") + break + for snapshot in new_snapshots: + navigator.simulate_remote_response(snapshot) + cv2.imshow( + "flywms comandi", + draw_commands_window(navigator.last_command_lines, navigator.motion_text), + ) + + key = cv2.waitKey(1) & 0xFF + if key in (27, ord("q")): + log("Interrotto da tastiera") + break + finally: + cap.release() + if not args.no_display: + cv2.destroyAllWindows() + + log(f"Snapshot salvati in: {Path(args.snapshot_output_dir).resolve()}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())