Files
flywms/flywms_navigation.py
2026-05-15 18:40:07 +02:00

992 lines
38 KiB
Python

import argparse
import configparser
import json
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
import cv2
import numpy as np
DEFAULT_CONFIG_PATH = "flywms_navigation.ini"
@dataclass(frozen=True)
class Detection:
class_id: int
class_name: str
confidence: float
bbox: tuple[int, int, int, int]
@dataclass(frozen=True)
class CandidateSnapshot:
frame_id: int
timestamp: float
frame: np.ndarray
bbox: tuple[int, int, int, int]
score: float
center_score: float
size_score: float
cut_score: float
@dataclass
class Track:
id: int
bbox: tuple[int, int, int, int]
confidence: float
first_seen_frame: int
last_seen_frame: int
hits: int = 1
missed: int = 0
state: str = "entering"
last_candidate_reason: str = ""
pending_remote_response: str = "none"
already_snapshotted: bool = False
bbox_history: list[tuple[int, int, int, int]] = field(default_factory=list)
center_history: list[tuple[float, float]] = field(default_factory=list)
area_history: list[float] = field(default_factory=list)
candidates: list[CandidateSnapshot] = field(default_factory=list)
def __post_init__(self) -> None:
self._append_history(self.bbox)
def update(self, bbox: tuple[int, int, int, int], confidence: float, frame_id: int) -> None:
self.bbox = bbox
self.confidence = confidence
self.last_seen_frame = frame_id
self.hits += 1
self.missed = 0
self._append_history(bbox)
def mark_missed(self) -> None:
self.missed += 1
if self.missed > 0 and self.state != "snapshotted":
self.state = "exiting"
def _append_history(self, bbox: tuple[int, int, int, int]) -> None:
self.bbox_history.append(bbox)
self.center_history.append(bbox_center(bbox))
self.area_history.append(float(bbox_area(bbox)))
keep = 20
self.bbox_history = self.bbox_history[-keep:]
self.center_history = self.center_history[-keep:]
self.area_history = self.area_history[-keep:]
def area_trend(self) -> float:
if len(self.area_history) < 4:
return 0.0
old = self.area_history[-4]
new = self.area_history[-1]
return (new - old) / max(old, 1.0)
@dataclass(frozen=True)
class NavigationSnapshot:
snapshot_id: int
frame_id: int
timestamp: float
simulated_position: str
track_id: int
bbox: tuple[int, int, int, int]
score: float
debug_frame_path: str
ocr_payload_path: str
class UltralyticsDetector:
def __init__(self, model_path: str, device: str):
from ultralytics import YOLO
self.model = YOLO(model_path)
self.device = device
names = self.model.names
if isinstance(names, dict):
self.classes = [str(names[i]) for i in sorted(names)]
else:
self.classes = [str(name) for name in names]
def detect(
self,
frame: np.ndarray,
min_confidence: float,
input_size: int,
) -> tuple[list[Detection], float]:
t0 = time.perf_counter()
results = self.model.predict(
source=frame,
imgsz=input_size,
conf=min_confidence,
device=self.device,
verbose=False,
)
elapsed_ms = (time.perf_counter() - t0) * 1000.0
detections: list[Detection] = []
if not results:
return detections, elapsed_ms
boxes = results[0].boxes
if boxes is None:
return detections, elapsed_ms
xyxy = boxes.xyxy.cpu().numpy()
confs = boxes.conf.cpu().numpy()
clss = boxes.cls.cpu().numpy().astype(int)
for box, conf, cls_id in zip(xyxy, confs, clss):
x1, y1, x2, y2 = [int(round(v)) for v in box]
x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, frame.shape[1], frame.shape[0])
if x2 <= x1 or y2 <= y1:
continue
class_name = self.classes[cls_id] if 0 <= cls_id < len(self.classes) else str(cls_id)
detections.append(Detection(
class_id=int(cls_id),
class_name=class_name,
confidence=float(conf),
bbox=(x1, y1, x2, y2),
))
return detections, elapsed_ms
class LightweightTracker:
"""Greedy bbox tracker: enough to explain and test navigation decisions."""
def __init__(
self,
max_missed: int,
min_match_score: float,
max_center_distance_ratio: float,
):
self.max_missed = max_missed
self.min_match_score = min_match_score
self.max_center_distance_ratio = max_center_distance_ratio
self._next_id = 1
self.tracks: dict[int, Track] = {}
def update(
self,
detections: list[Detection],
frame_id: int,
frame_width: int,
) -> list[Track]:
unmatched_tracks = set(self.tracks.keys())
unmatched_detections = set(range(len(detections)))
pairs: list[tuple[float, int, int]] = []
max_center_distance = max(1.0, frame_width * self.max_center_distance_ratio)
for track_id, track in self.tracks.items():
for det_idx, det in enumerate(detections):
score = association_score(track.bbox, det.bbox, max_center_distance)
if score >= self.min_match_score:
pairs.append((score, track_id, det_idx))
pairs.sort(reverse=True, key=lambda item: item[0])
for _, track_id, det_idx in pairs:
if track_id not in unmatched_tracks or det_idx not in unmatched_detections:
continue
det = detections[det_idx]
self.tracks[track_id].update(det.bbox, det.confidence, frame_id)
unmatched_tracks.remove(track_id)
unmatched_detections.remove(det_idx)
for track_id in list(unmatched_tracks):
self.tracks[track_id].mark_missed()
if self.tracks[track_id].missed > self.max_missed:
del self.tracks[track_id]
for det_idx in unmatched_detections:
det = detections[det_idx]
track_id = self._next_id
self._next_id += 1
self.tracks[track_id] = Track(
id=track_id,
bbox=det.bbox,
confidence=det.confidence,
first_seen_frame=frame_id,
last_seen_frame=frame_id,
)
return list(self.tracks.values())
class NavigationController:
def __init__(self, args):
self.args = args
self.output_dir = Path(args.snapshot_output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.metadata_path = self.output_dir / "snapshots.jsonl"
self.snapshot_counter = 0
self.position_counter = 0
self.last_command_text = ""
self.last_command_lines: list[str] = []
self.last_snapshot_frame: np.ndarray | None = None
self.last_ocr_payload_frame: np.ndarray | None = None
self.last_remote_result_text = ""
self.motion_text = "MOTO: n/d"
def process_track(
self,
track: Track,
frame: np.ndarray,
frame_id: int,
timestamp: float,
) -> NavigationSnapshot | None:
frame_h, frame_w = frame.shape[:2]
eligible, score_parts = self._is_snapshot_candidate(track, frame_w, frame_h)
self._update_track_state(track, eligible, frame_w)
if track.already_snapshotted:
return None
if eligible:
candidate = CandidateSnapshot(
frame_id=frame_id,
timestamp=timestamp,
frame=frame.copy(),
bbox=track.bbox,
score=score_parts["score"],
center_score=score_parts["center_score"],
size_score=score_parts["size_score"],
cut_score=score_parts["cut_score"],
)
track.candidates.append(candidate)
track.candidates = track.candidates[-self.args.snapshot_window_frames:]
if len(track.candidates) >= self.args.snapshot_window_frames:
return self._finalize_snapshot(track)
elif track.candidates:
return self._finalize_snapshot(track)
return None
def _is_snapshot_candidate(
self,
track: Track,
frame_w: int,
frame_h: int,
) -> tuple[bool, dict[str, float]]:
x1, y1, x2, y2 = track.bbox
cx, cy = bbox_center(track.bbox)
center_x = frame_w * 0.5
center_tolerance = max(1.0, frame_w * self.args.center_tolerance_ratio)
snapshot_tolerance = max(1.0, frame_w * self.args.snapshot_line_tolerance_ratio)
center_delta = abs(cx - center_x)
center_score = max(0.0, 1.0 - center_delta / center_tolerance)
area_ratio = bbox_area(track.bbox) / float(frame_w * frame_h)
size_score = min(1.0, area_ratio / max(self.args.min_gaylord_area_ratio * 4.0, 0.001))
if self.args.edge_margin_ratio <= 0:
cut = False
else:
edge_margin_x = frame_w * self.args.edge_margin_ratio
edge_margin_y = frame_h * self.args.edge_margin_ratio
cut = (
x1 <= edge_margin_x
or y1 <= edge_margin_y
or x2 >= frame_w - edge_margin_x
or y2 >= frame_h - edge_margin_y
)
cut_score = 0.0 if cut else 1.0
score = 0.50 * center_score + 0.30 * size_score + 0.20 * cut_score
in_center_band = center_delta <= center_tolerance
on_snapshot_line = center_delta <= snapshot_tolerance
in_y_band = (
frame_h * self.args.usable_y_min_ratio
<= cy
<= frame_h * self.args.usable_y_max_ratio
)
enough_hits = track.hits >= self.args.min_track_hits
large_enough = area_ratio >= self.args.min_gaylord_area_ratio
trend_ok = track.area_trend() >= self.args.min_area_trend
eligible = (
enough_hits
and on_snapshot_line
and in_y_band
and large_enough
and not cut
and trend_ok
and track.missed == 0
)
failed: list[str] = []
if not enough_hits:
failed.append(f"hits<{self.args.min_track_hits}")
if not in_center_band:
failed.append(f"outside_band={center_delta:.0f}>{center_tolerance:.0f}")
elif not on_snapshot_line:
failed.append(f"wait_line={center_delta:.0f}>{snapshot_tolerance:.0f}")
if not in_y_band:
failed.append("y_band")
if not large_enough:
failed.append(f"area={area_ratio:.3f}<{self.args.min_gaylord_area_ratio:.3f}")
if cut:
failed.append("edge_cut")
if not trend_ok:
failed.append(f"trend={track.area_trend():+.2f}<{self.args.min_area_trend:+.2f}")
if track.missed != 0:
failed.append(f"missed={track.missed}")
track.last_candidate_reason = "ok" if eligible else ",".join(failed)
return eligible, {
"score": score,
"center_score": center_score,
"size_score": size_score,
"cut_score": cut_score,
}
def _update_track_state(self, track: Track, eligible: bool, frame_w: int) -> None:
if track.already_snapshotted:
track.state = "snapshotted"
return
if track.missed > 0:
track.state = "exiting"
return
cx, _ = bbox_center(track.bbox)
center_delta = abs(cx - frame_w * 0.5)
snapshot_tolerance = frame_w * self.args.snapshot_line_tolerance_ratio
if eligible:
track.state = "centered"
elif track.hits < self.args.min_track_hits:
track.state = "entering"
elif center_delta <= snapshot_tolerance:
track.state = "centered"
elif center_delta <= frame_w * self.args.center_tolerance_ratio:
track.state = "candidate"
else:
track.state = "entering"
def _finalize_snapshot(self, track: Track) -> NavigationSnapshot | None:
if not track.candidates:
return None
best = max(track.candidates, key=lambda item: item.score)
track.candidates.clear()
track.already_snapshotted = True
track.state = "snapshotted"
self.snapshot_counter += 1
self.position_counter += 1
simulated_position = f"gaylord {self.position_counter}"
debug_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_frame.jpg"
payload_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_ocr_payload.jpg"
debug_path = self.output_dir / debug_name
payload_path = self.output_dir / payload_name
cv2.imwrite(str(debug_path), best.frame)
ocr_payload = crop_with_padding(
best.frame,
best.bbox,
self.args.ocr_payload_pad_ratio,
)
cv2.imwrite(str(payload_path), ocr_payload)
self.last_snapshot_frame = best.frame.copy()
self.last_ocr_payload_frame = ocr_payload.copy()
snapshot = NavigationSnapshot(
snapshot_id=self.snapshot_counter,
frame_id=best.frame_id,
timestamp=best.timestamp,
simulated_position=simulated_position,
track_id=track.id,
bbox=best.bbox,
score=best.score,
debug_frame_path=str(debug_path),
ocr_payload_path=str(payload_path),
)
self._write_metadata(snapshot)
self._print_commands(snapshot)
return snapshot
def simulate_remote_response(self, snapshot: NavigationSnapshot) -> str:
mode = self.args.remote_ack_mode
if mode == "always-ack":
result = "ACK"
elif mode == "always-nack":
result = "NACK"
else:
result = "ACK" if snapshot.snapshot_id % 2 == 1 else "NACK"
if result == "ACK":
self.last_remote_result_text = "ACK_RICEVUTO: codice valido su WMS"
resume_command = f"RIPARTI_{self.args.scan_direction.upper()}"
self.last_command_lines.extend([
self.last_remote_result_text,
resume_command,
])
log("[REMOTE] ACK_RICEVUTO codice valido su WMS")
log(f"[CMD] {resume_command}")
else:
self.last_remote_result_text = "NACK_RICEVUTO: riprovare foto"
self.last_command_lines.extend([
self.last_remote_result_text,
"MICRO_MOVE_CORRETTIVO",
"SCATTA_FOTO_RETRY",
])
log("[REMOTE] NACK_RICEVUTO codice assente/non valido")
log("[CMD] MICRO_MOVE_CORRETTIVO")
log("[CMD] SCATTA_FOTO_RETRY")
return result
def set_motion_text(self, text: str) -> None:
self.motion_text = text
def _write_metadata(self, snapshot: NavigationSnapshot) -> None:
record = {
"snapshot_id": snapshot.snapshot_id,
"frame_id": snapshot.frame_id,
"timestamp": snapshot.timestamp,
"simulated_position": snapshot.simulated_position,
"drone_pose_simulated": {
"mode": "linear_shelf_scan",
"position_label": snapshot.simulated_position,
},
"track_id": snapshot.track_id,
"gaylord_bbox": list(snapshot.bbox),
"score": snapshot.score,
"debug_frame_path": snapshot.debug_frame_path,
"ocr_payload_path": snapshot.ocr_payload_path,
}
with self.metadata_path.open("at", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=True) + "\n")
def _print_commands(self, snapshot: NavigationSnapshot) -> None:
self.last_command_text = (
f"SNAPSHOT {snapshot.snapshot_id:04d} "
f"track={snapshot.track_id} frame={snapshot.frame_id} "
f"pos={snapshot.simulated_position} score={snapshot.score:.2f}"
)
self.last_command_lines = [
self.last_command_text,
"STOP",
f"SCATTA_FOTO {Path(snapshot.debug_frame_path).name}",
f"ESTRAI_BBOX_CENTRALE track={snapshot.track_id}",
f"ASSOCIA_POSIZIONE {snapshot.simulated_position}",
f"INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}",
f"ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s",
]
log(f"[NAV] {self.last_command_text}")
log("[CMD] STOP")
log(f"[CMD] SCATTA_FOTO {Path(snapshot.debug_frame_path).name}")
log(f"[CMD] ESTRAI_BBOX_CENTRALE track={snapshot.track_id}")
log(f"[CMD] ASSOCIA_POSIZIONE {snapshot.simulated_position}")
log(f"[CMD] INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}")
log(f"[CMD] ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s")
def parse_args():
pre = argparse.ArgumentParser(add_help=False)
pre.add_argument("--config", default=DEFAULT_CONFIG_PATH, help="File configurazione INI")
pre_args, _ = pre.parse_known_args()
defaults = load_navigation_config(pre_args.config)
ap = argparse.ArgumentParser(parents=[pre])
ap.add_argument("-v", "--video", default=defaults["video"], help="Percorso video. Se omesso usa webcam 0")
ap.add_argument(
"--weights",
default=defaults["weights"],
help="Modello Ultralytics .pt",
)
ap.add_argument("--ultralytics-device", default=defaults["ultralytics_device"], help="Device Ultralytics: cpu oppure 0")
ap.add_argument("--input-size", type=int, default=defaults["input_size"], help="Dimensione input YOLO")
ap.add_argument("--min-confidence", type=float, default=defaults["min_confidence"], help="Confidenza minima")
ap.add_argument("--target-class", default=defaults["target_class"], help="Classe da tracciare")
ap.add_argument("--max-track-missed", type=int, default=defaults["max_track_missed"], help="Frame persi prima di rimuovere una track")
ap.add_argument("--min-match-score", type=float, default=defaults["min_match_score"], help="Soglia associazione detection-track")
ap.add_argument("--max-center-distance-ratio", type=float, default=defaults["max_center_distance_ratio"], help="Distanza max centri per matching")
ap.add_argument("--center-tolerance-ratio", type=float, default=defaults["center_tolerance_ratio"], help="Mezza ampiezza zona centrale")
ap.add_argument("--snapshot-line-tolerance-ratio", type=float, default=defaults["snapshot_line_tolerance_ratio"],
help="Tolleranza stretta dalla linea centrale per scattare")
ap.add_argument("--usable-y-min-ratio", type=float, default=defaults["usable_y_min_ratio"], help="Limite alto fascia utile Y")
ap.add_argument("--usable-y-max-ratio", type=float, default=defaults["usable_y_max_ratio"], help="Limite basso fascia utile Y")
ap.add_argument("--min-track-hits", type=int, default=defaults["min_track_hits"], help="Detection consecutive minime")
ap.add_argument("--min-gaylord-area-ratio", type=float, default=defaults["min_gaylord_area_ratio"], help="Area bbox minima sul frame")
ap.add_argument("--edge-margin-ratio", type=float, default=defaults["edge_margin_ratio"], help="Margine per considerare bbox tagliato")
ap.add_argument("--ocr-payload-pad-ratio", type=float, default=defaults["ocr_payload_pad_ratio"],
help="Padding intorno al bbox centrale inviato all'OCR remoto")
ap.add_argument("--min-area-trend", type=float, default=defaults["min_area_trend"], help="Trend area minimo ammesso")
ap.add_argument("--snapshot-window-frames", type=int, default=defaults["snapshot_window_frames"], help="Candidati da valutare prima dello snapshot")
ap.add_argument("--snapshot-output-dir", default=defaults["snapshot_output_dir"], help="Directory snapshot e JSONL")
ap.add_argument("--remote-ack-timeout-sec", type=float, default=defaults["remote_ack_timeout_sec"],
help="Tempo simulato di attesa OCR remoto/WMS")
ap.add_argument("--remote-ack-mode", choices=["always-ack", "always-nack", "alternate"],
default=defaults["remote_ack_mode"], help="Risposta remota simulata")
ap.add_argument("--scan-direction", choices=["destra", "sinistra"], default=defaults["scan_direction"],
help="Direzione simulata di ripartenza dopo ACK")
ap.add_argument("--preview-width", type=int, default=defaults["preview_width"], help="Larghezza preview")
ap.add_argument("--realtime-playback", action="store_true", default=defaults["realtime_playback"], help="Rispetta FPS video")
ap.add_argument("--max-frames", type=int, default=defaults["max_frames"], help="Numero massimo frame; 0 = tutto")
ap.add_argument("--stats-interval", type=float, default=defaults["stats_interval"], help="Intervallo log prestazioni")
ap.add_argument("--motion-report-interval", type=int, default=defaults["motion_report_interval"],
help="Ogni quanti frame aggiornare la direzione moto stimata")
ap.add_argument("--motion-min-pixels", type=float, default=defaults["motion_min_pixels"],
help="Spostamento medio minimo per dichiarare una direzione")
ap.add_argument("--debug-tracks", action="store_true", default=defaults["debug_tracks"], help="Logga stato e criteri delle track")
ap.add_argument("--flash-alpha", type=float, default=defaults["flash_alpha"], help="Intensita' flash 0..1 al momento dello scatto")
ap.add_argument("--no-display", action="store_true", default=defaults["no_display"], help="Disabilita finestra video")
return ap.parse_args()
def load_navigation_config(path_str: str) -> dict[str, object]:
defaults: dict[str, object] = {
"video": "testhd.mp4",
"weights": r"C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt",
"ultralytics_device": "cpu",
"input_size": 640,
"min_confidence": 0.25,
"target_class": "gaylord",
"max_track_missed": 8,
"min_match_score": 0.25,
"max_center_distance_ratio": 0.18,
"center_tolerance_ratio": 0.18,
"snapshot_line_tolerance_ratio": 0.035,
"usable_y_min_ratio": 0.15,
"usable_y_max_ratio": 0.85,
"min_track_hits": 3,
"min_gaylord_area_ratio": 0.02,
"edge_margin_ratio": 0.0,
"ocr_payload_pad_ratio": 0.03,
"min_area_trend": -0.35,
"snapshot_window_frames": 1,
"snapshot_output_dir": "navigate_snapshots",
"remote_ack_timeout_sec": 2.0,
"remote_ack_mode": "always-ack",
"scan_direction": "destra",
"preview_width": 1280,
"realtime_playback": True,
"max_frames": 0,
"stats_interval": 2.0,
"motion_report_interval": 5,
"motion_min_pixels": 1.5,
"debug_tracks": True,
"flash_alpha": 0.70,
"no_display": False,
}
path = Path(path_str)
if not path.exists():
return defaults
parser = configparser.ConfigParser()
parser.read(path, encoding="utf-8")
section = parser["navigation"] if parser.has_section("navigation") else {}
types = {key: type(value) for key, value in defaults.items()}
for key, default_value in defaults.items():
if key not in section:
continue
if types[key] is bool:
defaults[key] = parser.getboolean("navigation", key, fallback=bool(default_value))
elif types[key] is int:
defaults[key] = parser.getint("navigation", key, fallback=int(default_value))
elif types[key] is float:
defaults[key] = parser.getfloat("navigation", key, fallback=float(default_value))
else:
value = section.get(key, str(default_value)).strip()
defaults[key] = None if value.lower() in ("", "none", "null") else value
return defaults
def log(msg: str) -> None:
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
def require_file(path_str: str, description: str) -> Path:
path = Path(path_str)
if not path.exists():
log(f"ERRORE: {description} non trovato: {path}")
sys.exit(1)
return path
def open_capture(video_arg: str | None):
if video_arg is None:
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(0)
return cap, "camera:0"
if str(video_arg).isdigit():
idx = int(video_arg)
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(idx)
return cap, f"camera:{idx}"
return cv2.VideoCapture(video_arg), str(video_arg)
def clip_box(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> tuple[int, int, int, int]:
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
return x1, y1, x2, y2
def crop_with_padding(
frame: np.ndarray,
bbox: tuple[int, int, int, int],
pad_ratio: float,
) -> np.ndarray:
x1, y1, x2, y2 = bbox
bw = x2 - x1
bh = y2 - y1
pad_x = int(max(0.0, pad_ratio) * bw)
pad_y = int(max(0.0, pad_ratio) * bh)
cx1, cy1, cx2, cy2 = clip_box(
x1 - pad_x,
y1 - pad_y,
x2 + pad_x,
y2 + pad_y,
frame.shape[1],
frame.shape[0],
)
return frame[cy1:cy2, cx1:cx2].copy()
def bbox_area(bbox: tuple[int, int, int, int]) -> int:
x1, y1, x2, y2 = bbox
return max(0, x2 - x1) * max(0, y2 - y1)
def bbox_center(bbox: tuple[int, int, int, int]) -> tuple[float, float]:
x1, y1, x2, y2 = bbox
return (x1 + x2) * 0.5, (y1 + y2) * 0.5
def bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float:
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
inter = bbox_area((ix1, iy1, ix2, iy2))
union = bbox_area(a) + bbox_area(b) - inter
if union <= 0:
return 0.0
return inter / float(union)
def association_score(
track_bbox: tuple[int, int, int, int],
det_bbox: tuple[int, int, int, int],
max_center_distance: float,
) -> float:
iou = bbox_iou(track_bbox, det_bbox)
tx, ty = bbox_center(track_bbox)
dx, dy = bbox_center(det_bbox)
center_dist = float(np.hypot(tx - dx, ty - dy))
center_similarity = max(0.0, 1.0 - center_dist / max_center_distance)
return 0.70 * iou + 0.30 * center_similarity
def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray:
h, w = frame.shape[:2]
if max_width <= 0 or w <= max_width:
return frame
scale = max_width / float(w)
return cv2.resize(frame, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LINEAR)
def draw_navigation_debug(
frame: np.ndarray,
tracks: list[Track],
args,
last_command_text: str,
fps_text: str,
) -> np.ndarray:
display = frame.copy()
h, w = display.shape[:2]
center_x = int(w * 0.5)
tol = int(w * args.center_tolerance_ratio)
y_min = int(h * args.usable_y_min_ratio)
y_max = int(h * args.usable_y_max_ratio)
cv2.rectangle(display, (center_x - tol, y_min), (center_x + tol, y_max), (255, 255, 0), 4)
cv2.line(display, (center_x, 0), (center_x, h), (255, 255, 0), 3)
cv2.line(display, (0, y_min), (w, y_min), (100, 100, 100), 2)
cv2.line(display, (0, y_max), (w, y_max), (100, 100, 100), 2)
for track in tracks:
x1, y1, x2, y2 = track.bbox
color = state_color(track.state)
thickness = 8 if track.state == "centered" else 5
cv2.rectangle(display, (x1, y1), (x2, y2), color, thickness)
cx, cy = bbox_center(track.bbox)
cv2.circle(display, (int(cx), int(cy)), 12, color, -1)
cv2.circle(display, (int(cx), int(cy)), 18, (0, 0, 0), 3)
text = (
f"id={track.id} {track.state} conf={track.confidence:.2f} "
f"hits={track.hits} trend={track.area_trend():+.2f}"
)
cv2.putText(
display,
text,
(x1, max(24, y1 - 8)),
cv2.FONT_HERSHEY_SIMPLEX,
0.78,
color,
3,
cv2.LINE_AA,
)
cv2.putText(display, fps_text, (20, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.85, (0, 0, 255), 2)
if last_command_text:
cv2.putText(display, last_command_text, (20, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 255), 2)
return resize_preview(display, args.preview_width)
def draw_commands_window(command_lines: list[str], motion_text: str) -> np.ndarray:
lines = command_lines if command_lines else ["Nessun comando generato"]
canvas_h = max(340, 84 + len(lines[:10]) * 34)
canvas = np.full((canvas_h, 980, 3), 245, dtype=np.uint8)
cv2.putText(
canvas,
"COMANDI NAVIGAZIONE",
(24, 42),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 0, 0),
2,
cv2.LINE_AA,
)
cv2.putText(
canvas,
motion_text,
(24, 76),
cv2.FONT_HERSHEY_SIMPLEX,
0.80,
(120, 0, 120),
2,
cv2.LINE_AA,
)
y = 122
for idx, line in enumerate(lines[:10]):
color = (0, 0, 180) if idx == 0 else (0, 90, 0)
cv2.putText(
canvas,
line,
(24, y),
cv2.FONT_HERSHEY_SIMPLEX,
0.82,
color,
2,
cv2.LINE_AA,
)
y += 36
return canvas
def apply_flash(frame: np.ndarray, alpha: float) -> np.ndarray:
flash = np.full_like(frame, 255)
alpha = min(max(alpha, 0.0), 1.0)
return cv2.addWeighted(frame, 1.0 - alpha, flash, alpha, 0.0)
def estimate_motion_from_tracks(tracks: list[Track], min_pixels: float) -> str:
deltas: list[tuple[float, float]] = []
for track in tracks:
if track.missed != 0 or len(track.center_history) < 2:
continue
x0, y0 = track.center_history[-2]
x1, y1 = track.center_history[-1]
deltas.append((x1 - x0, y1 - y0))
if not deltas:
return "MOTO: n/d"
dx = sum(delta[0] for delta in deltas) / len(deltas)
dy = sum(delta[1] for delta in deltas) / len(deltas)
abs_dx = abs(dx)
abs_dy = abs(dy)
if abs_dx < min_pixels and abs_dy < min_pixels:
direction = "stabile"
elif abs_dx >= abs_dy:
direction = "destra" if dx > 0 else "sinistra"
else:
direction = "giu" if dy > 0 else "su"
return f"MOTO: {direction} dx={dx:+.1f}px dy={dy:+.1f}px tracks={len(deltas)}"
def state_color(state: str) -> tuple[int, int, int]:
if state == "centered":
return (0, 255, 255)
if state == "snapshotted":
return (255, 0, 255)
if state == "candidate":
return (0, 255, 0)
if state == "exiting":
return (0, 140, 255)
return (255, 255, 255)
def main() -> int:
args = parse_args()
require_file(args.weights, "modello Ultralytics")
detector = UltralyticsDetector(args.weights, args.ultralytics_device)
log(f"Classi modello: {detector.classes}")
log("Nota tracker: questa versione usa tracking geometrico interno; ByteTrack/BoT-SORT restano candidati per confronto successivo.")
cap, source_name = open_capture(args.video)
if not cap.isOpened():
log(f"ERRORE: impossibile aprire sorgente video: {source_name}")
return 1
video_fps = cap.get(cv2.CAP_PROP_FPS)
frame_delay = 1.0 / video_fps if args.realtime_playback and video_fps and video_fps > 1 else 0.0
tracker = LightweightTracker(
max_missed=args.max_track_missed,
min_match_score=args.min_match_score,
max_center_distance_ratio=args.max_center_distance_ratio,
)
navigator = NavigationController(args)
if not args.no_display:
cv2.namedWindow("flywms navigate", cv2.WINDOW_NORMAL)
cv2.namedWindow("flywms snapshot", cv2.WINDOW_NORMAL)
cv2.namedWindow("flywms comandi", cv2.WINDOW_NORMAL)
frame_id = 0
start_time = time.perf_counter()
last_stats = start_time
last_loop_end = start_time
yolo_total_ms = 0.0
yolo_cycles = 0
try:
while True:
if frame_delay > 0:
now = time.perf_counter()
sleep_for = frame_delay - (now - last_loop_end)
if sleep_for > 0:
time.sleep(sleep_for)
last_loop_end = time.perf_counter()
ok, frame = cap.read()
if not ok:
log("Fine stream")
break
frame_id += 1
timestamp = time.perf_counter()
if args.max_frames > 0 and frame_id > args.max_frames:
log(f"Raggiunto --max-frames={args.max_frames}")
break
detections, yolo_ms = detector.detect(frame, args.min_confidence, args.input_size)
yolo_total_ms += yolo_ms
yolo_cycles += 1
gaylords = [
det for det in detections
if det.class_name.strip().lower() == args.target_class.strip().lower()
]
tracks = tracker.update(gaylords, frame_id, frame.shape[1])
if args.motion_report_interval > 0 and frame_id % args.motion_report_interval == 0:
navigator.set_motion_text(
estimate_motion_from_tracks(tracks, args.motion_min_pixels)
)
new_snapshots: list[NavigationSnapshot] = []
for track in tracks:
if track.missed == 0:
snapshot = navigator.process_track(track, frame, frame_id, timestamp)
if snapshot is not None:
new_snapshots.append(snapshot)
if args.no_display and new_snapshots:
if args.remote_ack_timeout_sec > 0:
time.sleep(args.remote_ack_timeout_sec)
for snapshot in new_snapshots:
navigator.simulate_remote_response(snapshot)
now = time.perf_counter()
if now - last_stats >= args.stats_interval:
elapsed = max(now - start_time, 0.001)
avg_yolo = yolo_total_ms / max(yolo_cycles, 1)
active = sum(1 for t in tracks if t.missed == 0)
log(
f"fps={frame_id / elapsed:.1f} yolo_fps={yolo_cycles / elapsed:.1f} "
f"avg_yolo={avg_yolo:.1f}ms det={len(gaylords)} tracks={len(tracks)} active={active} "
f"snapshots={navigator.snapshot_counter} {navigator.motion_text}"
)
if args.debug_tracks:
for track in tracks:
cx, cy = bbox_center(track.bbox)
area_ratio = bbox_area(track.bbox) / float(frame.shape[0] * frame.shape[1])
log(
f" track={track.id} state={track.state} hits={track.hits} "
f"missed={track.missed} center=({cx:.0f},{cy:.0f}) "
f"area={area_ratio:.3f} trend={track.area_trend():+.2f} "
f"reason={track.last_candidate_reason}"
)
last_stats = now
if not args.no_display:
elapsed = max(time.perf_counter() - start_time, 0.001)
fps_text = (
f"frame={frame_id} fps={frame_id / elapsed:.1f} "
f"det={len(gaylords)} tracks={len(tracks)} snap={navigator.snapshot_counter}"
)
display = draw_navigation_debug(
frame,
tracks,
args,
navigator.last_command_text,
fps_text,
)
cv2.imshow("flywms navigate", display)
if navigator.last_ocr_payload_frame is not None:
snapshot_display = resize_preview(navigator.last_ocr_payload_frame, args.preview_width)
cv2.imshow("flywms snapshot", snapshot_display)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
if new_snapshots:
flash_display = apply_flash(display, args.flash_alpha)
cv2.imshow("flywms navigate", flash_display)
if navigator.last_ocr_payload_frame is not None:
flash_snapshot = apply_flash(
resize_preview(navigator.last_ocr_payload_frame, args.preview_width),
args.flash_alpha,
)
cv2.imshow("flywms snapshot", flash_snapshot)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
pause_ms = max(1, int(args.remote_ack_timeout_sec * 1000))
key = cv2.waitKey(pause_ms) & 0xFF
if key in (27, ord("q")):
log("Interrotto da tastiera")
break
for snapshot in new_snapshots:
navigator.simulate_remote_response(snapshot)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
key = cv2.waitKey(1) & 0xFF
if key in (27, ord("q")):
log("Interrotto da tastiera")
break
finally:
cap.release()
if not args.no_display:
cv2.destroyAllWindows()
log(f"Snapshot salvati in: {Path(args.snapshot_output_dir).resolve()}")
return 0
if __name__ == "__main__":
raise SystemExit(main())