Files
ware_house/barcode_client.py

479 lines
17 KiB
Python

"""Lightweight standalone barcode client for the warehouse WMS."""
from __future__ import annotations
import asyncio
import sys
import tkinter as tk
from concurrent.futures import Future
from tkinter import messagebox, ttk
from typing import Callable
from runtime_support import ensure_stdio, run_with_fatal_log
ensure_stdio("warehouse_barcode")
from async_loop_singleton import get_global_loop, stop_global_loop
from async_msssql_query import AsyncMSSQLClient
from barcode_repository import BarcodeRepository
from barcode_service import BarcodeActionResult, BarcodeService, BarcodeViewState
from db_config import build_dsn_from_config, ensure_db_config
from login_window import prompt_login_compact
class BarcodeClientApp:
"""Single-window Tk barcode client modeled after the C# legacy form."""
BARCODE_MAX_WIDTH = 320
BARCODE_MAX_HEIGHT = 400
DESKTOP_THRESHOLD_WIDTH = 1024
DESKTOP_THRESHOLD_HEIGHT = 768
DESKTOP_WINDOW_WIDTH = 465
DESKTOP_WINDOW_HEIGHT = 531
def __init__(self, root: tk.Tk, db_client: AsyncMSSQLClient, session, loop: asyncio.AbstractEventLoop):
self.root = root
self.db_client = db_client
self.session = session
self.loop = loop
self.repository = BarcodeRepository(db_client)
self.service = BarcodeService(self.repository, session.operator_id)
self._pending: Future | None = None
self._auto_advance_id: str | None = None
self._status_colors = {
"red": "#f4cccc",
"green": "#d9ead3",
"yellow": "#e2f0cb",
}
self._is_barcode_desktop = False
self.queue_var = tk.StringVar(value="")
self.info1_var = tk.StringVar(value="")
self.info2_var = tk.StringVar(value="")
self.info3_var = tk.StringVar(value="")
self.info4_var = tk.StringVar(value="")
self.destination_var = tk.StringVar(value="")
self.scanned_var = tk.StringVar(value="")
self._build_ui()
self._apply_state(self.service.state)
self._bind_keys()
self.root.protocol("WM_DELETE_WINDOW", self._shutdown)
def _build_ui(self) -> None:
self.root.title("WMS")
self.root.configure(bg="#f1f1f1")
self._apply_responsive_geometry()
if self._is_barcode_desktop:
field_font = ("Segoe UI", 9)
entry_font = ("Segoe UI", 11)
band_font = ("Segoe UI", 9, "bold")
info_font = ("Segoe UI", 9)
queue_font = ("Segoe UI", 8)
pad_x = 5
pad_y = 4
band_wrap = 190
info_wrap = 194
button_pad_y = 1
else:
field_font = ("Segoe UI", 12)
entry_font = ("Segoe UI", 15)
band_font = ("Segoe UI", 12, "bold")
info_font = ("Segoe UI", 12)
queue_font = ("Segoe UI", 10)
pad_x = 16
pad_y = 12
band_wrap = 400
info_wrap = 408
button_pad_y = 4
wrap = tk.Frame(self.root, bg="#f1f1f1", padx=pad_x, pady=pad_y)
wrap.pack(fill="both", expand=True)
wrap.rowconfigure(3, weight=1)
wrap.columnconfigure(0, weight=1)
header = tk.Frame(wrap, bg="#f1f1f1")
header.grid(row=0, column=0, sticky="ew", pady=(0, 4 if self._is_barcode_desktop else 10))
tk.Label(
header,
textvariable=self.queue_var,
bg="#f1f1f1",
fg="#333333",
font=queue_font,
anchor="e",
).pack(side="right")
form = tk.Frame(wrap, bg="#f1f1f1")
form.grid(row=1, column=0, sticky="ew")
self._add_compact_field(form, 0, "Pallet", self.scanned_var, scanned=True, label_font=field_font, entry_font=entry_font)
self._add_compact_field(form, 1, "Cella", self.destination_var, scanned=False, label_font=field_font, entry_font=entry_font)
self.status_band = tk.Label(
wrap,
textvariable=self.info1_var,
bg=self._status_colors["red"],
fg="#1f2937",
font=band_font,
anchor="w",
justify="left",
padx=6,
pady=3 if self._is_barcode_desktop else 6,
wraplength=band_wrap,
)
self.status_band.grid(row=2, column=0, sticky="ew", pady=(8 if self._is_barcode_desktop else 14, 4))
info_box = tk.Frame(wrap, bg="#f1f1f1")
info_box.grid(row=3, column=0, sticky="nsew")
info_box.columnconfigure(0, weight=1)
for row_index in range(3):
info_box.rowconfigure(row_index, weight=1)
self.info2_label = tk.Label(
info_box,
textvariable=self.info2_var,
bg="#f1f1f1",
fg="#222222",
font=info_font,
anchor="w",
justify="left",
wraplength=info_wrap,
)
self.info2_label.grid(row=0, column=0, sticky="ew", pady=(2, 0))
self.info3_label = tk.Label(
info_box,
textvariable=self.info3_var,
bg="#f1f1f1",
fg="#222222",
font=info_font,
anchor="w",
justify="left",
wraplength=info_wrap,
)
self.info3_label.grid(row=1, column=0, sticky="ew", pady=(2, 0))
self.info4_label = tk.Label(
info_box,
textvariable=self.info4_var,
bg="#f1f1f1",
fg="#222222",
font=info_font,
anchor="w",
justify="left",
wraplength=info_wrap,
)
self.info4_label.grid(row=2, column=0, sticky="ew", pady=(2, 4 if self._is_barcode_desktop else 10))
buttons = tk.Frame(wrap, bg="#f1f1f1")
buttons.grid(row=4, column=0, sticky="ew", pady=(4, 0))
buttons.columnconfigure(0, weight=1)
buttons.columnconfigure(1, weight=1)
self.btn_f1 = ttk.Button(buttons, text="[F1] H Priority", command=lambda: self._start_queue(1))
self.btn_f1.grid(row=0, column=0, padx=(0, 4), pady=(0, button_pad_y), sticky="ew")
self.btn_submit = ttk.Button(buttons, text="[Ent] Salva", command=self._submit)
self.btn_submit.grid(row=0, column=1, padx=(4, 0), pady=(0, button_pad_y), sticky="ew")
self.btn_f2 = ttk.Button(buttons, text="[F2] L Priority", command=lambda: self._start_queue(0))
self.btn_f2.grid(row=1, column=0, padx=(0, 4), sticky="ew")
self.btn_unload = ttk.Button(buttons, text="[F4] Elimina", command=self._begin_manual_unload)
self.btn_unload.grid(row=1, column=1, padx=(4, 0), sticky="ew")
self.busy_cover = tk.Frame(self.root, bg="#d9d9d9")
panel = ttk.Frame(self.busy_cover, padding=16)
panel.place(relx=0.5, rely=0.5, anchor="center")
self.busy_label = ttk.Label(panel, text="Attendere...", font=("Segoe UI", 11, "bold"))
self.busy_label.pack(pady=(0, 8))
self.busy_bar = ttk.Progressbar(panel, mode="indeterminate", length=260)
self.busy_bar.pack()
def _add_compact_field(
self,
parent: tk.Frame,
row: int,
label: str,
variable: tk.StringVar,
*,
scanned: bool,
label_font,
entry_font,
) -> None:
tk.Label(
parent,
text=label,
bg="#f1f1f1",
fg="#1f1f1f",
font=label_font,
anchor="w",
).grid(row=row, column=0, sticky="w", padx=(0, 8), pady=4 if self._is_barcode_desktop else 6)
entry = ttk.Entry(parent, textvariable=variable, font=entry_font, width=8)
entry.grid(row=row, column=1, sticky="ew", pady=4 if self._is_barcode_desktop else 6, ipady=1 if self._is_barcode_desktop else 4)
parent.columnconfigure(1, weight=1)
if scanned:
self.pallet_entry = entry
self.scanned_var.trace_add("write", lambda *_: self._limit_var(self.scanned_var, 8))
else:
self.destination_entry = entry
self.destination_var.trace_add("write", lambda *_: self._limit_var(self.destination_var, 8))
def _limit_var(self, variable: tk.StringVar, max_len: int) -> None:
value = str(variable.get() or "")
if len(value) > max_len:
variable.set(value[:max_len])
def _apply_responsive_geometry(self) -> None:
"""Adapt the window size to barcode-sized or desktop-sized screens."""
self.root.update_idletasks()
screen_width = max(1, int(self.root.winfo_screenwidth() or 0))
screen_height = max(1, int(self.root.winfo_screenheight() or 0))
is_barcode_desktop = (
screen_width <= self.BARCODE_MAX_WIDTH or screen_height <= self.BARCODE_MAX_HEIGHT
)
self._is_barcode_desktop = is_barcode_desktop
if is_barcode_desktop:
width = screen_width
height = screen_height
x = 0
y = 0
self.root.minsize(max(220, width), max(300, height))
else:
width = min(self.DESKTOP_WINDOW_WIDTH, screen_width)
height = min(self.DESKTOP_WINDOW_HEIGHT, screen_height)
x = 0
y = 0
self.root.minsize(420, 500)
if (
screen_width >= self.DESKTOP_THRESHOLD_WIDTH
and screen_height >= self.DESKTOP_THRESHOLD_HEIGHT
and not is_barcode_desktop
):
width = min(self.DESKTOP_WINDOW_WIDTH, screen_width)
height = min(self.DESKTOP_WINDOW_HEIGHT, screen_height)
x = 0
y = 0
self.root.geometry(f"{width}x{height}+{x}+{y}")
def _bind_keys(self) -> None:
self.root.bind("<F1>", lambda _e: self._start_queue(1))
self.root.bind("<F2>", lambda _e: self._start_queue(0))
self.root.bind("<F4>", lambda _e: self._begin_manual_unload())
self.pallet_entry.bind("<Return>", self._on_pallet_enter)
self.destination_entry.bind("<Return>", self._on_destination_enter)
def _set_busy(self, busy: bool, message: str = "") -> None:
state = "disabled" if busy else "normal"
for button in (self.btn_f1, self.btn_f2, self.btn_unload, self.btn_submit):
try:
button.configure(state=state)
except Exception:
pass
try:
self.pallet_entry.configure(state=state)
except Exception:
pass
try:
if str(self.destination_entry.cget("state")) != "readonly" or busy:
self.destination_entry.configure(state=state)
except Exception:
pass
if busy:
self.busy_label.configure(text=message or "Attendere...")
self.busy_cover.place(relx=0, rely=0, relwidth=1, relheight=1)
self.busy_bar.start(10)
else:
self.busy_bar.stop()
self.busy_cover.place_forget()
def _apply_state(self, state: BarcodeViewState) -> None:
if self._auto_advance_id is not None:
try:
self.root.after_cancel(self._auto_advance_id)
except Exception:
pass
self._auto_advance_id = None
self.queue_var.set(state.queue_label)
self.destination_var.set(state.destination_barcode)
self.scanned_var.set(state.scanned_pallet)
self.info1_var.set(state.status_text)
self.info2_var.set(state.document)
self.info3_var.set(state.customer)
self.info4_var.set(state.expected_pallet)
self.status_band.configure(bg=state.status_color or self._status_colors["red"])
destination_readonly = state.mode in ("priority_high", "priority_low", "manual_unload")
try:
self.destination_entry.configure(state="normal")
if destination_readonly:
self.destination_entry.configure(state="readonly")
except Exception:
pass
if state.mode == "confirm" and state.status_text == "Ok Scarico":
next_queue = self._queue_id_from_label(state.queue_label)
if next_queue is not None:
self._auto_advance_id = self.root.after(1200, lambda q=next_queue: self._start_queue(q))
self.root.after(20, self._focus_primary_input)
def _focus_primary_input(self) -> None:
try:
self.pallet_entry.focus_force()
self.pallet_entry.selection_range(0, "end")
except Exception:
pass
def _focus_destination_input(self) -> None:
try:
self.destination_entry.configure(state="normal")
except Exception:
pass
try:
self.destination_entry.focus_force()
self.destination_entry.selection_range(0, "end")
except Exception:
pass
def _on_pallet_enter(self, _event=None) -> str:
pallet = str(self.scanned_var.get() or "").strip()
destination = str(self.destination_var.get() or "").strip()
if not pallet:
return "break"
if destination == "9000000":
self._submit()
return "break"
self.destination_var.set("")
self._focus_destination_input()
return "break"
def _on_destination_enter(self, _event=None) -> str:
pallet = str(self.scanned_var.get() or "").strip()
destination = str(self.destination_var.get() or "").strip()
if pallet and destination:
self._submit()
else:
self._focus_primary_input()
return "break"
def _begin_manual_unload(self) -> None:
self._apply_state(self.service.begin_manual_unload())
def _start_queue(self, id_stato: int) -> None:
self._run_async(
lambda: self.service.start_priority_queue(id_stato),
busy_message="Carico la coda selezionata...",
)
def _submit(self) -> None:
self._run_async(
lambda: self.service.submit(
scanned_pallet=self.scanned_var.get(),
destination_barcode=self.destination_var.get(),
),
busy_message="Eseguo il movimento...",
)
def _run_async(self, coro_factory: Callable[[], object], busy_message: str) -> None:
if self._pending is not None and not self._pending.done():
return
if self._auto_advance_id is not None:
try:
self.root.after_cancel(self._auto_advance_id)
except Exception:
pass
self._auto_advance_id = None
self._set_busy(True, busy_message)
self._pending = asyncio.run_coroutine_threadsafe(coro_factory(), self.loop)
self.root.after(40, self._poll_future)
def _queue_id_from_label(self, queue_label: str) -> int | None:
text = str(queue_label or "")
if text.startswith("Alta priorita'"):
return 1
if text.startswith("Bassa priorita'"):
return 0
return None
def _poll_future(self) -> None:
if self._pending is None:
self._set_busy(False)
return
if not self._pending.done():
self.root.after(40, self._poll_future)
return
future = self._pending
self._pending = None
self._set_busy(False)
try:
result = future.result()
except Exception as exc:
current = self.service.state
current.status_text = f"Errore operativo: {exc}"
current.status_color = "#f4cccc"
self._apply_state(current)
messagebox.showerror("Barcode WMS", f"Operazione fallita:\n{exc}", parent=self.root)
return
if isinstance(result, BarcodeActionResult):
self._apply_state(result.state)
if result.message and not result.ok:
self.root.bell()
elif isinstance(result, BarcodeViewState):
self._apply_state(result)
def _shutdown(self) -> None:
try:
if self._pending is not None and not self._pending.done():
return
except Exception:
pass
try:
fut = asyncio.run_coroutine_threadsafe(self.db_client.dispose(), self.loop)
fut.result(timeout=2)
except Exception:
pass
try:
self.root.destroy()
finally:
try:
stop_global_loop()
except Exception:
pass
def main() -> int:
loop = get_global_loop()
bootstrap = tk.Tk()
bootstrap.withdraw()
config = ensure_db_config(loop, parent=bootstrap)
if not config:
bootstrap.destroy()
stop_global_loop()
return 1
db_client = AsyncMSSQLClient(build_dsn_from_config(config))
session = prompt_login_compact(bootstrap, db_client)
if session is None:
try:
fut = asyncio.run_coroutine_threadsafe(db_client.dispose(), loop)
fut.result(timeout=2)
except Exception:
pass
bootstrap.destroy()
stop_global_loop()
return 1
app = BarcodeClientApp(bootstrap, db_client, session, loop)
bootstrap.deiconify()
bootstrap.lift()
bootstrap.focus_force()
app._focus_primary_input()
bootstrap.mainloop()
return 0
if __name__ == "__main__":
raise SystemExit(run_with_fatal_log("Barcode WMS", main))