Files
flywms/flywms_navigation.py
2026-05-19 08:52:44 +02:00

1789 lines
73 KiB
Python

import argparse
import configparser
import json
import queue
import sys
import threading
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from urllib import request, error
import cv2
import numpy as np
DEFAULT_CONFIG_PATH = "flywms_navigation.ini"
CUDA_MIN_PIXELS = 640 * 360
def opencv_cuda_available() -> bool:
try:
return hasattr(cv2, "cuda") and cv2.cuda.getCudaEnabledDeviceCount() > 0
except cv2.error:
return False
OPENCV_CUDA_AVAILABLE = opencv_cuda_available()
def cuda_resize(
image: np.ndarray,
size: tuple[int, int],
interpolation: int = cv2.INTER_LINEAR,
min_pixels: int = CUDA_MIN_PIXELS,
) -> np.ndarray:
if not OPENCV_CUDA_AVAILABLE or image.size < min_pixels:
return cv2.resize(image, size, interpolation=interpolation)
try:
gpu = cv2.cuda_GpuMat()
gpu.upload(image)
return cv2.cuda.resize(gpu, size, interpolation=interpolation).download()
except cv2.error:
return cv2.resize(image, size, interpolation=interpolation)
def cuda_cvt_color(
image: np.ndarray,
code: int,
min_pixels: int = CUDA_MIN_PIXELS,
) -> np.ndarray:
if not OPENCV_CUDA_AVAILABLE or image.size < min_pixels:
return cv2.cvtColor(image, code)
try:
gpu = cv2.cuda_GpuMat()
gpu.upload(image)
return cv2.cuda.cvtColor(gpu, code).download()
except cv2.error:
return cv2.cvtColor(image, code)
@dataclass(frozen=True)
class Detection:
class_id: int
class_name: str
confidence: float
bbox: tuple[int, int, int, int]
@dataclass(frozen=True)
class CandidateSnapshot:
frame_id: int
timestamp: float
frame: np.ndarray
bbox: tuple[int, int, int, int]
label_bbox: tuple[int, int, int, int]
label_confidence: float
score: float
center_score: float
size_score: float
cut_score: float
@dataclass
class Track:
id: int
bbox: tuple[int, int, int, int]
confidence: float
first_seen_frame: int
last_seen_frame: int
hits: int = 1
missed: int = 0
state: str = "entering"
last_candidate_reason: str = ""
pending_remote_response: str = "none"
already_snapshotted: bool = False
bbox_history: list[tuple[int, int, int, int]] = field(default_factory=list)
center_history: list[tuple[float, float]] = field(default_factory=list)
area_history: list[float] = field(default_factory=list)
candidates: list[CandidateSnapshot] = field(default_factory=list)
def __post_init__(self) -> None:
self._append_history(self.bbox)
def update(self, bbox: tuple[int, int, int, int], confidence: float, frame_id: int) -> None:
self.bbox = bbox
self.confidence = confidence
self.last_seen_frame = frame_id
self.hits += 1
self.missed = 0
self._append_history(bbox)
def mark_missed(self) -> None:
self.missed += 1
if self.missed > 0 and self.state != "snapshotted":
self.state = "exiting"
def _append_history(self, bbox: tuple[int, int, int, int]) -> None:
self.bbox_history.append(bbox)
self.center_history.append(bbox_center(bbox))
self.area_history.append(float(bbox_area(bbox)))
keep = 20
self.bbox_history = self.bbox_history[-keep:]
self.center_history = self.center_history[-keep:]
self.area_history = self.area_history[-keep:]
def area_trend(self) -> float:
if len(self.area_history) < 4:
return 0.0
old = self.area_history[-4]
new = self.area_history[-1]
return (new - old) / max(old, 1.0)
@dataclass(frozen=True)
class NavigationSnapshot:
snapshot_id: int
frame_id: int
timestamp: float
simulated_position: str
track_id: int
bbox: tuple[int, int, int, int]
label_bbox: tuple[int, int, int, int]
score: float
debug_frame_path: str
ocr_payload_path: str
label_payload_path: str
movement_vector_px: tuple[float, float]
@dataclass(frozen=True)
class WmsSnapshotJob:
request_id: str
snapshot: NavigationSnapshot
metadata: dict[str, object]
label_image_path: str
gaylord_image_path: str | None
@dataclass(frozen=True)
class WmsResult:
request_id: str
snapshot_id: int
status: str
message: str
response: dict[str, object] | None = None
error: str = ""
class UltralyticsDetector:
def __init__(self, model_path: str, device: str, half: bool):
from ultralytics import YOLO
self.model = YOLO(model_path)
self.device = device
self.half = bool(half) and str(device).strip().lower() != "cpu"
names = self.model.names
if isinstance(names, dict):
self.classes = [str(names[i]) for i in sorted(names)]
else:
self.classes = [str(name) for name in names]
def detect(
self,
frame: np.ndarray,
min_confidence: float,
input_size: int,
) -> tuple[list[Detection], float]:
t0 = time.perf_counter()
results = self.model.predict(
source=frame,
imgsz=input_size,
conf=min_confidence,
device=self.device,
half=self.half,
verbose=False,
)
elapsed_ms = (time.perf_counter() - t0) * 1000.0
detections: list[Detection] = []
if not results:
return detections, elapsed_ms
boxes = results[0].boxes
if boxes is None:
return detections, elapsed_ms
xyxy = boxes.xyxy.cpu().numpy()
confs = boxes.conf.cpu().numpy()
clss = boxes.cls.cpu().numpy().astype(int)
for box, conf, cls_id in zip(xyxy, confs, clss):
x1, y1, x2, y2 = [int(round(v)) for v in box]
x1, y1, x2, y2 = clip_box(x1, y1, x2, y2, frame.shape[1], frame.shape[0])
if x2 <= x1 or y2 <= y1:
continue
class_name = self.classes[cls_id] if 0 <= cls_id < len(self.classes) else str(cls_id)
detections.append(Detection(
class_id=int(cls_id),
class_name=class_name,
confidence=float(conf),
bbox=(x1, y1, x2, y2),
))
return detections, elapsed_ms
class WmsAsyncClient:
def __init__(self, args):
self.args = args
self.enabled = bool(args.wms_enabled)
self.jobs: queue.Queue[WmsSnapshotJob | None] = queue.Queue(maxsize=args.wms_queue_max_size)
self.results: queue.Queue[WmsResult] = queue.Queue()
self._stop = threading.Event()
self._thread: threading.Thread | None = None
if self.enabled:
self._thread = threading.Thread(target=self._worker, name="flywms-wms-client", daemon=True)
self._thread.start()
def close(self) -> None:
if not self.enabled:
return
self._stop.set()
try:
self.jobs.put_nowait(None)
except queue.Full:
pass
if self._thread is not None:
self._thread.join(timeout=2.0)
def submit(self, snapshot: NavigationSnapshot) -> str | None:
if not self.enabled:
return None
request_id = str(uuid.uuid4())
metadata = {
"request_id": request_id,
"client_id": self.args.wms_client_id,
"snapshot_id": snapshot.snapshot_id,
"frame_id": snapshot.frame_id,
"timestamp": snapshot.timestamp,
"simulated_position": snapshot.simulated_position,
"track_id": snapshot.track_id,
"gaylord_bbox": list(snapshot.bbox),
"label_bbox": list(snapshot.label_bbox),
"movement_vector_px": {
"dx": snapshot.movement_vector_px[0],
"dy": snapshot.movement_vector_px[1],
},
}
job = WmsSnapshotJob(
request_id=request_id,
snapshot=snapshot,
metadata=metadata,
label_image_path=snapshot.label_payload_path,
gaylord_image_path=snapshot.debug_frame_path if self.args.wms_send_gaylord_debug else None,
)
try:
self.jobs.put_nowait(job)
except queue.Full:
self.results.put(WmsResult(
request_id=request_id,
snapshot_id=snapshot.snapshot_id,
status="ERROR",
message="WMS_QUEUE_FULL",
error="queue full",
))
return request_id
log(f"[WMS] job accodato request_id={request_id} snapshot={snapshot.snapshot_id}")
return request_id
def poll_results(self) -> list[WmsResult]:
results: list[WmsResult] = []
while True:
try:
results.append(self.results.get_nowait())
except queue.Empty:
return results
def wait_for_result(self, request_id: str | None, timeout_sec: float) -> WmsResult | None:
if not request_id:
return None
deadline = time.perf_counter() + max(0.0, timeout_sec)
deferred: list[WmsResult] = []
try:
while time.perf_counter() <= deadline:
try:
result = self.results.get(timeout=0.05)
except queue.Empty:
continue
if result.request_id == request_id:
return result
deferred.append(result)
return None
finally:
for result in deferred:
self.results.put(result)
def _worker(self) -> None:
while not self._stop.is_set():
try:
job = self.jobs.get(timeout=0.2)
except queue.Empty:
continue
if job is None:
break
result = self._send_job(job)
self.results.put(result)
def _send_job(self, job: WmsSnapshotJob) -> WmsResult:
try:
body, content_type = build_wms_multipart(job)
req = request.Request(
self.args.wms_server_url,
data=body,
headers={"Content-Type": content_type},
method="POST",
)
with request.urlopen(req, timeout=self.args.wms_timeout_sec) as response:
payload = json.loads(response.read().decode("utf-8"))
status = str(payload.get("status", "ERROR"))
message = str(payload.get("message", ""))
log(f"[WMS] risposta request_id={job.request_id} status={status} message={message}")
return WmsResult(job.request_id, job.snapshot.snapshot_id, status, message, payload)
except (OSError, error.URLError, TimeoutError, json.JSONDecodeError) as exc:
log(f"[WMS] errore request_id={job.request_id}: {exc}")
return WmsResult(
request_id=job.request_id,
snapshot_id=job.snapshot.snapshot_id,
status="ERROR",
message="WMS_SEND_ERROR",
error=str(exc),
)
class LightweightTracker:
"""Greedy bbox tracker: enough to explain and test navigation decisions."""
def __init__(
self,
max_missed: int,
min_match_score: float,
max_center_distance_ratio: float,
):
self.max_missed = max_missed
self.min_match_score = min_match_score
self.max_center_distance_ratio = max_center_distance_ratio
self._next_id = 1
self.tracks: dict[int, Track] = {}
def update(
self,
detections: list[Detection],
frame_id: int,
frame_width: int,
) -> list[Track]:
unmatched_tracks = set(self.tracks.keys())
unmatched_detections = set(range(len(detections)))
pairs: list[tuple[float, int, int]] = []
max_center_distance = max(1.0, frame_width * self.max_center_distance_ratio)
for track_id, track in self.tracks.items():
for det_idx, det in enumerate(detections):
score = association_score(track.bbox, det.bbox, max_center_distance)
if score >= self.min_match_score:
pairs.append((score, track_id, det_idx))
pairs.sort(reverse=True, key=lambda item: item[0])
for _, track_id, det_idx in pairs:
if track_id not in unmatched_tracks or det_idx not in unmatched_detections:
continue
det = detections[det_idx]
self.tracks[track_id].update(det.bbox, det.confidence, frame_id)
unmatched_tracks.remove(track_id)
unmatched_detections.remove(det_idx)
for track_id in list(unmatched_tracks):
self.tracks[track_id].mark_missed()
if self.tracks[track_id].missed > self.max_missed:
del self.tracks[track_id]
for det_idx in unmatched_detections:
det = detections[det_idx]
track_id = self._next_id
self._next_id += 1
self.tracks[track_id] = Track(
id=track_id,
bbox=det.bbox,
confidence=det.confidence,
first_seen_frame=frame_id,
last_seen_frame=frame_id,
)
return list(self.tracks.values())
class NavigationController:
def __init__(self, args):
self.args = args
self.output_dir = Path(args.snapshot_output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.metadata_path = self.output_dir / "snapshots.jsonl"
self.snapshot_counter = 0
self.position_counter = 0
self.last_command_text = ""
self.last_command_lines: list[str] = []
self.last_snapshot_frame: np.ndarray | None = None
self.last_ocr_payload_frame: np.ndarray | None = None
self.last_label_payload_frame: np.ndarray | None = None
self.last_remote_result_text = ""
self.motion_text = "MOTO: n/d"
self.label_movement_arrow: tuple[tuple[int, int], tuple[int, int]] | None = None
def process_track(
self,
track: Track,
frame: np.ndarray,
frame_id: int,
timestamp: float,
labels: list[Detection] | None = None,
) -> NavigationSnapshot | None:
labels = labels or []
frame_h, frame_w = frame.shape[:2]
eligible, score_parts = self._is_snapshot_candidate(track, frame_w, frame_h)
self._update_track_state(track, eligible, frame_w)
if track.already_snapshotted:
return None
if eligible:
label = find_label_inside_bbox(track.bbox, labels)
if label is None:
track.last_candidate_reason = "label_missing"
self.last_command_text = f"ETICHETTA_NON_TROVATA track={track.id}"
self.last_command_lines = [
self.last_command_text,
"MICRO_RICERCA_ETICHETTA",
"ATTENDI_FRAME_SUCCESSIVO",
]
log(f"[NAV] ETICHETTA_NON_TROVATA track={track.id}: attendo frame successivo")
return None
candidate = CandidateSnapshot(
frame_id=frame_id,
timestamp=timestamp,
frame=frame.copy(),
bbox=track.bbox,
label_bbox=label.bbox,
label_confidence=label.confidence,
score=score_parts["score"],
center_score=score_parts["center_score"],
size_score=score_parts["size_score"],
cut_score=score_parts["cut_score"],
)
track.candidates.append(candidate)
track.candidates = track.candidates[-self.args.snapshot_window_frames:]
if len(track.candidates) >= self.args.snapshot_window_frames:
return self._finalize_snapshot(track)
elif track.candidates:
return self._finalize_snapshot(track)
return None
def _is_snapshot_candidate(
self,
track: Track,
frame_w: int,
frame_h: int,
) -> tuple[bool, dict[str, float]]:
x1, y1, x2, y2 = track.bbox
cx, cy = bbox_center(track.bbox)
center_x = frame_w * 0.5
center_tolerance = max(1.0, frame_w * self.args.center_tolerance_ratio)
snapshot_tolerance = max(1.0, frame_w * self.args.snapshot_line_tolerance_ratio)
center_delta = abs(cx - center_x)
center_score = max(0.0, 1.0 - center_delta / center_tolerance)
area_ratio = bbox_area(track.bbox) / float(frame_w * frame_h)
size_score = min(1.0, area_ratio / max(self.args.min_gaylord_area_ratio * 4.0, 0.001))
if self.args.edge_margin_ratio <= 0:
cut = False
else:
edge_margin_x = frame_w * self.args.edge_margin_ratio
edge_margin_y = frame_h * self.args.edge_margin_ratio
cut = (
x1 <= edge_margin_x
or y1 <= edge_margin_y
or x2 >= frame_w - edge_margin_x
or y2 >= frame_h - edge_margin_y
)
cut_score = 0.0 if cut else 1.0
score = 0.50 * center_score + 0.30 * size_score + 0.20 * cut_score
in_center_band = center_delta <= center_tolerance
on_snapshot_line = center_delta <= snapshot_tolerance
in_y_band = (
frame_h * self.args.usable_y_min_ratio
<= cy
<= frame_h * self.args.usable_y_max_ratio
)
enough_hits = track.hits >= self.args.min_track_hits
large_enough = area_ratio >= self.args.min_gaylord_area_ratio
trend_ok = track.area_trend() >= self.args.min_area_trend
eligible = (
enough_hits
and on_snapshot_line
and in_y_band
and large_enough
and not cut
and trend_ok
and track.missed == 0
)
failed: list[str] = []
if not enough_hits:
failed.append(f"hits<{self.args.min_track_hits}")
if not in_center_band:
failed.append(f"outside_band={center_delta:.0f}>{center_tolerance:.0f}")
elif not on_snapshot_line:
failed.append(f"wait_line={center_delta:.0f}>{snapshot_tolerance:.0f}")
if not in_y_band:
failed.append("y_band")
if not large_enough:
failed.append(f"area={area_ratio:.3f}<{self.args.min_gaylord_area_ratio:.3f}")
if cut:
failed.append("edge_cut")
if not trend_ok:
failed.append(f"trend={track.area_trend():+.2f}<{self.args.min_area_trend:+.2f}")
if track.missed != 0:
failed.append(f"missed={track.missed}")
track.last_candidate_reason = "ok" if eligible else ",".join(failed)
return eligible, {
"score": score,
"center_score": center_score,
"size_score": size_score,
"cut_score": cut_score,
}
def _update_track_state(self, track: Track, eligible: bool, frame_w: int) -> None:
if track.already_snapshotted:
track.state = "snapshotted"
return
if track.missed > 0:
track.state = "exiting"
return
cx, _ = bbox_center(track.bbox)
center_delta = abs(cx - frame_w * 0.5)
snapshot_tolerance = frame_w * self.args.snapshot_line_tolerance_ratio
if eligible:
track.state = "centered"
elif track.hits < self.args.min_track_hits:
track.state = "entering"
elif center_delta <= snapshot_tolerance:
track.state = "centered"
elif center_delta <= frame_w * self.args.center_tolerance_ratio:
track.state = "candidate"
else:
track.state = "entering"
def _finalize_snapshot(self, track: Track) -> NavigationSnapshot | None:
if not track.candidates:
return None
best = max(track.candidates, key=lambda item: item.score)
track.candidates.clear()
track.already_snapshotted = True
track.state = "snapshotted"
self.snapshot_counter += 1
self.position_counter += 1
simulated_position = f"gaylord {self.position_counter}"
debug_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_frame.jpg"
payload_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_ocr_payload.jpg"
label_payload_name = f"snapshot_{self.snapshot_counter:04d}_track_{track.id:03d}_label_payload.jpg"
debug_path = self.output_dir / debug_name
payload_path = self.output_dir / payload_name
label_payload_path = self.output_dir / label_payload_name
cv2.imwrite(str(debug_path), best.frame)
ocr_payload = crop_with_padding(
best.frame,
best.bbox,
self.args.ocr_payload_pad_ratio,
)
cv2.imwrite(str(payload_path), ocr_payload)
label_payload = crop_with_padding(
best.frame,
best.label_bbox,
self.args.label_payload_pad_ratio,
)
cv2.imwrite(str(label_payload_path), label_payload)
self.last_snapshot_frame = best.frame.copy()
self.last_ocr_payload_frame = ocr_payload.copy()
self.last_label_payload_frame = label_payload.copy()
gaylord_center = bbox_center(best.bbox)
label_center = bbox_center(best.label_bbox)
movement_vector = (
label_center[0] - gaylord_center[0],
label_center[1] - gaylord_center[1],
)
self.label_movement_arrow = (
(int(round(gaylord_center[0])), int(round(gaylord_center[1]))),
(int(round(label_center[0])), int(round(label_center[1]))),
)
snapshot = NavigationSnapshot(
snapshot_id=self.snapshot_counter,
frame_id=best.frame_id,
timestamp=best.timestamp,
simulated_position=simulated_position,
track_id=track.id,
bbox=best.bbox,
label_bbox=best.label_bbox,
score=best.score,
debug_frame_path=str(debug_path),
ocr_payload_path=str(payload_path),
label_payload_path=str(label_payload_path),
movement_vector_px=movement_vector,
)
self._write_metadata(snapshot)
self._print_commands(snapshot)
return snapshot
def simulate_remote_response(self, snapshot: NavigationSnapshot) -> str:
mode = self.args.remote_ack_mode
if mode == "always-ack":
result = "ACK"
elif mode == "always-nack":
result = "NACK"
else:
result = "ACK" if snapshot.snapshot_id % 2 == 1 else "NACK"
if result == "ACK":
self.last_remote_result_text = "ACK_RICEVUTO: codice valido su WMS"
resume_command = f"RIPARTI_{self.args.scan_direction.upper()}"
self.last_command_lines.extend([
self.last_remote_result_text,
resume_command,
])
log("[REMOTE] ACK_RICEVUTO codice valido su WMS")
log(f"[CMD] {resume_command}")
else:
self.last_remote_result_text = "NACK_RICEVUTO: riprovare foto"
self.last_command_lines.extend([
self.last_remote_result_text,
"MICRO_MOVE_CORRETTIVO",
"SCATTA_FOTO_RETRY",
])
log("[REMOTE] NACK_RICEVUTO codice assente/non valido")
log("[CMD] MICRO_MOVE_CORRETTIVO")
log("[CMD] SCATTA_FOTO_RETRY")
return result
def apply_wms_result(self, result: WmsResult | None, snapshot: NavigationSnapshot) -> str:
if result is None:
self.last_remote_result_text = "WMS_TIMEOUT: nessuna risposta entro il timeout"
self.last_command_lines.extend([
self.last_remote_result_text,
"MICRO_MOVE_CORRETTIVO",
"SCATTA_FOTO_RETRY",
])
log("[WMS] TIMEOUT nessuna risposta entro il timeout")
return "TIMEOUT"
status = result.status.upper()
if status == "ACK":
message = result.message or "codice valido su WMS"
ocr_text = ""
if result.response:
ocr_text = str(result.response.get("ocr_text") or result.response.get("fake_ocr_text") or "")
self.last_remote_result_text = f"WMS_ACK: {message}"
resume_command = f"RIPARTI_{self.args.scan_direction.upper()}"
lines = [self.last_remote_result_text]
if ocr_text:
lines.append(f"OCR_CODICE {ocr_text}")
lines.append(resume_command)
self.last_command_lines.extend(lines)
log(f"[WMS] ACK request_id={result.request_id} snapshot={result.snapshot_id} {message}")
log(f"[CMD] {resume_command}")
return "ACK"
if status == "NACK":
message = result.message or "codice non riconosciuto"
self.last_remote_result_text = f"WMS_NACK: {message}"
self.last_command_lines.extend([
self.last_remote_result_text,
"MICRO_MOVE_CORRETTIVO",
"SCATTA_FOTO_RETRY",
])
log(f"[WMS] NACK request_id={result.request_id} snapshot={result.snapshot_id} {message}")
return "NACK"
message = result.error or result.message or "errore WMS"
self.last_remote_result_text = f"WMS_ERROR: {message}"
self.last_command_lines.extend([
self.last_remote_result_text,
"MICRO_MOVE_CORRETTIVO",
"SCATTA_FOTO_RETRY",
])
log(f"[WMS] ERROR request_id={result.request_id} snapshot={result.snapshot_id} {message}")
return "ERROR"
def set_motion_text(self, text: str) -> None:
self.motion_text = text
def _write_metadata(self, snapshot: NavigationSnapshot) -> None:
record = {
"snapshot_id": snapshot.snapshot_id,
"frame_id": snapshot.frame_id,
"timestamp": snapshot.timestamp,
"simulated_position": snapshot.simulated_position,
"drone_pose_simulated": {
"mode": "linear_shelf_scan",
"position_label": snapshot.simulated_position,
},
"track_id": snapshot.track_id,
"gaylord_bbox": list(snapshot.bbox),
"label_bbox": list(snapshot.label_bbox),
"score": snapshot.score,
"movement_vector_px": {
"dx": snapshot.movement_vector_px[0],
"dy": snapshot.movement_vector_px[1],
},
"debug_frame_path": snapshot.debug_frame_path,
"ocr_payload_path": snapshot.ocr_payload_path,
"label_payload_path": snapshot.label_payload_path,
}
with self.metadata_path.open("at", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=True) + "\n")
def _print_commands(self, snapshot: NavigationSnapshot) -> None:
self.last_command_text = (
f"SNAPSHOT {snapshot.snapshot_id:04d} "
f"track={snapshot.track_id} frame={snapshot.frame_id} "
f"pos={snapshot.simulated_position} score={snapshot.score:.2f}"
)
dx, dy = snapshot.movement_vector_px
self.last_command_lines = [
self.last_command_text,
"STOP",
f"ASSOCIA_POSIZIONE {snapshot.simulated_position}",
f"ASSOCIA_ETICHETTA {snapshot.simulated_position} - etichetta {snapshot.snapshot_id}",
f"CENTRA_ETICHETTA dx={dx:+.0f}px dy={dy:+.0f}px durata={self.args.label_move_sec:.1f}s",
f"ATTENDI_STABILIZZAZIONE {self.args.label_stabilization_sec:.1f}s",
f"SCATTA_FOTO_ETICHETTA {Path(snapshot.label_payload_path).name}",
f"RITORNA_CENTRO_GAYLORD durata={self.args.label_return_sec:.1f}s",
f"SCATTA_FOTO_GAYLORD {Path(snapshot.debug_frame_path).name}",
f"INVIA_ROI_REMOTA {Path(snapshot.label_payload_path).name}",
f"ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s",
]
log(f"[NAV] {self.last_command_text}")
log("[CMD] STOP")
log(f"[CMD] ASSOCIA_POSIZIONE {snapshot.simulated_position}")
log(f"[CMD] ASSOCIA_ETICHETTA {snapshot.simulated_position} - etichetta {snapshot.snapshot_id}")
log(f"[CMD] CENTRA_ETICHETTA dx={dx:+.0f}px dy={dy:+.0f}px durata={self.args.label_move_sec:.1f}s")
log(f"[CMD] ATTENDI_STABILIZZAZIONE {self.args.label_stabilization_sec:.1f}s")
log(f"[CMD] SCATTA_FOTO_ETICHETTA {Path(snapshot.label_payload_path).name}")
log(f"[CMD] RITORNA_CENTRO_GAYLORD durata={self.args.label_return_sec:.1f}s")
log(f"[CMD] SCATTA_FOTO_GAYLORD {Path(snapshot.debug_frame_path).name}")
log(f"[CMD] INVIA_ROI_REMOTA {Path(snapshot.label_payload_path).name}")
log(f"[CMD] ATTENDI_ACK timeout={self.args.remote_ack_timeout_sec:.1f}s")
def set_label_sequence_phase(self, snapshot: NavigationSnapshot, phase: str) -> None:
dx, dy = snapshot.movement_vector_px
base = [
self.last_command_text,
"STOP",
f"ASSOCIA_POSIZIONE {snapshot.simulated_position}",
f"ASSOCIA_ETICHETTA {snapshot.simulated_position} - etichetta {snapshot.snapshot_id}",
]
if phase == "move":
detail = [
f"CENTRA_ETICHETTA dx={dx:+.0f}px dy={dy:+.0f}px durata={self.args.label_move_sec:.1f}s",
"FRECCIA_MOVIMENTO_GAYLORD_ETICHETTA",
]
elif phase == "stabilize":
detail = [
"DRONE_CENTRATO_SU_ETICHETTA",
f"ATTENDI_STABILIZZAZIONE {self.args.label_stabilization_sec:.1f}s",
]
elif phase == "capture":
detail = [
f"SCATTA_FOTO_ETICHETTA {Path(snapshot.label_payload_path).name}",
f"INVIA_ROI_REMOTA {Path(snapshot.label_payload_path).name}",
]
elif phase == "wait_wms":
detail = [
f"INVIA_ROI_REMOTA {Path(snapshot.label_payload_path).name}",
f"ATTENDI_WMS timeout={self.args.remote_ack_timeout_sec:.1f}s",
]
elif phase == "return":
rdx = -dx
rdy = -dy
detail = [
f"RITORNA_CENTRO_GAYLORD dx={rdx:+.0f}px dy={rdy:+.0f}px durata={self.args.label_return_sec:.1f}s",
"FRECCIA_RITORNO_ETICHETTA_GAYLORD",
]
else:
detail = []
self.last_command_lines = base + detail
def parse_args():
pre = argparse.ArgumentParser(add_help=False)
pre.add_argument("--config", default=DEFAULT_CONFIG_PATH, help="File configurazione INI")
pre_args, _ = pre.parse_known_args()
defaults = load_navigation_config(pre_args.config)
ap = argparse.ArgumentParser(parents=[pre])
ap.add_argument("-v", "--video", default=defaults["video"], help="Percorso video. Se omesso usa webcam 0")
ap.add_argument(
"--weights",
default=defaults["weights"],
help="Modello Ultralytics .pt",
)
ap.add_argument("--ultralytics-device", default=defaults["ultralytics_device"], help="Device Ultralytics: cpu oppure 0")
ap.add_argument("--yolo-half", action=argparse.BooleanOptionalAction, default=defaults["yolo_half"],
help="Usa FP16/half precision per YOLO quando il device e' GPU")
ap.add_argument("--input-size", type=int, default=defaults["input_size"], help="Dimensione input YOLO")
ap.add_argument("--min-confidence", type=float, default=defaults["min_confidence"], help="Confidenza minima")
ap.add_argument("--target-class", default=defaults["target_class"], help="Classe da tracciare")
ap.add_argument("--label-class", default=defaults["label_class"], help="Classe etichetta da associare al gaylord")
ap.add_argument("--max-track-missed", type=int, default=defaults["max_track_missed"], help="Frame persi prima di rimuovere una track")
ap.add_argument("--min-match-score", type=float, default=defaults["min_match_score"], help="Soglia associazione detection-track")
ap.add_argument("--max-center-distance-ratio", type=float, default=defaults["max_center_distance_ratio"], help="Distanza max centri per matching")
ap.add_argument("--center-tolerance-ratio", type=float, default=defaults["center_tolerance_ratio"], help="Mezza ampiezza zona centrale")
ap.add_argument("--snapshot-line-tolerance-ratio", type=float, default=defaults["snapshot_line_tolerance_ratio"],
help="Tolleranza stretta dalla linea centrale per scattare")
ap.add_argument("--usable-y-min-ratio", type=float, default=defaults["usable_y_min_ratio"], help="Limite alto fascia utile Y")
ap.add_argument("--usable-y-max-ratio", type=float, default=defaults["usable_y_max_ratio"], help="Limite basso fascia utile Y")
ap.add_argument("--min-track-hits", type=int, default=defaults["min_track_hits"], help="Detection consecutive minime")
ap.add_argument("--min-gaylord-area-ratio", type=float, default=defaults["min_gaylord_area_ratio"], help="Area bbox minima sul frame")
ap.add_argument("--edge-margin-ratio", type=float, default=defaults["edge_margin_ratio"], help="Margine per considerare bbox tagliato")
ap.add_argument("--ocr-payload-pad-ratio", type=float, default=defaults["ocr_payload_pad_ratio"],
help="Padding intorno al bbox centrale inviato all'OCR remoto")
ap.add_argument("--label-payload-pad-ratio", type=float, default=defaults["label_payload_pad_ratio"],
help="Padding intorno al bbox etichetta inviato all'OCR remoto")
ap.add_argument("--min-area-trend", type=float, default=defaults["min_area_trend"], help="Trend area minimo ammesso")
ap.add_argument("--snapshot-window-frames", type=int, default=defaults["snapshot_window_frames"], help="Candidati da valutare prima dello snapshot")
ap.add_argument("--snapshot-output-dir", default=defaults["snapshot_output_dir"], help="Directory snapshot e JSONL")
ap.add_argument("--remote-ack-timeout-sec", type=float, default=defaults["remote_ack_timeout_sec"],
help="Tempo simulato di attesa OCR remoto/WMS")
ap.add_argument("--remote-ack-mode", choices=["always-ack", "always-nack", "alternate"],
default=defaults["remote_ack_mode"], help="Risposta remota simulata")
ap.add_argument("--wms-enabled", action="store_true", default=defaults["wms_enabled"],
help="Invia realmente snapshot al WMS demo")
ap.add_argument("--wms-server-url", default=defaults["wms_server_url"], help="Endpoint HTTP WMS")
ap.add_argument("--wms-client-id", default=defaults["wms_client_id"], help="Identificativo client WMS")
ap.add_argument("--wms-timeout-sec", type=float, default=defaults["wms_timeout_sec"], help="Timeout chiamata WMS")
ap.add_argument("--wms-queue-max-size", type=int, default=defaults["wms_queue_max_size"], help="Massimo job WMS in coda")
ap.add_argument("--wms-send-gaylord-debug", action="store_true", default=defaults["wms_send_gaylord_debug"],
help="Invia anche immagine/debug gaylord al server WMS")
ap.add_argument("--scan-direction", choices=["destra", "sinistra"], default=defaults["scan_direction"],
help="Direzione simulata di ripartenza dopo ACK")
ap.add_argument("--label-move-sec", type=float, default=defaults["label_move_sec"],
help="Durata simulata movimento verso etichetta")
ap.add_argument("--label-stabilization-sec", type=float, default=defaults["label_stabilization_sec"],
help="Attesa simulata stabilizzazione su etichetta")
ap.add_argument("--label-return-sec", type=float, default=defaults["label_return_sec"],
help="Durata simulata ritorno al centro gaylord")
ap.add_argument("--preview-width", type=int, default=defaults["preview_width"], help="Larghezza preview")
ap.add_argument("--benchmark-mode", action=argparse.BooleanOptionalAction, default=defaults["benchmark_mode"],
help="Profilo benchmark: niente finestre, preview a 30 fps e log tempi dedicato")
ap.add_argument("--benchmark-preview-fps", type=float, default=defaults["benchmark_preview_fps"],
help="FPS target preview/cattura usati dal profilo benchmark")
ap.add_argument("--realtime-playback", action="store_true", default=defaults["realtime_playback"], help="Rispetta FPS video")
ap.add_argument("--preview-fps", type=float, default=defaults["preview_fps"],
help="FPS massimo per lettura/preview realtime. 0 = FPS sorgente")
ap.add_argument("--yolo-fps", type=float, default=defaults["yolo_fps"],
help="FPS massimo per inferenza YOLO. 0 = ogni frame di preview")
ap.add_argument("--max-frames", type=int, default=defaults["max_frames"], help="Numero massimo frame; 0 = tutto")
ap.add_argument("--stats-interval", type=float, default=defaults["stats_interval"], help="Intervallo log prestazioni")
ap.add_argument("--perf-log-path", default=defaults["perf_log_path"], help="File log tempi dettagliati")
ap.add_argument("--perf-log-flush-interval-sec", type=float, default=defaults["perf_log_flush_interval_sec"],
help="Intervallo flush file log tempi")
ap.add_argument("--perf-log-flush-lines", type=int, default=defaults["perf_log_flush_lines"],
help="Numero righe bufferizzate prima del flush del log tempi")
ap.add_argument("--motion-report-interval", type=int, default=defaults["motion_report_interval"],
help="Ogni quanti frame aggiornare la direzione moto stimata")
ap.add_argument("--motion-min-pixels", type=float, default=defaults["motion_min_pixels"],
help="Spostamento medio minimo per dichiarare una direzione")
ap.add_argument("--debug-tracks", action="store_true", default=defaults["debug_tracks"], help="Logga stato e criteri delle track")
ap.add_argument("--flash-alpha", type=float, default=defaults["flash_alpha"], help="Intensita' flash 0..1 al momento dello scatto")
ap.add_argument("--no-display", action="store_true", default=defaults["no_display"], help="Disabilita finestra video")
ap.add_argument("--window-layout-enabled", action="store_true", default=defaults["window_layout_enabled"],
help="Posiziona e ridimensiona le finestre OpenCV")
ap.add_argument("--navigate-window", default=defaults["navigate_window"], help="Layout finestra navigate: x,y,w,h")
ap.add_argument("--commands-window", default=defaults["commands_window"], help="Layout finestra comandi: x,y,w,h")
ap.add_argument("--snapshot-window", default=defaults["snapshot_window"], help="Layout finestra snapshot: x,y,w,h")
ap.add_argument("--label-window", default=defaults["label_window"], help="Layout finestra etichetta: x,y,w,h")
return ap.parse_args()
def load_navigation_config(path_str: str) -> dict[str, object]:
defaults: dict[str, object] = {
"video": "testhd.mp4",
"weights": r"C:\devel\flywms\runs\flywms_yolo11n_quick20\weights\best.pt",
"ultralytics_device": "cpu",
"yolo_half": True,
"input_size": 640,
"min_confidence": 0.25,
"target_class": "gaylord",
"label_class": "etichetta",
"max_track_missed": 8,
"min_match_score": 0.25,
"max_center_distance_ratio": 0.18,
"center_tolerance_ratio": 0.18,
"snapshot_line_tolerance_ratio": 0.035,
"usable_y_min_ratio": 0.15,
"usable_y_max_ratio": 0.85,
"min_track_hits": 3,
"min_gaylord_area_ratio": 0.02,
"edge_margin_ratio": 0.0,
"ocr_payload_pad_ratio": 0.03,
"label_payload_pad_ratio": 0.20,
"min_area_trend": -0.35,
"snapshot_window_frames": 1,
"snapshot_output_dir": "navigate_snapshots",
"remote_ack_timeout_sec": 2.0,
"remote_ack_mode": "always-ack",
"wms_enabled": False,
"wms_server_url": "http://127.0.0.1:8088/api/v1/navigation-snapshot",
"wms_client_id": "flywms-demo-01",
"wms_timeout_sec": 2.0,
"wms_queue_max_size": 8,
"wms_send_gaylord_debug": True,
"scan_direction": "destra",
"label_move_sec": 3.0,
"label_stabilization_sec": 2.0,
"label_return_sec": 3.0,
"preview_width": 1280,
"benchmark_mode": False,
"benchmark_preview_fps": 30.0,
"realtime_playback": True,
"preview_fps": 24.0,
"yolo_fps": 15.0,
"max_frames": 0,
"stats_interval": 2.0,
"perf_log_path": "tempistiche.txt",
"perf_log_flush_interval_sec": 2.0,
"perf_log_flush_lines": 120,
"motion_report_interval": 5,
"motion_min_pixels": 1.5,
"debug_tracks": True,
"flash_alpha": 0.70,
"no_display": False,
"window_layout_enabled": True,
"navigate_window": "20,40,1100,620",
"commands_window": "1140,40,760,520",
"snapshot_window": "1140,590,520,360",
"label_window": "1140,980,520,260",
}
path = Path(path_str)
if not path.exists():
return defaults
parser = configparser.ConfigParser()
parser.read(path, encoding="utf-8")
section = parser["navigation"] if parser.has_section("navigation") else {}
types = {key: type(value) for key, value in defaults.items()}
for key, default_value in defaults.items():
if key not in section:
continue
if types[key] is bool:
defaults[key] = parser.getboolean("navigation", key, fallback=bool(default_value))
elif types[key] is int:
defaults[key] = parser.getint("navigation", key, fallback=int(default_value))
elif types[key] is float:
defaults[key] = parser.getfloat("navigation", key, fallback=float(default_value))
else:
value = section.get(key, str(default_value)).strip()
defaults[key] = None if value.lower() in ("", "none", "null") else value
return defaults
def log(msg: str) -> None:
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
def format_fps_value(value: float | int | None) -> str:
if value is None:
return "n/d"
try:
numeric = float(value)
except (TypeError, ValueError):
return "n/d"
if numeric <= 0:
return "n/d"
return f"{numeric:.1f}"
class PerfLogWriter:
def __init__(self, path_str: str, flush_interval_sec: float, flush_lines: int):
self.path = Path(path_str)
self.path.parent.mkdir(parents=True, exist_ok=True)
self.file = self.path.open("a", encoding="utf-8")
self.flush_interval_sec = max(0.2, float(flush_interval_sec))
self.flush_lines = max(1, int(flush_lines))
self.buffered_lines = 0
self.last_flush = time.perf_counter()
def write(self, line: str) -> None:
self.file.write(line.rstrip("\n") + "\n")
self.buffered_lines += 1
self.maybe_flush()
def maybe_flush(self) -> None:
now = time.perf_counter()
if self.buffered_lines >= self.flush_lines or (now - self.last_flush) >= self.flush_interval_sec:
self.file.flush()
self.buffered_lines = 0
self.last_flush = now
def close(self) -> None:
try:
self.file.flush()
finally:
self.file.close()
def require_file(path_str: str, description: str) -> Path:
path = Path(path_str)
if not path.exists():
log(f"ERRORE: {description} non trovato: {path}")
sys.exit(1)
return path
def open_capture(video_arg: str | None):
if video_arg is None:
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(0)
return cap, "camera:0"
if str(video_arg).isdigit():
idx = int(video_arg)
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
if not cap.isOpened():
cap = cv2.VideoCapture(idx)
return cap, f"camera:{idx}"
return cv2.VideoCapture(video_arg), str(video_arg)
def build_wms_multipart(job: WmsSnapshotJob) -> tuple[bytes, str]:
boundary = f"flywms-{uuid.uuid4().hex}"
chunks: list[bytes] = []
def add_field(name: str, value: str, content_type: str = "text/plain") -> None:
chunks.extend([
f"--{boundary}\r\n".encode("ascii"),
f'Content-Disposition: form-data; name="{name}"\r\n'.encode("ascii"),
f"Content-Type: {content_type}\r\n\r\n".encode("ascii"),
value.encode("utf-8"),
b"\r\n",
])
def add_file(name: str, path_str: str) -> None:
path = Path(path_str)
chunks.extend([
f"--{boundary}\r\n".encode("ascii"),
(
f'Content-Disposition: form-data; name="{name}"; '
f'filename="{path.name}"\r\n'
).encode("utf-8"),
b"Content-Type: image/jpeg\r\n\r\n",
path.read_bytes(),
b"\r\n",
])
add_field("metadata", json.dumps(job.metadata, ensure_ascii=True), "application/json")
add_file("label_image", job.label_image_path)
if job.gaylord_image_path is not None:
add_file("gaylord_image", job.gaylord_image_path)
chunks.append(f"--{boundary}--\r\n".encode("ascii"))
return b"".join(chunks), f"multipart/form-data; boundary={boundary}"
def parse_window_rect(value: str | None) -> tuple[int, int, int, int] | None:
if value is None:
return None
parts = [part.strip() for part in str(value).split(",")]
if len(parts) != 4:
return None
try:
x, y, w, h = [int(part) for part in parts]
except ValueError:
return None
if w <= 0 or h <= 0:
return None
return x, y, w, h
def apply_window_layout(args) -> None:
if not args.window_layout_enabled:
return
layout = {
"flywms navigate": args.navigate_window,
"flywms comandi": args.commands_window,
"flywms snapshot": args.snapshot_window,
"flywms etichetta": args.label_window,
}
for name, value in layout.items():
rect = parse_window_rect(value)
if rect is None:
log(f"Layout finestra ignorato per {name}: {value}")
continue
x, y, w, h = rect
cv2.resizeWindow(name, w, h)
cv2.moveWindow(name, x, y)
def clip_box(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> tuple[int, int, int, int]:
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
return x1, y1, x2, y2
def crop_with_padding(
frame: np.ndarray,
bbox: tuple[int, int, int, int],
pad_ratio: float,
) -> np.ndarray:
x1, y1, x2, y2 = bbox
bw = x2 - x1
bh = y2 - y1
pad_x = int(max(0.0, pad_ratio) * bw)
pad_y = int(max(0.0, pad_ratio) * bh)
cx1, cy1, cx2, cy2 = clip_box(
x1 - pad_x,
y1 - pad_y,
x2 + pad_x,
y2 + pad_y,
frame.shape[1],
frame.shape[0],
)
return frame[cy1:cy2, cx1:cx2].copy()
def bbox_area(bbox: tuple[int, int, int, int]) -> int:
x1, y1, x2, y2 = bbox
return max(0, x2 - x1) * max(0, y2 - y1)
def bbox_center(bbox: tuple[int, int, int, int]) -> tuple[float, float]:
x1, y1, x2, y2 = bbox
return (x1 + x2) * 0.5, (y1 + y2) * 0.5
def bbox_contains(
outer: tuple[int, int, int, int],
inner: tuple[int, int, int, int],
) -> bool:
ox1, oy1, ox2, oy2 = outer
ix1, iy1, ix2, iy2 = inner
return ox1 <= ix1 and oy1 <= iy1 and ix2 <= ox2 and iy2 <= oy2
def find_label_inside_bbox(
gaylord_bbox: tuple[int, int, int, int],
labels: list[Detection],
) -> Detection | None:
contained = [label for label in labels if bbox_contains(gaylord_bbox, label.bbox)]
if not contained:
return None
if len(contained) == 1:
return contained[0]
# Multiple contained labels should not happen; choose a deterministic fallback.
gx, gy = bbox_center(gaylord_bbox)
return min(
contained,
key=lambda label: (
np.hypot(bbox_center(label.bbox)[0] - gx, bbox_center(label.bbox)[1] - gy),
-label.confidence,
),
)
def bbox_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float:
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
inter = bbox_area((ix1, iy1, ix2, iy2))
union = bbox_area(a) + bbox_area(b) - inter
if union <= 0:
return 0.0
return inter / float(union)
def association_score(
track_bbox: tuple[int, int, int, int],
det_bbox: tuple[int, int, int, int],
max_center_distance: float,
) -> float:
iou = bbox_iou(track_bbox, det_bbox)
tx, ty = bbox_center(track_bbox)
dx, dy = bbox_center(det_bbox)
center_dist = float(np.hypot(tx - dx, ty - dy))
center_similarity = max(0.0, 1.0 - center_dist / max_center_distance)
return 0.70 * iou + 0.30 * center_similarity
def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray:
h, w = frame.shape[:2]
if max_width <= 0 or w <= max_width:
return frame
scale = max_width / float(w)
return cuda_resize(frame, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LINEAR)
def draw_navigation_debug(
frame: np.ndarray,
tracks: list[Track],
args,
labels: list[Detection] | None = None,
movement_arrow: tuple[tuple[int, int], tuple[int, int]] | None = None,
) -> np.ndarray:
labels = labels or []
display = frame.copy()
h, w = display.shape[:2]
center_x = int(w * 0.5)
tol = int(w * args.center_tolerance_ratio)
y_min = int(h * args.usable_y_min_ratio)
y_max = int(h * args.usable_y_max_ratio)
cv2.rectangle(display, (center_x - tol, y_min), (center_x + tol, y_max), (255, 255, 0), 4)
cv2.line(display, (center_x, 0), (center_x, h), (255, 255, 0), 3)
cv2.line(display, (0, y_min), (w, y_min), (100, 100, 100), 2)
cv2.line(display, (0, y_max), (w, y_max), (100, 100, 100), 2)
for track in tracks:
x1, y1, x2, y2 = track.bbox
color = state_color(track.state)
thickness = 8 if track.state == "centered" else 5
cv2.rectangle(display, (x1, y1), (x2, y2), color, thickness)
cx, cy = bbox_center(track.bbox)
cv2.circle(display, (int(cx), int(cy)), 12, color, -1)
cv2.circle(display, (int(cx), int(cy)), 18, (0, 0, 0), 3)
for label in labels:
x1, y1, x2, y2 = label.bbox
cv2.rectangle(display, (x1, y1), (x2, y2), (0, 170, 255), 4)
lx, ly = bbox_center(label.bbox)
cv2.circle(display, (int(lx), int(ly)), 8, (0, 170, 255), -1)
if movement_arrow is not None:
start, end = movement_arrow
cv2.arrowedLine(display, start, end, (0, 0, 0), 12, cv2.LINE_AA, tipLength=0.12)
cv2.arrowedLine(display, end, start, (0, 0, 0), 12, cv2.LINE_AA, tipLength=0.12)
return resize_preview(display, args.preview_width)
def draw_commands_window(command_lines: list[str], motion_text: str) -> np.ndarray:
lines = command_lines if command_lines else ["Nessun comando generato"]
visible_lines = lines[:14]
arrow_mode = command_arrow_mode(lines)
arrow_h = 210 if arrow_mode else 0
canvas_h = max(340, 84 + len(visible_lines) * 34 + arrow_h)
canvas = np.full((canvas_h, 980, 3), 245, dtype=np.uint8)
cv2.putText(
canvas,
"COMANDI NAVIGAZIONE",
(24, 42),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 0, 0),
2,
cv2.LINE_AA,
)
cv2.putText(
canvas,
motion_text,
(24, 76),
cv2.FONT_HERSHEY_SIMPLEX,
0.80,
(120, 0, 120),
2,
cv2.LINE_AA,
)
y = 122
for idx, line in enumerate(visible_lines):
color = (0, 0, 180) if idx == 0 else (0, 90, 0)
cv2.putText(
canvas,
line,
(24, y),
cv2.FONT_HERSHEY_SIMPLEX,
0.82,
color,
2,
cv2.LINE_AA,
)
y += 36
if arrow_mode:
draw_command_arrow(canvas, arrow_mode, y + 16)
return canvas
def command_arrow_mode(lines: list[str]) -> tuple[str, float, float] | None:
for line in lines:
if line.startswith("CENTRA_ETICHETTA"):
return "to_label", *parse_command_delta(line)
if line.startswith("RITORNA_CENTRO_GAYLORD"):
dx, dy = parse_command_delta(line)
return "to_gaylord", -dx, -dy
return None
def parse_command_delta(line: str) -> tuple[float, float]:
dx = 0.0
dy = 0.0
for part in line.split():
if part.startswith("dx="):
dx = float(part[3:].replace("px", ""))
elif part.startswith("dy="):
dy = float(part[3:].replace("px", ""))
return dx, dy
def draw_command_arrow(canvas: np.ndarray, arrow_info: tuple[str, float, float], y0: int) -> None:
mode, dx, dy = arrow_info
h, w = canvas.shape[:2]
y0 = min(max(y0, 110), h - 160)
center = (w // 2 - 120, y0 + 130)
northeast = (w // 2 + 150, y0 + 34)
if mode == "to_label":
start, end = center, northeast
title = "MOVIMENTO VERSO ETICHETTA"
start_label = "CENTRO GAYLORD"
end_label = "ETICHETTA"
else:
start, end = northeast, center
title = "RITORNO AL CENTRO GAYLORD"
start_label = "ETICHETTA"
end_label = "CENTRO GAYLORD"
cv2.rectangle(canvas, (24, y0), (w - 24, min(h - 18, y0 + 170)), (232, 232, 232), -1)
cv2.rectangle(canvas, (24, y0), (w - 24, min(h - 18, y0 + 170)), (120, 120, 120), 2)
cv2.putText(canvas, title, (44, y0 + 34), cv2.FONT_HERSHEY_SIMPLEX, 0.86, (0, 0, 0), 2, cv2.LINE_AA)
cv2.putText(canvas, f"DX ORIZZONTALE: {dx:+.0f} px", (44, y0 + 72), cv2.FONT_HERSHEY_SIMPLEX, 0.86, (0, 0, 180), 2, cv2.LINE_AA)
cv2.putText(canvas, f"DY VERTICALE: {dy:+.0f} px", (44, y0 + 108), cv2.FONT_HERSHEY_SIMPLEX, 0.86, (0, 0, 180), 2, cv2.LINE_AA)
cv2.circle(canvas, center, 16, (70, 70, 70), -1)
cv2.circle(canvas, northeast, 16, (0, 120, 255), -1)
cv2.arrowedLine(canvas, start, end, (0, 0, 0), 12, cv2.LINE_AA, tipLength=0.22)
cv2.putText(canvas, start_label, (start[0] - 115, start[1] + 42), cv2.FONT_HERSHEY_SIMPLEX, 0.62, (0, 0, 0), 2, cv2.LINE_AA)
cv2.putText(canvas, end_label, (end[0] - 78, max(24, end[1] - 24)), cv2.FONT_HERSHEY_SIMPLEX, 0.62, (0, 0, 0), 2, cv2.LINE_AA)
def apply_flash(frame: np.ndarray, alpha: float) -> np.ndarray:
flash = np.full_like(frame, 255)
alpha = min(max(alpha, 0.0), 1.0)
return cv2.addWeighted(frame, 1.0 - alpha, flash, alpha, 0.0)
def estimate_motion_from_tracks(tracks: list[Track], min_pixels: float) -> str:
deltas: list[tuple[float, float]] = []
for track in tracks:
if track.missed != 0 or len(track.center_history) < 2:
continue
x0, y0 = track.center_history[-2]
x1, y1 = track.center_history[-1]
deltas.append((x1 - x0, y1 - y0))
if not deltas:
return "MOTO: n/d"
dx = sum(delta[0] for delta in deltas) / len(deltas)
dy = sum(delta[1] for delta in deltas) / len(deltas)
abs_dx = abs(dx)
abs_dy = abs(dy)
if abs_dx < min_pixels and abs_dy < min_pixels:
direction = "stabile"
elif abs_dx >= abs_dy:
direction = "destra" if dx > 0 else "sinistra"
else:
direction = "giu" if dy > 0 else "su"
return f"MOTO: {direction} dx={dx:+.1f}px dy={dy:+.1f}px tracks={len(deltas)}"
def state_color(state: str) -> tuple[int, int, int]:
if state == "centered":
return (0, 255, 255)
if state == "snapshotted":
return (255, 0, 255)
if state == "candidate":
return (0, 255, 0)
if state == "exiting":
return (0, 140, 255)
return (255, 255, 255)
def main() -> int:
args = parse_args()
if args.benchmark_mode:
args.no_display = True
args.window_layout_enabled = False
args.realtime_playback = True
args.preview_fps = args.benchmark_preview_fps
if args.perf_log_path == "tempistiche.txt":
args.perf_log_path = "tempistiche-benchmark.txt"
log(
f"Profilo benchmark attivo: no_display=true "
f"preview_fps={format_fps_value(args.preview_fps)} "
f"log={args.perf_log_path}"
)
require_file(args.weights, "modello Ultralytics")
detector = UltralyticsDetector(args.weights, args.ultralytics_device, args.yolo_half)
log(f"Classi modello: {detector.classes}")
log(f"OpenCV CUDA: {'attivo' if OPENCV_CUDA_AVAILABLE else 'non disponibile'}")
log(f"YOLO half precision: {'attiva' if detector.half else 'disattiva'}")
log("Nota tracker: questa versione usa tracking geometrico interno; ByteTrack/BoT-SORT restano candidati per confronto successivo.")
cap, source_name = open_capture(args.video)
if not cap.isOpened():
log(f"ERRORE: impossibile aprire sorgente video: {source_name}")
return 1
video_fps = cap.get(cv2.CAP_PROP_FPS)
preview_fps = args.preview_fps if args.preview_fps and args.preview_fps > 0 else video_fps
if args.preview_fps and args.preview_fps > 0 and (args.video is None or str(args.video).isdigit()):
cap.set(cv2.CAP_PROP_FPS, float(args.preview_fps))
frame_delay = 1.0 / preview_fps if args.realtime_playback and preview_fps and preview_fps > 1 else 0.0
yolo_interval = 1.0 / args.yolo_fps if args.yolo_fps and args.yolo_fps > 0 else 0.0
log(
f"FPS sorgente={format_fps_value(video_fps)} "
f"preview_target={format_fps_value(preview_fps)} "
f"yolo_target={format_fps_value(args.yolo_fps)}"
)
perf_writer = PerfLogWriter(
args.perf_log_path,
args.perf_log_flush_interval_sec,
args.perf_log_flush_lines,
)
log(f"Log tempistiche: {perf_writer.path.resolve()}")
perf_writer.write(
"ts\tframe_id\trun_yolo\tread_ms\tyolo_ms\ttrack_ms\tdraw_ms\tui_ms\t"
"snapshot_pause_ms\twms_wait_ms\tloop_ms\tloop_fps\tyolo_real_fps\t"
"src_fps\tpreview_target\tyolo_target\tdet\tlabels\ttracks\tactive\t"
"snapshots\tcommand"
)
tracker = LightweightTracker(
max_missed=args.max_track_missed,
min_match_score=args.min_match_score,
max_center_distance_ratio=args.max_center_distance_ratio,
)
navigator = NavigationController(args)
wms_client = WmsAsyncClient(args)
if not args.no_display:
cv2.namedWindow("flywms navigate", cv2.WINDOW_NORMAL)
cv2.namedWindow("flywms snapshot", cv2.WINDOW_NORMAL)
cv2.namedWindow("flywms etichetta", cv2.WINDOW_NORMAL)
cv2.namedWindow("flywms comandi", cv2.WINDOW_NORMAL)
apply_window_layout(args)
frame_id = 0
start_time = time.perf_counter()
last_stats = start_time
last_loop_end = start_time
yolo_total_ms = 0.0
yolo_cycles = 0
next_yolo_time = start_time
last_yolo_ms = 0.0
gaylords: list[Detection] = []
labels: list[Detection] = []
tracks: list[Track] = []
try:
while True:
frame_loop_start = time.perf_counter()
read_ms = 0.0
track_ms = 0.0
draw_ms = 0.0
ui_ms = 0.0
snapshot_pause_ms = 0.0
wms_wait_ms = 0.0
if frame_delay > 0:
now = time.perf_counter()
sleep_for = frame_delay - (now - last_loop_end)
if sleep_for > 0:
time.sleep(sleep_for)
last_loop_end = time.perf_counter()
read_t0 = time.perf_counter()
ok, frame = cap.read()
read_ms = (time.perf_counter() - read_t0) * 1000.0
if not ok:
log("Fine stream")
break
frame_id += 1
timestamp = time.perf_counter()
if args.max_frames > 0 and frame_id > args.max_frames:
log(f"Raggiunto --max-frames={args.max_frames}")
break
new_snapshots: list[NavigationSnapshot] = []
run_yolo = yolo_interval <= 0 or timestamp >= next_yolo_time
if run_yolo:
next_yolo_time = timestamp + yolo_interval
detections, last_yolo_ms = detector.detect(frame, args.min_confidence, args.input_size)
yolo_total_ms += last_yolo_ms
yolo_cycles += 1
gaylords = [
det for det in detections
if det.class_name.strip().lower() == args.target_class.strip().lower()
]
labels = [
det for det in detections
if det.class_name.strip().lower() == args.label_class.strip().lower()
]
track_t0 = time.perf_counter()
tracks = tracker.update(gaylords, frame_id, frame.shape[1])
if args.motion_report_interval > 0 and yolo_cycles % args.motion_report_interval == 0:
navigator.set_motion_text(
estimate_motion_from_tracks(tracks, args.motion_min_pixels)
)
for track in tracks:
if track.missed == 0:
snapshot = navigator.process_track(track, frame, frame_id, timestamp, labels)
if snapshot is not None:
new_snapshots.append(snapshot)
track_ms = (time.perf_counter() - track_t0) * 1000.0
if args.no_display and new_snapshots:
sequence_sec = (
args.label_move_sec
+ args.label_stabilization_sec
+ args.label_return_sec
)
if sequence_sec > 0:
pause_t0 = time.perf_counter()
time.sleep(sequence_sec)
snapshot_pause_ms += (time.perf_counter() - pause_t0) * 1000.0
for snapshot in new_snapshots:
if args.wms_enabled:
request_id = wms_client.submit(snapshot)
wait_t0 = time.perf_counter()
result = wms_client.wait_for_result(request_id, args.remote_ack_timeout_sec)
wms_wait_ms += (time.perf_counter() - wait_t0) * 1000.0
navigator.apply_wms_result(result, snapshot)
else:
if args.remote_ack_timeout_sec > 0:
wait_t0 = time.perf_counter()
time.sleep(args.remote_ack_timeout_sec)
wms_wait_ms += (time.perf_counter() - wait_t0) * 1000.0
navigator.simulate_remote_response(snapshot)
now = time.perf_counter()
if now - last_stats >= args.stats_interval:
elapsed = max(now - start_time, 0.001)
avg_yolo = yolo_total_ms / max(yolo_cycles, 1)
active = sum(1 for t in tracks if t.missed == 0)
log(
f"fps={frame_id / elapsed:.1f} yolo_fps={yolo_cycles / elapsed:.1f} "
f"avg_yolo={avg_yolo:.1f}ms det={len(gaylords)} labels={len(labels)} "
f"tracks={len(tracks)} active={active} "
f"snapshots={navigator.snapshot_counter} {navigator.motion_text}"
)
if args.debug_tracks:
for track in tracks:
cx, cy = bbox_center(track.bbox)
area_ratio = bbox_area(track.bbox) / float(frame.shape[0] * frame.shape[1])
log(
f" track={track.id} state={track.state} hits={track.hits} "
f"missed={track.missed} center=({cx:.0f},{cy:.0f}) "
f"area={area_ratio:.3f} trend={track.area_trend():+.2f} "
f"reason={track.last_candidate_reason}"
)
last_stats = now
if not args.no_display:
draw_t0 = time.perf_counter()
display = draw_navigation_debug(
frame,
tracks,
args,
labels,
navigator.label_movement_arrow,
)
draw_ms += (time.perf_counter() - draw_t0) * 1000.0
ui_t0 = time.perf_counter()
cv2.imshow("flywms navigate", display)
if navigator.last_ocr_payload_frame is not None:
snapshot_display = resize_preview(navigator.last_ocr_payload_frame, args.preview_width)
cv2.imshow("flywms snapshot", snapshot_display)
if navigator.last_label_payload_frame is not None and not new_snapshots:
label_display = resize_preview(navigator.last_label_payload_frame, args.preview_width)
cv2.imshow("flywms etichetta", label_display)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
ui_ms += (time.perf_counter() - ui_t0) * 1000.0
if new_snapshots:
stop_requested = False
for snapshot in new_snapshots:
navigator.set_label_sequence_phase(snapshot, "move")
ui_t0 = time.perf_counter()
cv2.imshow("flywms navigate", display)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
ui_ms += (time.perf_counter() - ui_t0) * 1000.0
wait_t0 = time.perf_counter()
key = cv2.waitKey(max(1, int(args.label_move_sec * 1000))) & 0xFF
snapshot_pause_ms += (time.perf_counter() - wait_t0) * 1000.0
if key in (27, ord("q")):
log("Interrotto da tastiera")
stop_requested = True
break
navigator.set_label_sequence_phase(snapshot, "stabilize")
ui_t0 = time.perf_counter()
cv2.imshow("flywms navigate", display)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
ui_ms += (time.perf_counter() - ui_t0) * 1000.0
wait_t0 = time.perf_counter()
key = cv2.waitKey(max(1, int(args.label_stabilization_sec * 1000))) & 0xFF
snapshot_pause_ms += (time.perf_counter() - wait_t0) * 1000.0
if key in (27, ord("q")):
log("Interrotto da tastiera")
stop_requested = True
break
navigator.set_label_sequence_phase(snapshot, "capture")
flash_display = apply_flash(display, args.flash_alpha)
ui_t0 = time.perf_counter()
cv2.imshow("flywms navigate", flash_display)
if navigator.last_label_payload_frame is not None:
flash_label = apply_flash(
resize_preview(navigator.last_label_payload_frame, args.preview_width),
args.flash_alpha,
)
cv2.imshow("flywms etichetta", flash_label)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
ui_ms += (time.perf_counter() - ui_t0) * 1000.0
wait_t0 = time.perf_counter()
key = cv2.waitKey(500) & 0xFF
snapshot_pause_ms += (time.perf_counter() - wait_t0) * 1000.0
if key in (27, ord("q")):
log("Interrotto da tastiera")
stop_requested = True
break
request_id = wms_client.submit(snapshot) if args.wms_enabled else None
navigator.set_label_sequence_phase(snapshot, "return")
ui_t0 = time.perf_counter()
cv2.imshow("flywms navigate", display)
if navigator.last_label_payload_frame is not None:
cv2.imshow(
"flywms etichetta",
resize_preview(navigator.last_label_payload_frame, args.preview_width),
)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
ui_ms += (time.perf_counter() - ui_t0) * 1000.0
wait_t0 = time.perf_counter()
key = cv2.waitKey(max(1, int(args.label_return_sec * 1000))) & 0xFF
snapshot_pause_ms += (time.perf_counter() - wait_t0) * 1000.0
if key in (27, ord("q")):
log("Interrotto da tastiera")
stop_requested = True
break
navigator.set_label_sequence_phase(snapshot, "wait_wms")
ui_t0 = time.perf_counter()
cv2.imshow("flywms navigate", display)
if navigator.last_label_payload_frame is not None:
cv2.imshow(
"flywms etichetta",
resize_preview(navigator.last_label_payload_frame, args.preview_width),
)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
ui_ms += (time.perf_counter() - ui_t0) * 1000.0
wait_deadline = time.perf_counter() + max(0.0, args.remote_ack_timeout_sec)
wms_result: WmsResult | None = None
if args.wms_enabled:
wait_t0 = time.perf_counter()
while time.perf_counter() <= wait_deadline:
wms_result = wms_client.wait_for_result(request_id, 0.05)
key = cv2.waitKey(1) & 0xFF
if key in (27, ord("q")):
log("Interrotto da tastiera")
stop_requested = True
break
if wms_result is not None:
break
wms_wait_ms += (time.perf_counter() - wait_t0) * 1000.0
else:
wait_t0 = time.perf_counter()
key = cv2.waitKey(max(1, int(args.remote_ack_timeout_sec * 1000))) & 0xFF
wms_wait_ms += (time.perf_counter() - wait_t0) * 1000.0
if key in (27, ord("q")):
log("Interrotto da tastiera")
stop_requested = True
if stop_requested:
break
if args.wms_enabled:
navigator.apply_wms_result(wms_result, snapshot)
else:
navigator.simulate_remote_response(snapshot)
cv2.imshow(
"flywms comandi",
draw_commands_window(navigator.last_command_lines, navigator.motion_text),
)
navigator.label_movement_arrow = None
if stop_requested:
break
ui_t0 = time.perf_counter()
key = cv2.waitKey(1) & 0xFF
ui_ms += (time.perf_counter() - ui_t0) * 1000.0
if key in (27, ord("q")):
log("Interrotto da tastiera")
break
loop_end = time.perf_counter()
elapsed = max(loop_end - start_time, 0.001)
loop_ms = (loop_end - frame_loop_start) * 1000.0
active = sum(1 for t in tracks if t.missed == 0)
perf_writer.write(
f"{time.strftime('%Y-%m-%d %H:%M:%S')}\t{frame_id}\t{int(run_yolo)}\t"
f"{read_ms:.3f}\t{last_yolo_ms if run_yolo else 0.0:.3f}\t{track_ms:.3f}\t"
f"{draw_ms:.3f}\t{ui_ms:.3f}\t{snapshot_pause_ms:.3f}\t{wms_wait_ms:.3f}\t"
f"{loop_ms:.3f}\t{frame_id / elapsed:.3f}\t{yolo_cycles / elapsed:.3f}\t"
f"{format_fps_value(video_fps)}\t{format_fps_value(preview_fps)}\t{format_fps_value(args.yolo_fps)}\t"
f"{len(gaylords)}\t{len(labels)}\t{len(tracks)}\t{active}\t{navigator.snapshot_counter}\t"
f"{json.dumps(navigator.last_command_text, ensure_ascii=True)}"
)
finally:
cap.release()
wms_client.close()
perf_writer.close()
if not args.no_display:
cv2.destroyAllWindows()
log(f"Snapshot salvati in: {Path(args.snapshot_output_dir).resolve()}")
return 0
if __name__ == "__main__":
raise SystemExit(main())