Compare commits
2 Commits
milestone-
...
16458d98e9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16458d98e9 | ||
|
|
8a8bea1211 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -11,3 +11,4 @@ dataset_yolo/labels/*.cache
|
||||
dataset_yolo/labels/*_backup_before_remap_*/
|
||||
runs/flywms_dataset_check/
|
||||
runs/flywms_dataset_check_1epoch/
|
||||
navigate_snapshots*/
|
||||
|
||||
166
flywms_navigation.ini
Normal file
166
flywms_navigation.ini
Normal file
@@ -0,0 +1,166 @@
|
||||
[navigation]
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: sorgente video usata per simulare la camera del drone.
|
||||
; Se vuoto o "none", usa webcam 0.
|
||||
; Default se non indicato: testhd.mp4
|
||||
video = testhd.mp4
|
||||
|
||||
; OBBLIGATORIO: si.
|
||||
; Ruolo: modello Ultralytics/YOLO moderno usato per rilevare gaylord ed etichette.
|
||||
; Default se non indicato: C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt
|
||||
weights = C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: device usato da Ultralytics. Usa "cpu" ora; con GPU compatibile usare "0".
|
||||
; Default se non indicato: cpu
|
||||
ultralytics_device = cpu
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: dimensione input YOLO. 640 e' il valore usato nel training rapido.
|
||||
; Default se non indicato: 640
|
||||
input_size = 640
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: confidenza minima delle detection accettate dal detector.
|
||||
; Default se non indicato: 0.25
|
||||
min_confidence = 0.25
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: classe tracciata dalla navigazione. Le altre detection non entrano nel tracker.
|
||||
; Default se non indicato: gaylord
|
||||
target_class = gaylord
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: numero massimo di frame in cui una track puo' non essere vista prima di essere rimossa.
|
||||
; Default se non indicato: 8
|
||||
max_track_missed = 8
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: soglia minima dello score che associa una detection a una track esistente.
|
||||
; Default se non indicato: 0.25
|
||||
min_match_score = 0.25
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: distanza massima ammessa tra centri bbox, espressa come frazione della larghezza frame.
|
||||
; Default se non indicato: 0.18
|
||||
max_center_distance_ratio = 0.18
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: mezza ampiezza della fascia azzurra di avvicinamento al centro.
|
||||
; Non fa scattare la foto: indica solo che la track e' candidata.
|
||||
; Default se non indicato: 0.18
|
||||
center_tolerance_ratio = 0.18
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: tolleranza stretta dalla linea verticale centrale per scattare la foto.
|
||||
; La foto parte quando il centro bbox e' entro questa soglia.
|
||||
; Default se non indicato: 0.035
|
||||
snapshot_line_tolerance_ratio = 0.035
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: limite verticale superiore della fascia utile della scaffalatura.
|
||||
; Default se non indicato: 0.15
|
||||
usable_y_min_ratio = 0.15
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: limite verticale inferiore della fascia utile della scaffalatura.
|
||||
; Default se non indicato: 0.85
|
||||
usable_y_max_ratio = 0.85
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: numero minimo di detection confermate prima di considerare affidabile una track.
|
||||
; Default se non indicato: 3
|
||||
min_track_hits = 3
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: area minima del bbox gaylord rispetto all'intero frame.
|
||||
; Serve a ignorare oggetti troppo lontani/piccoli.
|
||||
; Default se non indicato: 0.02
|
||||
min_gaylord_area_ratio = 0.02
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: margine da bordo immagine per considerare un bbox tagliato.
|
||||
; 0 disabilita questo filtro, utile con il video manuale di test.
|
||||
; Default se non indicato: 0.0
|
||||
edge_margin_ratio = 0.0
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: padding aggiunto al bbox centrale prima di salvare il crop inviato all'OCR remoto.
|
||||
; Default se non indicato: 0.03
|
||||
ocr_payload_pad_ratio = 0.03
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: trend minimo dell'area bbox negli ultimi frame. Valori negativi tollerano leggera uscita.
|
||||
; Default se non indicato: -0.35
|
||||
min_area_trend = -0.35
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: numero di candidati da valutare prima dello snapshot.
|
||||
; 1 significa: scatta subito quando il centro tocca la linea.
|
||||
; Default se non indicato: 1
|
||||
snapshot_window_frames = 1
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: directory dove salvare frame debug, crop OCR e snapshots.jsonl.
|
||||
; Default se non indicato: navigate_snapshots
|
||||
snapshot_output_dir = navigate_snapshots
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: tempo simulato con cui il drone attende OCR remoto + verifica WMS.
|
||||
; Default se non indicato: 2.0
|
||||
remote_ack_timeout_sec = 2.0
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: risposta remota simulata. Valori: always-ack, always-nack, alternate.
|
||||
; Default se non indicato: always-ack
|
||||
remote_ack_mode = always-ack
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: direzione simulata di ripartenza dopo ACK. Valori: destra, sinistra.
|
||||
; Default se non indicato: destra
|
||||
scan_direction = destra
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: larghezza massima delle finestre video di debug.
|
||||
; Default se non indicato: 1280
|
||||
preview_width = 1280
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: se true, il video di test viene riprodotto rispettando il framerate originale.
|
||||
; Default se non indicato: true
|
||||
realtime_playback = true
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: massimo numero di frame da processare. 0 significa tutto il video.
|
||||
; Default se non indicato: 0
|
||||
max_frames = 0
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: ogni quanti secondi stampare statistiche nel terminale.
|
||||
; Default se non indicato: 2.0
|
||||
stats_interval = 2.0
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: ogni quanti frame aggiornare il moto apparente stimato dalle track.
|
||||
; Default se non indicato: 5
|
||||
motion_report_interval = 5
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: movimento medio minimo in pixel per dichiarare destra/sinistra/su/giu.
|
||||
; Default se non indicato: 1.5
|
||||
motion_min_pixels = 1.5
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: se true, logga nel terminale lo stato delle track e i motivi di non scatto.
|
||||
; Default se non indicato: true
|
||||
debug_tracks = true
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: intensita' del flash visuale simulato al momento dello scatto, da 0 a 1.
|
||||
; Default se non indicato: 0.70
|
||||
flash_alpha = 0.70
|
||||
|
||||
; OBBLIGATORIO: no.
|
||||
; Ruolo: se true, disabilita tutte le finestre video. Usarlo solo per test headless.
|
||||
; Default se non indicato: false
|
||||
no_display = false
|
||||
991
flywms_navigation.py
Normal file
991
flywms_navigation.py
Normal file
@@ -0,0 +1,991 @@
|
||||
import argparse
|
||||
import configparser
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
DEFAULT_CONFIG_PATH = "flywms_navigation.ini"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Detection:
|
||||
class_id: int
|
||||
class_name: str
|
||||
confidence: float
|
||||
bbox: tuple[int, int, int, int]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CandidateSnapshot:
|
||||
frame_id: int
|
||||
timestamp: float
|
||||
frame: np.ndarray
|
||||
bbox: tuple[int, int, int, int]
|
||||
score: float
|
||||
center_score: float
|
||||
size_score: float
|
||||
cut_score: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class Track:
|
||||
id: int
|
||||
bbox: tuple[int, int, int, int]
|
||||
confidence: float
|
||||
first_seen_frame: int
|
||||
last_seen_frame: int
|
||||
hits: int = 1
|
||||
missed: int = 0
|
||||
state: str = "entering"
|
||||
last_candidate_reason: str = ""
|
||||
pending_remote_response: str = "none"
|
||||
already_snapshotted: bool = False
|
||||
bbox_history: list[tuple[int, int, int, int]] = field(default_factory=list)
|
||||
center_history: list[tuple[float, float]] = field(default_factory=list)
|
||||
area_history: list[float] = field(default_factory=list)
|
||||
candidates: list[CandidateSnapshot] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self._append_history(self.bbox)
|
||||
|
||||
def update(self, bbox: tuple[int, int, int, int], confidence: float, frame_id: int) -> None:
|
||||
self.bbox = bbox
|
||||
self.confidence = confidence
|
||||
self.last_seen_frame = frame_id
|
||||
self.hits += 1
|
||||
self.missed = 0
|
||||
self._append_history(bbox)
|
||||
|
||||
def mark_missed(self) -> None:
|
||||
self.missed += 1
|
||||
if self.missed > 0 and self.state != "snapshotted":
|
||||
self.state = "exiting"
|
||||
|
||||
def _append_history(self, bbox: tuple[int, int, int, int]) -> None:
|
||||
self.bbox_history.append(bbox)
|
||||
self.center_history.append(bbox_center(bbox))
|
||||
self.area_history.append(float(bbox_area(bbox)))
|
||||
keep = 20
|
||||
self.bbox_history = self.bbox_history[-keep:]
|
||||
self.center_history = self.center_history[-keep:]
|
||||
self.area_history = self.area_history[-keep:]
|
||||
|
||||
def area_trend(self) -> float:
|
||||
if len(self.area_history) < 4:
|
||||
return 0.0
|
||||
old = self.area_history[-4]
|
||||
new = self.area_history[-1]
|
||||
return (new - old) / max(old, 1.0)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class NavigationSnapshot:
|
||||
snapshot_id: int
|
||||
frame_id: int
|
||||
timestamp: float
|
||||
simulated_position: str
|
||||
track_id: int
|
||||
bbox: tuple[int, int, int, int]
|
||||
score: float
|
||||
debug_frame_path: str
|
||||
ocr_payload_path: str
|
||||
|
||||
|
||||
class UltralyticsDetector:
|
||||
def __init__(self, model_path: str, device: str):
|
||||
from ultralytics import YOLO
|
||||
|
||||
self.model = YOLO(model_path)
|
||||
self.device = device
|
||||
names = self.model.names
|
||||
if isinstance(names, dict):
|
||||
self.classes = [str(names[i]) for i in sorted(names)]
|
||||
else:
|
||||
self.classes = [str(name) for name in names]
|
||||
|
||||
def detect(
|
||||
self,
|
||||
frame: np.ndarray,
|
||||
min_confidence: float,
|
||||
input_size: int,
|
||||
) -> tuple[list[Detection], float]:
|
||||
t0 = time.perf_counter()
|
||||
results = self.model.predict(
|
||||
source=frame,
|
||||
imgsz=input_size,
|
||||
conf=min_confidence,
|
||||
device=self.device,
|
||||
verbose=False,
|
||||
)
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000.0
|
||||
|
||||
detections: list[Detection] = []
|
||||
if not results:
|
||||
return detections, elapsed_ms
|
||||
|
||||
boxes = results[0].boxes
|
||||
if boxes is None:
|
||||
return detections, elapsed_ms
|
||||
|
||||
xyxy = boxes.xyxy.cpu().numpy()
|
||||
confs = boxes.conf.cpu().numpy()
|
||||
clss = boxes.cls.cpu().numpy().astype(int)
|
||||
for box, conf, cls_id in zip(xyxy, confs, clss):
|
||||
x1, y1, x2, y2 = [int(round(v)) for v in box]
|
||||
x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, frame.shape[1], frame.shape[0])
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
continue
|
||||
class_name = self.classes[cls_id] if 0 <= cls_id < len(self.classes) else str(cls_id)
|
||||
detections.append(Detection(
|
||||
class_id=int(cls_id),
|
||||
class_name=class_name,
|
||||
confidence=float(conf),
|
||||
bbox=(x1, y1, x2, y2),
|
||||
))
|
||||
return detections, elapsed_ms
|
||||
|
||||
|
||||
class LightweightTracker:
|
||||
"""Greedy bbox tracker: enough to explain and test navigation decisions."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_missed: int,
|
||||
min_match_score: float,
|
||||
max_center_distance_ratio: float,
|
||||
):
|
||||
self.max_missed = max_missed
|
||||
self.min_match_score = min_match_score
|
||||
self.max_center_distance_ratio = max_center_distance_ratio
|
||||
self._next_id = 1
|
||||
self.tracks: dict[int, Track] = {}
|
||||
|
||||
def update(
|
||||
self,
|
||||
detections: list[Detection],
|
||||
frame_id: int,
|
||||
frame_width: int,
|
||||
) -> list[Track]:
|
||||
unmatched_tracks = set(self.tracks.keys())
|
||||
unmatched_detections = set(range(len(detections)))
|
||||
pairs: list[tuple[float, int, int]] = []
|
||||
|
||||
max_center_distance = max(1.0, frame_width * self.max_center_distance_ratio)
|
||||
for track_id, track in self.tracks.items():
|
||||
for det_idx, det in enumerate(detections):
|
||||
score = association_score(track.bbox, det.bbox, max_center_distance)
|
||||
if score >= self.min_match_score:
|
||||
pairs.append((score, track_id, det_idx))
|
||||
|
||||
pairs.sort(reverse=True, key=lambda item: item[0])
|
||||
for _, track_id, det_idx in pairs:
|
||||
if track_id not in unmatched_tracks or det_idx not in unmatched_detections:
|
||||
continue
|
||||
det = detections[det_idx]
|
||||
self.tracks[track_id].update(det.bbox, det.confidence, frame_id)
|
||||
unmatched_tracks.remove(track_id)
|
||||
unmatched_detections.remove(det_idx)
|
||||
|
||||
for track_id in list(unmatched_tracks):
|
||||
self.tracks[track_id].mark_missed()
|
||||
if self.tracks[track_id].missed > self.max_missed:
|
||||
del self.tracks[track_id]
|
||||
|
||||
for det_idx in unmatched_detections:
|
||||
det = detections[det_idx]
|
||||
track_id = self._next_id
|
||||
self._next_id += 1
|
||||
self.tracks[track_id] = Track(
|
||||
id=track_id,
|
||||
bbox=det.bbox,
|
||||
confidence=det.confidence,
|
||||
first_seen_frame=frame_id,
|
||||
last_seen_frame=frame_id,
|
||||
)
|
||||
|
||||
return list(self.tracks.values())
|
||||
|
||||
|
||||
class NavigationController:
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
self.output_dir = Path(args.snapshot_output_dir)
|
||||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.metadata_path = self.output_dir / "snapshots.jsonl"
|
||||
self.snapshot_counter = 0
|
||||
self.position_counter = 0
|
||||
self.last_command_text = ""
|
||||
self.last_command_lines: list[str] = []
|
||||
self.last_snapshot_frame: np.ndarray | None = None
|
||||
self.last_ocr_payload_frame: np.ndarray | None = None
|
||||
self.last_remote_result_text = ""
|
||||
self.motion_text = "MOTO: n/d"
|
||||
|
||||
def process_track(
|
||||
self,
|
||||
track: Track,
|
||||
frame: np.ndarray,
|
||||
frame_id: int,
|
||||
timestamp: float,
|
||||
) -> NavigationSnapshot | None:
|
||||
frame_h, frame_w = frame.shape[:2]
|
||||
eligible, score_parts = self._is_snapshot_candidate(track, frame_w, frame_h)
|
||||
self._update_track_state(track, eligible, frame_w)
|
||||
|
||||
if track.already_snapshotted:
|
||||
return None
|
||||
|
||||
if eligible:
|
||||
candidate = CandidateSnapshot(
|
||||
frame_id=frame_id,
|
||||
timestamp=timestamp,
|
||||
frame=frame.copy(),
|
||||
bbox=track.bbox,
|
||||
score=score_parts["score"],
|
||||
center_score=score_parts["center_score"],
|
||||
size_score=score_parts["size_score"],
|
||||
cut_score=score_parts["cut_score"],
|
||||
)
|
||||
track.candidates.append(candidate)
|
||||
track.candidates = track.candidates[-self.args.snapshot_window_frames:]
|
||||
|
||||
if len(track.candidates) >= self.args.snapshot_window_frames:
|
||||
return self._finalize_snapshot(track)
|
||||
elif track.candidates:
|
||||
return self._finalize_snapshot(track)
|
||||
|
||||
return None
|
||||
|
||||
def _is_snapshot_candidate(
|
||||
self,
|
||||
track: Track,
|
||||
frame_w: int,
|
||||
frame_h: int,
|
||||
) -> tuple[bool, dict[str, float]]:
|
||||
x1, y1, x2, y2 = track.bbox
|
||||
cx, cy = bbox_center(track.bbox)
|
||||
center_x = frame_w * 0.5
|
||||
center_tolerance = max(1.0, frame_w * self.args.center_tolerance_ratio)
|
||||
snapshot_tolerance = max(1.0, frame_w * self.args.snapshot_line_tolerance_ratio)
|
||||
center_delta = abs(cx - center_x)
|
||||
center_score = max(0.0, 1.0 - center_delta / center_tolerance)
|
||||
|
||||
area_ratio = bbox_area(track.bbox) / float(frame_w * frame_h)
|
||||
size_score = min(1.0, area_ratio / max(self.args.min_gaylord_area_ratio * 4.0, 0.001))
|
||||
|
||||
if self.args.edge_margin_ratio <= 0:
|
||||
cut = False
|
||||
else:
|
||||
edge_margin_x = frame_w * self.args.edge_margin_ratio
|
||||
edge_margin_y = frame_h * self.args.edge_margin_ratio
|
||||
cut = (
|
||||
x1 <= edge_margin_x
|
||||
or y1 <= edge_margin_y
|
||||
or x2 >= frame_w - edge_margin_x
|
||||
or y2 >= frame_h - edge_margin_y
|
||||
)
|
||||
cut_score = 0.0 if cut else 1.0
|
||||
score = 0.50 * center_score + 0.30 * size_score + 0.20 * cut_score
|
||||
|
||||
in_center_band = center_delta <= center_tolerance
|
||||
on_snapshot_line = center_delta <= snapshot_tolerance
|
||||
in_y_band = (
|
||||
frame_h * self.args.usable_y_min_ratio
|
||||
<= cy
|
||||
<= frame_h * self.args.usable_y_max_ratio
|
||||
)
|
||||
enough_hits = track.hits >= self.args.min_track_hits
|
||||
large_enough = area_ratio >= self.args.min_gaylord_area_ratio
|
||||
trend_ok = track.area_trend() >= self.args.min_area_trend
|
||||
eligible = (
|
||||
enough_hits
|
||||
and on_snapshot_line
|
||||
and in_y_band
|
||||
and large_enough
|
||||
and not cut
|
||||
and trend_ok
|
||||
and track.missed == 0
|
||||
)
|
||||
failed: list[str] = []
|
||||
if not enough_hits:
|
||||
failed.append(f"hits<{self.args.min_track_hits}")
|
||||
if not in_center_band:
|
||||
failed.append(f"outside_band={center_delta:.0f}>{center_tolerance:.0f}")
|
||||
elif not on_snapshot_line:
|
||||
failed.append(f"wait_line={center_delta:.0f}>{snapshot_tolerance:.0f}")
|
||||
if not in_y_band:
|
||||
failed.append("y_band")
|
||||
if not large_enough:
|
||||
failed.append(f"area={area_ratio:.3f}<{self.args.min_gaylord_area_ratio:.3f}")
|
||||
if cut:
|
||||
failed.append("edge_cut")
|
||||
if not trend_ok:
|
||||
failed.append(f"trend={track.area_trend():+.2f}<{self.args.min_area_trend:+.2f}")
|
||||
if track.missed != 0:
|
||||
failed.append(f"missed={track.missed}")
|
||||
track.last_candidate_reason = "ok" if eligible else ",".join(failed)
|
||||
return eligible, {
|
||||
"score": score,
|
||||
"center_score": center_score,
|
||||
"size_score": size_score,
|
||||
"cut_score": cut_score,
|
||||
}
|
||||
|
||||
def _update_track_state(self, track: Track, eligible: bool, frame_w: int) -> None:
|
||||
if track.already_snapshotted:
|
||||
track.state = "snapshotted"
|
||||
return
|
||||
if track.missed > 0:
|
||||
track.state = "exiting"
|
||||
return
|
||||
cx, _ = bbox_center(track.bbox)
|
||||
center_delta = abs(cx - frame_w * 0.5)
|
||||
snapshot_tolerance = frame_w * self.args.snapshot_line_tolerance_ratio
|
||||
if eligible:
|
||||
track.state = "centered"
|
||||
elif track.hits < self.args.min_track_hits:
|
||||
track.state = "entering"
|
||||
elif center_delta <= snapshot_tolerance:
|
||||
track.state = "centered"
|
||||
elif center_delta <= frame_w * self.args.center_tolerance_ratio:
|
||||
track.state = "candidate"
|
||||
else:
|
||||
track.state = "entering"
|
||||
|
||||
def _finalize_snapshot(self, track: Track) -> NavigationSnapshot | None:
|
||||
if not track.candidates:
|
||||
return None
|
||||
best = max(track.candidates, key=lambda item: item.score)
|
||||
track.candidates.clear()
|
||||
track.already_snapshotted = True
|
||||
track.state = "snapshotted"
|
||||
|
||||
self.snapshot_counter += 1
|
||||
self.position_counter += 1
|
||||
simulated_position = f"gaylord {self.position_counter}"
|
||||
debug_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_frame.jpg"
|
||||
payload_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_ocr_payload.jpg"
|
||||
debug_path = self.output_dir / debug_name
|
||||
payload_path = self.output_dir / payload_name
|
||||
cv2.imwrite(str(debug_path), best.frame)
|
||||
ocr_payload = crop_with_padding(
|
||||
best.frame,
|
||||
best.bbox,
|
||||
self.args.ocr_payload_pad_ratio,
|
||||
)
|
||||
cv2.imwrite(str(payload_path), ocr_payload)
|
||||
self.last_snapshot_frame = best.frame.copy()
|
||||
self.last_ocr_payload_frame = ocr_payload.copy()
|
||||
|
||||
snapshot = NavigationSnapshot(
|
||||
snapshot_id=self.snapshot_counter,
|
||||
frame_id=best.frame_id,
|
||||
timestamp=best.timestamp,
|
||||
simulated_position=simulated_position,
|
||||
track_id=track.id,
|
||||
bbox=best.bbox,
|
||||
score=best.score,
|
||||
debug_frame_path=str(debug_path),
|
||||
ocr_payload_path=str(payload_path),
|
||||
)
|
||||
self._write_metadata(snapshot)
|
||||
self._print_commands(snapshot)
|
||||
return snapshot
|
||||
|
||||
def simulate_remote_response(self, snapshot: NavigationSnapshot) -> str:
|
||||
mode = self.args.remote_ack_mode
|
||||
if mode == "always-ack":
|
||||
result = "ACK"
|
||||
elif mode == "always-nack":
|
||||
result = "NACK"
|
||||
else:
|
||||
result = "ACK" if snapshot.snapshot_id % 2 == 1 else "NACK"
|
||||
|
||||
if result == "ACK":
|
||||
self.last_remote_result_text = "ACK_RICEVUTO: codice valido su WMS"
|
||||
resume_command = f"RIPARTI_{self.args.scan_direction.upper()}"
|
||||
self.last_command_lines.extend([
|
||||
self.last_remote_result_text,
|
||||
resume_command,
|
||||
])
|
||||
log("[REMOTE] ACK_RICEVUTO codice valido su WMS")
|
||||
log(f"[CMD] {resume_command}")
|
||||
else:
|
||||
self.last_remote_result_text = "NACK_RICEVUTO: riprovare foto"
|
||||
self.last_command_lines.extend([
|
||||
self.last_remote_result_text,
|
||||
"MICRO_MOVE_CORRETTIVO",
|
||||
"SCATTA_FOTO_RETRY",
|
||||
])
|
||||
log("[REMOTE] NACK_RICEVUTO codice assente/non valido")
|
||||
log("[CMD] MICRO_MOVE_CORRETTIVO")
|
||||
log("[CMD] SCATTA_FOTO_RETRY")
|
||||
return result
|
||||
|
||||
def set_motion_text(self, text: str) -> None:
|
||||
self.motion_text = text
|
||||
|
||||
def _write_metadata(self, snapshot: NavigationSnapshot) -> None:
|
||||
record = {
|
||||
"snapshot_id": snapshot.snapshot_id,
|
||||
"frame_id": snapshot.frame_id,
|
||||
"timestamp": snapshot.timestamp,
|
||||
"simulated_position": snapshot.simulated_position,
|
||||
"drone_pose_simulated": {
|
||||
"mode": "linear_shelf_scan",
|
||||
"position_label": snapshot.simulated_position,
|
||||
},
|
||||
"track_id": snapshot.track_id,
|
||||
"gaylord_bbox": list(snapshot.bbox),
|
||||
"score": snapshot.score,
|
||||
"debug_frame_path": snapshot.debug_frame_path,
|
||||
"ocr_payload_path": snapshot.ocr_payload_path,
|
||||
}
|
||||
with self.metadata_path.open("at", encoding="utf-8") as f:
|
||||
f.write(json.dumps(record, ensure_ascii=True) + "\n")
|
||||
|
||||
def _print_commands(self, snapshot: NavigationSnapshot) -> None:
|
||||
self.last_command_text = (
|
||||
f"SNAPSHOT {snapshot.snapshot_id:04d} "
|
||||
f"track={snapshot.track_id} frame={snapshot.frame_id} "
|
||||
f"pos={snapshot.simulated_position} score={snapshot.score:.2f}"
|
||||
)
|
||||
self.last_command_lines = [
|
||||
self.last_command_text,
|
||||
"STOP",
|
||||
f"SCATTA_FOTO {Path(snapshot.debug_frame_path).name}",
|
||||
f"ESTRAI_BBOX_CENTRALE track={snapshot.track_id}",
|
||||
f"ASSOCIA_POSIZIONE {snapshot.simulated_position}",
|
||||
f"INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}",
|
||||
f"ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s",
|
||||
]
|
||||
log(f"[NAV] {self.last_command_text}")
|
||||
log("[CMD] STOP")
|
||||
log(f"[CMD] SCATTA_FOTO {Path(snapshot.debug_frame_path).name}")
|
||||
log(f"[CMD] ESTRAI_BBOX_CENTRALE track={snapshot.track_id}")
|
||||
log(f"[CMD] ASSOCIA_POSIZIONE {snapshot.simulated_position}")
|
||||
log(f"[CMD] INVIA_ROI_REMOTA {Path(snapshot.ocr_payload_path).name}")
|
||||
log(f"[CMD] ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s")
|
||||
|
||||
|
||||
def parse_args():
|
||||
pre = argparse.ArgumentParser(add_help=False)
|
||||
pre.add_argument("--config", default=DEFAULT_CONFIG_PATH, help="File configurazione INI")
|
||||
pre_args, _ = pre.parse_known_args()
|
||||
defaults = load_navigation_config(pre_args.config)
|
||||
|
||||
ap = argparse.ArgumentParser(parents=[pre])
|
||||
ap.add_argument("-v", "--video", default=defaults["video"], help="Percorso video. Se omesso usa webcam 0")
|
||||
ap.add_argument(
|
||||
"--weights",
|
||||
default=defaults["weights"],
|
||||
help="Modello Ultralytics .pt",
|
||||
)
|
||||
ap.add_argument("--ultralytics-device", default=defaults["ultralytics_device"], help="Device Ultralytics: cpu oppure 0")
|
||||
ap.add_argument("--input-size", type=int, default=defaults["input_size"], help="Dimensione input YOLO")
|
||||
ap.add_argument("--min-confidence", type=float, default=defaults["min_confidence"], help="Confidenza minima")
|
||||
ap.add_argument("--target-class", default=defaults["target_class"], help="Classe da tracciare")
|
||||
|
||||
ap.add_argument("--max-track-missed", type=int, default=defaults["max_track_missed"], help="Frame persi prima di rimuovere una track")
|
||||
ap.add_argument("--min-match-score", type=float, default=defaults["min_match_score"], help="Soglia associazione detection-track")
|
||||
ap.add_argument("--max-center-distance-ratio", type=float, default=defaults["max_center_distance_ratio"], help="Distanza max centri per matching")
|
||||
|
||||
ap.add_argument("--center-tolerance-ratio", type=float, default=defaults["center_tolerance_ratio"], help="Mezza ampiezza zona centrale")
|
||||
ap.add_argument("--snapshot-line-tolerance-ratio", type=float, default=defaults["snapshot_line_tolerance_ratio"],
|
||||
help="Tolleranza stretta dalla linea centrale per scattare")
|
||||
ap.add_argument("--usable-y-min-ratio", type=float, default=defaults["usable_y_min_ratio"], help="Limite alto fascia utile Y")
|
||||
ap.add_argument("--usable-y-max-ratio", type=float, default=defaults["usable_y_max_ratio"], help="Limite basso fascia utile Y")
|
||||
ap.add_argument("--min-track-hits", type=int, default=defaults["min_track_hits"], help="Detection consecutive minime")
|
||||
ap.add_argument("--min-gaylord-area-ratio", type=float, default=defaults["min_gaylord_area_ratio"], help="Area bbox minima sul frame")
|
||||
ap.add_argument("--edge-margin-ratio", type=float, default=defaults["edge_margin_ratio"], help="Margine per considerare bbox tagliato")
|
||||
ap.add_argument("--ocr-payload-pad-ratio", type=float, default=defaults["ocr_payload_pad_ratio"],
|
||||
help="Padding intorno al bbox centrale inviato all'OCR remoto")
|
||||
ap.add_argument("--min-area-trend", type=float, default=defaults["min_area_trend"], help="Trend area minimo ammesso")
|
||||
ap.add_argument("--snapshot-window-frames", type=int, default=defaults["snapshot_window_frames"], help="Candidati da valutare prima dello snapshot")
|
||||
ap.add_argument("--snapshot-output-dir", default=defaults["snapshot_output_dir"], help="Directory snapshot e JSONL")
|
||||
ap.add_argument("--remote-ack-timeout-sec", type=float, default=defaults["remote_ack_timeout_sec"],
|
||||
help="Tempo simulato di attesa OCR remoto/WMS")
|
||||
ap.add_argument("--remote-ack-mode", choices=["always-ack", "always-nack", "alternate"],
|
||||
default=defaults["remote_ack_mode"], help="Risposta remota simulata")
|
||||
ap.add_argument("--scan-direction", choices=["destra", "sinistra"], default=defaults["scan_direction"],
|
||||
help="Direzione simulata di ripartenza dopo ACK")
|
||||
|
||||
ap.add_argument("--preview-width", type=int, default=defaults["preview_width"], help="Larghezza preview")
|
||||
ap.add_argument("--realtime-playback", action="store_true", default=defaults["realtime_playback"], help="Rispetta FPS video")
|
||||
ap.add_argument("--max-frames", type=int, default=defaults["max_frames"], help="Numero massimo frame; 0 = tutto")
|
||||
ap.add_argument("--stats-interval", type=float, default=defaults["stats_interval"], help="Intervallo log prestazioni")
|
||||
ap.add_argument("--motion-report-interval", type=int, default=defaults["motion_report_interval"],
|
||||
help="Ogni quanti frame aggiornare la direzione moto stimata")
|
||||
ap.add_argument("--motion-min-pixels", type=float, default=defaults["motion_min_pixels"],
|
||||
help="Spostamento medio minimo per dichiarare una direzione")
|
||||
ap.add_argument("--debug-tracks", action="store_true", default=defaults["debug_tracks"], help="Logga stato e criteri delle track")
|
||||
ap.add_argument("--flash-alpha", type=float, default=defaults["flash_alpha"], help="Intensita' flash 0..1 al momento dello scatto")
|
||||
ap.add_argument("--no-display", action="store_true", default=defaults["no_display"], help="Disabilita finestra video")
|
||||
return ap.parse_args()
|
||||
|
||||
|
||||
def load_navigation_config(path_str: str) -> dict[str, object]:
|
||||
defaults: dict[str, object] = {
|
||||
"video": "testhd.mp4",
|
||||
"weights": r"C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt",
|
||||
"ultralytics_device": "cpu",
|
||||
"input_size": 640,
|
||||
"min_confidence": 0.25,
|
||||
"target_class": "gaylord",
|
||||
"max_track_missed": 8,
|
||||
"min_match_score": 0.25,
|
||||
"max_center_distance_ratio": 0.18,
|
||||
"center_tolerance_ratio": 0.18,
|
||||
"snapshot_line_tolerance_ratio": 0.035,
|
||||
"usable_y_min_ratio": 0.15,
|
||||
"usable_y_max_ratio": 0.85,
|
||||
"min_track_hits": 3,
|
||||
"min_gaylord_area_ratio": 0.02,
|
||||
"edge_margin_ratio": 0.0,
|
||||
"ocr_payload_pad_ratio": 0.03,
|
||||
"min_area_trend": -0.35,
|
||||
"snapshot_window_frames": 1,
|
||||
"snapshot_output_dir": "navigate_snapshots",
|
||||
"remote_ack_timeout_sec": 2.0,
|
||||
"remote_ack_mode": "always-ack",
|
||||
"scan_direction": "destra",
|
||||
"preview_width": 1280,
|
||||
"realtime_playback": True,
|
||||
"max_frames": 0,
|
||||
"stats_interval": 2.0,
|
||||
"motion_report_interval": 5,
|
||||
"motion_min_pixels": 1.5,
|
||||
"debug_tracks": True,
|
||||
"flash_alpha": 0.70,
|
||||
"no_display": False,
|
||||
}
|
||||
|
||||
path = Path(path_str)
|
||||
if not path.exists():
|
||||
return defaults
|
||||
|
||||
parser = configparser.ConfigParser()
|
||||
parser.read(path, encoding="utf-8")
|
||||
section = parser["navigation"] if parser.has_section("navigation") else {}
|
||||
|
||||
types = {key: type(value) for key, value in defaults.items()}
|
||||
for key, default_value in defaults.items():
|
||||
if key not in section:
|
||||
continue
|
||||
if types[key] is bool:
|
||||
defaults[key] = parser.getboolean("navigation", key, fallback=bool(default_value))
|
||||
elif types[key] is int:
|
||||
defaults[key] = parser.getint("navigation", key, fallback=int(default_value))
|
||||
elif types[key] is float:
|
||||
defaults[key] = parser.getfloat("navigation", key, fallback=float(default_value))
|
||||
else:
|
||||
value = section.get(key, str(default_value)).strip()
|
||||
defaults[key] = None if value.lower() in ("", "none", "null") else value
|
||||
return defaults
|
||||
|
||||
|
||||
def log(msg: str) -> None:
|
||||
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
|
||||
|
||||
|
||||
def require_file(path_str: str, description: str) -> Path:
|
||||
path = Path(path_str)
|
||||
if not path.exists():
|
||||
log(f"ERRORE: {description} non trovato: {path}")
|
||||
sys.exit(1)
|
||||
return path
|
||||
|
||||
|
||||
def open_capture(video_arg: str | None):
|
||||
if video_arg is None:
|
||||
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
|
||||
if not cap.isOpened():
|
||||
cap = cv2.VideoCapture(0)
|
||||
return cap, "camera:0"
|
||||
|
||||
if str(video_arg).isdigit():
|
||||
idx = int(video_arg)
|
||||
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
|
||||
if not cap.isOpened():
|
||||
cap = cv2.VideoCapture(idx)
|
||||
return cap, f"camera:{idx}"
|
||||
|
||||
return cv2.VideoCapture(video_arg), str(video_arg)
|
||||
|
||||
|
||||
def clip_box(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> tuple[int, int, int, int]:
|
||||
x1 = max(0, min(x1, w - 1))
|
||||
y1 = max(0, min(y1, h - 1))
|
||||
x2 = max(0, min(x2, w - 1))
|
||||
y2 = max(0, min(y2, h - 1))
|
||||
return x1, y1, x2, y2
|
||||
|
||||
|
||||
def crop_with_padding(
|
||||
frame: np.ndarray,
|
||||
bbox: tuple[int, int, int, int],
|
||||
pad_ratio: float,
|
||||
) -> np.ndarray:
|
||||
x1, y1, x2, y2 = bbox
|
||||
bw = x2 - x1
|
||||
bh = y2 - y1
|
||||
pad_x = int(max(0.0, pad_ratio) * bw)
|
||||
pad_y = int(max(0.0, pad_ratio) * bh)
|
||||
cx1, cy1, cx2, cy2 = clip_box(
|
||||
x1 - pad_x,
|
||||
y1 - pad_y,
|
||||
x2 + pad_x,
|
||||
y2 + pad_y,
|
||||
frame.shape[1],
|
||||
frame.shape[0],
|
||||
)
|
||||
return frame[cy1:cy2, cx1:cx2].copy()
|
||||
|
||||
|
||||
def bbox_area(bbox: tuple[int, int, int, int]) -> int:
|
||||
x1, y1, x2, y2 = bbox
|
||||
return max(0, x2 - x1) * max(0, y2 - y1)
|
||||
|
||||
|
||||
def bbox_center(bbox: tuple[int, int, int, int]) -> tuple[float, float]:
|
||||
x1, y1, x2, y2 = bbox
|
||||
return (x1 + x2) * 0.5, (y1 + y2) * 0.5
|
||||
|
||||
|
||||
def bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float:
|
||||
ax1, ay1, ax2, ay2 = a
|
||||
bx1, by1, bx2, by2 = b
|
||||
ix1 = max(ax1, bx1)
|
||||
iy1 = max(ay1, by1)
|
||||
ix2 = min(ax2, bx2)
|
||||
iy2 = min(ay2, by2)
|
||||
inter = bbox_area((ix1, iy1, ix2, iy2))
|
||||
union = bbox_area(a) + bbox_area(b) - inter
|
||||
if union <= 0:
|
||||
return 0.0
|
||||
return inter / float(union)
|
||||
|
||||
|
||||
def association_score(
|
||||
track_bbox: tuple[int, int, int, int],
|
||||
det_bbox: tuple[int, int, int, int],
|
||||
max_center_distance: float,
|
||||
) -> float:
|
||||
iou = bbox_iou(track_bbox, det_bbox)
|
||||
tx, ty = bbox_center(track_bbox)
|
||||
dx, dy = bbox_center(det_bbox)
|
||||
center_dist = float(np.hypot(tx - dx, ty - dy))
|
||||
center_similarity = max(0.0, 1.0 - center_dist / max_center_distance)
|
||||
return 0.70 * iou + 0.30 * center_similarity
|
||||
|
||||
|
||||
def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray:
|
||||
h, w = frame.shape[:2]
|
||||
if max_width <= 0 or w <= max_width:
|
||||
return frame
|
||||
scale = max_width / float(w)
|
||||
return cv2.resize(frame, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LINEAR)
|
||||
|
||||
|
||||
def draw_navigation_debug(
|
||||
frame: np.ndarray,
|
||||
tracks: list[Track],
|
||||
args,
|
||||
last_command_text: str,
|
||||
fps_text: str,
|
||||
) -> np.ndarray:
|
||||
display = frame.copy()
|
||||
h, w = display.shape[:2]
|
||||
center_x = int(w * 0.5)
|
||||
tol = int(w * args.center_tolerance_ratio)
|
||||
y_min = int(h * args.usable_y_min_ratio)
|
||||
y_max = int(h * args.usable_y_max_ratio)
|
||||
|
||||
cv2.rectangle(display, (center_x - tol, y_min), (center_x + tol, y_max), (255, 255, 0), 4)
|
||||
cv2.line(display, (center_x, 0), (center_x, h), (255, 255, 0), 3)
|
||||
cv2.line(display, (0, y_min), (w, y_min), (100, 100, 100), 2)
|
||||
cv2.line(display, (0, y_max), (w, y_max), (100, 100, 100), 2)
|
||||
|
||||
for track in tracks:
|
||||
x1, y1, x2, y2 = track.bbox
|
||||
color = state_color(track.state)
|
||||
thickness = 8 if track.state == "centered" else 5
|
||||
cv2.rectangle(display, (x1, y1), (x2, y2), color, thickness)
|
||||
cx, cy = bbox_center(track.bbox)
|
||||
cv2.circle(display, (int(cx), int(cy)), 12, color, -1)
|
||||
cv2.circle(display, (int(cx), int(cy)), 18, (0, 0, 0), 3)
|
||||
text = (
|
||||
f"id={track.id} {track.state} conf={track.confidence:.2f} "
|
||||
f"hits={track.hits} trend={track.area_trend():+.2f}"
|
||||
)
|
||||
cv2.putText(
|
||||
display,
|
||||
text,
|
||||
(x1, max(24, y1 - 8)),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.78,
|
||||
color,
|
||||
3,
|
||||
cv2.LINE_AA,
|
||||
)
|
||||
|
||||
cv2.putText(display, fps_text, (20, 34), cv2.FONT_HERSHEY_SIMPLEX, 0.85, (0, 0, 255), 2)
|
||||
if last_command_text:
|
||||
cv2.putText(display, last_command_text, (20, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 255), 2)
|
||||
return resize_preview(display, args.preview_width)
|
||||
|
||||
|
||||
def draw_commands_window(command_lines: list[str], motion_text: str) -> np.ndarray:
|
||||
lines = command_lines if command_lines else ["Nessun comando generato"]
|
||||
canvas_h = max(340, 84 + len(lines[:10]) * 34)
|
||||
canvas = np.full((canvas_h, 980, 3), 245, dtype=np.uint8)
|
||||
cv2.putText(
|
||||
canvas,
|
||||
"COMANDI NAVIGAZIONE",
|
||||
(24, 42),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
1.0,
|
||||
(0, 0, 0),
|
||||
2,
|
||||
cv2.LINE_AA,
|
||||
)
|
||||
cv2.putText(
|
||||
canvas,
|
||||
motion_text,
|
||||
(24, 76),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.80,
|
||||
(120, 0, 120),
|
||||
2,
|
||||
cv2.LINE_AA,
|
||||
)
|
||||
y = 122
|
||||
for idx, line in enumerate(lines[:10]):
|
||||
color = (0, 0, 180) if idx == 0 else (0, 90, 0)
|
||||
cv2.putText(
|
||||
canvas,
|
||||
line,
|
||||
(24, y),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.82,
|
||||
color,
|
||||
2,
|
||||
cv2.LINE_AA,
|
||||
)
|
||||
y += 36
|
||||
return canvas
|
||||
|
||||
|
||||
def apply_flash(frame: np.ndarray, alpha: float) -> np.ndarray:
|
||||
flash = np.full_like(frame, 255)
|
||||
alpha = min(max(alpha, 0.0), 1.0)
|
||||
return cv2.addWeighted(frame, 1.0 - alpha, flash, alpha, 0.0)
|
||||
|
||||
|
||||
def estimate_motion_from_tracks(tracks: list[Track], min_pixels: float) -> str:
|
||||
deltas: list[tuple[float, float]] = []
|
||||
for track in tracks:
|
||||
if track.missed != 0 or len(track.center_history) < 2:
|
||||
continue
|
||||
x0, y0 = track.center_history[-2]
|
||||
x1, y1 = track.center_history[-1]
|
||||
deltas.append((x1 - x0, y1 - y0))
|
||||
|
||||
if not deltas:
|
||||
return "MOTO: n/d"
|
||||
|
||||
dx = sum(delta[0] for delta in deltas) / len(deltas)
|
||||
dy = sum(delta[1] for delta in deltas) / len(deltas)
|
||||
abs_dx = abs(dx)
|
||||
abs_dy = abs(dy)
|
||||
|
||||
if abs_dx < min_pixels and abs_dy < min_pixels:
|
||||
direction = "stabile"
|
||||
elif abs_dx >= abs_dy:
|
||||
direction = "destra" if dx > 0 else "sinistra"
|
||||
else:
|
||||
direction = "giu" if dy > 0 else "su"
|
||||
|
||||
return f"MOTO: {direction} dx={dx:+.1f}px dy={dy:+.1f}px tracks={len(deltas)}"
|
||||
|
||||
|
||||
def state_color(state: str) -> tuple[int, int, int]:
|
||||
if state == "centered":
|
||||
return (0, 255, 255)
|
||||
if state == "snapshotted":
|
||||
return (255, 0, 255)
|
||||
if state == "candidate":
|
||||
return (0, 255, 0)
|
||||
if state == "exiting":
|
||||
return (0, 140, 255)
|
||||
return (255, 255, 255)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
require_file(args.weights, "modello Ultralytics")
|
||||
|
||||
detector = UltralyticsDetector(args.weights, args.ultralytics_device)
|
||||
log(f"Classi modello: {detector.classes}")
|
||||
log("Nota tracker: questa versione usa tracking geometrico interno; ByteTrack/BoT-SORT restano candidati per confronto successivo.")
|
||||
|
||||
cap, source_name = open_capture(args.video)
|
||||
if not cap.isOpened():
|
||||
log(f"ERRORE: impossibile aprire sorgente video: {source_name}")
|
||||
return 1
|
||||
|
||||
video_fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
frame_delay = 1.0 / video_fps if args.realtime_playback and video_fps and video_fps > 1 else 0.0
|
||||
tracker = LightweightTracker(
|
||||
max_missed=args.max_track_missed,
|
||||
min_match_score=args.min_match_score,
|
||||
max_center_distance_ratio=args.max_center_distance_ratio,
|
||||
)
|
||||
navigator = NavigationController(args)
|
||||
|
||||
if not args.no_display:
|
||||
cv2.namedWindow("flywms navigate", cv2.WINDOW_NORMAL)
|
||||
cv2.namedWindow("flywms snapshot", cv2.WINDOW_NORMAL)
|
||||
cv2.namedWindow("flywms comandi", cv2.WINDOW_NORMAL)
|
||||
|
||||
frame_id = 0
|
||||
start_time = time.perf_counter()
|
||||
last_stats = start_time
|
||||
last_loop_end = start_time
|
||||
yolo_total_ms = 0.0
|
||||
yolo_cycles = 0
|
||||
|
||||
try:
|
||||
while True:
|
||||
if frame_delay > 0:
|
||||
now = time.perf_counter()
|
||||
sleep_for = frame_delay - (now - last_loop_end)
|
||||
if sleep_for > 0:
|
||||
time.sleep(sleep_for)
|
||||
last_loop_end = time.perf_counter()
|
||||
|
||||
ok, frame = cap.read()
|
||||
if not ok:
|
||||
log("Fine stream")
|
||||
break
|
||||
frame_id += 1
|
||||
timestamp = time.perf_counter()
|
||||
if args.max_frames > 0 and frame_id > args.max_frames:
|
||||
log(f"Raggiunto --max-frames={args.max_frames}")
|
||||
break
|
||||
|
||||
detections, yolo_ms = detector.detect(frame, args.min_confidence, args.input_size)
|
||||
yolo_total_ms += yolo_ms
|
||||
yolo_cycles += 1
|
||||
gaylords = [
|
||||
det for det in detections
|
||||
if det.class_name.strip().lower() == args.target_class.strip().lower()
|
||||
]
|
||||
|
||||
tracks = tracker.update(gaylords, frame_id, frame.shape[1])
|
||||
if args.motion_report_interval > 0 and frame_id % args.motion_report_interval == 0:
|
||||
navigator.set_motion_text(
|
||||
estimate_motion_from_tracks(tracks, args.motion_min_pixels)
|
||||
)
|
||||
new_snapshots: list[NavigationSnapshot] = []
|
||||
for track in tracks:
|
||||
if track.missed == 0:
|
||||
snapshot = navigator.process_track(track, frame, frame_id, timestamp)
|
||||
if snapshot is not None:
|
||||
new_snapshots.append(snapshot)
|
||||
if args.no_display and new_snapshots:
|
||||
if args.remote_ack_timeout_sec > 0:
|
||||
time.sleep(args.remote_ack_timeout_sec)
|
||||
for snapshot in new_snapshots:
|
||||
navigator.simulate_remote_response(snapshot)
|
||||
|
||||
now = time.perf_counter()
|
||||
if now - last_stats >= args.stats_interval:
|
||||
elapsed = max(now - start_time, 0.001)
|
||||
avg_yolo = yolo_total_ms / max(yolo_cycles, 1)
|
||||
active = sum(1 for t in tracks if t.missed == 0)
|
||||
log(
|
||||
f"fps={frame_id / elapsed:.1f} yolo_fps={yolo_cycles / elapsed:.1f} "
|
||||
f"avg_yolo={avg_yolo:.1f}ms det={len(gaylords)} tracks={len(tracks)} active={active} "
|
||||
f"snapshots={navigator.snapshot_counter} {navigator.motion_text}"
|
||||
)
|
||||
if args.debug_tracks:
|
||||
for track in tracks:
|
||||
cx, cy = bbox_center(track.bbox)
|
||||
area_ratio = bbox_area(track.bbox) / float(frame.shape[0] * frame.shape[1])
|
||||
log(
|
||||
f" track={track.id} state={track.state} hits={track.hits} "
|
||||
f"missed={track.missed} center=({cx:.0f},{cy:.0f}) "
|
||||
f"area={area_ratio:.3f} trend={track.area_trend():+.2f} "
|
||||
f"reason={track.last_candidate_reason}"
|
||||
)
|
||||
last_stats = now
|
||||
|
||||
if not args.no_display:
|
||||
elapsed = max(time.perf_counter() - start_time, 0.001)
|
||||
fps_text = (
|
||||
f"frame={frame_id} fps={frame_id / elapsed:.1f} "
|
||||
f"det={len(gaylords)} tracks={len(tracks)} snap={navigator.snapshot_counter}"
|
||||
)
|
||||
display = draw_navigation_debug(
|
||||
frame,
|
||||
tracks,
|
||||
args,
|
||||
navigator.last_command_text,
|
||||
fps_text,
|
||||
)
|
||||
cv2.imshow("flywms navigate", display)
|
||||
if navigator.last_ocr_payload_frame is not None:
|
||||
snapshot_display = resize_preview(navigator.last_ocr_payload_frame, args.preview_width)
|
||||
cv2.imshow("flywms snapshot", snapshot_display)
|
||||
cv2.imshow(
|
||||
"flywms comandi",
|
||||
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
|
||||
)
|
||||
|
||||
if new_snapshots:
|
||||
flash_display = apply_flash(display, args.flash_alpha)
|
||||
cv2.imshow("flywms navigate", flash_display)
|
||||
if navigator.last_ocr_payload_frame is not None:
|
||||
flash_snapshot = apply_flash(
|
||||
resize_preview(navigator.last_ocr_payload_frame, args.preview_width),
|
||||
args.flash_alpha,
|
||||
)
|
||||
cv2.imshow("flywms snapshot", flash_snapshot)
|
||||
cv2.imshow(
|
||||
"flywms comandi",
|
||||
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
|
||||
)
|
||||
pause_ms = max(1, int(args.remote_ack_timeout_sec * 1000))
|
||||
key = cv2.waitKey(pause_ms) & 0xFF
|
||||
if key in (27, ord("q")):
|
||||
log("Interrotto da tastiera")
|
||||
break
|
||||
for snapshot in new_snapshots:
|
||||
navigator.simulate_remote_response(snapshot)
|
||||
cv2.imshow(
|
||||
"flywms comandi",
|
||||
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
|
||||
)
|
||||
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
if key in (27, ord("q")):
|
||||
log("Interrotto da tastiera")
|
||||
break
|
||||
finally:
|
||||
cap.release()
|
||||
if not args.no_display:
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
log(f"Snapshot salvati in: {Path(args.snapshot_output_dir).resolve()}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
403
flywms_navigation_gui.py
Normal file
403
flywms_navigation_gui.py
Normal file
@@ -0,0 +1,403 @@
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
import flywms_navigation as nav
|
||||
|
||||
try:
|
||||
import dearpygui.dearpygui as dpg
|
||||
except ModuleNotFoundError as exc:
|
||||
dpg = None
|
||||
DEARPYGUI_IMPORT_ERROR = exc
|
||||
else:
|
||||
DEARPYGUI_IMPORT_ERROR = None
|
||||
|
||||
|
||||
TEXTURE_MAIN = "texture_main"
|
||||
TEXTURE_SNAPSHOT = "texture_snapshot"
|
||||
TEXTURE_REGISTRY = "texture_registry"
|
||||
IMAGE_MAIN = "image_main"
|
||||
IMAGE_SNAPSHOT = "image_snapshot"
|
||||
TEXT_STATUS = "text_status"
|
||||
TEXT_COMMANDS = "text_commands"
|
||||
TEXT_TRACKS = "text_tracks"
|
||||
TEXT_METRICS = "text_metrics"
|
||||
BUTTON_RUN = "button_run"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DemoFrameState:
|
||||
frame_id: int
|
||||
elapsed: float
|
||||
source_frame: np.ndarray
|
||||
display_frame: np.ndarray
|
||||
snapshot_frame: np.ndarray | None
|
||||
tracks: list[nav.Track]
|
||||
detections_count: int
|
||||
snapshots: list[nav.NavigationSnapshot]
|
||||
yolo_ms: float
|
||||
fps_text: str
|
||||
|
||||
|
||||
class NavigationDemoEngine:
|
||||
"""One-step navigation simulator used by the DearPyGUI shell.
|
||||
|
||||
The original flywms_navigation.py command-line program stays untouched.
|
||||
This class reuses its detector/tracker/navigation objects and exposes a
|
||||
frame-by-frame API suitable for a GUI event loop.
|
||||
"""
|
||||
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
nav.require_file(args.weights, "modello Ultralytics")
|
||||
self.detector = nav.UltralyticsDetector(args.weights, args.ultralytics_device)
|
||||
self.cap, self.source_name = nav.open_capture(args.video)
|
||||
if not self.cap.isOpened():
|
||||
raise RuntimeError(f"impossibile aprire sorgente video: {self.source_name}")
|
||||
|
||||
video_fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||
self.frame_delay = (
|
||||
1.0 / video_fps
|
||||
if args.realtime_playback and video_fps and video_fps > 1
|
||||
else 0.0
|
||||
)
|
||||
self.tracker = nav.LightweightTracker(
|
||||
max_missed=args.max_track_missed,
|
||||
min_match_score=args.min_match_score,
|
||||
max_center_distance_ratio=args.max_center_distance_ratio,
|
||||
)
|
||||
self.navigator = nav.NavigationController(args)
|
||||
self.frame_id = 0
|
||||
self.start_time = time.perf_counter()
|
||||
self.last_loop_end = self.start_time
|
||||
self.yolo_total_ms = 0.0
|
||||
self.yolo_cycles = 0
|
||||
self.stop_reason = ""
|
||||
|
||||
def close(self) -> None:
|
||||
self.cap.release()
|
||||
|
||||
def step(self) -> DemoFrameState | None:
|
||||
if self.frame_delay > 0:
|
||||
now = time.perf_counter()
|
||||
sleep_for = self.frame_delay - (now - self.last_loop_end)
|
||||
if sleep_for > 0:
|
||||
time.sleep(sleep_for)
|
||||
self.last_loop_end = time.perf_counter()
|
||||
|
||||
ok, frame = self.cap.read()
|
||||
if not ok:
|
||||
self.stop_reason = "Fine stream"
|
||||
return None
|
||||
|
||||
self.frame_id += 1
|
||||
timestamp = time.perf_counter()
|
||||
if self.args.max_frames > 0 and self.frame_id > self.args.max_frames:
|
||||
self.stop_reason = f"Raggiunto max_frames={self.args.max_frames}"
|
||||
return None
|
||||
|
||||
detections, yolo_ms = self.detector.detect(
|
||||
frame,
|
||||
self.args.min_confidence,
|
||||
self.args.input_size,
|
||||
)
|
||||
self.yolo_total_ms += yolo_ms
|
||||
self.yolo_cycles += 1
|
||||
|
||||
gaylords = [
|
||||
det for det in detections
|
||||
if det.class_name.strip().lower() == self.args.target_class.strip().lower()
|
||||
]
|
||||
tracks = self.tracker.update(gaylords, self.frame_id, frame.shape[1])
|
||||
|
||||
if (
|
||||
self.args.motion_report_interval > 0
|
||||
and self.frame_id % self.args.motion_report_interval == 0
|
||||
):
|
||||
self.navigator.set_motion_text(
|
||||
nav.estimate_motion_from_tracks(tracks, self.args.motion_min_pixels)
|
||||
)
|
||||
|
||||
new_snapshots: list[nav.NavigationSnapshot] = []
|
||||
for track in tracks:
|
||||
if track.missed == 0:
|
||||
snapshot = self.navigator.process_track(
|
||||
track,
|
||||
frame,
|
||||
self.frame_id,
|
||||
timestamp,
|
||||
)
|
||||
if snapshot is not None:
|
||||
new_snapshots.append(snapshot)
|
||||
|
||||
for snapshot in new_snapshots:
|
||||
self.navigator.simulate_remote_response(snapshot)
|
||||
|
||||
elapsed = max(time.perf_counter() - self.start_time, 0.001)
|
||||
fps_text = (
|
||||
f"frame={self.frame_id} fps={self.frame_id / elapsed:.1f} "
|
||||
f"det={len(gaylords)} tracks={len(tracks)} "
|
||||
f"snap={self.navigator.snapshot_counter}"
|
||||
)
|
||||
display = nav.draw_navigation_debug(
|
||||
frame,
|
||||
tracks,
|
||||
self.args,
|
||||
self.navigator.last_command_text,
|
||||
fps_text,
|
||||
)
|
||||
|
||||
return DemoFrameState(
|
||||
frame_id=self.frame_id,
|
||||
elapsed=elapsed,
|
||||
source_frame=frame,
|
||||
display_frame=display,
|
||||
snapshot_frame=self.navigator.last_ocr_payload_frame,
|
||||
tracks=tracks,
|
||||
detections_count=len(gaylords),
|
||||
snapshots=new_snapshots,
|
||||
yolo_ms=yolo_ms,
|
||||
fps_text=fps_text,
|
||||
)
|
||||
|
||||
|
||||
class NavigationDemoGui:
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
self.engine: NavigationDemoEngine | None = None
|
||||
self.running = False
|
||||
self.main_texture_size: tuple[int, int] | None = None
|
||||
self.snapshot_texture_size: tuple[int, int] | None = None
|
||||
self.main_texture_tag = TEXTURE_MAIN
|
||||
self.snapshot_texture_tag = TEXTURE_SNAPSHOT
|
||||
self.last_state: DemoFrameState | None = None
|
||||
|
||||
def start(self) -> None:
|
||||
if self.engine is None:
|
||||
self.engine = NavigationDemoEngine(self.args)
|
||||
dpg.set_value(TEXT_STATUS, f"Sorgente: {self.engine.source_name}")
|
||||
self.running = True
|
||||
dpg.configure_item(BUTTON_RUN, label="Pausa")
|
||||
|
||||
def pause(self) -> None:
|
||||
self.running = False
|
||||
dpg.configure_item(BUTTON_RUN, label="Avvia")
|
||||
|
||||
def toggle_run(self) -> None:
|
||||
if self.running:
|
||||
self.pause()
|
||||
else:
|
||||
self.start()
|
||||
|
||||
def reset(self) -> None:
|
||||
self.pause()
|
||||
if self.engine is not None:
|
||||
self.engine.close()
|
||||
self.engine = NavigationDemoEngine(self.args)
|
||||
self.last_state = None
|
||||
self.main_texture_size = None
|
||||
self.snapshot_texture_size = None
|
||||
dpg.set_value(TEXT_COMMANDS, "Nessun comando generato")
|
||||
dpg.set_value(TEXT_TRACKS, "Nessuna track")
|
||||
dpg.set_value(TEXT_METRICS, "Metriche non disponibili")
|
||||
dpg.set_value(TEXT_STATUS, f"Reset completato. Sorgente: {self.engine.source_name}")
|
||||
|
||||
def step_once(self) -> None:
|
||||
if self.engine is None:
|
||||
self.engine = NavigationDemoEngine(self.args)
|
||||
state = self.engine.step()
|
||||
if state is None:
|
||||
self.pause()
|
||||
dpg.set_value(TEXT_STATUS, self.engine.stop_reason if self.engine else "Stop")
|
||||
return
|
||||
self.last_state = state
|
||||
self._update_ui(state)
|
||||
|
||||
def tick(self) -> None:
|
||||
if self.running:
|
||||
self.step_once()
|
||||
|
||||
def close(self) -> None:
|
||||
if self.engine is not None:
|
||||
self.engine.close()
|
||||
|
||||
def _update_ui(self, state: DemoFrameState) -> None:
|
||||
self._set_image_texture(
|
||||
frame=state.display_frame,
|
||||
image_tag=IMAGE_MAIN,
|
||||
size_attr="main_texture_size",
|
||||
tag_attr="main_texture_tag",
|
||||
max_display_width=960,
|
||||
)
|
||||
if state.snapshot_frame is not None:
|
||||
self._set_image_texture(
|
||||
frame=nav.resize_preview(state.snapshot_frame, 520),
|
||||
image_tag=IMAGE_SNAPSHOT,
|
||||
size_attr="snapshot_texture_size",
|
||||
tag_attr="snapshot_texture_tag",
|
||||
max_display_width=520,
|
||||
)
|
||||
|
||||
assert self.engine is not None
|
||||
avg_yolo = self.engine.yolo_total_ms / max(self.engine.yolo_cycles, 1)
|
||||
dpg.set_value(
|
||||
TEXT_METRICS,
|
||||
"\n".join([
|
||||
state.fps_text,
|
||||
f"YOLO ultimo: {state.yolo_ms:.1f} ms",
|
||||
f"YOLO medio: {avg_yolo:.1f} ms",
|
||||
self.engine.navigator.motion_text,
|
||||
f"Snapshot salvati: {self.engine.navigator.snapshot_counter}",
|
||||
f"Output: {Path(self.args.snapshot_output_dir).resolve()}",
|
||||
]),
|
||||
)
|
||||
dpg.set_value(
|
||||
TEXT_COMMANDS,
|
||||
"\n".join(self.engine.navigator.last_command_lines)
|
||||
if self.engine.navigator.last_command_lines
|
||||
else "Nessun comando generato",
|
||||
)
|
||||
dpg.set_value(TEXT_TRACKS, self._format_tracks(state.tracks))
|
||||
if state.snapshots:
|
||||
dpg.set_value(
|
||||
TEXT_STATUS,
|
||||
f"Snapshot {state.snapshots[-1].snapshot_id:04d} acquisito "
|
||||
f"su {state.snapshots[-1].simulated_position}",
|
||||
)
|
||||
|
||||
def _set_image_texture(
|
||||
self,
|
||||
frame: np.ndarray,
|
||||
image_tag: str,
|
||||
size_attr: str,
|
||||
tag_attr: str,
|
||||
max_display_width: int,
|
||||
) -> None:
|
||||
h, w = frame.shape[:2]
|
||||
rgba = bgr_to_rgba_float(frame)
|
||||
display_scale = min(1.0, max_display_width / float(w))
|
||||
display_w = max(1, int(w * display_scale))
|
||||
display_h = max(1, int(h * display_scale))
|
||||
old_size = getattr(self, size_attr)
|
||||
if old_size != (w, h):
|
||||
old_texture_tag = getattr(self, tag_attr)
|
||||
new_texture_tag = dpg.generate_uuid()
|
||||
dpg.add_dynamic_texture(
|
||||
w,
|
||||
h,
|
||||
rgba,
|
||||
tag=new_texture_tag,
|
||||
parent=TEXTURE_REGISTRY,
|
||||
)
|
||||
dpg.configure_item(
|
||||
image_tag,
|
||||
texture_tag=new_texture_tag,
|
||||
width=display_w,
|
||||
height=display_h,
|
||||
)
|
||||
if dpg.does_item_exist(old_texture_tag):
|
||||
dpg.delete_item(old_texture_tag)
|
||||
setattr(self, tag_attr, new_texture_tag)
|
||||
setattr(self, size_attr, (w, h))
|
||||
return
|
||||
dpg.configure_item(image_tag, width=display_w, height=display_h)
|
||||
dpg.set_value(getattr(self, tag_attr), rgba)
|
||||
|
||||
@staticmethod
|
||||
def _format_tracks(tracks: list[nav.Track]) -> str:
|
||||
if not tracks:
|
||||
return "Nessuna track"
|
||||
lines = []
|
||||
for track in tracks[:12]:
|
||||
cx, cy = nav.bbox_center(track.bbox)
|
||||
lines.append(
|
||||
f"#{track.id:03d} {track.state:<11} "
|
||||
f"conf={track.confidence:.2f} hits={track.hits:<3} "
|
||||
f"missed={track.missed:<2} center=({cx:.0f},{cy:.0f}) "
|
||||
f"trend={track.area_trend():+.2f} {track.last_candidate_reason}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def bgr_to_rgba_float(frame: np.ndarray) -> np.ndarray:
|
||||
rgba = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
|
||||
return np.asarray(rgba, dtype=np.float32).ravel() / 255.0
|
||||
|
||||
|
||||
def build_ui(app: NavigationDemoGui) -> None:
|
||||
dpg.create_context()
|
||||
|
||||
with dpg.font_registry():
|
||||
default_font = dpg.add_font("C:/Windows/Fonts/segoeui.ttf", 17)
|
||||
mono_font = dpg.add_font("C:/Windows/Fonts/consola.ttf", 15)
|
||||
|
||||
with dpg.texture_registry(show=False, tag=TEXTURE_REGISTRY):
|
||||
blank = np.zeros((32, 32, 4), dtype=np.float32).ravel()
|
||||
dpg.add_dynamic_texture(32, 32, blank, tag=TEXTURE_MAIN)
|
||||
dpg.add_dynamic_texture(32, 32, blank, tag=TEXTURE_SNAPSHOT)
|
||||
|
||||
with dpg.window(tag="main_window", label="FlyWMS Navigation Demo"):
|
||||
with dpg.group(horizontal=True):
|
||||
dpg.add_button(label="Avvia", tag=BUTTON_RUN, callback=lambda: app.toggle_run())
|
||||
dpg.add_button(label="Step", callback=lambda: app.step_once())
|
||||
dpg.add_button(label="Reset", callback=lambda: app.reset())
|
||||
dpg.add_text("Pronto", tag=TEXT_STATUS)
|
||||
|
||||
dpg.add_separator()
|
||||
|
||||
with dpg.group(horizontal=True):
|
||||
with dpg.child_window(width=980, height=-1, border=False):
|
||||
dpg.add_image(TEXTURE_MAIN, tag=IMAGE_MAIN)
|
||||
|
||||
with dpg.child_window(width=560, height=-1, border=False):
|
||||
dpg.add_text("Metriche")
|
||||
dpg.add_text("Metriche non disponibili", tag=TEXT_METRICS)
|
||||
dpg.bind_item_font(TEXT_METRICS, mono_font)
|
||||
dpg.add_separator()
|
||||
dpg.add_text("Comandi")
|
||||
dpg.add_text("Nessun comando generato", tag=TEXT_COMMANDS)
|
||||
dpg.bind_item_font(TEXT_COMMANDS, mono_font)
|
||||
dpg.add_separator()
|
||||
dpg.add_text("Payload OCR")
|
||||
dpg.add_image(TEXTURE_SNAPSHOT, tag=IMAGE_SNAPSHOT)
|
||||
dpg.add_separator()
|
||||
dpg.add_text("Track")
|
||||
dpg.add_text("Nessuna track", tag=TEXT_TRACKS)
|
||||
dpg.bind_item_font(TEXT_TRACKS, mono_font)
|
||||
|
||||
dpg.bind_font(default_font)
|
||||
dpg.create_viewport(title="FlyWMS Navigation Demo", width=1600, height=980)
|
||||
dpg.setup_dearpygui()
|
||||
dpg.show_viewport()
|
||||
dpg.set_primary_window("main_window", True)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if dpg is None:
|
||||
print(
|
||||
"DearPyGUI non e' installato. Installa il pacchetto con:\n"
|
||||
" python -m pip install dearpygui\n"
|
||||
f"Dettaglio import: {DEARPYGUI_IMPORT_ERROR}",
|
||||
flush=True,
|
||||
)
|
||||
return 1
|
||||
|
||||
args = nav.parse_args()
|
||||
args.no_display = True
|
||||
app = NavigationDemoGui(args)
|
||||
build_ui(app)
|
||||
try:
|
||||
while dpg.is_dearpygui_running():
|
||||
app.tick()
|
||||
dpg.render_dearpygui_frame()
|
||||
finally:
|
||||
app.close()
|
||||
dpg.destroy_context()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
116
handoff.md
Normal file
116
handoff.md
Normal file
@@ -0,0 +1,116 @@
|
||||
# FlyWMS handoff - GUI navigazione
|
||||
|
||||
Data: 2026-05-15
|
||||
|
||||
## Stato repository
|
||||
|
||||
- Branch: `master`
|
||||
- Remote: `origin` su Gitea
|
||||
- Ultimo lavoro: aggiunta shell DearPyGUI separata per la simulazione dimostrativa di `flywms_navigation.py`.
|
||||
|
||||
## Contesto recuperato
|
||||
|
||||
L'obiettivo discusso era trasformare `flywms_navigation.py` in una simulazione piu' ordinata e dimostrativa, senza contaminare la logica esistente. Il programma originale contiene gia':
|
||||
|
||||
- detector YOLO/Ultralytics;
|
||||
- tracking geometrico leggero dei `gaylord`;
|
||||
- decisione di snapshot quando una track e' centrata;
|
||||
- salvataggio frame debug, payload OCR e `snapshots.jsonl`;
|
||||
- simulazione comandi `STOP`, `SCATTA_FOTO`, `ASSOCIA_POSIZIONE`, `INVIA_ROI_REMOTA`, `ATTENDI_ACK`, `RIPARTI_*`;
|
||||
- UI debug OpenCV integrata nel `main()`.
|
||||
|
||||
## Decisione architetturale
|
||||
|
||||
La GUI DearPyGUI e' stata tenuta in un file separato, `flywms_navigation_gui.py`.
|
||||
|
||||
Motivo: non toccare il ciclo originale e non rischiare regressioni nella logica di navigazione. La GUI importa `flywms_navigation.py` e riusa:
|
||||
|
||||
- `UltralyticsDetector`;
|
||||
- `LightweightTracker`;
|
||||
- `NavigationController`;
|
||||
- `draw_navigation_debug`;
|
||||
- configurazione da `flywms_navigation.ini`.
|
||||
|
||||
In pratica `flywms_navigation.py` resta il simulatore CLI/OpenCV funzionante; `flywms_navigation_gui.py` e' una shell dimostrativa sopra la stessa logica.
|
||||
|
||||
## Implementato
|
||||
|
||||
File nuovo:
|
||||
|
||||
- `flywms_navigation_gui.py`
|
||||
|
||||
Funzioni principali:
|
||||
|
||||
- `NavigationDemoEngine`: esegue un singolo step del ciclo video/YOLO/tracking/navigazione e restituisce uno stato frame-by-frame.
|
||||
- `NavigationDemoGui`: gestisce finestra DearPyGUI, comandi Avvia/Pausa, Step, Reset, preview, metriche, comandi, payload OCR e track.
|
||||
- conversione frame OpenCV BGR in texture RGBA DearPyGUI.
|
||||
- gestione texture corretta con tag univoci quando cambia la dimensione immagine.
|
||||
- resize visuale della preview principale a 960x540.
|
||||
|
||||
DearPyGUI e' stato installato nell'ambiente Python corrente con:
|
||||
|
||||
```powershell
|
||||
python -m pip install dearpygui
|
||||
```
|
||||
|
||||
## Verifiche eseguite
|
||||
|
||||
Compilazione:
|
||||
|
||||
```powershell
|
||||
python -m py_compile flywms_navigation_gui.py flywms_navigation.py
|
||||
```
|
||||
|
||||
Test motore GUI senza finestra persistente:
|
||||
|
||||
```text
|
||||
state1 1 (720, 1280, 3)
|
||||
state2 2 (720, 1280, 3)
|
||||
```
|
||||
|
||||
Test aggiornamento UI/texture:
|
||||
|
||||
```text
|
||||
ui step ok
|
||||
960 540
|
||||
```
|
||||
|
||||
Avvio manuale usato:
|
||||
|
||||
```powershell
|
||||
python flywms_navigation_gui.py
|
||||
```
|
||||
|
||||
## Osservazioni performance
|
||||
|
||||
Con CPU, la GUI scende circa da 6 fps a 3.8 fps. Il dato e' credibile perche' la shell GUI aggiunge:
|
||||
|
||||
- disegno overlay OpenCV;
|
||||
- conversione BGR -> RGBA;
|
||||
- normalizzazione/copia texture `float32`;
|
||||
- aggiornamento pannelli DearPyGUI nello stesso thread di YOLO.
|
||||
|
||||
Il carico CPU alto su tutti gli 8 core e' credibile: PyTorch/OpenCV/NumPy usano thread nativi anche se il loop Python e' singolo.
|
||||
|
||||
Quando sara' disponibile una GPU corretta, conviene rimisurare separando:
|
||||
|
||||
- fps YOLO/tracking;
|
||||
- fps GUI;
|
||||
- tempo medio YOLO;
|
||||
- tempo medio conversione texture.
|
||||
|
||||
## Prossimi passi consigliati
|
||||
|
||||
1. Migliorare la GUI come demo:
|
||||
- timeline eventi `STOP`, `SCATTA_FOTO`, `ACK/NACK`, `RIPARTI`;
|
||||
- pannello soglie principali;
|
||||
- evidenza piu' leggibile dello stato `entering/candidate/centered/snapshotted`;
|
||||
- contatori snapshot e posizioni simulate.
|
||||
|
||||
2. Ottimizzare solo se serve:
|
||||
- aggiornare texture a frequenza limitata;
|
||||
- convertire per GUI un frame gia' ridotto;
|
||||
- separare inferenza e GUI con stato latest-frame;
|
||||
- aggiornare testo solo quando cambia.
|
||||
|
||||
3. Non modificare la logica di `flywms_navigation.py` finche' la GUI non ha stabilizzato i requisiti dimostrativi.
|
||||
Reference in New Issue
Block a user