1 Commits

Author SHA1 Message Date
administrator
8a8bea1211 Milestone navigation simulator with config 2026-05-15 18:40:07 +02:00
3 changed files with 1158 additions and 0 deletions

1
.gitignore vendored
View File

@@ -11,3 +11,4 @@ dataset_yolo/labels/*.cache
dataset_yolo/labels/*_backup_before_remap_*/ dataset_yolo/labels/*_backup_before_remap_*/
runs/flywms_dataset_check/ runs/flywms_dataset_check/
runs/flywms_dataset_check_1epoch/ runs/flywms_dataset_check_1epoch/
navigate_snapshots*/

166
flywms_navigation.ini Normal file
View File

@@ -0,0 +1,166 @@
[navigation]
; OBBLIGATORIO: no.
; Ruolo: sorgente video usata per simulare la camera del drone.
; Se vuoto o "none", usa webcam 0.
; Default se non indicato: testhd.mp4
video = testhd.mp4
; OBBLIGATORIO: si.
; Ruolo: modello Ultralytics/YOLO moderno usato per rilevare gaylord ed etichette.
; Default se non indicato: C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt
weights = C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt
; OBBLIGATORIO: no.
; Ruolo: device usato da Ultralytics. Usa "cpu" ora; con GPU compatibile usare "0".
; Default se non indicato: cpu
ultralytics_device = cpu
; OBBLIGATORIO: no.
; Ruolo: dimensione input YOLO. 640 e' il valore usato nel training rapido.
; Default se non indicato: 640
input_size = 640
; OBBLIGATORIO: no.
; Ruolo: confidenza minima delle detection accettate dal detector.
; Default se non indicato: 0.25
min_confidence = 0.25
; OBBLIGATORIO: no.
; Ruolo: classe tracciata dalla navigazione. Le altre detection non entrano nel tracker.
; Default se non indicato: gaylord
target_class = gaylord
; OBBLIGATORIO: no.
; Ruolo: numero massimo di frame in cui una track puo' non essere vista prima di essere rimossa.
; Default se non indicato: 8
max_track_missed = 8
; OBBLIGATORIO: no.
; Ruolo: soglia minima dello score che associa una detection a una track esistente.
; Default se non indicato: 0.25
min_match_score = 0.25
; OBBLIGATORIO: no.
; Ruolo: distanza massima ammessa tra centri bbox, espressa come frazione della larghezza frame.
; Default se non indicato: 0.18
max_center_distance_ratio = 0.18
; OBBLIGATORIO: no.
; Ruolo: mezza ampiezza della fascia azzurra di avvicinamento al centro.
; Non fa scattare la foto: indica solo che la track e' candidata.
; Default se non indicato: 0.18
center_tolerance_ratio = 0.18
; OBBLIGATORIO: no.
; Ruolo: tolleranza stretta dalla linea verticale centrale per scattare la foto.
; La foto parte quando il centro bbox e' entro questa soglia.
; Default se non indicato: 0.035
snapshot_line_tolerance_ratio = 0.035
; OBBLIGATORIO: no.
; Ruolo: limite verticale superiore della fascia utile della scaffalatura.
; Default se non indicato: 0.15
usable_y_min_ratio = 0.15
; OBBLIGATORIO: no.
; Ruolo: limite verticale inferiore della fascia utile della scaffalatura.
; Default se non indicato: 0.85
usable_y_max_ratio = 0.85
; OBBLIGATORIO: no.
; Ruolo: numero minimo di detection confermate prima di considerare affidabile una track.
; Default se non indicato: 3
min_track_hits = 3
; OBBLIGATORIO: no.
; Ruolo: area minima del bbox gaylord rispetto all'intero frame.
; Serve a ignorare oggetti troppo lontani/piccoli.
; Default se non indicato: 0.02
min_gaylord_area_ratio = 0.02
; OBBLIGATORIO: no.
; Ruolo: margine da bordo immagine per considerare un bbox tagliato.
; 0 disabilita questo filtro, utile con il video manuale di test.
; Default se non indicato: 0.0
edge_margin_ratio = 0.0
; OBBLIGATORIO: no.
; Ruolo: padding aggiunto al bbox centrale prima di salvare il crop inviato all'OCR remoto.
; Default se non indicato: 0.03
ocr_payload_pad_ratio = 0.03
; OBBLIGATORIO: no.
; Ruolo: trend minimo dell'area bbox negli ultimi frame. Valori negativi tollerano leggera uscita.
; Default se non indicato: -0.35
min_area_trend = -0.35
; OBBLIGATORIO: no.
; Ruolo: numero di candidati da valutare prima dello snapshot.
; 1 significa: scatta subito quando il centro tocca la linea.
; Default se non indicato: 1
snapshot_window_frames = 1
; OBBLIGATORIO: no.
; Ruolo: directory dove salvare frame debug, crop OCR e snapshots.jsonl.
; Default se non indicato: navigate_snapshots
snapshot_output_dir = navigate_snapshots
; OBBLIGATORIO: no.
; Ruolo: tempo simulato con cui il drone attende OCR remoto + verifica WMS.
; Default se non indicato: 2.0
remote_ack_timeout_sec = 2.0
; OBBLIGATORIO: no.
; Ruolo: risposta remota simulata. Valori: always-ack, always-nack, alternate.
; Default se non indicato: always-ack
remote_ack_mode = always-ack
; OBBLIGATORIO: no.
; Ruolo: direzione simulata di ripartenza dopo ACK. Valori: destra, sinistra.
; Default se non indicato: destra
scan_direction = destra
; OBBLIGATORIO: no.
; Ruolo: larghezza massima delle finestre video di debug.
; Default se non indicato: 1280
preview_width = 1280
; OBBLIGATORIO: no.
; Ruolo: se true, il video di test viene riprodotto rispettando il framerate originale.
; Default se non indicato: true
realtime_playback = true
; OBBLIGATORIO: no.
; Ruolo: massimo numero di frame da processare. 0 significa tutto il video.
; Default se non indicato: 0
max_frames = 0
; OBBLIGATORIO: no.
; Ruolo: ogni quanti secondi stampare statistiche nel terminale.
; Default se non indicato: 2.0
stats_interval = 2.0
; OBBLIGATORIO: no.
; Ruolo: ogni quanti frame aggiornare il moto apparente stimato dalle track.
; Default se non indicato: 5
motion_report_interval = 5
; OBBLIGATORIO: no.
; Ruolo: movimento medio minimo in pixel per dichiarare destra/sinistra/su/giu.
; Default se non indicato: 1.5
motion_min_pixels = 1.5
; OBBLIGATORIO: no.
; Ruolo: se true, logga nel terminale lo stato delle track e i motivi di non scatto.
; Default se non indicato: true
debug_tracks = true
; OBBLIGATORIO: no.
; Ruolo: intensita' del flash visuale simulato al momento dello scatto, da 0 a 1.
; Default se non indicato: 0.70
flash_alpha = 0.70
; OBBLIGATORIO: no.
; Ruolo: se true, disabilita tutte le finestre video. Usarlo solo per test headless.
; Default se non indicato: false
no_display = false

991
flywms_navigation.py Normal file
View File

@@ -0,0 +1,991 @@
import argparse
import configparser
import json
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
import cv2
import numpy as np
DEFAULT_CONFIG_PATH = "flywms_navigation.ini"
@dataclass(frozen=True)
class Detection:
class_id: int
class_name: str
confidence: float
bbox: tuple[int, int, int, int]
@dataclass(frozen=True)
class CandidateSnapshot:
frame_id: int
timestamp: float
frame: np.ndarray
bbox: tuple[int, int, int, int]
score: float
center_score: float
size_score: float
cut_score: float
@dataclass
class Track:
id: int
bbox: tuple[int, int, int, int]
confidence: float
first_seen_frame: int
last_seen_frame: int
hits: int = 1
missed: int = 0
state: str = "entering"
last_candidate_reason: str = ""
pending_remote_response: str = "none"
already_snapshotted: bool = False
bbox_history: list[tuple[int, int, int, int]] = field(default_factory=list)
center_history: list[tuple[float, float]] = field(default_factory=list)
area_history: list[float] = field(default_factory=list)
candidates: list[CandidateSnapshot] = field(default_factory=list)
def __post_init__(self) -> None:
self._append_history(self.bbox)
def update(self, bbox: tuple[int, int, int, int], confidence: float, frame_id: int) -> None:
self.bbox = bbox
self.confidence = confidence
self.last_seen_frame = frame_id
self.hits += 1
self.missed = 0
self._append_history(bbox)
def mark_missed(self) -> None:
self.missed += 1
if self.missed > 0 and self.state != "snapshotted":
self.state = "exiting"
def _append_history(self, bbox: tuple[int, int, int, int]) -> None:
self.bbox_history.append(bbox)
self.center_history.append(bbox_center(bbox))
self.area_history.append(float(bbox_area(bbox)))
keep = 20
self.bbox_history = self.bbox_history[-keep:]
self.center_history = self.center_history[-keep:]
self.area_history = self.area_history[-keep:]
def area_trend(self) -> float:
if len(self.area_history) < 4:
return 0.0
old = self.area_history[-4]
new = self.area_history[-1]
return (new - old) / max(old, 1.0)
@dataclass(frozen=True)
class NavigationSnapshot:
snapshot_id: int
frame_id: int
timestamp: float
simulated_position: str
track_id: int
bbox: tuple[int, int, int, int]
score: float
debug_frame_path: str
ocr_payload_path: str
class UltralyticsDetector:
def __init__(self, model_path: str, device: str):
from ultralytics import YOLO
self.model = YOLO(model_path)
self.device = device
names = self.model.names
if isinstance(names, dict):
self.classes = [str(names[i]) for i in sorted(names)]
else:
self.classes = [str(name) for name in names]
def detect(
self,
frame: np.ndarray,
min_confidence: float,
input_size: int,
) -> tuple[list[Detection], float]:
t0 = time.perf_counter()
results = self.model.predict(
source=frame,
imgsz=input_size,
conf=min_confidence,
device=self.device,
verbose=False,
)
elapsed_ms = (time.perf_counter() - t0) * 1000.0
detections: list[Detection] = []
if not results:
return detections, elapsed_ms
boxes = results[0].boxes
if boxes is None:
return detections, elapsed_ms
xyxy = boxes.xyxy.cpu().numpy()
confs = boxes.conf.cpu().numpy()
clss = boxes.cls.cpu().numpy().astype(int)
for box, conf, cls_id in zip(xyxy, confs, clss):
x1, y1, x2, y2 = [int(round(v)) for v in box]
x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, frame.shape[1], frame.shape[0])
if x2 <= x1 or y2 <= y1:
continue
class_name = self.classes[cls_id] if 0 <= cls_id < len(self.classes) else str(cls_id)
detections.append(Detection(
class_id=int(cls_id),
class_name=class_name,
confidence=float(conf),
bbox=(x1, y1, x2, y2),
))
return detections, elapsed_ms
class LightweightTracker:
"""Greedy bbox tracker: enough to explain and test navigation decisions."""
def __init__(
self,
max_missed: int,
min_match_score: float,
max_center_distance_ratio: float,
):
self.max_missed = max_missed
self.min_match_score = min_match_score
self.max_center_distance_ratio = max_center_distance_ratio
self._next_id = 1
self.tracks: dict[int, Track] = {}
def update(
self,
detections: list[Detection],
frame_id: int,
frame_width: int,
) -> list[Track]:
unmatched_tracks = set(self.tracks.keys())
unmatched_detections = set(range(len(detections)))
pairs: list[tuple[float, int, int]] = []
max_center_distance = max(1.0, frame_width * self.max_center_distance_ratio)
for track_id, track in self.tracks.items():
for det_idx, det in enumerate(detections):
score = association_score(track.bbox, det.bbox, max_center_distance)
if score >= self.min_match_score:
pairs.append((score, track_id, det_idx))
pairs.sort(reverse=True, key=lambda item: item[0])
for _, track_id, det_idx in pairs:
if track_id not in unmatched_tracks or det_idx not in unmatched_detections:
continue
det = detections[det_idx]
self.tracks[track_id].update(det.bbox, det.confidence, frame_id)
unmatched_tracks.remove(track_id)
unmatched_detections.remove(det_idx)
for track_id in list(unmatched_tracks):
self.tracks[track_id].mark_missed()
if self.tracks[track_id].missed > self.max_missed:
del self.tracks[track_id]
for det_idx in unmatched_detections:
det = detections[det_idx]
track_id = self._next_id
self._next_id += 1
self.tracks[track_id] = Track(
id=track_id,
bbox=det.bbox,
confidence=det.confidence,
first_seen_frame=frame_id,
last_seen_frame=frame_id,
)
return list(self.tracks.values())
class NavigationController:
def __init__(self, args):
self.args = args
self.output_dir = Path(args.snapshot_output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.metadata_path = self.output_dir / "snapshots.jsonl"
self.snapshot_counter = 0
self.position_counter = 0
self.last_command_text = ""
self.last_command_lines: list[str] = []
self.last_snapshot_frame: np.ndarray | None = None
self.last_ocr_payload_frame: np.ndarray | None = None
self.last_remote_result_text = ""
self.motion_text = "MOTO: n/d"
def process_track(
self,
track: Track,
frame: np.ndarray,
frame_id: int,
timestamp: float,
) -> NavigationSnapshot | None:
frame_h, frame_w = frame.shape[:2]
eligible, score_parts = self._is_snapshot_candidate(track, frame_w, frame_h)
self._update_track_state(track, eligible, frame_w)
if track.already_snapshotted:
return None
if eligible:
candidate = CandidateSnapshot(
frame_id=frame_id,
timestamp=timestamp,
frame=frame.copy(),
bbox=track.bbox,
score=score_parts["score"],
center_score=score_parts["center_score"],
size_score=score_parts["size_score"],
cut_score=score_parts["cut_score"],
)
track.candidates.append(candidate)
track.candidates = track.candidates[-self.args.snapshot_window_frames:]
if len(track.candidates) >= self.args.snapshot_window_frames:
return self._finalize_snapshot(track)
elif track.candidates:
return self._finalize_snapshot(track)
return None
def _is_snapshot_candidate(
self,
track: Track,
frame_w: int,
frame_h: int,
) -> tuple[bool, dict[str, float]]:
x1, y1, x2, y2 = track.bbox
cx, cy = bbox_center(track.bbox)
center_x = frame_w * 0.5
center_tolerance = max(1.0, frame_w * self.args.center_tolerance_ratio)
snapshot_tolerance = max(1.0, frame_w * self.args.snapshot_line_tolerance_ratio)
center_delta = abs(cx - center_x)
center_score = max(0.0, 1.0 - center_delta / center_tolerance)
area_ratio = bbox_area(track.bbox) / float(frame_w * frame_h)
size_score = min(1.0, area_ratio / max(self.args.min_gaylord_area_ratio * 4.0, 0.001))
if self.args.edge_margin_ratio <= 0:
cut = False
else:
edge_margin_x = frame_w * self.args.edge_margin_ratio
edge_margin_y = frame_h * self.args.edge_margin_ratio
cut = (
x1 <= edge_margin_x
or y1 <= edge_margin_y
or x2 >= frame_w - edge_margin_x
or y2 >= frame_h - edge_margin_y
)
cut_score = 0.0 if cut else 1.0
score = 0.50 * center_score + 0.30 * size_score + 0.20 * cut_score
in_center_band = center_delta <= center_tolerance
on_snapshot_line = center_delta <= snapshot_tolerance
in_y_band = (
frame_h * self.args.usable_y_min_ratio
<= cy
<= frame_h * self.args.usable_y_max_ratio
)
enough_hits = track.hits >= self.args.min_track_hits
large_enough = area_ratio >= self.args.min_gaylord_area_ratio
trend_ok = track.area_trend() >= self.args.min_area_trend
eligible = (
enough_hits
and on_snapshot_line
and in_y_band
and large_enough
and not cut
and trend_ok
and track.missed == 0
)
failed: list[str] = []
if not enough_hits:
failed.append(f"hits<{self.args.min_track_hits}")
if not in_center_band:
failed.append(f"outside_band={center_delta:.0f}>{center_tolerance:.0f}")
elif not on_snapshot_line:
failed.append(f"wait_line={center_delta:.0f}>{snapshot_tolerance:.0f}")
if not in_y_band:
failed.append("y_band")
if not large_enough:
failed.append(f"area={area_ratio:.3f}<{self.args.min_gaylord_area_ratio:.3f}")
if cut:
failed.append("edge_cut")
if not trend_ok:
failed.append(f"trend={track.area_trend():+.2f}<{self.args.min_area_trend:+.2f}")
if track.missed != 0:
failed.append(f"missed={track.missed}")
track.last_candidate_reason = "ok" if eligible else ",".join(failed)
return eligible, {
"score": score,
"center_score": center_score,
"size_score": size_score,
"cut_score": cut_score,
}
def _update_track_state(self, track: Track, eligible: bool, frame_w: int) -> None:
if track.already_snapshotted:
track.state = "snapshotted"
return
if track.missed > 0:
track.state = "exiting"
return
cx, _ = bbox_center(track.bbox)
center_delta = abs(cx - frame_w * 0.5)
snapshot_tolerance = frame_w * self.args.snapshot_line_tolerance_ratio
if eligible:
track.state = "centered"
elif track.hits < self.args.min_track_hits:
track.state = "entering"
elif center_delta <= snapshot_tolerance:
track.state = "centered"
elif center_delta <= frame_w * self.args.center_tolerance_ratio:
track.state = "candidate"
else:
track.state = "entering"
def _finalize_snapshot(self, track: Track) -> NavigationSnapshot | None:
if not track.candidates:
return None
best = max(track.candidates, key=lambda item: item.score)
track.candidates.clear()
track.already_snapshotted = True
track.state = "snapshotted"
self.snapshot_counter += 1
self.position_counter += 1
simulated_position = f"gaylord {self.position_counter}"
debug_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_frame.jpg"
payload_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_ocr_payload.jpg"
debug_path = self.output_dir / debug_name
payload_path = self.output_dir / payload_name
cv2.imwrite(str(debug_path), best.frame)
ocr_payload = crop_with_padding(
best.frame,
best.bbox,
self.args.ocr_payload_pad_ratio,
)
cv2.imwrite(str(payload_path), ocr_payload)
self.last_snapshot_frame = best.frame.copy()
self.last_ocr_payload_frame = ocr_payload.copy()
snapshot = NavigationSnapshot(
snapshot_id=self.snapshot_counter,
frame_id=best.frame_id,
timestamp=best.timestamp,
simulated_position=simulated_position,
track_id=track.id,
bbox=best.bbox,
score=best.score,
debug_frame_path=str(debug_path),
ocr_payload_path=str(payload_path),
)
self._write_metadata(snapshot)
self._print_commands(snapshot)
return snapshot
def simulate_remote_response(self, snapshot: NavigationSnapshot) -> str:
mode = self.args.remote_ack_mode
if mode == "always-ack":
result = "ACK"
elif mode == "always-nack":
result = "NACK"
else:
result = "ACK" if snapshot.snapshot_id % 2 == 1 else "NACK"
if result == "ACK":
self.last_remote_result_text = "ACK_RICEVUTO: codice valido su WMS"
resume_command = f"RIPARTI_{self.args.scan_direction.upper()}"
self.last_command_lines.extend([
self.last_remote_result_text,
resume_command,
])
log("[REMOTE] ACK_RICEVUTO codice valido su WMS")
log(f"[CMD] {resume_command}")
else:
self.last_remote_result_text = "NACK_RICEVUTO: riprovare foto"
self.last_command_lines.extend([
self.last_remote_result_text,
"MICRO_MOVE_CORRETTIVO",
"SCATTA_FOTO_RETRY",
])
log("[REMOTE] NACK_RICEVUTO codice assente/non valido")
log("[CMD] MICRO_MOVE_CORRETTIVO")
log("[CMD] SCATTA_FOTO_RETRY")
return result
def set_motion_text(self, text: str) -> None:
self.motion_text = text
def _write_metadata(self, snapshot: NavigationSnapshot) -> None:
record = {
"snapshot_id": snapshot.snapshot_id,
"frame_id": snapshot.frame_id,
"timestamp": snapshot.timestamp,
"simulated_position": snapshot.simulated_position,
"drone_pose_simulated": {
"mode": "linear_shelf_scan",
"position_label": snapshot.simulated_position,
},
"track_id": snapshot.track_id,
"gaylord_bbox": list(snapshot.bbox),
"score": snapshot.score,
"debug_frame_path": snapshot.debug_frame_path,
"ocr_payload_path": snapshot.ocr_payload_path,
}
with self.metadata_path.open("at", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=True) + "\n")
def _print_commands(self, snapshot: NavigationSnapshot) -> None:
self.last_command_text = (
f"SNAPSHOT {snapshot.snapshot_id:04d} "
f"track={snapshot.track_id} frame={snapshot.frame_id} "
f"pos={snapshot.simulated_position} score={snapshot.score:.2f}"
)
self.last_command_lines = [
self.last_command_text,
"STOP",
f"SCATTA_FOTO {Path(snapshot.debug_frame_path).name}",
f"ESTRAI_BBOX_CENTRALE track={snapshot.track_id}",
f"ASSOCIA_POSIZIONE {snapshot.simulated_position}",
f"INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}",
f"ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s",
]
log(f"[NAV] {self.last_command_text}")
log("[CMD] STOP")
log(f"[CMD] SCATTA_FOTO {Path(snapshot.debug_frame_path).name}")
log(f"[CMD] ESTRAI_BBOX_CENTRALE track={snapshot.track_id}")
log(f"[CMD] ASSOCIA_POSIZIONE {snapshot.simulated_position}")
log(f"[CMD] INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}")
log(f"[CMD] ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s")
def parse_args():
pre = argparse.ArgumentParser(add_help=False)
pre.add_argument("--config", default=DEFAULT_CONFIG_PATH, help="File configurazione INI")
pre_args, _ = pre.parse_known_args()
defaults = load_navigation_config(pre_args.config)
ap = argparse.ArgumentParser(parents=[pre])
ap.add_argument("-v", "--video", default=defaults["video"], help="Percorso video. Se omesso usa webcam 0")
ap.add_argument(
"--weights",
default=defaults["weights"],
help="Modello Ultralytics .pt",
)
ap.add_argument("--ultralytics-device", default=defaults["ultralytics_device"], help="Device Ultralytics: cpu oppure 0")
ap.add_argument("--input-size", type=int, default=defaults["input_size"], help="Dimensione input YOLO")
ap.add_argument("--min-confidence", type=float, default=defaults["min_confidence"], help="Confidenza minima")
ap.add_argument("--target-class", default=defaults["target_class"], help="Classe da tracciare")
ap.add_argument("--max-track-missed", type=int, default=defaults["max_track_missed"], help="Frame persi prima di rimuovere una track")
ap.add_argument("--min-match-score", type=float, default=defaults["min_match_score"], help="Soglia associazione detection-track")
ap.add_argument("--max-center-distance-ratio", type=float, default=defaults["max_center_distance_ratio"], help="Distanza max centri per matching")
ap.add_argument("--center-tolerance-ratio", type=float, default=defaults["center_tolerance_ratio"], help="Mezza ampiezza zona centrale")
ap.add_argument("--snapshot-line-tolerance-ratio", type=float, default=defaults["snapshot_line_tolerance_ratio"],
help="Tolleranza stretta dalla linea centrale per scattare")
ap.add_argument("--usable-y-min-ratio", type=float, default=defaults["usable_y_min_ratio"], help="Limite alto fascia utile Y")
ap.add_argument("--usable-y-max-ratio", type=float, default=defaults["usable_y_max_ratio"], help="Limite basso fascia utile Y")
ap.add_argument("--min-track-hits", type=int, default=defaults["min_track_hits"], help="Detection consecutive minime")
ap.add_argument("--min-gaylord-area-ratio", type=float, default=defaults["min_gaylord_area_ratio"], help="Area bbox minima sul frame")
ap.add_argument("--edge-margin-ratio", type=float, default=defaults["edge_margin_ratio"], help="Margine per considerare bbox tagliato")
ap.add_argument("--ocr-payload-pad-ratio", type=float, default=defaults["ocr_payload_pad_ratio"],
help="Padding intorno al bbox centrale inviato all'OCR remoto")
ap.add_argument("--min-area-trend", type=float, default=defaults["min_area_trend"], help="Trend area minimo ammesso")
ap.add_argument("--snapshot-window-frames", type=int, default=defaults["snapshot_window_frames"], help="Candidati da valutare prima dello snapshot")
ap.add_argument("--snapshot-output-dir", default=defaults["snapshot_output_dir"], help="Directory snapshot e JSONL")
ap.add_argument("--remote-ack-timeout-sec", type=float, default=defaults["remote_ack_timeout_sec"],
help="Tempo simulato di attesa OCR remoto/WMS")
ap.add_argument("--remote-ack-mode", choices=["always-ack", "always-nack", "alternate"],
default=defaults["remote_ack_mode"], help="Risposta remota simulata")
ap.add_argument("--scan-direction", choices=["destra", "sinistra"], default=defaults["scan_direction"],
help="Direzione simulata di ripartenza dopo ACK")
ap.add_argument("--preview-width", type=int, default=defaults["preview_width"], help="Larghezza preview")
ap.add_argument("--realtime-playback", action="store_true", default=defaults["realtime_playback"], help="Rispetta FPS video")
ap.add_argument("--max-frames", type=int, default=defaults["max_frames"], help="Numero massimo frame; 0 = tutto")
ap.add_argument("--stats-interval", type=float, default=defaults["stats_interval"], help="Intervallo log prestazioni")
ap.add_argument("--motion-report-interval", type=int, default=defaults["motion_report_interval"],
help="Ogni quanti frame aggiornare la direzione moto stimata")
ap.add_argument("--motion-min-pixels", type=float, default=defaults["motion_min_pixels"],
help="Spostamento medio minimo per dichiarare una direzione")
ap.add_argument("--debug-tracks", action="store_true", default=defaults["debug_tracks"], help="Logga stato e criteri delle track")
ap.add_argument("--flash-alpha", type=float, default=defaults["flash_alpha"], help="Intensita' flash 0..1 al momento dello scatto")
ap.add_argument("--no-display", action="store_true", default=defaults["no_display"], help="Disabilita finestra video")
return ap.parse_args()
def load_navigation_config(path_str: str) -> dict[str, object]:
defaults: dict[str, object] = {
"video": "testhd.mp4",
"weights": r"C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt",
"ultralytics_device": "cpu",
"input_size": 640,
"min_confidence": 0.25,
"target_class": "gaylord",
"max_track_missed": 8,
"min_match_score": 0.25,
"max_center_distance_ratio": 0.18,
"center_tolerance_ratio": 0.18,
"snapshot_line_tolerance_ratio": 0.035,
"usable_y_min_ratio": 0.15,
"usable_y_max_ratio": 0.85,
"min_track_hits": 3,
"min_gaylord_area_ratio": 0.02,
"edge_margin_ratio": 0.0,
"ocr_payload_pad_ratio": 0.03,
"min_area_trend": -0.35,
"snapshot_window_frames": 1,
"snapshot_output_dir": "navigate_snapshots",
"remote_ack_timeout_sec": 2.0,
"remote_ack_mode": "always-ack",
"scan_direction": "destra",
"preview_width": 1280,
"realtime_playback": True,
"max_frames": 0,
"stats_interval": 2.0,
"motion_report_interval": 5,
"motion_min_pixels": 1.5,
"debug_tracks": True,
"flash_alpha": 0.70,
"no_display": False,
}
path = Path(path_str)
if not path.exists():
return defaults
parser = configparser.ConfigParser()
parser.read(path, encoding="utf-8")
section = parser["navigation"] if parser.has_section("navigation") else {}
types = {key: type(value) for key, value in defaults.items()}
for key, default_value in defaults.items():
if key not in section:
continue
if types[key] is bool:
defaults[key] = parser.getboolean("navigation", key, fallback=bool(default_value))
elif types[key] is int:
defaults[key] = parser.getint("navigation", key, fallback=int(default_value))
elif types[key] is float:
defaults[key] = parser.getfloat("navigation", key, fallback=float(default_value))
else:
value = section.get(key, str(default_value)).strip()
defaults[key] = None if value.lower() in ("", "none", "null") else value
return defaults
def log(msg: str) -> None:
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
def require_file(path_str: str, description: str) -> Path:
path = Path(path_str)
if not path.exists():
log(f"ERRORE: {description} non trovato: {path}")
sys.exit(1)
return path
def open_capture(video_arg: str | None):
if video_arg is None:
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(0)
return cap, "camera:0"
if str(video_arg).isdigit():
idx = int(video_arg)
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(idx)
return cap, f"camera:{idx}"
return cv2.VideoCapture(video_arg), str(video_arg)
def clip_box(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> tuple[int, int, int, int]:
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
return x1, y1, x2, y2
def crop_with_padding(
frame: np.ndarray,
bbox: tuple[int, int, int, int],
pad_ratio: float,
) -> np.ndarray:
x1, y1, x2, y2 = bbox
bw = x2 - x1
bh = y2 - y1
pad_x = int(max(0.0, pad_ratio) * bw)
pad_y = int(max(0.0, pad_ratio) * bh)
cx1, cy1, cx2, cy2 = clip_box(
x1 - pad_x,
y1 - pad_y,
x2 + pad_x,
y2 + pad_y,
frame.shape[1],
frame.shape[0],
)
return frame[cy1:cy2, cx1:cx2].copy()
def bbox_area(bbox: tuple[int, int, int, int]) -> int:
x1, y1, x2, y2 = bbox
return max(0, x2 - x1) * max(0, y2 - y1)
def bbox_center(bbox: tuple[int, int, int, int]) -> tuple[float, float]:
x1, y1, x2, y2 = bbox
return (x1 + x2) * 0.5, (y1 + y2) * 0.5
def bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float:
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
inter = bbox_area((ix1, iy1, ix2, iy2))
union = bbox_area(a) + bbox_area(b) - inter
if union <= 0:
return 0.0
return inter / float(union)
def association_score(
track_bbox: tuple[int, int, int, int],
det_bbox: tuple[int, int, int, int],
max_center_distance: float,
) -> float:
iou = bbox_iou(track_bbox, det_bbox)
tx, ty = bbox_center(track_bbox)
dx, dy = bbox_center(det_bbox)
center_dist = float(np.hypot(tx - dx, ty - dy))
center_similarity = max(0.0, 1.0 - center_dist / max_center_distance)
return 0.70 * iou + 0.30 * center_similarity
def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray:
h, w = frame.shape[:2]
if max_width <= 0 or w <= max_width:
return frame
scale = max_width / float(w)
return cv2.resize(frame, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LINEAR)
def draw_navigation_debug(
frame: np.ndarray,
tracks: list[Track],
args,
last_command_text: str,
fps_text: str,
) -> np.ndarray:
display = frame.copy()
h, w = display.shape[:2]
center_x = int(w * 0.5)
tol = int(w * args.center_tolerance_ratio)
y_min = int(h * args.usable_y_min_ratio)
y_max = int(h * args.usable_y_max_ratio)
cv2.rectangle(display, (center_x - tol, y_min), (center_x + tol, y_max), (255, 255, 0), 4)
cv2.line(display, (center_x, 0), (center_x, h), (255, 255, 0), 3)
cv2.line(display, (0, y_min), (w, y_min), (100, 100, 100), 2)
cv2.line(display, (0, y_max), (w, y_max), (100, 100, 100), 2)
for track in tracks:
x1, y1, x2, y2 = track.bbox
color = state_color(track.state)
thickness = 8 if track.state == "centered" else 5
cv2.rectangle(display, (x1, y1), (x2, y2), color, thickness)
cx, cy = bbox_center(track.bbox)
cv2.circle(display, (int(cx), int(cy)), 12, color, -1)
cv2.circle(display, (int(cx), int(cy)), 18, (0, 0, 0), 3)
text = (
f"id={track.id} {track.state} conf={track.confidence:.2f} "
f"hits={track.hits} trend={track.area_trend():+.2f}"
)
cv2.putText(
display,
text,
(x1, max(24, y1 - 8)),
cv2.FONT_HERSHEY_SIMPLEX,
0.78,
color,
3,
cv2.LINE_AA,
)
cv2.putText(display, fps_text, (20, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.85, (0, 0, 255), 2)
if last_command_text:
cv2.putText(display, last_command_text, (20, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 255), 2)
return resize_preview(display, args.preview_width)
def draw_commands_window(command_lines: list[str], motion_text: str) -> np.ndarray:
lines = command_lines if command_lines else ["Nessun comando generato"]
canvas_h = max(340, 84 + len(lines[:10]) * 34)
canvas = np.full((canvas_h, 980, 3), 245, dtype=np.uint8)
cv2.putText(
canvas,
"COMANDI NAVIGAZIONE",
(24, 42),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 0, 0),
2,
cv2.LINE_AA,
)
cv2.putText(
canvas,
motion_text,
(24, 76),
cv2.FONT_HERSHEY_SIMPLEX,
0.80,
(120, 0, 120),
2,
cv2.LINE_AA,
)
y = 122
for idx, line in enumerate(lines[:10]):
color = (0, 0, 180) if idx == 0 else (0, 90, 0)
cv2.putText(
canvas,
line,
(24, y),
cv2.FONT_HERSHEY_SIMPLEX,
0.82,
color,
2,
cv2.LINE_AA,
)
y += 36
return canvas
def apply_flash(frame: np.ndarray, alpha: float) -> np.ndarray:
flash = np.full_like(frame, 255)
alpha = min(max(alpha, 0.0), 1.0)
return cv2.addWeighted(frame, 1.0 - alpha, flash, alpha, 0.0)
def estimate_motion_from_tracks(tracks: list[Track], min_pixels: float) -> str:
deltas: list[tuple[float, float]] = []
for track in tracks:
if track.missed != 0 or len(track.center_history) < 2:
continue
x0, y0 = track.center_history[-2]
x1, y1 = track.center_history[-1]
deltas.append((x1 - x0, y1 - y0))
if not deltas:
return "MOTO: n/d"
dx = sum(delta[0] for delta in deltas) / len(deltas)
dy = sum(delta[1] for delta in deltas) / len(deltas)
abs_dx = abs(dx)
abs_dy = abs(dy)
if abs_dx < min_pixels and abs_dy < min_pixels:
direction = "stabile"
elif abs_dx >= abs_dy:
direction = "destra" if dx > 0 else "sinistra"
else:
direction = "giu" if dy > 0 else "su"
return f"MOTO: {direction} dx={dx:+.1f}px dy={dy:+.1f}px tracks={len(deltas)}"
def state_color(state: str) -> tuple[int, int, int]:
if state == "centered":
return (0, 255, 255)
if state == "snapshotted":
return (255, 0, 255)
if state == "candidate":
return (0, 255, 0)
if state == "exiting":
return (0, 140, 255)
return (255, 255, 255)
def main() -> int:
args = parse_args()
require_file(args.weights, "modello Ultralytics")
detector = UltralyticsDetector(args.weights, args.ultralytics_device)
log(f"Classi modello: {detector.classes}")
log("Nota tracker: questa versione usa tracking geometrico interno; ByteTrack/BoT-SORT restano candidati per confronto successivo.")
cap, source_name = open_capture(args.video)
if not cap.isOpened():
log(f"ERRORE: impossibile aprire sorgente video: {source_name}")
return 1
video_fps = cap.get(cv2.CAP_PROP_FPS)
frame_delay = 1.0 / video_fps if args.realtime_playback and video_fps and video_fps > 1 else 0.0
tracker = LightweightTracker(
max_missed=args.max_track_missed,
min_match_score=args.min_match_score,
max_center_distance_ratio=args.max_center_distance_ratio,
)
navigator = NavigationController(args)
if not args.no_display:
cv2.namedWindow("flywms navigate", cv2.WINDOW_NORMAL)
cv2.namedWindow("flywms snapshot", cv2.WINDOW_NORMAL)
cv2.namedWindow("flywms comandi", cv2.WINDOW_NORMAL)
frame_id = 0
start_time = time.perf_counter()
last_stats = start_time
last_loop_end = start_time
yolo_total_ms = 0.0
yolo_cycles = 0
try:
while True:
if frame_delay > 0:
now = time.perf_counter()
sleep_for = frame_delay - (now - last_loop_end)
if sleep_for > 0:
time.sleep(sleep_for)
last_loop_end = time.perf_counter()
ok, frame = cap.read()
if not ok:
log("Fine stream")
break
frame_id += 1
timestamp = time.perf_counter()
if args.max_frames > 0 and frame_id > args.max_frames:
log(f"Raggiunto --max-frames={args.max_frames}")
break
detections, yolo_ms = detector.detect(frame, args.min_confidence, args.input_size)
yolo_total_ms += yolo_ms
yolo_cycles += 1
gaylords = [
det for det in detections
if det.class_name.strip().lower() == args.target_class.strip().lower()
]
tracks = tracker.update(gaylords, frame_id, frame.shape[1])
if args.motion_report_interval > 0 and frame_id % args.motion_report_interval == 0:
navigator.set_motion_text(
estimate_motion_from_tracks(tracks, args.motion_min_pixels)
)
new_snapshots: list[NavigationSnapshot] = []
for track in tracks:
if track.missed == 0:
snapshot = navigator.process_track(track, frame, frame_id, timestamp)
if snapshot is not None:
new_snapshots.append(snapshot)
if args.no_display and new_snapshots:
if args.remote_ack_timeout_sec > 0:
time.sleep(args.remote_ack_timeout_sec)
for snapshot in new_snapshots:
navigator.simulate_remote_response(snapshot)
now = time.perf_counter()
if now - last_stats >= args.stats_interval:
elapsed = max(now - start_time, 0.001)
avg_yolo = yolo_total_ms / max(yolo_cycles, 1)
active = sum(1 for t in tracks if t.missed == 0)
log(
f"fps={frame_id / elapsed:.1f} yolo_fps={yolo_cycles / elapsed:.1f} "
f"avg_yolo={avg_yolo:.1f}ms det={len(gaylords)} tracks={len(tracks)} active={active} "
f"snapshots={navigator.snapshot_counter} {navigator.motion_text}"
)
if args.debug_tracks:
for track in tracks:
cx, cy = bbox_center(track.bbox)
area_ratio = bbox_area(track.bbox) / float(frame.shape[0] * frame.shape[1])
log(
f" track={track.id} state={track.state} hits={track.hits} "
f"missed={track.missed} center=({cx:.0f},{cy:.0f}) "
f"area={area_ratio:.3f} trend={track.area_trend():+.2f} "
f"reason={track.last_candidate_reason}"
)
last_stats = now
if not args.no_display:
elapsed = max(time.perf_counter() - start_time, 0.001)
fps_text = (
f"frame={frame_id} fps={frame_id / elapsed:.1f} "
f"det={len(gaylords)} tracks={len(tracks)} snap={navigator.snapshot_counter}"
)
display = draw_navigation_debug(
frame,
tracks,
args,
navigator.last_command_text,
fps_text,
)
cv2.imshow("flywms navigate", display)
if navigator.last_ocr_payload_frame is not None:
snapshot_display = resize_preview(navigator.last_ocr_payload_frame, args.preview_width)
cv2.imshow("flywms snapshot", snapshot_display)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
if new_snapshots:
flash_display = apply_flash(display, args.flash_alpha)
cv2.imshow("flywms navigate", flash_display)
if navigator.last_ocr_payload_frame is not None:
flash_snapshot = apply_flash(
resize_preview(navigator.last_ocr_payload_frame, args.preview_width),
args.flash_alpha,
)
cv2.imshow("flywms snapshot", flash_snapshot)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
pause_ms = max(1, int(args.remote_ack_timeout_sec * 1000))
key = cv2.waitKey(pause_ms) & 0xFF
if key in (27, ord("q")):
log("Interrotto da tastiera")
break
for snapshot in new_snapshots:
navigator.simulate_remote_response(snapshot)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
key = cv2.waitKey(1) & 0xFF
if key in (27, ord("q")):
log("Interrotto da tastiera")
break
finally:
cap.release()
if not args.no_display:
cv2.destroyAllWindows()
log(f"Snapshot salvati in: {Path(args.snapshot_output_dir).resolve()}")
return 0
if __name__ == "__main__":
raise SystemExit(main())