import argparse import asyncio import configparser import ctypes import json import random import re import shutil import subprocess import threading import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path import cv2 import numpy as np import uvicorn from fastapi import FastAPI, File, Form, UploadFile try: import dearpygui.dearpygui as dpg except ModuleNotFoundError: dpg = None DEFAULT_CONFIG_PATH = "flywms_navigation.ini" CUDA_MIN_PIXELS = 640 * 360 def opencv_cuda_available() -> bool: try: return hasattr(cv2, "cuda") and cv2.cuda.getCudaEnabledDeviceCount() > 0 except cv2.error: return False OPENCV_CUDA_AVAILABLE = opencv_cuda_available() def cuda_resize( image: np.ndarray, size: tuple[int, int], interpolation: int = cv2.INTER_LINEAR, min_pixels: int = CUDA_MIN_PIXELS, ) -> np.ndarray: if not OPENCV_CUDA_AVAILABLE or image.size < min_pixels: return cv2.resize(image, size, interpolation=interpolation) try: gpu = cv2.cuda_GpuMat() gpu.upload(image) return cv2.cuda.resize(gpu, size, interpolation=interpolation).download() except cv2.error: return cv2.resize(image, size, interpolation=interpolation) def cuda_cvt_color( image: np.ndarray, code: int, min_pixels: int = CUDA_MIN_PIXELS, ) -> np.ndarray: if not OPENCV_CUDA_AVAILABLE or image.size < min_pixels: return cv2.cvtColor(image, code) try: gpu = cv2.cuda_GpuMat() gpu.upload(image) return cv2.cuda.cvtColor(gpu, code).download() except cv2.error: return cv2.cvtColor(image, code) app = FastAPI(title="FlyWMS Demo WMS Server") server_state = { "received_dir": "wms_received", "fake_ack_mode": "always-ack", "fake_processing_sec": 0.5, "ui_enabled": True, "ocr_enabled": True, "ocr_mode": "paddleocr", "tesseract_cmd": "", "paddle_python": r"C:\devel\yolo-ocr\.venv-paddle-cpu\Scripts\python.exe", "paddle_worker_script": str(Path(__file__).with_name("flywms_paddleocr_worker.py")), "paddle_target_heights": "96", "paddle_variant_set": "fast", "paddle_expected_digits": 6, "paddle_min_votes": 2, "paddle_min_confidence": 0.70, "fake_ocr_prefix": "UDC", "undetermined_code_text": "udc non determinato", "fake_ocr_delay_sec": 0.2, "operator": "udrone", "site": "demo-magazzino", "counter": 0, } state_lock = threading.Lock() ui_lock = threading.Lock() ui_state = { "image": None, "payload_lines": ["In attesa di payload WMS"], "updated": False, "stop": False, } @dataclass(frozen=True) class OcrServerResult: text: str raw_text: str confidence: float backend: str fallback_used: bool votes: int = 0 variant: str = "" class PaddleOcrWorker: def __init__( self, python_path: str, script_path: str, target_heights: str, variant_set: str, expected_digits: int, min_votes: int, min_confidence: float, ) -> None: self.python_path = python_path self.script_path = script_path self.target_heights = target_heights self.variant_set = variant_set self.expected_digits = expected_digits self.min_votes = min_votes self.min_confidence = min_confidence self.lock = threading.Lock() self.process: subprocess.Popen[str] | None = None def close(self) -> None: with self.lock: process = self.process self.process = None if process is None: return try: if process.stdin: process.stdin.write("__quit__\n") process.stdin.flush() except Exception: pass try: process.wait(timeout=2.0) except subprocess.TimeoutExpired: process.kill() def predict(self, image_path: Path) -> dict[str, object]: with self.lock: process = self._ensure_process() if process.stdin is None or process.stdout is None: raise RuntimeError("worker stdio non disponibile") request = { "image_path": str(image_path), "target_heights": self.target_heights, "variant_set": self.variant_set, "expected_digits": self.expected_digits, "min_votes": self.min_votes, "min_confidence": self.min_confidence, } process.stdin.write(json.dumps(request, ensure_ascii=True) + "\n") process.stdin.flush() line = process.stdout.readline() if not line: self.process = None raise RuntimeError("worker PaddleOCR terminato") return json.loads(line) def _ensure_process(self) -> subprocess.Popen[str]: if self.process is not None and self.process.poll() is None: return self.process python_path = Path(self.python_path) script_path = Path(self.script_path) if not python_path.exists(): raise RuntimeError(f"python PaddleOCR non trovato: {python_path}") if not script_path.exists(): raise RuntimeError(f"worker PaddleOCR non trovato: {script_path}") self.process = subprocess.Popen( [str(python_path), str(script_path)], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, encoding="utf-8", errors="replace", creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0, ) assert self.process.stdout is not None ready_line = self.process.stdout.readline() if not ready_line: raise RuntimeError("worker PaddleOCR non avviato") ready = json.loads(ready_line) if not ready.get("ready"): raise RuntimeError(f"worker PaddleOCR risposta inattesa: {ready}") return self.process _paddle_worker: PaddleOcrWorker | None = None _paddle_worker_lock = threading.Lock() def log(msg: str) -> None: print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) def load_server_config(path_str: str) -> dict[str, object]: defaults: dict[str, object] = { "wms_server_host": "0.0.0.0", "wms_server_port": 8088, "wms_received_dir": "wms_received", "wms_fake_ack_mode": "always-ack", "wms_fake_processing_sec": 0.5, "wms_ui_enabled": True, "wms_ocr_enabled": True, "wms_ocr_mode": "paddleocr", "wms_tesseract_cmd": "", "wms_paddle_python": r"C:\devel\yolo-ocr\.venv-paddle-cpu\Scripts\python.exe", "wms_paddle_worker_script": str(Path(__file__).with_name("flywms_paddleocr_worker.py")), "wms_paddle_target_heights": "96", "wms_paddle_variant_set": "fast", "wms_paddle_expected_digits": 6, "wms_paddle_min_votes": 2, "wms_paddle_min_confidence": 0.70, "wms_fake_ocr_prefix": "UDC", "wms_undetermined_code_text": "udc non determinato", "wms_fake_ocr_delay_sec": 0.2, "wms_operator": "udrone", "wms_site": "demo-magazzino", } path = Path(path_str) if not path.exists(): return defaults parser = configparser.ConfigParser() parser.read(path, encoding="utf-8") section = parser["navigation"] if parser.has_section("navigation") else {} for key, default_value in defaults.items(): if key not in section: continue if isinstance(default_value, bool): defaults[key] = parser.getboolean("navigation", key, fallback=default_value) elif isinstance(default_value, int): defaults[key] = parser.getint("navigation", key, fallback=default_value) elif isinstance(default_value, float): defaults[key] = parser.getfloat("navigation", key, fallback=default_value) else: defaults[key] = section.get(key, str(default_value)).strip() return defaults def parse_args(): pre = argparse.ArgumentParser(add_help=False) pre.add_argument("--config", default=DEFAULT_CONFIG_PATH, help="File configurazione INI") pre_args, _ = pre.parse_known_args() defaults = load_server_config(pre_args.config) ap = argparse.ArgumentParser(parents=[pre]) ap.add_argument("--host", default=defaults["wms_server_host"], help="Host bind server") ap.add_argument("--port", type=int, default=defaults["wms_server_port"], help="Porta server") ap.add_argument("--received-dir", default=defaults["wms_received_dir"], help="Directory upload ricevuti") ap.add_argument("--ui-backend", choices=["auto", "opencv", "dearpygui"], default="dearpygui", help="Backend UI server WMS") ap.add_argument( "--fake-ack-mode", choices=["always-ack", "always-nack", "alternate", "random"], default=defaults["wms_fake_ack_mode"], help="Modalita risposta WMS demo", ) ap.add_argument( "--fake-processing-sec", type=float, default=defaults["wms_fake_processing_sec"], help="Ritardo elaborazione finta", ) ap.add_argument("--ui-enabled", action="store_true", default=defaults["wms_ui_enabled"], help="Mostra finestre OpenCV server") ap.add_argument("--no-ui", action="store_false", dest="ui_enabled", help="Disabilita finestre OpenCV server") ap.add_argument("--ocr-enabled", action="store_true", default=defaults["wms_ocr_enabled"], help="Esegue OCR reale se disponibile") ap.add_argument("--no-ocr", action="store_false", dest="ocr_enabled", help="Disabilita OCR reale e usa fallback demo") ap.add_argument("--ocr-mode", choices=["paddleocr", "easyocr", "tesseract", "fake"], default=defaults["wms_ocr_mode"], help="Motore OCR server") ap.add_argument("--tesseract-cmd", default=defaults["wms_tesseract_cmd"], help="Percorso esplicito tesseract.exe") ap.add_argument("--paddle-python", default=defaults["wms_paddle_python"], help="Python del virtualenv PaddleOCR") ap.add_argument("--paddle-worker-script", default=defaults["wms_paddle_worker_script"], help="Script worker PaddleOCR") ap.add_argument("--paddle-target-heights", default=defaults["wms_paddle_target_heights"], help="Altezze resize PaddleOCR, es. 96 oppure 64,96,128") ap.add_argument("--paddle-variant-set", choices=["fast", "balanced", "full"], default=defaults["wms_paddle_variant_set"], help="Numero varianti preprocessing PaddleOCR") ap.add_argument("--paddle-expected-digits", type=int, default=defaults["wms_paddle_expected_digits"], help="Numero cifre atteso codice UDC") ap.add_argument("--paddle-min-votes", type=int, default=defaults["wms_paddle_min_votes"], help="Consenso minimo varianti PaddleOCR") ap.add_argument("--paddle-min-confidence", type=float, default=defaults["wms_paddle_min_confidence"], help="Confidenza minima PaddleOCR") ap.add_argument("--fake-ocr-prefix", default=defaults["wms_fake_ocr_prefix"], help="Prefisso fallback OCR demo") ap.add_argument("--undetermined-code-text", default=defaults["wms_undetermined_code_text"], help="Testo usato se OCR non determina il codice") ap.add_argument("--fake-ocr-delay-sec", type=float, default=defaults["wms_fake_ocr_delay_sec"], help="Ritardo OCR demo/fallback") ap.add_argument("--operator", default=defaults["wms_operator"], help="Operatore WMS demo") ap.add_argument("--site", default=defaults["wms_site"], help="Sito/magazzino demo") return ap.parse_args() @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} @app.post("/api/v1/navigation-snapshot") async def receive_navigation_snapshot( metadata: str = Form(...), label_image: UploadFile = File(...), gaylord_image: UploadFile | None = File(None), ) -> dict[str, object]: started = time.perf_counter() try: meta = json.loads(metadata) except json.JSONDecodeError: meta = {"raw_metadata": metadata} with state_lock: server_state["counter"] = int(server_state["counter"]) + 1 counter = int(server_state["counter"]) request_id = str(meta.get("request_id") or f"server-{counter:06d}") snapshot_id = meta.get("snapshot_id", counter) received_dir = Path(str(server_state["received_dir"])) request_dir = received_dir / f"{counter:06d}_{safe_name(request_id)}" request_dir.mkdir(parents=True, exist_ok=True) label_path = request_dir / safe_upload_name(label_image.filename, "label.jpg") label_bytes = await label_image.read() label_path.write_bytes(label_bytes) gaylord_path = None gaylord_bytes = b"" if gaylord_image is not None: gaylord_path = request_dir / safe_upload_name(gaylord_image.filename, "gaylord.jpg") gaylord_bytes = await gaylord_image.read() gaylord_path.write_bytes(gaylord_bytes) (request_dir / "metadata.json").write_text( json.dumps(meta, indent=2, ensure_ascii=True), encoding="utf-8", ) image = cv2.imdecode(np.frombuffer(label_bytes, dtype=np.uint8), cv2.IMREAD_COLOR) ocr_result = await asyncio.to_thread(run_server_ocr, image, int(snapshot_id), label_path) fake_processing_sec = float(server_state["fake_processing_sec"]) if fake_processing_sec > 0: await asyncio.sleep(fake_processing_sec) status = choose_status(str(server_state["fake_ack_mode"]), counter) udc_code = ocr_result.text if status == "ACK" else "" message = ( f"codice valido su WMS: {udc_code}" if status == "ACK" else f"codice non riconosciuto: {ocr_result.text}" ) processing_ms = (time.perf_counter() - started) * 1000.0 received_at = datetime.now(timezone.utc).isoformat() wms_payload = { "request_id": request_id, "client_id": meta.get("client_id", ""), "site": server_state["site"], "operator": server_state["operator"], "received_at": received_at, "snapshot_id": snapshot_id, "position": meta.get("simulated_position", ""), "track_id": meta.get("track_id", ""), "udc_code": udc_code, "ocr_raw_text": ocr_result.raw_text, "ocr_confidence": ocr_result.confidence, "ocr_backend": ocr_result.backend, "ocr_fallback_used": ocr_result.fallback_used, "ocr_votes": ocr_result.votes, "ocr_variant": ocr_result.variant, "validation_status": status, "validation_message": message, "label_bbox": meta.get("label_bbox", []), "gaylord_bbox": meta.get("gaylord_bbox", []), "movement_vector_px": meta.get("movement_vector_px", {}), "label_image_path": str(label_path), "gaylord_image_path": str(gaylord_path) if gaylord_path else "", } (request_dir / "wms_payload.json").write_text( json.dumps(wms_payload, indent=2, ensure_ascii=True), encoding="utf-8", ) update_ui_state(image, wms_payload) log( f"received request_id={request_id} snapshot={snapshot_id} " f"label_bytes={len(label_bytes)} gaylord_bytes={len(gaylord_bytes)} " f"ocr='{ocr_result.text}' status={status}" ) return { "status": status, "request_id": request_id, "message": message, "ocr_text": udc_code, "ocr_raw_text": ocr_result.raw_text, "ocr_confidence": ocr_result.confidence, "ocr_backend": ocr_result.backend, "ocr_fallback_used": ocr_result.fallback_used, "ocr_votes": ocr_result.votes, "ocr_variant": ocr_result.variant, "processing_ms": round(processing_ms, 1), "saved_dir": str(request_dir), "wms_payload_path": str(request_dir / "wms_payload.json"), "label_image_path": str(label_path), "gaylord_image_path": str(gaylord_path) if gaylord_path else "", } def run_server_ocr(image: np.ndarray | None, snapshot_id: int, label_path: Path | None = None) -> OcrServerResult: fake_delay = float(server_state["fake_ocr_delay_sec"]) if fake_delay > 0: time.sleep(fake_delay) fallback = str(server_state["undetermined_code_text"]) if image is None: return OcrServerResult(fallback, "", 0.0, "fallback", True) if not bool(server_state["ocr_enabled"]) or str(server_state["ocr_mode"]) == "fake": return OcrServerResult(fallback, fallback, 1.0, "fake", True) if str(server_state["ocr_mode"]) == "paddleocr": if label_path is None: return OcrServerResult(fallback, "", 0.0, "paddleocr-missing-path", True) return run_paddleocr_ocr(label_path, fallback) if str(server_state["ocr_mode"]) == "tesseract": return run_tesseract_ocr(image, fallback) try: reader = get_easyocr_reader() ocr_input = preprocess_label_for_ocr(image) results = reader.readtext( ocr_input, allowlist="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-", detail=1, paragraph=False, ) except Exception as exc: log(f"OCR fallback: {exc}") return OcrServerResult(fallback, "", 0.0, "easyocr-error", True) raw_parts: list[str] = [] best_text = "" best_conf = 0.0 for item in results: text = str(item[1]).strip() conf = float(item[2]) if len(item) > 2 else 0.0 if text: raw_parts.append(text) candidate = normalize_ocr_code(text) if candidate and conf >= best_conf: best_text = candidate best_conf = conf raw_text = " ".join(raw_parts) if not best_text: digits = re.sub(r"\D+", "", raw_text) if digits: best_text = digits best_conf = max(best_conf, 0.01) if best_text: return OcrServerResult(best_text, raw_text, best_conf, "easyocr", False) return OcrServerResult(fallback, raw_text, best_conf, "easyocr-fallback", True) def get_paddle_worker() -> PaddleOcrWorker: global _paddle_worker with _paddle_worker_lock: if _paddle_worker is None: _paddle_worker = PaddleOcrWorker( python_path=str(server_state["paddle_python"]), script_path=str(server_state["paddle_worker_script"]), target_heights=str(server_state["paddle_target_heights"]), variant_set=str(server_state["paddle_variant_set"]), expected_digits=int(server_state["paddle_expected_digits"]), min_votes=int(server_state["paddle_min_votes"]), min_confidence=float(server_state["paddle_min_confidence"]), ) return _paddle_worker def close_paddle_worker() -> None: global _paddle_worker with _paddle_worker_lock: worker = _paddle_worker _paddle_worker = None if worker is not None: worker.close() def run_paddleocr_ocr(label_path: Path, fallback: str) -> OcrServerResult: try: response = get_paddle_worker().predict(label_path) except Exception as exc: log(f"PaddleOCR fallback: {exc}") close_paddle_worker() return OcrServerResult(fallback, "", 0.0, "paddleocr-error", True) text = str(response.get("text") or "") best_text = str(response.get("best_text") or text) raw_text = str(response.get("raw_text") or best_text) confidence = float(response.get("confidence") or 0.0) votes = int(response.get("votes") or 0) variant = str(response.get("variant") or "") if response.get("ok") and text: return OcrServerResult(text, raw_text, confidence, "paddleocr", False, votes=votes, variant=variant) reason = str(response.get("reason") or "fallback") raw_with_reason = f"{raw_text} [{reason}]".strip() return OcrServerResult(fallback, raw_with_reason, confidence, "paddleocr-fallback", True, votes=votes, variant=variant) def run_tesseract_ocr(image: np.ndarray, fallback: str) -> OcrServerResult: try: import pytesseract except Exception as exc: log(f"Tesseract fallback: pytesseract non disponibile: {exc}") return OcrServerResult(fallback, "", 0.0, "tesseract-unavailable", True) tesseract_cmd = str(server_state.get("tesseract_cmd") or "").strip() if tesseract_cmd: if not Path(tesseract_cmd).exists(): log(f"Tesseract fallback: binario non trovato: {tesseract_cmd}") return OcrServerResult(fallback, "", 0.0, "tesseract-missing", True) pytesseract.pytesseract.tesseract_cmd = tesseract_cmd elif shutil.which("tesseract") is None: log("Tesseract fallback: tesseract.exe non trovato nel PATH") return OcrServerResult(fallback, "", 0.0, "tesseract-missing", True) variants = build_tesseract_variants(image) candidates: list[tuple[str, float, str]] = [] config = "--psm 7 --oem 3 -c tessedit_char_whitelist=0123456789" for name, variant in variants: try: data = pytesseract.image_to_data( variant, config=config, output_type=pytesseract.Output.DICT, ) except Exception as exc: log(f"Tesseract fallback su variante {name}: {exc}") return OcrServerResult(fallback, "", 0.0, "tesseract-error", True) texts = [text.strip() for text in data.get("text", []) if text and text.strip()] confs: list[float] = [] for conf in data.get("conf", []): try: value = float(conf) except (TypeError, ValueError): continue if value >= 0: confs.append(value / 100.0) raw = "".join(texts) digits = re.sub(r"\D+", "", raw) confidence = sum(confs) / len(confs) if confs else 0.0 if digits: candidates.append((digits, confidence, name)) if not candidates: return OcrServerResult(fallback, "", 0.0, "tesseract-fallback", True) votes: dict[str, list[float]] = {} for digits, confidence, _ in candidates: votes.setdefault(digits, []).append(confidence) best_digits, best_confs = max( votes.items(), key=lambda item: (len(item[1]), sum(item[1]) / max(len(item[1]), 1)), ) consensus_count = len(best_confs) avg_conf = sum(best_confs) / consensus_count raw_text = "; ".join(f"{digits}@{conf:.2f}/{name}" for digits, conf, name in candidates) if consensus_count < 2 or avg_conf < 0.45: return OcrServerResult(fallback, raw_text, avg_conf, "tesseract-ambiguous", True) return OcrServerResult(best_digits, raw_text, avg_conf, "tesseract", False) def build_tesseract_variants(image: np.ndarray) -> list[tuple[str, np.ndarray]]: h, w = image.shape[:2] scale = 4.0 if w < 500 else 2.0 resized = cuda_resize(image, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_CUBIC) gray = cuda_cvt_color(resized, cv2.COLOR_BGR2GRAY) gray = cv2.bilateralFilter(gray, 7, 45, 45) sharpen = cv2.addWeighted(gray, 1.6, cv2.GaussianBlur(gray, (0, 0), 1.2), -0.6, 0) _, otsu = cv2.threshold(sharpen, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) adaptive = cv2.adaptiveThreshold( sharpen, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 7, ) return [ ("gray", gray), ("sharpen", sharpen), ("otsu", otsu), ("adaptive", adaptive), ("otsu_inv", cv2.bitwise_not(otsu)), ] _easyocr_reader = None _easyocr_lock = threading.Lock() def get_easyocr_reader(): global _easyocr_reader with _easyocr_lock: if _easyocr_reader is None: import easyocr _easyocr_reader = easyocr.Reader(["en"], gpu=False, verbose=False) return _easyocr_reader def preprocess_label_for_ocr(image: np.ndarray) -> np.ndarray: h, w = image.shape[:2] scale = 3.0 if w < 450 else 2.0 resized = cuda_resize(image, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_CUBIC) gray = cuda_cvt_color(resized, cv2.COLOR_BGR2GRAY) gray = cv2.bilateralFilter(gray, 7, 45, 45) return cuda_cvt_color(gray, cv2.COLOR_GRAY2BGR) def normalize_ocr_code(text: str) -> str: compact = re.sub(r"[^0-9A-Za-z-]+", "", text).strip("-") digits = re.sub(r"\D+", "", compact) return digits if digits else compact def update_ui_state(image: np.ndarray | None, payload: dict[str, object]) -> None: if not bool(server_state["ui_enabled"]): return lines = [ "ULTIMO PAYLOAD WMS", f"request_id: {payload.get('request_id', '')}", f"client_id: {payload.get('client_id', '')}", f"site: {payload.get('site', '')}", f"operator: {payload.get('operator', '')}", f"received_at: {payload.get('received_at', '')}", f"snapshot_id: {payload.get('snapshot_id', '')}", f"position: {payload.get('position', '')}", f"track_id: {payload.get('track_id', '')}", f"udc_code: {payload.get('udc_code', '')}", f"ocr_raw: {payload.get('ocr_raw_text', '')}", f"ocr_backend: {payload.get('ocr_backend', '')}", f"ocr_fallback: {payload.get('ocr_fallback_used', '')}", f"ocr_votes: {payload.get('ocr_votes', '')}", f"ocr_variant: {payload.get('ocr_variant', '')}", f"status: {payload.get('validation_status', '')}", f"label_bbox: {payload.get('label_bbox', '')}", f"movement: {payload.get('movement_vector_px', '')}", ] with ui_lock: ui_state["image"] = None if image is None else image.copy() ui_state["payload_lines"] = lines ui_state["updated"] = True def choose_status(mode: str, counter: int) -> str: if mode == "always-nack": return "NACK" if mode == "alternate": return "ACK" if counter % 2 == 1 else "NACK" if mode == "random": return "ACK" if random.random() >= 0.5 else "NACK" return "ACK" def safe_upload_name(filename: str | None, fallback: str) -> str: name = Path(filename or fallback).name return safe_name(name) or fallback def safe_name(value: str) -> str: allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789._-" return "".join(ch if ch in allowed else "_" for ch in value)[:160] def choose_ui_backend(requested: str) -> str: if requested == "opencv": return "opencv" if requested == "dearpygui": if dpg is None: raise RuntimeError("DearPyGUI non disponibile") return "dearpygui" if dpg is not None: return "dearpygui" return "opencv" def placeholder_frame(width: int, height: int, text: str) -> np.ndarray: canvas = np.full((height, width, 3), 235, dtype=np.uint8) cv2.putText( canvas, text, (24, height // 2), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (30, 30, 30), 2, cv2.LINE_AA, ) return canvas def frame_to_texture_data(frame: np.ndarray, target_w: int, target_h: int) -> list[float]: src_h, src_w = frame.shape[:2] if src_h <= 0 or src_w <= 0: canvas = np.zeros((target_h, target_w, 3), dtype=np.uint8) else: scale = min(target_w / float(src_w), target_h / float(src_h)) draw_w = max(1, int(round(src_w * scale))) draw_h = max(1, int(round(src_h * scale))) resized = cv2.resize(frame, (draw_w, draw_h), interpolation=cv2.INTER_AREA if scale < 1.0 else cv2.INTER_LINEAR) canvas = np.full((target_h, target_w, 3), 18, dtype=np.uint8) x0 = (target_w - draw_w) // 2 y0 = (target_h - draw_h) // 2 canvas[y0:y0 + draw_h, x0:x0 + draw_w] = resized rgba = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGBA) return (rgba.astype(np.float32).ravel() / 255.0).tolist() def restore_window_by_title(title: str) -> None: if not hasattr(ctypes, "windll"): return user32 = ctypes.windll.user32 hwnd = user32.FindWindowW(None, title) if hwnd: user32.ShowWindow(hwnd, 9) # SW_RESTORE user32.SetForegroundWindow(hwnd) def server_ui_loop() -> None: try: cv2.namedWindow("wms immagine ricevuta", cv2.WINDOW_NORMAL) cv2.namedWindow("wms payload", cv2.WINDOW_NORMAL) except cv2.error as exc: log(f"UI OpenCV disabilitata: highgui non disponibile ({exc})") with ui_lock: ui_state["stop"] = True return cv2.resizeWindow("wms immagine ricevuta", 640, 360) cv2.resizeWindow("wms payload", 900, 560) cv2.moveWindow("wms immagine ricevuta", 40, 40) cv2.moveWindow("wms payload", 720, 40) while True: with ui_lock: if ui_state["stop"]: break image = None if ui_state["image"] is None else ui_state["image"].copy() lines = list(ui_state["payload_lines"]) if image is None: image_display = np.full((360, 640, 3), 240, dtype=np.uint8) cv2.putText( image_display, "In attesa immagine etichetta", (30, 180), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 0), 2, cv2.LINE_AA, ) else: image_display = resize_preview(image, 640) cv2.imshow("wms immagine ricevuta", image_display) cv2.imshow("wms payload", draw_payload_window(lines)) key = cv2.waitKey(100) & 0xFF if key in (27, ord("q")): with ui_lock: ui_state["stop"] = True break cv2.destroyWindow("wms immagine ricevuta") cv2.destroyWindow("wms payload") def server_dpg_ui_loop() -> None: if dpg is None: log("UI DearPyGUI non disponibile, impossibile avviare il monitor WMS") with ui_lock: ui_state["stop"] = True return image_texture = "wms_image_texture" payload_text = "wms_payload_text" status_text = "wms_status_text" image_w = 720 image_h = 420 def create_texture(tag: str, data: list[float], width: int, height: int) -> None: dpg.add_dynamic_texture(width=width, height=height, default_value=data, tag=tag) def set_texture(frame: np.ndarray) -> None: dpg.set_value(image_texture, frame_to_texture_data(frame, image_w, image_h)) dpg.create_context() with dpg.texture_registry(show=False): create_texture( image_texture, frame_to_texture_data(placeholder_frame(image_w, image_h, "In attesa immagine etichetta"), image_w, image_h), image_w, image_h, ) with dpg.window(label="FlyWMS WMS Server", width=1450, height=900): with dpg.group(horizontal=True): with dpg.child_window(width=760, height=840, border=True): dpg.add_text("Ultima immagine ricevuta") dpg.add_image(image_texture) with dpg.child_window(width=640, height=840, border=True): dpg.add_text("Stato") dpg.add_text("Server WMS in attesa payload", tag=status_text, wrap=600) dpg.add_separator() dpg.add_text("Payload") dpg.add_input_text(tag=payload_text, multiline=True, readonly=True, width=600, height=720, default_value="In attesa di payload WMS") dpg.create_viewport(title="FlyWMS WMS Server", width=1450, height=900) dpg.setup_dearpygui() dpg.show_viewport() time.sleep(0.05) restore_window_by_title("FlyWMS WMS Server") try: while dpg.is_dearpygui_running(): with ui_lock: stop = bool(ui_state["stop"]) image = None if ui_state["image"] is None else ui_state["image"].copy() lines = list(ui_state["payload_lines"]) if stop: break if image is not None: set_texture(image) dpg.set_value(status_text, f"Richieste ricevute: {server_state['counter']}") dpg.set_value(payload_text, "\n".join(lines)) dpg.render_dearpygui_frame() time.sleep(0.03) finally: dpg.destroy_context() def resize_preview(frame: np.ndarray, max_width: int) -> np.ndarray: h, w = frame.shape[:2] if max_width <= 0 or w <= max_width: return frame scale = max_width / float(w) return cuda_resize(frame, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LINEAR) def draw_payload_window(lines: list[str]) -> np.ndarray: canvas = np.full((560, 900, 3), 245, dtype=np.uint8) y = 42 for idx, line in enumerate(lines[:18]): color = (0, 0, 160) if idx == 0 else (0, 0, 0) scale = 0.9 if idx == 0 else 0.63 thickness = 2 if idx == 0 else 1 cv2.putText( canvas, str(line)[:110], (24, y), cv2.FONT_HERSHEY_SIMPLEX, scale, color, thickness, cv2.LINE_AA, ) y += 34 if idx == 0 else 30 return canvas def main() -> int: args = parse_args() ui_backend = choose_ui_backend(args.ui_backend) if args.ui_enabled else "disabled" server_state["received_dir"] = args.received_dir server_state["fake_ack_mode"] = args.fake_ack_mode server_state["fake_processing_sec"] = args.fake_processing_sec server_state["ui_enabled"] = args.ui_enabled server_state["ocr_enabled"] = args.ocr_enabled server_state["ocr_mode"] = args.ocr_mode server_state["tesseract_cmd"] = args.tesseract_cmd server_state["paddle_python"] = args.paddle_python server_state["paddle_worker_script"] = args.paddle_worker_script server_state["paddle_target_heights"] = args.paddle_target_heights server_state["paddle_variant_set"] = args.paddle_variant_set server_state["paddle_expected_digits"] = args.paddle_expected_digits server_state["paddle_min_votes"] = args.paddle_min_votes server_state["paddle_min_confidence"] = args.paddle_min_confidence server_state["fake_ocr_prefix"] = args.fake_ocr_prefix server_state["undetermined_code_text"] = args.undetermined_code_text server_state["fake_ocr_delay_sec"] = args.fake_ocr_delay_sec server_state["operator"] = args.operator server_state["site"] = args.site Path(args.received_dir).mkdir(parents=True, exist_ok=True) log( f"FlyWMS WMS demo server host={args.host} port={args.port} " f"mode={args.fake_ack_mode} received_dir={args.received_dir}" ) log(f"OpenCV CUDA: {'attivo' if OPENCV_CUDA_AVAILABLE else 'non disponibile'}") log(f"UI backend: {ui_backend}") ui_thread = None if args.ui_enabled: ui_target = server_dpg_ui_loop if ui_backend == "dearpygui" else server_ui_loop ui_thread = threading.Thread(target=ui_target, name="wms-server-ui", daemon=True) ui_thread.start() try: uvicorn.run(app, host=args.host, port=args.port, log_level="info") finally: close_paddle_worker() with ui_lock: ui_state["stop"] = True if ui_thread is not None: ui_thread.join(timeout=2.0) return 0 if __name__ == "__main__": raise SystemExit(main())