"""Picking list management window. The module presents a master/detail UI for packing lists, supports reservation and unreservation through an async stored-procedure port and keeps rendering smooth by relying on deferred updates and lightweight progress indicators. """ from __future__ import annotations import json import sys import tkinter as tk import customtkinter as ctk from tkinter import messagebox from typing import Optional, Any, Dict, List, Callable from dataclasses import dataclass from functools import wraps from pathlib import Path import logging from audit_log import log_user_action try: from loguru import logger except Exception: # pragma: no cover - safety fallback if dependency is missing locally class _FallbackLogger: """Minimal adapter used only when Loguru is not installed yet.""" def __init__(self): self._logger = logging.getLogger(MODULE_LOG_NAME if "MODULE_LOG_NAME" in globals() else __name__) self._logger.setLevel(logging.DEBUG) self._logger.propagate = False def bind(self, **_kwargs): return self def add(self, sink, level="INFO", format=None, encoding="utf-8", **_kwargs): handler: logging.Handler if hasattr(sink, "write"): handler = logging.StreamHandler(sink) else: handler = logging.FileHandler(str(sink), encoding=encoding) handler.setLevel(getattr(logging, str(level).upper(), logging.INFO)) handler.setFormatter( logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s") ) self._logger.addHandler(handler) return 0 def log(self, level, message): getattr(self._logger, str(level).lower(), self._logger.info)(message) def debug(self, message): self._logger.debug(message) def info(self, message): self._logger.info(message) def exception(self, message): self._logger.exception(message) logger = _FallbackLogger() try: from tksheet import Sheet, natural_sort_key except Exception: Sheet = None # type: ignore[assignment] natural_sort_key = None # type: ignore[assignment] # Usa overlay e runner "collaudati" from busy_overlay import InlineBusyOverlay from gestione_aree import AsyncRunner from locale_text import load_locale_catalog, text as loc_text from ui_theme import theme_color, theme_font, theme_section, theme_value from user_session import UserSession from window_placement import place_window_fullsize_below_parent_later # === IMPORT procedura async prenota/s-prenota (no pyodbc qui) === import asyncio try: from prenota_sprenota_sql import sp_xExePackingListPallet_async, SPResult except Exception: async def sp_xExePackingListPallet_async(*args, **kwargs): raise RuntimeError("sp_xExePackingListPallet_async non importabile: verifica prenota_sprenota_sql.py") class SPResult: def __init__(self, rc=-1, message="Procedura non disponibile", id_result=None): self.rc = rc; self.message = message; self.id_result = id_result PICKINGLIST_LOG_MODE = "INFO" # "OFF" | "INFO" | "DEBUG" PICKINGLIST_DETAIL_TEST_MULTIPLIER = 1 # 1 disables artificial row expansion for UI stress tests MODULE_LOG_NAME = Path(__file__).stem MODULE_LOG_PATH = Path(__file__).with_suffix(".log") _MODULE_LOG_ENABLED = PICKINGLIST_LOG_MODE.upper() != "OFF" _MODULE_LOG_LEVEL = "DEBUG" if PICKINGLIST_LOG_MODE.upper() == "DEBUG" else "INFO" _MODULE_LOGGER = logger.bind(warehouse_module=MODULE_LOG_NAME) _MODULE_LOGGING_CONFIGURED = False def _configure_module_logger(): """Configure console and file logging for this module.""" global _MODULE_LOGGING_CONFIGURED if _MODULE_LOGGING_CONFIGURED: return if not _MODULE_LOG_ENABLED: _MODULE_LOGGING_CONFIGURED = True return record_filter = lambda record: record["extra"].get("warehouse_module") == MODULE_LOG_NAME logger.add( sys.stderr, level=_MODULE_LOG_LEVEL, colorize=True, filter=record_filter, format=( "{time:YYYY-MM-DD HH:mm:ss.SSS} | " "{level: <8} | " "" + MODULE_LOG_NAME + " | " "{message}" ), ) logger.add( MODULE_LOG_PATH, level=_MODULE_LOG_LEVEL, colorize=False, encoding="utf-8", filter=record_filter, format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | " + MODULE_LOG_NAME + " | {message}", ) _MODULE_LOGGING_CONFIGURED = True def _format_payload(payload: Any) -> str: """Serialize payloads for human-readable logging.""" try: return json.dumps(payload, ensure_ascii=False, indent=2, default=str) except Exception: return repr(payload) def _log_call(level: Optional[str] = None): """Trace entry, exit and failure of selected high-level functions.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): effective_level = level or _MODULE_LOG_LEVEL _MODULE_LOGGER.log( effective_level, f"CALL {func.__qualname__} args={_format_payload(args[1:] if args else ())} kwargs={_format_payload(kwargs)}", ) try: result = func(*args, **kwargs) except Exception: _MODULE_LOGGER.exception(f"FAIL {func.__qualname__}") raise _MODULE_LOGGER.log(effective_level, f"RETURN {func.__qualname__}") return result return wrapper return decorator def _log_sql(query_name: str, sql: str, params: Dict[str, Any]): """Log one SQL statement and its parameters.""" _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} params={_format_payload(params)}") _MODULE_LOGGER.debug(f"SQL {query_name} text:\n{sql.strip()}") def _log_dataset(query_name: str, rows: List[Dict[str, Any]]): """Log query results at summary or full-debug level depending on the flag.""" _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} returned {len(rows)} rows") if PICKINGLIST_LOG_MODE.upper() == "DEBUG": _MODULE_LOGGER.debug(f"SQL {query_name} dataset:\n{_format_payload(rows)}") def _expand_detail_rows_for_test(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Artificially duplicate detail rows to stress-test the UI with larger datasets.""" multiplier = max(1, int(PICKINGLIST_DETAIL_TEST_MULTIPLIER)) if multiplier == 1 or not rows: return rows expanded: List[Dict[str, Any]] = [] for copy_idx in range(multiplier): for row_idx, row in enumerate(rows): cloned = dict(row) cloned["__test_copy__"] = copy_idx cloned["__test_row__"] = row_idx expanded.append(cloned) _MODULE_LOGGER.info( f"Dataset dettaglio espanso artificialmente da {len(rows)} a {len(expanded)} righe per test UI" ) return expanded _configure_module_logger() if _MODULE_LOG_ENABLED: _MODULE_LOGGER.info( f"Logging inizializzato su {MODULE_LOG_PATH.name} livello={_MODULE_LOG_LEVEL} mode={PICKINGLIST_LOG_MODE.upper()}" ) # -------------------- SQL -------------------- SQL_PL = """ SELECT COUNT(DISTINCT Pallet) AS Pallet, COUNT(DISTINCT Lotto) AS Lotto, COUNT(DISTINCT Articolo) AS Articolo, COUNT(DISTINCT Descrizione) AS Descrizione, SUM(Qta) AS Qta, Documento, CodNazione, NAZIONE, Stato, MAX(PalletCella) AS PalletCella, MAX(Magazzino) AS Magazzino, MAX(Area) AS Area, MAX(Cella) AS Cella, MIN(Ordinamento) AS Ordinamento, MAX(IDStato) AS IDStato FROM dbo.py_ViewPackingListRestante GROUP BY Documento, CodNazione, NAZIONE, Stato ORDER BY MIN(Ordinamento), Documento, NAZIONE, Stato; """ SQL_PL_DETAILS = """ SELECT * FROM dbo.py_ViewPackingListRestante WHERE Documento = :Documento ORDER BY Ordinamento; """ # -------------------- helpers -------------------- def _rows_to_dicts(res: Dict[str, Any]) -> List[Dict[str, Any]]: """ Converte il payload ritornato da query_json in lista di dict. Supporta: - res = [ {..}, {..} ] - res = { "rows": [..], "columns": [...] } - res = { "data": [..], "columns": [...] } - res = { "rows": [tuple,..], "columns": [...] } """ if res is None: return [] if isinstance(res, list): if not res: return [] if isinstance(res[0], dict): return res return [] if isinstance(res, dict): for rows_key in ("rows", "data", "result", "records"): if rows_key in res and isinstance(res[rows_key], list): rows = res[rows_key] if not rows: return [] if isinstance(rows[0], dict): return rows cols = res.get("columns") or res.get("cols") or [] out = [] for r in rows: if cols and isinstance(r, (list, tuple)): out.append({ (cols[i] if i < len(cols) else f"c{i}") : r[i] for i in range(min(len(cols), len(r))) }) else: if isinstance(r, (list, tuple)): out.append({ f"c{i}": r[i] for i in range(len(r)) }) return out if res and all(not isinstance(v, (list, tuple, dict)) for v in res.values()): return [res] return [] def _s(v) -> str: """Return a string representation, converting ``None`` to an empty string.""" return "" if v is None else str(v) def _first(d: Dict[str, Any], keys: List[str], default: str = ""): """Return the first non-empty value found among the provided keys.""" for k in keys: if k in d and d[k] not in (None, ""): return d[k] return default # -------------------- column specs -------------------- @dataclass class ColSpec: """Describe one logical column rendered in a ``ScrollTable``.""" title: str key: str width: int anchor: str # 'w' | 'e' | 'center' # Colonne PL (in alto) — include IDStato per la colorazione PL_COLS: List[ColSpec] = [ ColSpec("", "__check__", 36, "w"), ColSpec("Documento", "Documento", 120, "w"), ColSpec("NAZIONE", "NAZIONE", 240, "w"), ColSpec("Stato", "Stato", 110, "w"), ColSpec("IDStato", "IDStato", 80, "e"), # nuova colonna ColSpec("#Pallet", "Pallet", 100, "e"), ColSpec("#Lotti", "Lotto", 100, "e"), ColSpec("#Articoli", "Articolo", 110, "e"), ColSpec("Qta", "Qta", 120, "e"), ] DET_COLS: List[ColSpec] = [ ColSpec("UDC/Pallet", "Pallet", 150, "w"), ColSpec("Lotto", "Lotto", 130, "w"), ColSpec("Articolo", "Articolo", 150, "w"), ColSpec("Descrizione","Descrizione", 320, "w"), ColSpec("Qta", "Qta", 110, "e"), ColSpec("Ubicazione", "Ubicazione", 320, "w"), ] ROW_H = 28 # -------------------- Micro spinner (toolbar) -------------------- class ToolbarSpinner: """ Micro-animazione leggerissima per indicare attività: mostra una label con frame: ◐ ◓ ◑ ◒ ... finché è attivo. """ FRAMES = ("◐", "◓", "◑", "◒") def __init__(self, parent: tk.Widget): """Create the spinner label attached to the given parent widget.""" self.parent = parent self.lbl = ctk.CTkLabel(parent, text="", width=28) self._i = 0 self._active = False self._job = None def widget(self) -> ctk.CTkLabel: """Return the label widget hosting the spinner animation.""" return self.lbl def start(self, text: str = ""): """Start the animation and optionally show a short status message.""" if self._active: return self._active = True self.lbl.configure(text=f"{self.FRAMES[self._i]} {text}".strip()) self._tick() def stop(self): """Stop the animation and clear the label text.""" self._active = False if self._job is not None: try: self.parent.after_cancel(self._job) except Exception: pass self._job = None self.lbl.configure(text="") def _tick(self): """Advance the spinner animation frame.""" if not self._active: return self._i = (self._i + 1) % len(self.FRAMES) current = self.lbl.cget("text") # Mantieni l'eventuale testo dopo il simbolo txt_suffix = "" if isinstance(current, str) and len(current) > 2: txt_suffix = current[2:] self.lbl.configure(text=f"{self.FRAMES[self._i]}{txt_suffix}") self._job = self.parent.after(120, self._tick) # 8 fps soft # -------------------- Scrollable table -------------------- class ScrollTable(ctk.CTkFrame): GRID_COLOR = "#D0D5DD" PADX_L = 8 PADX_R = 8 PADY = 2 def __init__( self, master, columns: List[ColSpec], on_header_click: Optional[Callable[[ColSpec], None]] = None, ): """Create a fixed-header scrollable table rendered with Tk/CTk widgets.""" super().__init__(master) self.columns = columns self.on_header_click = on_header_click self._sort_key: Optional[str] = None self._sort_reverse = False self.total_w = sum(c.width for c in self.columns) self.grid_rowconfigure(1, weight=1) self.grid_columnconfigure(0, weight=1) # header self.h_canvas = tk.Canvas(self, height=ROW_H, highlightthickness=0, bd=0) self.h_inner = ctk.CTkFrame(self.h_canvas, fg_color="#f3f3f3", height=ROW_H, width=self.total_w) self.h_canvas.create_window((0,0), window=self.h_inner, anchor="nw", width=self.total_w, height=ROW_H) self.h_canvas.grid(row=0, column=0, sticky="ew") # body self.b_canvas = tk.Canvas(self, highlightthickness=0, bd=0) self.b_inner = ctk.CTkFrame(self.b_canvas, fg_color="transparent", width=self.total_w) self.body_window = self.b_canvas.create_window((0,0), window=self.b_inner, anchor="nw", width=self.total_w) self.b_canvas.grid(row=1, column=0, sticky="nsew") # scrollbars self.vbar = tk.Scrollbar(self, orient="vertical", command=self.b_canvas.yview) self.xbar = tk.Scrollbar(self, orient="horizontal", command=self._xscroll_both) self.vbar.grid(row=1, column=1, sticky="ns") self.xbar.grid(row=2, column=0, sticky="ew") # link scroll self.b_canvas.configure(yscrollcommand=self.vbar.set, xscrollcommand=self._xscroll_set_both) self.h_canvas.configure(xscrollcommand=self.xbar.set) # bind self.h_inner.bind("", lambda e: self._sync_header_width()) self.b_inner.bind("", lambda e: self._on_body_configure()) self.b_canvas.bind("", self._bind_mousewheel) self.b_canvas.bind("", self._unbind_mousewheel) self.b_inner.bind("", self._bind_mousewheel) self.b_inner.bind("", self._unbind_mousewheel) self._build_header() def _build_header(self): """Build the static header row using the configured columns.""" for w in self.h_inner.winfo_children(): w.destroy() row = ctk.CTkFrame(self.h_inner, fg_color="#f3f3f3", height=ROW_H, width=self.total_w) row.pack(fill="x", expand=False) row.pack_propagate(False) for col in self.columns: holder = ctk.CTkFrame( row, fg_color="#f3f3f3", width=col.width, height=ROW_H, border_width=1, border_color=self.GRID_COLOR ) holder.pack(side="left", fill="y") holder.pack_propagate(False) header_text = col.title if col.key == self._sort_key: header_text = f"{col.title} {'↓' if self._sort_reverse else '↑'}" lbl = ctk.CTkLabel(holder, text=header_text, anchor="w") lbl.pack(fill="both", padx=(self.PADX_L, self.PADX_R), pady=self.PADY) if self.on_header_click and col.key != "__check__": for widget in (holder, lbl): widget.bind("", lambda e, c=col: self.on_header_click(c)) widget.configure(cursor="hand2") self.h_inner.configure(width=self.total_w, height=ROW_H) self.h_canvas.configure(scrollregion=(0,0,self.total_w,ROW_H)) def set_sort_state(self, key: Optional[str], reverse: bool = False): """Update the header labels so the active sort is visible.""" self._sort_key = key self._sort_reverse = reverse self._build_header() def _update_body_width(self): """Keep the scroll region aligned with the current body content width.""" self.b_canvas.itemconfigure(self.body_window, width=self.total_w) sr = self.b_canvas.bbox("all") if sr: self.b_canvas.configure(scrollregion=(0,0,max(self.total_w, sr[2]), sr[3])) else: self.b_canvas.configure(scrollregion=(0,0,self.total_w,0)) def _on_body_configure(self): """React to body resize events by syncing dimensions and header scroll.""" self._update_body_width() self._sync_header_width() def _sync_header_width(self): """Mirror the body horizontal scroll position on the header canvas.""" first, _ = self.b_canvas.xview() self.h_canvas.xview_moveto(first) def _xscroll_both(self, *args): """Scroll header and body together when the horizontal bar moves.""" self.h_canvas.xview(*args) self.b_canvas.xview(*args) def _xscroll_set_both(self, first, last): """Update the header viewport and scrollbar thumb in one place.""" self.h_canvas.xview_moveto(first) self.xbar.set(first, last) def _bind_mousewheel(self, _event=None): """Route mouse-wheel scrolling to the body canvas while the cursor is over the table.""" self.b_canvas.bind_all("", self._on_mousewheel) def _unbind_mousewheel(self, _event=None): """Stop routing global mouse-wheel events when the pointer leaves the table.""" self.b_canvas.unbind_all("") def _on_mousewheel(self, event): """Scroll the body canvas vertically in response to wheel movement.""" delta = getattr(event, "delta", 0) if delta == 0: return step = -1 if delta > 0 else 1 self.b_canvas.yview_scroll(step, "units") def clear_rows(self): """Remove all rendered body rows.""" for w in self.b_inner.winfo_children(): w.destroy() self._update_body_width() def add_row( self, values: List[str], row_index: int, anchors: Optional[List[str]] = None, checkbox_builder: Optional[Callable[[tk.Widget], ctk.CTkCheckBox]] = None, ): """Append one row to the table body.""" row = ctk.CTkFrame(self.b_inner, fg_color="transparent", height=ROW_H, width=self.total_w) row.pack(fill="x", expand=False) row.pack_propagate(False) for i, col in enumerate(self.columns): holder = ctk.CTkFrame( row, fg_color="transparent", width=col.width, height=ROW_H, border_width=1, border_color=self.GRID_COLOR ) holder.pack(side="left", fill="y") holder.pack_propagate(False) if col.key == "__check__": if checkbox_builder: cb = checkbox_builder(holder) cb.pack(padx=(self.PADX_L, self.PADX_R), pady=self.PADY, anchor="w") else: ctk.CTkLabel(holder, text="").pack(fill="both") else: anchor = (anchors[i] if anchors else col.anchor) ctk.CTkLabel(holder, text=values[i], anchor=anchor).pack( fill="both", padx=(self.PADX_L, self.PADX_R), pady=self.PADY ) self._update_body_width() # -------------------- PL row model -------------------- class PLRow: """State holder for one picking list row and its selection checkbox.""" def __init__(self, pl: Dict[str, Any], on_check): """Bind a picking list payload to a ``BooleanVar`` and callback.""" self.pl = pl self.var = ctk.BooleanVar(value=False) self._callback = on_check def is_checked(self) -> bool: """Return whether the row is currently selected.""" return self.var.get() def set_checked(self, val: bool): """Programmatically update the checkbox state.""" self.var.set(val) def build_checkbox(self, parent) -> ctk.CTkCheckBox: """Create the checkbox widget bound to this row model.""" return ctk.CTkCheckBox(parent, text="", variable=self.var, command=lambda: self._callback(self, self.var.get())) # -------------------- main frame (no-flicker + UX tuning + spinner) -------------------- class GestionePickingListFrame(ctk.CTkFrame): @_log_call() def __init__(self, master, *, db_client=None, conn_str=None, session: UserSession | None = None): """Create the master/detail picking list frame.""" super().__init__(master) self._theme = theme_section("pickinglist_window", {}) self._locale_catalog = load_locale_catalog() if db_client is None: raise ValueError("GestionePickingListFrame richiede un db_client condiviso.") self.db_client = db_client self.session = session self.runner = AsyncRunner(self) # runner condiviso (usa loop globale) self.busy = InlineBusyOverlay(self, self._theme) try: self.configure(fg_color=theme_color(self._theme, "window_fg_color", ("#efefef", "#2f2f2f"))) except Exception: pass self.rows_models: list[PLRow] = [] self._detail_cache: Dict[Any, list] = {} self.detail_doc = None self._detail_sort_key: Optional[str] = None self._detail_sort_reverse = False self._detail_sorting = False self._first_loading: bool = False # flag per cursore d'attesa solo al primo load self._render_job = None # Tracking del job di rendering in corso self._build_layout() self._initial_load_started = False def _can(self, action: str) -> bool: """Return whether the current user can execute one picking-list action.""" return self.session.can(action) if self.session else False def _operator_id(self) -> int: """Return the authenticated operator id or ``0`` if no session is present.""" return int(self.session.operator_id) if self.session else 0 def _first_show(self): """Chiamato a finestra già resa → evitiamo sfarfallio del primo paint e mostriamo wait-cursor.""" if self._initial_load_started: return self._initial_load_started = True self._first_loading = True try: self.winfo_toplevel().configure(cursor="watch") except Exception: pass # spinner inizia self.spinner.start(" Carico…") self.reload_from_db(first=True) # ---------- UI ---------- def _build_layout(self): """Build toolbar, master table and detail table.""" for r in (1, 3): self.grid_rowconfigure(r, weight=1) self.grid_columnconfigure(0, weight=1) top = ctk.CTkFrame(self) top.grid(row=0, column=0, sticky="ew", padx=10, pady=(8,4)) try: top.configure(fg_color=theme_color(self._theme, "toolbar_frame_fg_color", ("#d7d7d7", "#3b3b3b"))) except Exception: pass for i, (text, cmd) in enumerate([ (loc_text("picking.button.reload", catalog=self._locale_catalog, default="Ricarica"), self.reload_from_db), (loc_text("picking.button.prenota", catalog=self._locale_catalog, default="Prenota"), self.on_prenota), (loc_text("picking.button.sprenota", catalog=self._locale_catalog, default="S-prenota"), self.on_sprenota), (loc_text("picking.button.export", catalog=self._locale_catalog, default="Esporta XLSX"), self.on_export) ]): ctk.CTkButton( top, text=text, command=cmd, font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")), ).grid(row=0, column=i, padx=6) # --- micro spinner a destra della toolbar --- self.spinner = ToolbarSpinner(top) self.spinner.widget().grid(row=0, column=10, padx=(8,0)) # largo spazio a destra self.pl_table = ScrollTable(self, PL_COLS) self.pl_table.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4,8)) self.det_host = tk.Frame(self, bd=0, highlightthickness=0) self.det_host.grid(row=3, column=0, sticky="nsew", padx=10, pady=(4,10)) self.det_host.grid_rowconfigure(0, weight=1) self.det_host.grid_columnconfigure(0, weight=1) self._build_detail_sheet() self._draw_details_hint() def _build_detail_sheet(self): """Create the high-volume detail table using tksheet.""" if Sheet is None: raise RuntimeError("tksheet non disponibile: installa la dipendenza per usare la tabella dettagli.") self.detail_sheet = Sheet( self.det_host, data=[], show_row_index=False, show_top_left=False, width=1000, height=320, sort_key=natural_sort_key, ) self.detail_sheet.change_theme("light green") self.detail_sheet.enable_bindings("all") self.detail_sheet.headers(self._detail_headers(), redraw=False) self.detail_sheet.bind("", self._on_detail_sheet_left_click, add="+") self.detail_sheet.grid(row=0, column=0, sticky="nsew") def _draw_details_hint(self): """Render the placeholder row shown when no document is selected.""" self._load_detail_sheet_data( [["", "", "", "Seleziona una Picking List per vedere le UDC...", "", ""]] ) def _detail_headers(self) -> List[str]: """Return detail headers with the active sort indicator, if any.""" headers: List[str] = [] for col in DET_COLS: title = col.title if col.key == self._detail_sort_key: title = f"{title} {'[desc]' if self._detail_sort_reverse else '[asc]'}" headers.append(title) return headers def _detail_rows_to_sheet_data(self, rows: List[Dict[str, Any]]) -> List[List[str]]: """Convert detail dictionaries to the row format expected by tksheet.""" data: List[List[str]] = [] for d in rows: pallet = _s(_first(d, ["Pallet", "UDC", "PalletID"])) lotto = _s(_first(d, ["Lotto"])) articolo = _s(_first(d, ["Articolo", "CodArticolo", "CodiceArticolo", "Art", "Codice"])) descr = _s(_first(d, ["Descrizione", "Descr", "DescrArticolo", "DescArticolo", "DesArticolo"])) qta = _s(_first(d, ["Qta", "Quantita", "Qty", "QTY"])) ubi_raw = _first(d, ["Ubicazione", "Cella", "PalletCella"]) loc = "Non scaffalata" if (ubi_raw is None or str(ubi_raw).strip() == "") else str(ubi_raw).strip() data.append([pallet, lotto, articolo, descr, qta, loc]) return data def _load_detail_sheet_data(self, data: List[List[str]]): """Push one full dataset into the tksheet detail widget.""" self.detail_sheet.headers(self._detail_headers(), redraw=False) self.detail_sheet.set_sheet_data( data, reset_col_positions=True, reset_row_positions=True, redraw=True, ) self.detail_sheet.set_all_column_widths() def _detail_sort_value(self, row: Dict[str, Any], key: str): """Return a normalized value used to sort detail rows by one logical column.""" if key == "Ubicazione": value = _first(row, ["Ubicazione", "Cella", "PalletCella"]) value = "Non scaffalata" if value in (None, "") else value elif key == "Qta": value = _first(row, ["Qta", "Quantita", "Qty", "QTY"], 0) try: return (0, float(value)) except Exception: return (1, _s(value).lower()) elif key == "Articolo": value = _first(row, ["Articolo", "CodArticolo", "CodiceArticolo", "Art", "Codice"]) elif key == "Descrizione": value = _first(row, ["Descrizione", "Descr", "DescrArticolo", "DescArticolo", "DesArticolo"]) elif key == "Pallet": value = _first(row, ["Pallet", "UDC", "PalletID"]) else: value = row.get(key) return (0, _s(value).lower()) def _sort_detail_rows(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Return detail rows sorted using the current header state.""" if not self._detail_sort_key: return rows return sorted( rows, key=lambda row: self._detail_sort_value(row, self._detail_sort_key), reverse=self._detail_sort_reverse, ) def _finish_detail_sort_feedback(self, root: tk.Misc): """Dismiss busy feedback only after Tk has flushed the detail redraw.""" try: root.update_idletasks() except Exception: pass self._detail_sorting = False self.spinner.stop() self.busy.hide() try: self.detail_sheet.configure(cursor="") root.configure(cursor="") except Exception: pass def _on_detail_header_click(self, col: ColSpec): """Toggle detail sorting when the user clicks a detail header.""" if not self.detail_doc or self._detail_sorting: return if self._detail_sort_key == col.key: self._detail_sort_reverse = not self._detail_sort_reverse else: self._detail_sort_key = col.key self._detail_sort_reverse = False self._detail_sorting = True self.spinner.start(" Ordino dettagli...") self.busy.show(f"Ordinamento per {col.title}...") root = self.winfo_toplevel() try: root.configure(cursor="watch") self.detail_sheet.configure(cursor="watch") root.update_idletasks() except Exception: pass def _apply_sort(): try: rows = list(self._detail_cache.get(self.detail_doc, [])) rows = self._sort_detail_rows(rows) self._detail_cache[self.detail_doc] = rows self._refresh_details() finally: # Wait one more UI turn so the redraw becomes visible before removing feedback. self.after_idle(lambda r=root: self.after(15, lambda: self._finish_detail_sort_feedback(r))) self.after(25, _apply_sort) def _on_detail_sheet_left_click(self, event): """Sort detail rows when the user clicks a tksheet header cell.""" try: region = self.detail_sheet.identify_region(event) column = self.detail_sheet.identify_column(event, exclude_header=False, allow_end=False) except Exception: return if region != "header" or column is None or column < 0 or column >= len(DET_COLS): return self._on_detail_header_click(DET_COLS[column]) def _apply_row_colors(self, rows: List[Dict[str, Any]]): """Colorazione differita (after_idle) per evitare micro-jank durante l'inserimento righe.""" try: for idx, d in enumerate(rows): row_widget = self.pl_table.b_inner.winfo_children()[idx] if int(d.get("IDStato") or 0) == 1: row_widget.configure(fg_color="#ffe6f2") # rosa tenue else: row_widget.configure(fg_color="transparent") except Exception: pass def _refresh_mid_rows(self, rows: List[Dict[str, Any]]): """Rebuild the master table using the latest query results.""" self.pl_table.clear_rows() self.rows_models.clear() for r, d in enumerate(rows): model = PLRow(d, self.on_row_checked) self.rows_models.append(model) values = [ "", # checkbox _s(d.get("Documento")), _s(d.get("NAZIONE")), _s(d.get("Stato")), _s(d.get("IDStato")), # nuova colonna visibile _s(d.get("Pallet")), _s(d.get("Lotto")), _s(d.get("Articolo")), _s(d.get("Qta")), ] self.pl_table.add_row( values=values, row_index=r, anchors=[c.anchor for c in PL_COLS], checkbox_builder=model.build_checkbox ) # 🎯 Colora dopo che la UI è resa → no balzi visivi self.after_idle(lambda: self._apply_row_colors(rows)) # ----- helpers ----- def _get_selected_model(self) -> Optional[PLRow]: """Return the currently checked picking list row, if any.""" for m in self.rows_models: if m.is_checked(): return m return None def _recolor_row_by_documento(self, documento: str, idstato: int): """Aggiorna colore riga e cella IDStato per il Documento indicato.""" for idx, m in enumerate(self.rows_models): if _s(m.pl.get("Documento")) == _s(documento): m.pl["IDStato"] = idstato def _paint(): try: row_widget = self.pl_table.b_inner.winfo_children()[idx] row_widget.configure(fg_color="#ffe6f2" if idstato == 1 else "transparent") row_children = row_widget.winfo_children() if len(row_children) >= 5: holder = row_children[4] if holder.winfo_children(): lbl = holder.winfo_children()[0] if hasattr(lbl, "configure"): lbl.configure(text=str(idstato)) except Exception: pass # differisci la colorazione (smooth) self.after_idle(_paint) break def _reselect_documento_after_reload(self, documento: str): """(Opzionale) Dopo un reload DB, riseleziona la PL con lo stesso Documento.""" for m in self.rows_models: if _s(m.pl.get("Documento")) == _s(documento): m.set_checked(True) self.on_row_checked(m, True) break # ----- eventi ----- @_log_call() def on_row_checked(self, model: PLRow, is_checked: bool): """Handle row selection changes and refresh the detail section.""" # selezione esclusiva if is_checked: for m in self.rows_models: if m is not model and m.is_checked(): m.set_checked(False) self.detail_doc = model.pl.get("Documento") _MODULE_LOGGER.info(f"Documento selezionato per il dettaglio: {self.detail_doc}") self.spinner.start(" Carico dettagli…") # spinner ON async def _job(): _log_sql("SQL_PL_DETAILS", SQL_PL_DETAILS, {"Documento": self.detail_doc}) return await self.db_client.query_json(SQL_PL_DETAILS, {"Documento": self.detail_doc}) def _ok(res): # NON fermare lo spinner subito: lo farà _refresh_details_incremental rows = _expand_detail_rows_for_test(_rows_to_dicts(res)) _log_dataset("SQL_PL_DETAILS", rows) self._detail_cache[self.detail_doc] = rows # Avvia il rendering incrementale che mantiene l'overlay attivo self._refresh_details_incremental() def _err(ex): _MODULE_LOGGER.exception(f"Errore durante il caricamento dettagli del documento {self.detail_doc}: {ex}") self.spinner.stop() self.busy.hide() # Chiudi l'overlay in caso di errore messagebox.showerror("DB", f"Errore nel caricamento dettagli:\n{ex}") self.runner.run( _job(), on_success=_ok, on_error=_err, busy=None, # NON usare busy automatico: lo gestiamo manualmente nel rendering message=f"Carico UDC per Documento {self.detail_doc}…" ) # Mostra manualmente l'overlay per la query self.busy.show(f"Carico UDC per Documento {self.detail_doc}…") else: if not any(m.is_checked() for m in self.rows_models): self.detail_doc = None _MODULE_LOGGER.info("Nessun documento selezionato: ripristino placeholder del dettaglio.") self._refresh_details() # ----- load PL ----- @_log_call() def reload_from_db(self, first: bool = False, reselect_documento: str | None = None): """Load or reload the picking list summary table from the database.""" self.spinner.start(" Carico…") # spinner ON async def _job(): _log_sql("SQL_PL", SQL_PL, {}) return await self.db_client.query_json(SQL_PL, {}) def _on_success(res): rows = _rows_to_dicts(res) _log_dataset("SQL_PL", rows) self._refresh_mid_rows(rows) if reselect_documento: self.after_idle(lambda doc=reselect_documento: self._reselect_documento_after_reload(doc)) self.spinner.stop() # spinner OFF # se era il primo load, ripristina il cursore standard if self._first_loading: try: self.winfo_toplevel().configure(cursor="") except Exception: pass self._first_loading = False def _on_error(ex): _MODULE_LOGGER.exception(f"Errore durante il caricamento della picking list: {ex}") self.spinner.stop() if self._first_loading: try: self.winfo_toplevel().configure(cursor="") except Exception: pass self._first_loading = False messagebox.showerror("DB", f"Errore nel caricamento:\n{ex}") self.runner.run( _job(), on_success=_on_success, on_error=_on_error, busy=self.busy, message="Caricamento Picking List…" if first else "Aggiornamento…" ) @_log_call("DEBUG") def _refresh_details(self): """Render the detail table for the currently selected document.""" if not self.detail_doc: self._draw_details_hint() return rows = list(self._detail_cache.get(self.detail_doc, [])) rows = self._sort_detail_rows(rows) _MODULE_LOGGER.debug(f"Ridisegno tabella dettaglio per documento={self.detail_doc} righe={len(rows)}") if not rows: self._load_detail_sheet_data([["", "", "", "Nessuna UDC trovata.", "", ""]]) return self._load_detail_sheet_data(self._detail_rows_to_sheet_data(rows)) @_log_call("DEBUG") def _refresh_details_incremental(self, batch_size: int = 25): """ Render detail table using tksheet while keeping busy feedback consistent. """ if not self.detail_doc: self._draw_details_hint() self.spinner.stop() self.busy.hide() return rows = list(self._detail_cache.get(self.detail_doc, [])) rows = self._sort_detail_rows(rows) self._detail_cache[self.detail_doc] = rows _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Avvio rendering dettagli documento={self.detail_doc} righe={len(rows)}") if not rows: self._load_detail_sheet_data([["", "", "", "Nessuna UDC trovata.", "", ""]]) self.spinner.stop() self.busy.hide() return self.busy.show(f"Rendering {len(rows)} UDC...") self._load_detail_sheet_data(self._detail_rows_to_sheet_data(rows)) _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Rendering dettagli completato documento={self.detail_doc} righe={len(rows)}") self.spinner.stop() self.busy.hide() @_log_call("DEBUG") def _render_batch(self, rows: List[Dict[str, Any]], batch_size: int, start_idx: int, total_rows: int): """ Legacy helper kept for compatibility after the move to tksheet. """ del batch_size, start_idx, total_rows self._load_detail_sheet_data(self._detail_rows_to_sheet_data(rows)) # ----- azioni ----- @_log_call() def on_prenota(self): """Reserve the selected picking list.""" if not self._can("pickinglist.prenota"): messagebox.showwarning("Permesso negato", "L'operatore corrente non puo' prenotare picking list.", parent=self) return model = self._get_selected_model() if not model: messagebox.showinfo("Prenota", "Seleziona una Picking List (checkbox) prima di prenotare.") return documento = _s(model.pl.get("Documento")) current = int(model.pl.get("IDStato") or 0) desired = 1 if current == desired: messagebox.showinfo("Prenota", f"La Picking List {documento} è già prenotata.") return id_operatore = self._operator_id() if id_operatore <= 0: messagebox.showerror("Prenota", "Sessione operatore non valida.", parent=self) return _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Richiesta prenotazione documento={documento} id_operatore={id_operatore}") self.spinner.start(" Prenoto…") async def _job(): return await sp_xExePackingListPallet_async(self.db_client, id_operatore, documento, "P") def _ok(res: SPResult): _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Esito prenotazione documento={documento} rc={getattr(res, 'rc', None)} message={getattr(res, 'message', None)}") self.spinner.stop() if res and res.rc == 0: log_user_action( self.session, module=MODULE_LOG_NAME, action="pickinglist.prenota", outcome="ok", target=documento, ) self._detail_cache.pop(documento, None) self.reload_from_db(reselect_documento=documento) else: msg = (res.message if res else "Errore sconosciuto") log_user_action( self.session, module=MODULE_LOG_NAME, action="pickinglist.prenota", outcome="denied", target=documento, details={"message": msg}, ) messagebox.showerror("Prenota", f"Operazione non riuscita:\n{msg}") def _err(ex): _MODULE_LOGGER.exception(f"Errore prenotazione documento={documento}: {ex}") self.spinner.stop() log_user_action( self.session, module=MODULE_LOG_NAME, action="pickinglist.prenota", outcome="error", target=documento, details={"error": str(ex)}, ) messagebox.showerror("Prenota", f"Errore:\n{ex}") self.runner.run( _job(), on_success=_ok, on_error=_err, busy=self.busy, message=f"Prenoto la Picking List {documento}…" ) @_log_call() def on_sprenota(self): """Unreserve the selected picking list.""" if not self._can("pickinglist.sprenota"): messagebox.showwarning("Permesso negato", "L'operatore corrente non puo' s-prenotare picking list.", parent=self) return model = self._get_selected_model() if not model: messagebox.showinfo("S-prenota", "Seleziona una Picking List (checkbox) prima di s-prenotare.") return documento = _s(model.pl.get("Documento")) current = int(model.pl.get("IDStato") or 0) desired = 0 if current == desired: messagebox.showinfo("S-prenota", f"La Picking List {documento} è già NON prenotata.") return id_operatore = self._operator_id() if id_operatore <= 0: messagebox.showerror("S-prenota", "Sessione operatore non valida.", parent=self) return _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Richiesta s-prenotazione documento={documento} id_operatore={id_operatore}") self.spinner.start(" S-prenoto…") async def _job(): return await sp_xExePackingListPallet_async(self.db_client, id_operatore, documento, "S") def _ok(res: SPResult): _MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Esito s-prenotazione documento={documento} rc={getattr(res, 'rc', None)} message={getattr(res, 'message', None)}") self.spinner.stop() if res and res.rc == 0: log_user_action( self.session, module=MODULE_LOG_NAME, action="pickinglist.sprenota", outcome="ok", target=documento, ) self._detail_cache.pop(documento, None) self.reload_from_db(reselect_documento=documento) else: msg = (res.message if res else "Errore sconosciuto") log_user_action( self.session, module=MODULE_LOG_NAME, action="pickinglist.sprenota", outcome="denied", target=documento, details={"message": msg}, ) messagebox.showerror("S-prenota", f"Operazione non riuscita:\n{msg}") def _err(ex): _MODULE_LOGGER.exception(f"Errore s-prenotazione documento={documento}: {ex}") self.spinner.stop() log_user_action( self.session, module=MODULE_LOG_NAME, action="pickinglist.sprenota", outcome="error", target=documento, details={"error": str(ex)}, ) messagebox.showerror("S-prenota", f"Errore:\n{ex}") self.runner.run( _job(), on_success=_ok, on_error=_err, busy=self.busy, message=f"S-prenoto la Picking List {documento}…" ) def on_export(self): """Placeholder for a future export implementation.""" messagebox.showinfo("Esporta", "Stub esportazione.") # factory per main @_log_call() def create_frame(parent, *, db_client=None, conn_str=None, session: UserSession | None = None) -> 'GestionePickingListFrame': """Factory used by the launcher to build the picking list frame.""" ctk.set_appearance_mode("light") ctk.set_default_color_theme("green") return GestionePickingListFrame(parent, db_client=db_client, session=session) @_log_call() def open_pickinglist_window(parent: tk.Misc, db_client, session: UserSession | None = None) -> tk.Misc: """Open the picking list window while minimizing the first paint flicker.""" key = "_gestione_pickinglist_window_singleton" ex = getattr(parent, key, None) if ex and ex.winfo_exists(): frame = getattr(ex, "_pickinglist_frame", None) if frame is not None: try: frame.session = session except Exception: pass try: ex.deiconify() except Exception: pass try: ex.lift() ex.focus_force() return ex except Exception: pass win = ctk.CTkToplevel(parent) locale_catalog = load_locale_catalog() win.title(loc_text("picking.title", catalog=locale_catalog, default="Gestione Picking List")) theme = theme_section("pickinglist_window", {}) win.geometry(str(theme_value(theme, "window_geometry", "1200x700"))) minsize = theme_value(theme, "window_minsize", [1000, 560]) win.minsize(int(minsize[0]), int(minsize[1])) setattr(parent, key, win) # Keep the toplevel hidden until the child frame has built its initial layout. try: win.withdraw() win.attributes("-alpha", 0.0) except Exception: pass frame = create_frame(win, db_client=db_client, session=session) try: frame.pack(fill="both", expand=True) except Exception: pass setattr(win, "_pickinglist_frame", frame) # Reveal the fully-laid out window only after pending geometry work completes. try: win.update_idletasks() place_window_fullsize_below_parent_later(parent, win) win.after(340, lambda: frame._first_show()) win.after(360, lambda: win.lift() if getattr(win, "winfo_exists", lambda: False)() else None) win.after(380, lambda: win.focus_force() if getattr(win, "winfo_exists", lambda: False)() else None) except Exception: pass win.bind("", lambda e: win.destroy()) win.protocol("WM_DELETE_WINDOW", win.destroy) return win # =================== /gestione_pickinglist.py ===================