import argparse import configparser import json import sys import time from dataclasses import dataclass, field from pathlib import Path import cv2 import numpy as np DEFAULT_CONFIG_PATH = "flywms_navigation.ini" @dataclass(frozen=True) class Detection: class_id: int class_name: str confidence: float bbox: tuple[int, int, int, int] @dataclass(frozen=True) class CandidateSnapshot: frame_id: int timestamp: float frame: np.ndarray bbox: tuple[int, int, int, int] score: float center_score: float size_score: float cut_score: float @dataclass class Track: id: int bbox: tuple[int, int, int, int] confidence: float first_seen_frame: int last_seen_frame: int hits: int = 1 missed: int = 0 state: str = "entering" last_candidate_reason: str = "" pending_remote_response: str = "none" already_snapshotted: bool = False bbox_history: list[tuple[int, int, int, int]] = field(default_factory=list) center_history: list[tuple[float, float]] = field(default_factory=list) area_history: list[float] = field(default_factory=list) candidates: list[CandidateSnapshot] = field(default_factory=list) def __post_init__(self) -> None: self._append_history(self.bbox) def update(self, bbox: tuple[int, int, int, int], confidence: float, frame_id: int) -> None: self.bbox = bbox self.confidence = confidence self.last_seen_frame = frame_id self.hits += 1 self.missed = 0 self._append_history(bbox) def mark_missed(self) -> None: self.missed += 1 if self.missed > 0 and self.state != "snapshotted": self.state = "exiting" def _append_history(self, bbox: tuple[int, int, int, int]) -> None: self.bbox_history.append(bbox) self.center_history.append(bbox_center(bbox)) self.area_history.append(float(bbox_area(bbox))) keep = 20 self.bbox_history = self.bbox_history[-keep:] self.center_history = self.center_history[-keep:] self.area_history = self.area_history[-keep:] def area_trend(self) -> float: if len(self.area_history) < 4: return 0.0 old = self.area_history[-4] new = self.area_history[-1] return (new - old) / max(old, 1.0) @dataclass(frozen=True) class NavigationSnapshot: snapshot_id: int frame_id: int timestamp: float simulated_position: str track_id: int bbox: tuple[int, int, int, int] score: float debug_frame_path: str ocr_payload_path: str class UltralyticsDetector: def __init__(self, model_path: str, device: str): from ultralytics import YOLO self.model = YOLO(model_path) self.device = device names = self.model.names if isinstance(names, dict): self.classes = [str(names[i]) for i in sorted(names)] else: self.classes = [str(name) for name in names] def detect( self, frame: np.ndarray, min_confidence: float, input_size: int, ) -> tuple[list[Detection], float]: t0 = time.perf_counter() results = self.model.predict( source=frame, imgsz=input_size, conf=min_confidence, device=self.device, verbose=False, ) elapsed_ms = (time.perf_counter() - t0) * 1000.0 detections: list[Detection] = [] if not results: return detections, elapsed_ms boxes = results[0].boxes if boxes is None: return detections, elapsed_ms xyxy = boxes.xyxy.cpu().numpy() confs = boxes.conf.cpu().numpy() clss = boxes.cls.cpu().numpy().astype(int) for box, conf, cls_id in zip(xyxy, confs, clss): x1, y1, x2, y2 = [int(round(v)) for v in box] x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, frame.shape[1], frame.shape[0]) if x2 <= x1 or y2 <= y1: continue class_name = self.classes[cls_id] if 0 <= cls_id < len(self.classes) else str(cls_id) detections.append(Detection( class_id=int(cls_id), class_name=class_name, confidence=float(conf), bbox=(x1, y1, x2, y2), )) return detections, elapsed_ms class LightweightTracker: """Greedy bbox tracker: enough to explain and test navigation decisions.""" def __init__( self, max_missed: int, min_match_score: float, max_center_distance_ratio: float, ): self.max_missed = max_missed self.min_match_score = min_match_score self.max_center_distance_ratio = max_center_distance_ratio self._next_id = 1 self.tracks: dict[int, Track] = {} def update( self, detections: list[Detection], frame_id: int, frame_width: int, ) -> list[Track]: unmatched_tracks = set(self.tracks.keys()) unmatched_detections = set(range(len(detections))) pairs: list[tuple[float, int, int]] = [] max_center_distance = max(1.0, frame_width * self.max_center_distance_ratio) for track_id, track in self.tracks.items(): for det_idx, det in enumerate(detections): score = association_score(track.bbox, det.bbox, max_center_distance) if score >= self.min_match_score: pairs.append((score, track_id, det_idx)) pairs.sort(reverse=True, key=lambda item: item[0]) for _, track_id, det_idx in pairs: if track_id not in unmatched_tracks or det_idx not in unmatched_detections: continue det = detections[det_idx] self.tracks[track_id].update(det.bbox, det.confidence, frame_id) unmatched_tracks.remove(track_id) unmatched_detections.remove(det_idx) for track_id in list(unmatched_tracks): self.tracks[track_id].mark_missed() if self.tracks[track_id].missed > self.max_missed: del self.tracks[track_id] for det_idx in unmatched_detections: det = detections[det_idx] track_id = self._next_id self._next_id += 1 self.tracks[track_id] = Track( id=track_id, bbox=det.bbox, confidence=det.confidence, first_seen_frame=frame_id, last_seen_frame=frame_id, ) return list(self.tracks.values()) class NavigationController: def __init__(self, args): self.args = args self.output_dir = Path(args.snapshot_output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.metadata_path = self.output_dir / "snapshots.jsonl" self.snapshot_counter = 0 self.position_counter = 0 self.last_command_text = "" self.last_command_lines: list[str] = [] self.last_snapshot_frame: np.ndarray | None = None self.last_ocr_payload_frame: np.ndarray | None = None self.last_remote_result_text = "" self.motion_text = "MOTO: n/d" def process_track( self, track: Track, frame: np.ndarray, frame_id: int, timestamp: float, ) -> NavigationSnapshot | None: frame_h, frame_w = frame.shape[:2] eligible, score_parts = self._is_snapshot_candidate(track, frame_w, frame_h) self._update_track_state(track, eligible, frame_w) if track.already_snapshotted: return None if eligible: candidate = CandidateSnapshot( frame_id=frame_id, timestamp=timestamp, frame=frame.copy(), bbox=track.bbox, score=score_parts["score"], center_score=score_parts["center_score"], size_score=score_parts["size_score"], cut_score=score_parts["cut_score"], ) track.candidates.append(candidate) track.candidates = track.candidates[-self.args.snapshot_window_frames:] if len(track.candidates) >= self.args.snapshot_window_frames: return self._finalize_snapshot(track) elif track.candidates: return self._finalize_snapshot(track) return None def _is_snapshot_candidate( self, track: Track, frame_w: int, frame_h: int, ) -> tuple[bool, dict[str, float]]: x1, y1, x2, y2 = track.bbox cx, cy = bbox_center(track.bbox) center_x = frame_w * 0.5 center_tolerance = max(1.0, frame_w * self.args.center_tolerance_ratio) snapshot_tolerance = max(1.0, frame_w * self.args.snapshot_line_tolerance_ratio) center_delta = abs(cx - center_x) center_score = max(0.0, 1.0 - center_delta / center_tolerance) area_ratio = bbox_area(track.bbox) / float(frame_w * frame_h) size_score = min(1.0, area_ratio / max(self.args.min_gaylord_area_ratio * 4.0, 0.001)) if self.args.edge_margin_ratio <= 0: cut = False else: edge_margin_x = frame_w * self.args.edge_margin_ratio edge_margin_y = frame_h * self.args.edge_margin_ratio cut = ( x1 <= edge_margin_x or y1 <= edge_margin_y or x2 >= frame_w - edge_margin_x or y2 >= frame_h - edge_margin_y ) cut_score = 0.0 if cut else 1.0 score = 0.50 * center_score + 0.30 * size_score + 0.20 * cut_score in_center_band = center_delta <= center_tolerance on_snapshot_line = center_delta <= snapshot_tolerance in_y_band = ( frame_h * self.args.usable_y_min_ratio <= cy <= frame_h * self.args.usable_y_max_ratio ) enough_hits = track.hits >= self.args.min_track_hits large_enough = area_ratio >= self.args.min_gaylord_area_ratio trend_ok = track.area_trend() >= self.args.min_area_trend eligible = ( enough_hits and on_snapshot_line and in_y_band and large_enough and not cut and trend_ok and track.missed == 0 ) failed: list[str] = [] if not enough_hits: failed.append(f"hits<{self.args.min_track_hits}") if not in_center_band: failed.append(f"outside_band={center_delta:.0f}>{center_tolerance:.0f}") elif not on_snapshot_line: failed.append(f"wait_line={center_delta:.0f}>{snapshot_tolerance:.0f}") if not in_y_band: failed.append("y_band") if not large_enough: failed.append(f"area={area_ratio:.3f}<{self.args.min_gaylord_area_ratio:.3f}") if cut: failed.append("edge_cut") if not trend_ok: failed.append(f"trend={track.area_trend():+.2f}<{self.args.min_area_trend:+.2f}") if track.missed != 0: failed.append(f"missed={track.missed}") track.last_candidate_reason = "ok" if eligible else ",".join(failed) return eligible, { "score": score, "center_score": center_score, "size_score": size_score, "cut_score": cut_score, } def _update_track_state(self, track: Track, eligible: bool, frame_w: int) -> None: if track.already_snapshotted: track.state = "snapshotted" return if track.missed > 0: track.state = "exiting" return cx, _ = bbox_center(track.bbox) center_delta = abs(cx - frame_w * 0.5) snapshot_tolerance = frame_w * self.args.snapshot_line_tolerance_ratio if eligible: track.state = "centered" elif track.hits < self.args.min_track_hits: track.state = "entering" elif center_delta <= snapshot_tolerance: track.state = "centered" elif center_delta <= frame_w * self.args.center_tolerance_ratio: track.state = "candidate" else: track.state = "entering" def _finalize_snapshot(self, track: Track) -> NavigationSnapshot | None: if not track.candidates: return None best = max(track.candidates, key=lambda item: item.score) track.candidates.clear() track.already_snapshotted = True track.state = "snapshotted" self.snapshot_counter += 1 self.position_counter += 1 simulated_position = f"gaylord {self.position_counter}" debug_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_frame.jpg" payload_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_ocr_payload.jpg" debug_path = self.output_dir / debug_name payload_path = self.output_dir / payload_name cv2.imwrite(str(debug_path), best.frame) ocr_payload = crop_with_padding( best.frame, best.bbox, self.args.ocr_payload_pad_ratio, ) cv2.imwrite(str(payload_path), ocr_payload) self.last_snapshot_frame = best.frame.copy() self.last_ocr_payload_frame = ocr_payload.copy() snapshot = NavigationSnapshot( snapshot_id=self.snapshot_counter, frame_id=best.frame_id, timestamp=best.timestamp, simulated_position=simulated_position, track_id=track.id, bbox=best.bbox, score=best.score, debug_frame_path=str(debug_path), ocr_payload_path=str(payload_path), ) self._write_metadata(snapshot) self._print_commands(snapshot) return snapshot def simulate_remote_response(self, snapshot: NavigationSnapshot) -> str: mode = self.args.remote_ack_mode if mode == "always-ack": result = "ACK" elif mode == "always-nack": result = "NACK" else: result = "ACK" if snapshot.snapshot_id % 2 == 1 else "NACK" if result == "ACK": self.last_remote_result_text = "ACK_RICEVUTO: codice valido su WMS" resume_command = f"RIPARTI_{self.args.scan_direction.upper()}" self.last_command_lines.extend([ self.last_remote_result_text, resume_command, ]) log("[REMOTE] ACK_RICEVUTO codice valido su WMS") log(f"[CMD] {resume_command}") else: self.last_remote_result_text = "NACK_RICEVUTO: riprovare foto" self.last_command_lines.extend([ self.last_remote_result_text, "MICRO_MOVE_CORRETTIVO", "SCATTA_FOTO_RETRY", ]) log("[REMOTE] NACK_RICEVUTO codice assente/non valido") log("[CMD] MICRO_MOVE_CORRETTIVO") log("[CMD] SCATTA_FOTO_RETRY") return result def set_motion_text(self, text: str) -> None: self.motion_text = text def _write_metadata(self, snapshot: NavigationSnapshot) -> None: record = { "snapshot_id": snapshot.snapshot_id, "frame_id": snapshot.frame_id, "timestamp": snapshot.timestamp, "simulated_position": snapshot.simulated_position, "drone_pose_simulated": { "mode": "linear_shelf_scan", "position_label": snapshot.simulated_position, }, "track_id": snapshot.track_id, "gaylord_bbox": list(snapshot.bbox), "score": snapshot.score, "debug_frame_path": snapshot.debug_frame_path, "ocr_payload_path": snapshot.ocr_payload_path, } with self.metadata_path.open("at", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=True) + "\n") def _print_commands(self, snapshot: NavigationSnapshot) -> None: self.last_command_text = ( f"SNAPSHOT {snapshot.snapshot_id:04d} " f"track={snapshot.track_id} frame={snapshot.frame_id} " f"pos={snapshot.simulated_position} score={snapshot.score:.2f}" ) self.last_command_lines = [ self.last_command_text, "STOP", f"SCATTA_FOTO {Path(snapshot.debug_frame_path).name}", f"ESTRAI_BBOX_CENTRALE track={snapshot.track_id}", f"ASSOCIA_POSIZIONE {snapshot.simulated_position}", f"INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}", f"ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s", ] log(f"[NAV] {self.last_command_text}") log("[CMD] STOP") log(f"[CMD] SCATTA_FOTO {Path(snapshot.debug_frame_path).name}") log(f"[CMD] ESTRAI_BBOX_CENTRALE track={snapshot.track_id}") log(f"[CMD] ASSOCIA_POSIZIONE {snapshot.simulated_position}") log(f"[CMD] INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}") log(f"[CMD] ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s") def parse_args(): pre = argparse.ArgumentParser(add_help=False) pre.add_argument("--config", default=DEFAULT_CONFIG_PATH, help="File configurazione INI") pre_args, _ = pre.parse_known_args() defaults = load_navigation_config(pre_args.config) ap = argparse.ArgumentParser(parents=[pre]) ap.add_argument("-v", "--video", default=defaults["video"], help="Percorso video. Se omesso usa webcam 0") ap.add_argument( "--weights", default=defaults["weights"], help="Modello Ultralytics .pt", ) ap.add_argument("--ultralytics-device", default=defaults["ultralytics_device"], help="Device Ultralytics: cpu oppure 0") ap.add_argument("--input-size", type=int, default=defaults["input_size"], help="Dimensione input YOLO") ap.add_argument("--min-confidence", type=float, default=defaults["min_confidence"], help="Confidenza minima") ap.add_argument("--target-class", default=defaults["target_class"], help="Classe da tracciare") ap.add_argument("--max-track-missed", type=int, default=defaults["max_track_missed"], help="Frame persi prima di rimuovere una track") ap.add_argument("--min-match-score", type=float, default=defaults["min_match_score"], help="Soglia associazione detection-track") ap.add_argument("--max-center-distance-ratio", type=float, default=defaults["max_center_distance_ratio"], help="Distanza max centri per matching") ap.add_argument("--center-tolerance-ratio", type=float, default=defaults["center_tolerance_ratio"], help="Mezza ampiezza zona centrale") ap.add_argument("--snapshot-line-tolerance-ratio", type=float, default=defaults["snapshot_line_tolerance_ratio"], help="Tolleranza stretta dalla linea centrale per scattare") ap.add_argument("--usable-y-min-ratio", type=float, default=defaults["usable_y_min_ratio"], help="Limite alto fascia utile Y") ap.add_argument("--usable-y-max-ratio", type=float, default=defaults["usable_y_max_ratio"], help="Limite basso fascia utile Y") ap.add_argument("--min-track-hits", type=int, default=defaults["min_track_hits"], help="Detection consecutive minime") ap.add_argument("--min-gaylord-area-ratio", type=float, default=defaults["min_gaylord_area_ratio"], help="Area bbox minima sul frame") ap.add_argument("--edge-margin-ratio", type=float, default=defaults["edge_margin_ratio"], help="Margine per considerare bbox tagliato") ap.add_argument("--ocr-payload-pad-ratio", type=float, default=defaults["ocr_payload_pad_ratio"], help="Padding intorno al bbox centrale inviato all'OCR remoto") ap.add_argument("--min-area-trend", type=float, default=defaults["min_area_trend"], help="Trend area minimo ammesso") ap.add_argument("--snapshot-window-frames", type=int, default=defaults["snapshot_window_frames"], help="Candidati da valutare prima dello snapshot") ap.add_argument("--snapshot-output-dir", default=defaults["snapshot_output_dir"], help="Directory snapshot e JSONL") ap.add_argument("--remote-ack-timeout-sec", type=float, default=defaults["remote_ack_timeout_sec"], help="Tempo simulato di attesa OCR remoto/WMS") ap.add_argument("--remote-ack-mode", choices=["always-ack", "always-nack", "alternate"], default=defaults["remote_ack_mode"], help="Risposta remota simulata") ap.add_argument("--scan-direction", choices=["destra", "sinistra"], default=defaults["scan_direction"], help="Direzione simulata di ripartenza dopo ACK") ap.add_argument("--preview-width", type=int, default=defaults["preview_width"], help="Larghezza preview") ap.add_argument("--realtime-playback", action="store_true", default=defaults["realtime_playback"], help="Rispetta FPS video") ap.add_argument("--preview-fps", type=float, default=defaults["preview_fps"], help="FPS massimo per lettura/preview realtime. 0 = FPS sorgente") ap.add_argument("--yolo-fps", type=float, default=defaults["yolo_fps"], help="FPS massimo per inferenza YOLO. 0 = ogni frame di preview") ap.add_argument("--max-frames", type=int, default=defaults["max_frames"], help="Numero massimo frame; 0 = tutto") ap.add_argument("--stats-interval", type=float, default=defaults["stats_interval"], help="Intervallo log prestazioni") ap.add_argument("--motion-report-interval", type=int, default=defaults["motion_report_interval"], help="Ogni quanti frame aggiornare la direzione moto stimata") ap.add_argument("--motion-min-pixels", type=float, default=defaults["motion_min_pixels"], help="Spostamento medio minimo per dichiarare una direzione") ap.add_argument("--debug-tracks", action="store_true", default=defaults["debug_tracks"], help="Logga stato e criteri delle track") ap.add_argument("--flash-alpha", type=float, default=defaults["flash_alpha"], help="Intensita' flash 0..1 al momento dello scatto") ap.add_argument("--no-display", action="store_true", default=defaults["no_display"], help="Disabilita finestra video") return ap.parse_args() def load_navigation_config(path_str: str) -> dict[str, object]: defaults: dict[str, object] = { "video": "testhd.mp4", "weights": r"C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt", "ultralytics_device": "cpu", "input_size": 640, "min_confidence": 0.25, "target_class": "gaylord", "max_track_missed": 8, "min_match_score": 0.25, "max_center_distance_ratio": 0.18, "center_tolerance_ratio": 0.18, "snapshot_line_tolerance_ratio": 0.035, "usable_y_min_ratio": 0.15, "usable_y_max_ratio": 0.85, "min_track_hits": 3, "min_gaylord_area_ratio": 0.02, "edge_margin_ratio": 0.0, "ocr_payload_pad_ratio": 0.03, "min_area_trend": -0.35, "snapshot_window_frames": 1, "snapshot_output_dir": "navigate_snapshots", "remote_ack_timeout_sec": 2.0, "remote_ack_mode": "always-ack", "scan_direction": "destra", "preview_width": 1280, "realtime_playback": True, "preview_fps": 24.0, "yolo_fps": 15.0, "max_frames": 0, "stats_interval": 2.0, "motion_report_interval": 5, "motion_min_pixels": 1.5, "debug_tracks": True, "flash_alpha": 0.70, "no_display": False, } path = Path(path_str) if not path.exists(): return defaults parser = configparser.ConfigParser() parser.read(path, encoding="utf-8") section = parser["navigation"] if parser.has_section("navigation") else {} types = {key: type(value) for key, value in defaults.items()} for key, default_value in defaults.items(): if key not in section: continue if types[key] is bool: defaults[key] = parser.getboolean("navigation", key, fallback=bool(default_value)) elif types[key] is int: defaults[key] = parser.getint("navigation", key, fallback=int(default_value)) elif types[key] is float: defaults[key] = parser.getfloat("navigation", key, fallback=float(default_value)) else: value = section.get(key, str(default_value)).strip() defaults[key] = None if value.lower() in ("", "none", "null") else value return defaults def log(msg: str) -> None: print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) def require_file(path_str: str, description: str) -> Path: path = Path(path_str) if not path.exists(): log(f"ERRORE: {description} non trovato: {path}") sys.exit(1) return path def open_capture(video_arg: str | None): if video_arg is None: cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) if not cap.isOpened(): cap = cv2.VideoCapture(0) return cap, "camera:0" if str(video_arg).isdigit(): idx = int(video_arg) cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW) if not cap.isOpened(): cap = cv2.VideoCapture(idx) return cap, f"camera:{idx}" return cv2.VideoCapture(video_arg), str(video_arg) def clip_box(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> tuple[int, int, int, int]: x1 = max(0, min(x1, w - 1)) y1 = max(0, min(y1, h - 1)) x2 = max(0, min(x2, w - 1)) y2 = max(0, min(y2, h - 1)) return x1, y1, x2, y2 def crop_with_padding( frame: np.ndarray, bbox: tuple[int, int, int, int], pad_ratio: float, ) -> np.ndarray: x1, y1, x2, y2 = bbox bw = x2 - x1 bh = y2 - y1 pad_x = int(max(0.0, pad_ratio) * bw) pad_y = int(max(0.0, pad_ratio) * bh) cx1, cy1, cx2, cy2 = clip_box( x1 - pad_x, y1 - pad_y, x2 + pad_x, y2 + pad_y, frame.shape[1], frame.shape[0], ) return frame[cy1:cy2, cx1:cx2].copy() def bbox_area(bbox: tuple[int, int, int, int]) -> int: x1, y1, x2, y2 = bbox return max(0, x2 - x1) * max(0, y2 - y1) def bbox_center(bbox: tuple[int, int, int, int]) -> tuple[float, float]: x1, y1, x2, y2 = bbox return (x1 + x2) * 0.5, (y1 + y2) * 0.5 def bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float: ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b ix1 = max(ax1, bx1) iy1 = max(ay1, by1) ix2 = min(ax2, bx2) iy2 = min(ay2, by2) inter = bbox_area((ix1, iy1, ix2, iy2)) union = bbox_area(a) + bbox_area(b) - inter if union <= 0: return 0.0 return inter / float(union) def association_score( track_bbox: tuple[int, int, int, int], det_bbox: tuple[int, int, int, int], max_center_distance: float, ) -> float: iou = bbox_iou(track_bbox, det_bbox) tx, ty = bbox_center(track_bbox) dx, dy = bbox_center(det_bbox) center_dist = float(np.hypot(tx - dx, ty - dy)) center_similarity = max(0.0, 1.0 - center_dist / max_center_distance) return 0.70 * iou + 0.30 * center_similarity def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray: h, w = frame.shape[:2] if max_width <= 0 or w <= max_width: return frame scale = max_width / float(w) return cv2.resize(frame, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LINEAR) def draw_navigation_debug( frame: np.ndarray, tracks: list[Track], args, last_command_text: str, fps_text: str, ) -> np.ndarray: display = frame.copy() h, w = display.shape[:2] center_x = int(w * 0.5) tol = int(w * args.center_tolerance_ratio) y_min = int(h * args.usable_y_min_ratio) y_max = int(h * args.usable_y_max_ratio) cv2.rectangle(display, (center_x - tol, y_min), (center_x + tol, y_max), (255, 255, 0), 4) cv2.line(display, (center_x, 0), (center_x, h), (255, 255, 0), 3) cv2.line(display, (0, y_min), (w, y_min), (100, 100, 100), 2) cv2.line(display, (0, y_max), (w, y_max), (100, 100, 100), 2) for track in tracks: x1, y1, x2, y2 = track.bbox color = state_color(track.state) thickness = 8 if track.state == "centered" else 5 cv2.rectangle(display, (x1, y1), (x2, y2), color, thickness) cx, cy = bbox_center(track.bbox) cv2.circle(display, (int(cx), int(cy)), 12, color, -1) cv2.circle(display, (int(cx), int(cy)), 18, (0, 0, 0), 3) text = ( f"id={track.id} {track.state} conf={track.confidence:.2f} " f"hits={track.hits} trend={track.area_trend():+.2f}" ) cv2.putText( display, text, (x1, max(24, y1 - 8)), cv2.FONT_HERSHEY_SIMPLEX, 0.78, color, 3, cv2.LINE_AA, ) cv2.putText(display, fps_text, (20, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.85, (0, 0, 255), 2) if last_command_text: cv2.putText(display, last_command_text, (20, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 255), 2) return resize_preview(display, args.preview_width) def draw_commands_window(command_lines: list[str], motion_text: str) -> np.ndarray: lines = command_lines if command_lines else ["Nessun comando generato"] canvas_h = max(340, 84 + len(lines[:10]) * 34) canvas = np.full((canvas_h, 980, 3), 245, dtype=np.uint8) cv2.putText( canvas, "COMANDI NAVIGAZIONE", (24, 42), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 0), 2, cv2.LINE_AA, ) cv2.putText( canvas, motion_text, (24, 76), cv2.FONT_HERSHEY_SIMPLEX, 0.80, (120, 0, 120), 2, cv2.LINE_AA, ) y = 122 for idx, line in enumerate(lines[:10]): color = (0, 0, 180) if idx == 0 else (0, 90, 0) cv2.putText( canvas, line, (24, y), cv2.FONT_HERSHEY_SIMPLEX, 0.82, color, 2, cv2.LINE_AA, ) y += 36 return canvas def apply_flash(frame: np.ndarray, alpha: float) -> np.ndarray: flash = np.full_like(frame, 255) alpha = min(max(alpha, 0.0), 1.0) return cv2.addWeighted(frame, 1.0 - alpha, flash, alpha, 0.0) def estimate_motion_from_tracks(tracks: list[Track], min_pixels: float) -> str: deltas: list[tuple[float, float]] = [] for track in tracks: if track.missed != 0 or len(track.center_history) < 2: continue x0, y0 = track.center_history[-2] x1, y1 = track.center_history[-1] deltas.append((x1 - x0, y1 - y0)) if not deltas: return "MOTO: n/d" dx = sum(delta[0] for delta in deltas) / len(deltas) dy = sum(delta[1] for delta in deltas) / len(deltas) abs_dx = abs(dx) abs_dy = abs(dy) if abs_dx < min_pixels and abs_dy < min_pixels: direction = "stabile" elif abs_dx >= abs_dy: direction = "destra" if dx > 0 else "sinistra" else: direction = "giu" if dy > 0 else "su" return f"MOTO: {direction} dx={dx:+.1f}px dy={dy:+.1f}px tracks={len(deltas)}" def state_color(state: str) -> tuple[int, int, int]: if state == "centered": return (0, 255, 255) if state == "snapshotted": return (255, 0, 255) if state == "candidate": return (0, 255, 0) if state == "exiting": return (0, 140, 255) return (255, 255, 255) def main() -> int: args = parse_args() require_file(args.weights, "modello Ultralytics") detector = UltralyticsDetector(args.weights, args.ultralytics_device) log(f"Classi modello: {detector.classes}") log("Nota tracker: questa versione usa tracking geometrico interno; ByteTrack/BoT-SORT restano candidati per confronto successivo.") cap, source_name = open_capture(args.video) if not cap.isOpened(): log(f"ERRORE: impossibile aprire sorgente video: {source_name}") return 1 video_fps = cap.get(cv2.CAP_PROP_FPS) preview_fps = args.preview_fps if args.preview_fps and args.preview_fps > 0 else video_fps if args.preview_fps and args.preview_fps > 0 and (args.video is None or str(args.video).isdigit()): cap.set(cv2.CAP_PROP_FPS, float(args.preview_fps)) frame_delay = 1.0 / preview_fps if args.realtime_playback and preview_fps and preview_fps > 1 else 0.0 yolo_interval = 1.0 / args.yolo_fps if args.yolo_fps and args.yolo_fps > 0 else 0.0 tracker = LightweightTracker( max_missed=args.max_track_missed, min_match_score=args.min_match_score, max_center_distance_ratio=args.max_center_distance_ratio, ) navigator = NavigationController(args) if not args.no_display: cv2.namedWindow("flywms navigate", cv2.WINDOW_NORMAL) cv2.namedWindow("flywms snapshot", cv2.WINDOW_NORMAL) cv2.namedWindow("flywms comandi", cv2.WINDOW_NORMAL) frame_id = 0 start_time = time.perf_counter() last_stats = start_time last_loop_end = start_time yolo_total_ms = 0.0 yolo_cycles = 0 next_yolo_time = start_time last_yolo_ms = 0.0 gaylords: list[Detection] = [] tracks: list[Track] = [] try: while True: if frame_delay > 0: now = time.perf_counter() sleep_for = frame_delay - (now - last_loop_end) if sleep_for > 0: time.sleep(sleep_for) last_loop_end = time.perf_counter() ok, frame = cap.read() if not ok: log("Fine stream") break frame_id += 1 timestamp = time.perf_counter() if args.max_frames > 0 and frame_id > args.max_frames: log(f"Raggiunto --max-frames={args.max_frames}") break new_snapshots: list[NavigationSnapshot] = [] run_yolo = yolo_interval <= 0 or timestamp >= next_yolo_time if run_yolo: next_yolo_time = timestamp + yolo_interval detections, last_yolo_ms = detector.detect(frame, args.min_confidence, args.input_size) yolo_total_ms += last_yolo_ms yolo_cycles += 1 gaylords = [ det for det in detections if det.class_name.strip().lower() == args.target_class.strip().lower() ] tracks = tracker.update(gaylords, frame_id, frame.shape[1]) if args.motion_report_interval > 0 and yolo_cycles % args.motion_report_interval == 0: navigator.set_motion_text( estimate_motion_from_tracks(tracks, args.motion_min_pixels) ) for track in tracks: if track.missed == 0: snapshot = navigator.process_track(track, frame, frame_id, timestamp) if snapshot is not None: new_snapshots.append(snapshot) if args.no_display and new_snapshots: if args.remote_ack_timeout_sec > 0: time.sleep(args.remote_ack_timeout_sec) for snapshot in new_snapshots: navigator.simulate_remote_response(snapshot) now = time.perf_counter() if now - last_stats >= args.stats_interval: elapsed = max(now - start_time, 0.001) avg_yolo = yolo_total_ms / max(yolo_cycles, 1) active = sum(1 for t in tracks if t.missed == 0) log( f"fps={frame_id / elapsed:.1f} yolo_fps={yolo_cycles / elapsed:.1f} " f"avg_yolo={avg_yolo:.1f}ms det={len(gaylords)} tracks={len(tracks)} active={active} " f"snapshots={navigator.snapshot_counter} {navigator.motion_text}" ) if args.debug_tracks: for track in tracks: cx, cy = bbox_center(track.bbox) area_ratio = bbox_area(track.bbox) / float(frame.shape[0] * frame.shape[1]) log( f" track={track.id} state={track.state} hits={track.hits} " f"missed={track.missed} center=({cx:.0f},{cy:.0f}) " f"area={area_ratio:.3f} trend={track.area_trend():+.2f} " f"reason={track.last_candidate_reason}" ) last_stats = now if not args.no_display: elapsed = max(time.perf_counter() - start_time, 0.001) fps_text = ( f"frame={frame_id} fps={frame_id / elapsed:.1f} " f"yolo_fps={yolo_cycles / elapsed:.1f} yolo={last_yolo_ms:.0f}ms " f"det={len(gaylords)} tracks={len(tracks)} snap={navigator.snapshot_counter}" ) display = draw_navigation_debug( frame, tracks, args, navigator.last_command_text, fps_text, ) cv2.imshow("flywms navigate", display) if navigator.last_ocr_payload_frame is not None: snapshot_display = resize_preview(navigator.last_ocr_payload_frame, args.preview_width) cv2.imshow("flywms snapshot", snapshot_display) cv2.imshow( "flywms comandi", draw_commands_window(navigator.last_command_lines, navigator.motion_text), ) if new_snapshots: flash_display = apply_flash(display, args.flash_alpha) cv2.imshow("flywms navigate", flash_display) if navigator.last_ocr_payload_frame is not None: flash_snapshot = apply_flash( resize_preview(navigator.last_ocr_payload_frame, args.preview_width), args.flash_alpha, ) cv2.imshow("flywms snapshot", flash_snapshot) cv2.imshow( "flywms comandi", draw_commands_window(navigator.last_command_lines, navigator.motion_text), ) pause_ms = max(1, int(args.remote_ack_timeout_sec * 1000)) key = cv2.waitKey(pause_ms) & 0xFF if key in (27, ord("q")): log("Interrotto da tastiera") break for snapshot in new_snapshots: navigator.simulate_remote_response(snapshot) cv2.imshow( "flywms comandi", draw_commands_window(navigator.last_command_lines, navigator.motion_text), ) key = cv2.waitKey(1) & 0xFF if key in (27, ord("q")): log("Interrotto da tastiera") break finally: cap.release() if not args.no_display: cv2.destroyAllWindows() log(f"Snapshot salvati in: {Path(args.snapshot_output_dir).resolve()}") return 0 if __name__ == "__main__": raise SystemExit(main())