"""Lightweight standalone barcode client for the warehouse WMS.""" from __future__ import annotations import asyncio import sys import tkinter as tk from concurrent.futures import Future from tkinter import messagebox, ttk from typing import Callable from async_loop_singleton import get_global_loop, stop_global_loop from async_msssql_query import AsyncMSSQLClient from barcode_repository import BarcodeRepository from barcode_service import BarcodeActionResult, BarcodeService, BarcodeViewState from db_config import build_dsn_from_config, ensure_db_config from login_window import prompt_login_compact if sys.platform.startswith("win"): try: asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) except Exception: pass class BarcodeClientApp: """Single-window Tk barcode client modeled after the C# legacy form.""" BARCODE_MAX_WIDTH = 320 BARCODE_MAX_HEIGHT = 400 DESKTOP_THRESHOLD_WIDTH = 1024 DESKTOP_THRESHOLD_HEIGHT = 768 DESKTOP_WINDOW_WIDTH = 465 DESKTOP_WINDOW_HEIGHT = 531 def __init__(self, root: tk.Tk, db_client: AsyncMSSQLClient, session, loop: asyncio.AbstractEventLoop): self.root = root self.db_client = db_client self.session = session self.loop = loop self.repository = BarcodeRepository(db_client) self.service = BarcodeService(self.repository, session.operator_id) self._pending: Future | None = None self._auto_advance_id: str | None = None self._status_colors = { "red": "#f4cccc", "green": "#d9ead3", "yellow": "#e2f0cb", } self._is_barcode_desktop = False self.queue_var = tk.StringVar(value="") self.info1_var = tk.StringVar(value="") self.info2_var = tk.StringVar(value="") self.info3_var = tk.StringVar(value="") self.info4_var = tk.StringVar(value="") self.destination_var = tk.StringVar(value="") self.scanned_var = tk.StringVar(value="") self._build_ui() self._apply_state(self.service.state) self._bind_keys() self.root.protocol("WM_DELETE_WINDOW", self._shutdown) def _build_ui(self) -> None: self.root.title("WMS") self.root.configure(bg="#f1f1f1") self._apply_responsive_geometry() if self._is_barcode_desktop: field_font = ("Segoe UI", 9) entry_font = ("Segoe UI", 11) band_font = ("Segoe UI", 9, "bold") info_font = ("Segoe UI", 9) queue_font = ("Segoe UI", 8) pad_x = 5 pad_y = 4 band_wrap = 190 info_wrap = 194 button_pad_y = 1 else: field_font = ("Segoe UI", 12) entry_font = ("Segoe UI", 15) band_font = ("Segoe UI", 12, "bold") info_font = ("Segoe UI", 12) queue_font = ("Segoe UI", 10) pad_x = 16 pad_y = 12 band_wrap = 400 info_wrap = 408 button_pad_y = 4 wrap = tk.Frame(self.root, bg="#f1f1f1", padx=pad_x, pady=pad_y) wrap.pack(fill="both", expand=True) wrap.rowconfigure(3, weight=1) wrap.columnconfigure(0, weight=1) header = tk.Frame(wrap, bg="#f1f1f1") header.grid(row=0, column=0, sticky="ew", pady=(0, 4 if self._is_barcode_desktop else 10)) tk.Label( header, textvariable=self.queue_var, bg="#f1f1f1", fg="#333333", font=queue_font, anchor="e", ).pack(side="right") form = tk.Frame(wrap, bg="#f1f1f1") form.grid(row=1, column=0, sticky="ew") self._add_compact_field(form, 0, "Pallet", self.scanned_var, scanned=True, label_font=field_font, entry_font=entry_font) self._add_compact_field(form, 1, "Cella", self.destination_var, scanned=False, label_font=field_font, entry_font=entry_font) self.status_band = tk.Label( wrap, textvariable=self.info1_var, bg=self._status_colors["red"], fg="#1f2937", font=band_font, anchor="w", justify="left", padx=6, pady=3 if self._is_barcode_desktop else 6, wraplength=band_wrap, ) self.status_band.grid(row=2, column=0, sticky="ew", pady=(8 if self._is_barcode_desktop else 14, 4)) info_box = tk.Frame(wrap, bg="#f1f1f1") info_box.grid(row=3, column=0, sticky="nsew") info_box.columnconfigure(0, weight=1) for row_index in range(3): info_box.rowconfigure(row_index, weight=1) self.info2_label = tk.Label( info_box, textvariable=self.info2_var, bg="#f1f1f1", fg="#222222", font=info_font, anchor="w", justify="left", wraplength=info_wrap, ) self.info2_label.grid(row=0, column=0, sticky="ew", pady=(2, 0)) self.info3_label = tk.Label( info_box, textvariable=self.info3_var, bg="#f1f1f1", fg="#222222", font=info_font, anchor="w", justify="left", wraplength=info_wrap, ) self.info3_label.grid(row=1, column=0, sticky="ew", pady=(2, 0)) self.info4_label = tk.Label( info_box, textvariable=self.info4_var, bg="#f1f1f1", fg="#222222", font=info_font, anchor="w", justify="left", wraplength=info_wrap, ) self.info4_label.grid(row=2, column=0, sticky="ew", pady=(2, 4 if self._is_barcode_desktop else 10)) buttons = tk.Frame(wrap, bg="#f1f1f1") buttons.grid(row=4, column=0, sticky="ew", pady=(4, 0)) buttons.columnconfigure(0, weight=1) buttons.columnconfigure(1, weight=1) self.btn_f1 = ttk.Button(buttons, text="[F1] H Priority", command=lambda: self._start_queue(1)) self.btn_f1.grid(row=0, column=0, padx=(0, 4), pady=(0, button_pad_y), sticky="ew") self.btn_submit = ttk.Button(buttons, text="[Ent] Salva", command=self._submit) self.btn_submit.grid(row=0, column=1, padx=(4, 0), pady=(0, button_pad_y), sticky="ew") self.btn_f2 = ttk.Button(buttons, text="[F2] L Priority", command=lambda: self._start_queue(0)) self.btn_f2.grid(row=1, column=0, padx=(0, 4), sticky="ew") self.btn_unload = ttk.Button(buttons, text="[F4] Elimina", command=self._begin_manual_unload) self.btn_unload.grid(row=1, column=1, padx=(4, 0), sticky="ew") self.busy_cover = tk.Frame(self.root, bg="#d9d9d9") panel = ttk.Frame(self.busy_cover, padding=16) panel.place(relx=0.5, rely=0.5, anchor="center") self.busy_label = ttk.Label(panel, text="Attendere...", font=("Segoe UI", 11, "bold")) self.busy_label.pack(pady=(0, 8)) self.busy_bar = ttk.Progressbar(panel, mode="indeterminate", length=260) self.busy_bar.pack() def _add_compact_field( self, parent: tk.Frame, row: int, label: str, variable: tk.StringVar, *, scanned: bool, label_font, entry_font, ) -> None: tk.Label( parent, text=label, bg="#f1f1f1", fg="#1f1f1f", font=label_font, anchor="w", ).grid(row=row, column=0, sticky="w", padx=(0, 8), pady=4 if self._is_barcode_desktop else 6) entry = ttk.Entry(parent, textvariable=variable, font=entry_font, width=8) entry.grid(row=row, column=1, sticky="ew", pady=4 if self._is_barcode_desktop else 6, ipady=1 if self._is_barcode_desktop else 4) parent.columnconfigure(1, weight=1) if scanned: self.pallet_entry = entry self.scanned_var.trace_add("write", lambda *_: self._limit_var(self.scanned_var, 8)) else: self.destination_entry = entry self.destination_var.trace_add("write", lambda *_: self._limit_var(self.destination_var, 8)) def _limit_var(self, variable: tk.StringVar, max_len: int) -> None: value = str(variable.get() or "") if len(value) > max_len: variable.set(value[:max_len]) def _apply_responsive_geometry(self) -> None: """Adapt the window size to barcode-sized or desktop-sized screens.""" self.root.update_idletasks() screen_width = max(1, int(self.root.winfo_screenwidth() or 0)) screen_height = max(1, int(self.root.winfo_screenheight() or 0)) is_barcode_desktop = ( screen_width <= self.BARCODE_MAX_WIDTH or screen_height <= self.BARCODE_MAX_HEIGHT ) self._is_barcode_desktop = is_barcode_desktop if is_barcode_desktop: width = screen_width height = screen_height x = 0 y = 0 self.root.minsize(max(220, width), max(300, height)) else: width = min(self.DESKTOP_WINDOW_WIDTH, screen_width) height = min(self.DESKTOP_WINDOW_HEIGHT, screen_height) x = 0 y = 0 self.root.minsize(420, 500) if ( screen_width >= self.DESKTOP_THRESHOLD_WIDTH and screen_height >= self.DESKTOP_THRESHOLD_HEIGHT and not is_barcode_desktop ): width = min(self.DESKTOP_WINDOW_WIDTH, screen_width) height = min(self.DESKTOP_WINDOW_HEIGHT, screen_height) x = 0 y = 0 self.root.geometry(f"{width}x{height}+{x}+{y}") def _bind_keys(self) -> None: self.root.bind("", lambda _e: self._start_queue(1)) self.root.bind("", lambda _e: self._start_queue(0)) self.root.bind("", lambda _e: self._begin_manual_unload()) self.pallet_entry.bind("", self._on_pallet_enter) self.destination_entry.bind("", self._on_destination_enter) def _set_busy(self, busy: bool, message: str = "") -> None: state = "disabled" if busy else "normal" for button in (self.btn_f1, self.btn_f2, self.btn_unload, self.btn_submit): try: button.configure(state=state) except Exception: pass try: self.pallet_entry.configure(state=state) except Exception: pass try: if str(self.destination_entry.cget("state")) != "readonly" or busy: self.destination_entry.configure(state=state) except Exception: pass if busy: self.busy_label.configure(text=message or "Attendere...") self.busy_cover.place(relx=0, rely=0, relwidth=1, relheight=1) self.busy_bar.start(10) else: self.busy_bar.stop() self.busy_cover.place_forget() def _apply_state(self, state: BarcodeViewState) -> None: if self._auto_advance_id is not None: try: self.root.after_cancel(self._auto_advance_id) except Exception: pass self._auto_advance_id = None self.queue_var.set(state.queue_label) self.destination_var.set(state.destination_barcode) self.scanned_var.set(state.scanned_pallet) self.info1_var.set(state.status_text) self.info2_var.set(state.document) self.info3_var.set(state.customer) self.info4_var.set(state.expected_pallet) self.status_band.configure(bg=state.status_color or self._status_colors["red"]) destination_readonly = state.mode in ("priority_high", "priority_low", "manual_unload") try: self.destination_entry.configure(state="normal") if destination_readonly: self.destination_entry.configure(state="readonly") except Exception: pass if state.mode == "confirm" and state.status_text == "Ok Scarico": next_queue = self._queue_id_from_label(state.queue_label) if next_queue is not None: self._auto_advance_id = self.root.after(1200, lambda q=next_queue: self._start_queue(q)) self.root.after(20, self._focus_primary_input) def _focus_primary_input(self) -> None: try: self.pallet_entry.focus_force() self.pallet_entry.selection_range(0, "end") except Exception: pass def _focus_destination_input(self) -> None: try: self.destination_entry.configure(state="normal") except Exception: pass try: self.destination_entry.focus_force() self.destination_entry.selection_range(0, "end") except Exception: pass def _on_pallet_enter(self, _event=None) -> str: pallet = str(self.scanned_var.get() or "").strip() destination = str(self.destination_var.get() or "").strip() if not pallet: return "break" if destination == "9000000": self._submit() return "break" self.destination_var.set("") self._focus_destination_input() return "break" def _on_destination_enter(self, _event=None) -> str: pallet = str(self.scanned_var.get() or "").strip() destination = str(self.destination_var.get() or "").strip() if pallet and destination: self._submit() else: self._focus_primary_input() return "break" def _begin_manual_unload(self) -> None: self._apply_state(self.service.begin_manual_unload()) def _start_queue(self, id_stato: int) -> None: self._run_async( lambda: self.service.start_priority_queue(id_stato), busy_message="Carico la coda selezionata...", ) def _submit(self) -> None: self._run_async( lambda: self.service.submit( scanned_pallet=self.scanned_var.get(), destination_barcode=self.destination_var.get(), ), busy_message="Eseguo il movimento...", ) def _run_async(self, coro_factory: Callable[[], object], busy_message: str) -> None: if self._pending is not None and not self._pending.done(): return if self._auto_advance_id is not None: try: self.root.after_cancel(self._auto_advance_id) except Exception: pass self._auto_advance_id = None self._set_busy(True, busy_message) self._pending = asyncio.run_coroutine_threadsafe(coro_factory(), self.loop) self.root.after(40, self._poll_future) def _queue_id_from_label(self, queue_label: str) -> int | None: text = str(queue_label or "") if text.startswith("Alta priorita'"): return 1 if text.startswith("Bassa priorita'"): return 0 return None def _poll_future(self) -> None: if self._pending is None: self._set_busy(False) return if not self._pending.done(): self.root.after(40, self._poll_future) return future = self._pending self._pending = None self._set_busy(False) try: result = future.result() except Exception as exc: current = self.service.state current.status_text = f"Errore operativo: {exc}" current.status_color = "#f4cccc" self._apply_state(current) messagebox.showerror("Barcode WMS", f"Operazione fallita:\n{exc}", parent=self.root) return if isinstance(result, BarcodeActionResult): self._apply_state(result.state) if result.message and not result.ok: self.root.bell() elif isinstance(result, BarcodeViewState): self._apply_state(result) def _shutdown(self) -> None: try: if self._pending is not None and not self._pending.done(): return except Exception: pass try: fut = asyncio.run_coroutine_threadsafe(self.db_client.dispose(), self.loop) fut.result(timeout=2) except Exception: pass try: self.root.destroy() finally: try: stop_global_loop() except Exception: pass def main() -> int: loop = get_global_loop() bootstrap = tk.Tk() bootstrap.withdraw() config = ensure_db_config(loop, parent=bootstrap) if not config: bootstrap.destroy() stop_global_loop() return 1 db_client = AsyncMSSQLClient(build_dsn_from_config(config)) session = prompt_login_compact(bootstrap, db_client) if session is None: try: fut = asyncio.run_coroutine_threadsafe(db_client.dispose(), loop) fut.result(timeout=2) except Exception: pass bootstrap.destroy() stop_global_loop() return 1 app = BarcodeClientApp(bootstrap, db_client, session, loop) bootstrap.deiconify() bootstrap.lift() bootstrap.focus_force() app._focus_primary_input() bootstrap.mainloop() return 0 if __name__ == "__main__": raise SystemExit(main())