1264 lines
48 KiB
Python
1264 lines
48 KiB
Python
"""Picking list management window.
|
|
|
|
The module presents a master/detail UI for packing lists, supports reservation
|
|
and unreservation through an async stored-procedure port and keeps rendering
|
|
smooth by relying on deferred updates and lightweight progress indicators.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import json
|
|
import sys
|
|
import tkinter as tk
|
|
import customtkinter as ctk
|
|
from tkinter import messagebox
|
|
from typing import Optional, Any, Dict, List, Callable
|
|
from dataclasses import dataclass
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
from audit_log import log_user_action
|
|
try:
|
|
from loguru import logger
|
|
except Exception: # pragma: no cover - safety fallback if dependency is missing locally
|
|
class _FallbackLogger:
|
|
"""Minimal adapter used only when Loguru is not installed yet."""
|
|
|
|
def __init__(self):
|
|
self._logger = logging.getLogger(MODULE_LOG_NAME if "MODULE_LOG_NAME" in globals() else __name__)
|
|
self._logger.setLevel(logging.DEBUG)
|
|
self._logger.propagate = False
|
|
|
|
def bind(self, **_kwargs):
|
|
return self
|
|
|
|
def add(self, sink, level="INFO", format=None, encoding="utf-8", **_kwargs):
|
|
handler: logging.Handler
|
|
if hasattr(sink, "write"):
|
|
handler = logging.StreamHandler(sink)
|
|
else:
|
|
handler = logging.FileHandler(str(sink), encoding=encoding)
|
|
handler.setLevel(getattr(logging, str(level).upper(), logging.INFO))
|
|
handler.setFormatter(
|
|
logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
|
|
)
|
|
self._logger.addHandler(handler)
|
|
return 0
|
|
|
|
def log(self, level, message):
|
|
getattr(self._logger, str(level).lower(), self._logger.info)(message)
|
|
|
|
def debug(self, message):
|
|
self._logger.debug(message)
|
|
|
|
def info(self, message):
|
|
self._logger.info(message)
|
|
|
|
def exception(self, message):
|
|
self._logger.exception(message)
|
|
|
|
logger = _FallbackLogger()
|
|
|
|
try:
|
|
from tksheet import Sheet, natural_sort_key
|
|
except Exception:
|
|
Sheet = None # type: ignore[assignment]
|
|
natural_sort_key = None # type: ignore[assignment]
|
|
|
|
# Usa overlay e runner "collaudati"
|
|
from gestione_aree import BusyOverlay, AsyncRunner
|
|
from user_session import UserSession
|
|
|
|
# === IMPORT procedura async prenota/s-prenota (no pyodbc qui) ===
|
|
import asyncio
|
|
try:
|
|
from prenota_sprenota_sql import sp_xExePackingListPallet_async, SPResult
|
|
except Exception:
|
|
async def sp_xExePackingListPallet_async(*args, **kwargs):
|
|
raise RuntimeError("sp_xExePackingListPallet_async non importabile: verifica prenota_sprenota_sql.py")
|
|
class SPResult:
|
|
def __init__(self, rc=-1, message="Procedura non disponibile", id_result=None):
|
|
self.rc = rc; self.message = message; self.id_result = id_result
|
|
|
|
|
|
PICKINGLIST_LOG_MODE = "INFO" # "OFF" | "INFO" | "DEBUG"
|
|
PICKINGLIST_DETAIL_TEST_MULTIPLIER = 1 # 1 disables artificial row expansion for UI stress tests
|
|
MODULE_LOG_NAME = Path(__file__).stem
|
|
MODULE_LOG_PATH = Path(__file__).with_suffix(".log")
|
|
_MODULE_LOG_ENABLED = PICKINGLIST_LOG_MODE.upper() != "OFF"
|
|
_MODULE_LOG_LEVEL = "DEBUG" if PICKINGLIST_LOG_MODE.upper() == "DEBUG" else "INFO"
|
|
_MODULE_LOGGER = logger.bind(warehouse_module=MODULE_LOG_NAME)
|
|
_MODULE_LOGGING_CONFIGURED = False
|
|
|
|
|
|
def _configure_module_logger():
|
|
"""Configure console and file logging for this module."""
|
|
global _MODULE_LOGGING_CONFIGURED
|
|
if _MODULE_LOGGING_CONFIGURED:
|
|
return
|
|
if not _MODULE_LOG_ENABLED:
|
|
_MODULE_LOGGING_CONFIGURED = True
|
|
return
|
|
|
|
record_filter = lambda record: record["extra"].get("warehouse_module") == MODULE_LOG_NAME
|
|
|
|
logger.add(
|
|
sys.stderr,
|
|
level=_MODULE_LOG_LEVEL,
|
|
colorize=True,
|
|
filter=record_filter,
|
|
format=(
|
|
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
|
|
"<level>{level: <8}</level> | "
|
|
"<cyan>" + MODULE_LOG_NAME + "</cyan> | "
|
|
"<level>{message}</level>"
|
|
),
|
|
)
|
|
logger.add(
|
|
MODULE_LOG_PATH,
|
|
level=_MODULE_LOG_LEVEL,
|
|
colorize=False,
|
|
encoding="utf-8",
|
|
filter=record_filter,
|
|
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | " + MODULE_LOG_NAME + " | {message}",
|
|
)
|
|
_MODULE_LOGGING_CONFIGURED = True
|
|
|
|
|
|
def _format_payload(payload: Any) -> str:
|
|
"""Serialize payloads for human-readable logging."""
|
|
try:
|
|
return json.dumps(payload, ensure_ascii=False, indent=2, default=str)
|
|
except Exception:
|
|
return repr(payload)
|
|
|
|
|
|
def _log_call(level: Optional[str] = None):
|
|
"""Trace entry, exit and failure of selected high-level functions."""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
effective_level = level or _MODULE_LOG_LEVEL
|
|
_MODULE_LOGGER.log(
|
|
effective_level,
|
|
f"CALL {func.__qualname__} args={_format_payload(args[1:] if args else ())} kwargs={_format_payload(kwargs)}",
|
|
)
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
except Exception:
|
|
_MODULE_LOGGER.exception(f"FAIL {func.__qualname__}")
|
|
raise
|
|
_MODULE_LOGGER.log(effective_level, f"RETURN {func.__qualname__}")
|
|
return result
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def _log_sql(query_name: str, sql: str, params: Dict[str, Any]):
|
|
"""Log one SQL statement and its parameters."""
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} params={_format_payload(params)}")
|
|
_MODULE_LOGGER.debug(f"SQL {query_name} text:\n{sql.strip()}")
|
|
|
|
|
|
def _log_dataset(query_name: str, rows: List[Dict[str, Any]]):
|
|
"""Log query results at summary or full-debug level depending on the flag."""
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} returned {len(rows)} rows")
|
|
if PICKINGLIST_LOG_MODE.upper() == "DEBUG":
|
|
_MODULE_LOGGER.debug(f"SQL {query_name} dataset:\n{_format_payload(rows)}")
|
|
|
|
|
|
def _expand_detail_rows_for_test(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Artificially duplicate detail rows to stress-test the UI with larger datasets."""
|
|
multiplier = max(1, int(PICKINGLIST_DETAIL_TEST_MULTIPLIER))
|
|
if multiplier == 1 or not rows:
|
|
return rows
|
|
|
|
expanded: List[Dict[str, Any]] = []
|
|
for copy_idx in range(multiplier):
|
|
for row_idx, row in enumerate(rows):
|
|
cloned = dict(row)
|
|
cloned["__test_copy__"] = copy_idx
|
|
cloned["__test_row__"] = row_idx
|
|
expanded.append(cloned)
|
|
_MODULE_LOGGER.info(
|
|
f"Dataset dettaglio espanso artificialmente da {len(rows)} a {len(expanded)} righe per test UI"
|
|
)
|
|
return expanded
|
|
|
|
|
|
_configure_module_logger()
|
|
if _MODULE_LOG_ENABLED:
|
|
_MODULE_LOGGER.info(
|
|
f"Logging inizializzato su {MODULE_LOG_PATH.name} livello={_MODULE_LOG_LEVEL} mode={PICKINGLIST_LOG_MODE.upper()}"
|
|
)
|
|
|
|
|
|
# -------------------- SQL --------------------
|
|
SQL_PL = """
|
|
SELECT
|
|
COUNT(DISTINCT Pallet) AS Pallet,
|
|
COUNT(DISTINCT Lotto) AS Lotto,
|
|
COUNT(DISTINCT Articolo) AS Articolo,
|
|
COUNT(DISTINCT Descrizione) AS Descrizione,
|
|
SUM(Qta) AS Qta,
|
|
Documento,
|
|
CodNazione,
|
|
NAZIONE,
|
|
Stato,
|
|
MAX(PalletCella) AS PalletCella,
|
|
MAX(Magazzino) AS Magazzino,
|
|
MAX(Area) AS Area,
|
|
MAX(Cella) AS Cella,
|
|
MIN(Ordinamento) AS Ordinamento,
|
|
MAX(IDStato) AS IDStato
|
|
FROM dbo.XMag_ViewPackingList
|
|
GROUP BY Documento, CodNazione, NAZIONE, Stato
|
|
ORDER BY MIN(Ordinamento), Documento, NAZIONE, Stato;
|
|
"""
|
|
|
|
SQL_PL_DETAILS = """
|
|
SELECT *
|
|
FROM ViewPackingListRestante
|
|
WHERE Documento = :Documento
|
|
ORDER BY Ordinamento;
|
|
"""
|
|
|
|
# -------------------- helpers --------------------
|
|
def _rows_to_dicts(res: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Converte il payload ritornato da query_json in lista di dict.
|
|
Supporta:
|
|
- res = [ {..}, {..} ]
|
|
- res = { "rows": [..], "columns": [...] }
|
|
- res = { "data": [..], "columns": [...] }
|
|
- res = { "rows": [tuple,..], "columns": [...] }
|
|
"""
|
|
if res is None:
|
|
return []
|
|
|
|
if isinstance(res, list):
|
|
if not res:
|
|
return []
|
|
if isinstance(res[0], dict):
|
|
return res
|
|
return []
|
|
|
|
if isinstance(res, dict):
|
|
for rows_key in ("rows", "data", "result", "records"):
|
|
if rows_key in res and isinstance(res[rows_key], list):
|
|
rows = res[rows_key]
|
|
if not rows:
|
|
return []
|
|
if isinstance(rows[0], dict):
|
|
return rows
|
|
cols = res.get("columns") or res.get("cols") or []
|
|
out = []
|
|
for r in rows:
|
|
if cols and isinstance(r, (list, tuple)):
|
|
out.append({ (cols[i] if i < len(cols) else f"c{i}") : r[i]
|
|
for i in range(min(len(cols), len(r))) })
|
|
else:
|
|
if isinstance(r, (list, tuple)):
|
|
out.append({ f"c{i}": r[i] for i in range(len(r)) })
|
|
return out
|
|
if res and all(not isinstance(v, (list, tuple, dict)) for v in res.values()):
|
|
return [res]
|
|
|
|
return []
|
|
|
|
def _s(v) -> str:
|
|
"""Return a string representation, converting ``None`` to an empty string."""
|
|
return "" if v is None else str(v)
|
|
|
|
def _first(d: Dict[str, Any], keys: List[str], default: str = ""):
|
|
"""Return the first non-empty value found among the provided keys."""
|
|
for k in keys:
|
|
if k in d and d[k] not in (None, ""):
|
|
return d[k]
|
|
return default
|
|
|
|
# -------------------- column specs --------------------
|
|
@dataclass
|
|
class ColSpec:
|
|
"""Describe one logical column rendered in a ``ScrollTable``."""
|
|
|
|
title: str
|
|
key: str
|
|
width: int
|
|
anchor: str # 'w' | 'e' | 'center'
|
|
|
|
# Colonne PL (in alto) — include IDStato per la colorazione
|
|
PL_COLS: List[ColSpec] = [
|
|
ColSpec("", "__check__", 36, "w"),
|
|
ColSpec("Documento", "Documento", 120, "w"),
|
|
ColSpec("NAZIONE", "NAZIONE", 240, "w"),
|
|
ColSpec("Stato", "Stato", 110, "w"),
|
|
ColSpec("IDStato", "IDStato", 80, "e"), # nuova colonna
|
|
ColSpec("#Pallet", "Pallet", 100, "e"),
|
|
ColSpec("#Lotti", "Lotto", 100, "e"),
|
|
ColSpec("#Articoli", "Articolo", 110, "e"),
|
|
ColSpec("Qta", "Qta", 120, "e"),
|
|
]
|
|
|
|
DET_COLS: List[ColSpec] = [
|
|
ColSpec("UDC/Pallet", "Pallet", 150, "w"),
|
|
ColSpec("Lotto", "Lotto", 130, "w"),
|
|
ColSpec("Articolo", "Articolo", 150, "w"),
|
|
ColSpec("Descrizione","Descrizione", 320, "w"),
|
|
ColSpec("Qta", "Qta", 110, "e"),
|
|
ColSpec("Ubicazione", "Ubicazione", 320, "w"),
|
|
]
|
|
|
|
ROW_H = 28
|
|
|
|
|
|
# -------------------- Micro spinner (toolbar) --------------------
|
|
class ToolbarSpinner:
|
|
"""
|
|
Micro-animazione leggerissima per indicare attività:
|
|
mostra una label con frame: ◐ ◓ ◑ ◒ ... finché è attivo.
|
|
"""
|
|
FRAMES = ("◐", "◓", "◑", "◒")
|
|
def __init__(self, parent: tk.Widget):
|
|
"""Create the spinner label attached to the given parent widget."""
|
|
self.parent = parent
|
|
self.lbl = ctk.CTkLabel(parent, text="", width=28)
|
|
self._i = 0
|
|
self._active = False
|
|
self._job = None
|
|
|
|
def widget(self) -> ctk.CTkLabel:
|
|
"""Return the label widget hosting the spinner animation."""
|
|
return self.lbl
|
|
|
|
def start(self, text: str = ""):
|
|
"""Start the animation and optionally show a short status message."""
|
|
if self._active:
|
|
return
|
|
self._active = True
|
|
self.lbl.configure(text=f"{self.FRAMES[self._i]} {text}".strip())
|
|
self._tick()
|
|
|
|
def stop(self):
|
|
"""Stop the animation and clear the label text."""
|
|
self._active = False
|
|
if self._job is not None:
|
|
try:
|
|
self.parent.after_cancel(self._job)
|
|
except Exception:
|
|
pass
|
|
self._job = None
|
|
self.lbl.configure(text="")
|
|
|
|
def _tick(self):
|
|
"""Advance the spinner animation frame."""
|
|
if not self._active:
|
|
return
|
|
self._i = (self._i + 1) % len(self.FRAMES)
|
|
current = self.lbl.cget("text")
|
|
# Mantieni l'eventuale testo dopo il simbolo
|
|
txt_suffix = ""
|
|
if isinstance(current, str) and len(current) > 2:
|
|
txt_suffix = current[2:]
|
|
self.lbl.configure(text=f"{self.FRAMES[self._i]}{txt_suffix}")
|
|
self._job = self.parent.after(120, self._tick) # 8 fps soft
|
|
|
|
|
|
# -------------------- Scrollable table --------------------
|
|
class ScrollTable(ctk.CTkFrame):
|
|
GRID_COLOR = "#D0D5DD"
|
|
PADX_L = 8
|
|
PADX_R = 8
|
|
PADY = 2
|
|
|
|
def __init__(
|
|
self,
|
|
master,
|
|
columns: List[ColSpec],
|
|
on_header_click: Optional[Callable[[ColSpec], None]] = None,
|
|
):
|
|
"""Create a fixed-header scrollable table rendered with Tk/CTk widgets."""
|
|
super().__init__(master)
|
|
self.columns = columns
|
|
self.on_header_click = on_header_click
|
|
self._sort_key: Optional[str] = None
|
|
self._sort_reverse = False
|
|
self.total_w = sum(c.width for c in self.columns)
|
|
|
|
self.grid_rowconfigure(1, weight=1)
|
|
self.grid_columnconfigure(0, weight=1)
|
|
|
|
# header
|
|
self.h_canvas = tk.Canvas(self, height=ROW_H, highlightthickness=0, bd=0)
|
|
self.h_inner = ctk.CTkFrame(self.h_canvas, fg_color="#f3f3f3",
|
|
height=ROW_H, width=self.total_w)
|
|
self.h_canvas.create_window((0,0), window=self.h_inner, anchor="nw",
|
|
width=self.total_w, height=ROW_H)
|
|
self.h_canvas.grid(row=0, column=0, sticky="ew")
|
|
|
|
# body
|
|
self.b_canvas = tk.Canvas(self, highlightthickness=0, bd=0)
|
|
self.b_inner = ctk.CTkFrame(self.b_canvas, fg_color="transparent",
|
|
width=self.total_w)
|
|
self.body_window = self.b_canvas.create_window((0,0), window=self.b_inner,
|
|
anchor="nw", width=self.total_w)
|
|
self.b_canvas.grid(row=1, column=0, sticky="nsew")
|
|
|
|
# scrollbars
|
|
self.vbar = tk.Scrollbar(self, orient="vertical", command=self.b_canvas.yview)
|
|
self.xbar = tk.Scrollbar(self, orient="horizontal", command=self._xscroll_both)
|
|
self.vbar.grid(row=1, column=1, sticky="ns")
|
|
self.xbar.grid(row=2, column=0, sticky="ew")
|
|
|
|
# link scroll
|
|
self.b_canvas.configure(yscrollcommand=self.vbar.set, xscrollcommand=self._xscroll_set_both)
|
|
self.h_canvas.configure(xscrollcommand=self.xbar.set)
|
|
|
|
# bind
|
|
self.h_inner.bind("<Configure>", lambda e: self._sync_header_width())
|
|
self.b_inner.bind("<Configure>", lambda e: self._on_body_configure())
|
|
self.b_canvas.bind("<Enter>", self._bind_mousewheel)
|
|
self.b_canvas.bind("<Leave>", self._unbind_mousewheel)
|
|
self.b_inner.bind("<Enter>", self._bind_mousewheel)
|
|
self.b_inner.bind("<Leave>", self._unbind_mousewheel)
|
|
|
|
self._build_header()
|
|
|
|
def _build_header(self):
|
|
"""Build the static header row using the configured columns."""
|
|
for w in self.h_inner.winfo_children():
|
|
w.destroy()
|
|
|
|
row = ctk.CTkFrame(self.h_inner, fg_color="#f3f3f3",
|
|
height=ROW_H, width=self.total_w)
|
|
row.pack(fill="x", expand=False)
|
|
row.pack_propagate(False)
|
|
|
|
for col in self.columns:
|
|
holder = ctk.CTkFrame(
|
|
row, fg_color="#f3f3f3",
|
|
width=col.width, height=ROW_H,
|
|
border_width=1, border_color=self.GRID_COLOR
|
|
)
|
|
holder.pack(side="left", fill="y")
|
|
holder.pack_propagate(False)
|
|
|
|
header_text = col.title
|
|
if col.key == self._sort_key:
|
|
header_text = f"{col.title} {'↓' if self._sort_reverse else '↑'}"
|
|
|
|
lbl = ctk.CTkLabel(holder, text=header_text, anchor="w")
|
|
lbl.pack(fill="both", padx=(self.PADX_L, self.PADX_R), pady=self.PADY)
|
|
|
|
if self.on_header_click and col.key != "__check__":
|
|
for widget in (holder, lbl):
|
|
widget.bind("<Button-1>", lambda e, c=col: self.on_header_click(c))
|
|
widget.configure(cursor="hand2")
|
|
|
|
self.h_inner.configure(width=self.total_w, height=ROW_H)
|
|
self.h_canvas.configure(scrollregion=(0,0,self.total_w,ROW_H))
|
|
|
|
def set_sort_state(self, key: Optional[str], reverse: bool = False):
|
|
"""Update the header labels so the active sort is visible."""
|
|
self._sort_key = key
|
|
self._sort_reverse = reverse
|
|
self._build_header()
|
|
|
|
def _update_body_width(self):
|
|
"""Keep the scroll region aligned with the current body content width."""
|
|
self.b_canvas.itemconfigure(self.body_window, width=self.total_w)
|
|
sr = self.b_canvas.bbox("all")
|
|
if sr:
|
|
self.b_canvas.configure(scrollregion=(0,0,max(self.total_w, sr[2]), sr[3]))
|
|
else:
|
|
self.b_canvas.configure(scrollregion=(0,0,self.total_w,0))
|
|
|
|
def _on_body_configure(self):
|
|
"""React to body resize events by syncing dimensions and header scroll."""
|
|
self._update_body_width()
|
|
self._sync_header_width()
|
|
|
|
def _sync_header_width(self):
|
|
"""Mirror the body horizontal scroll position on the header canvas."""
|
|
first, _ = self.b_canvas.xview()
|
|
self.h_canvas.xview_moveto(first)
|
|
|
|
def _xscroll_both(self, *args):
|
|
"""Scroll header and body together when the horizontal bar moves."""
|
|
self.h_canvas.xview(*args)
|
|
self.b_canvas.xview(*args)
|
|
|
|
def _xscroll_set_both(self, first, last):
|
|
"""Update the header viewport and scrollbar thumb in one place."""
|
|
self.h_canvas.xview_moveto(first)
|
|
self.xbar.set(first, last)
|
|
|
|
def _bind_mousewheel(self, _event=None):
|
|
"""Route mouse-wheel scrolling to the body canvas while the cursor is over the table."""
|
|
self.b_canvas.bind_all("<MouseWheel>", self._on_mousewheel)
|
|
|
|
def _unbind_mousewheel(self, _event=None):
|
|
"""Stop routing global mouse-wheel events when the pointer leaves the table."""
|
|
self.b_canvas.unbind_all("<MouseWheel>")
|
|
|
|
def _on_mousewheel(self, event):
|
|
"""Scroll the body canvas vertically in response to wheel movement."""
|
|
delta = getattr(event, "delta", 0)
|
|
if delta == 0:
|
|
return
|
|
step = -1 if delta > 0 else 1
|
|
self.b_canvas.yview_scroll(step, "units")
|
|
|
|
def clear_rows(self):
|
|
"""Remove all rendered body rows."""
|
|
for w in self.b_inner.winfo_children():
|
|
w.destroy()
|
|
self._update_body_width()
|
|
|
|
def add_row(
|
|
self,
|
|
values: List[str],
|
|
row_index: int,
|
|
anchors: Optional[List[str]] = None,
|
|
checkbox_builder: Optional[Callable[[tk.Widget], ctk.CTkCheckBox]] = None,
|
|
):
|
|
"""Append one row to the table body."""
|
|
row = ctk.CTkFrame(self.b_inner, fg_color="transparent",
|
|
height=ROW_H, width=self.total_w)
|
|
row.pack(fill="x", expand=False)
|
|
row.pack_propagate(False)
|
|
|
|
for i, col in enumerate(self.columns):
|
|
holder = ctk.CTkFrame(
|
|
row, fg_color="transparent",
|
|
width=col.width, height=ROW_H,
|
|
border_width=1, border_color=self.GRID_COLOR
|
|
)
|
|
holder.pack(side="left", fill="y")
|
|
holder.pack_propagate(False)
|
|
|
|
if col.key == "__check__":
|
|
if checkbox_builder:
|
|
cb = checkbox_builder(holder)
|
|
cb.pack(padx=(self.PADX_L, self.PADX_R), pady=self.PADY, anchor="w")
|
|
else:
|
|
ctk.CTkLabel(holder, text="").pack(fill="both")
|
|
else:
|
|
anchor = (anchors[i] if anchors else col.anchor)
|
|
ctk.CTkLabel(holder, text=values[i], anchor=anchor).pack(
|
|
fill="both", padx=(self.PADX_L, self.PADX_R), pady=self.PADY
|
|
)
|
|
|
|
self._update_body_width()
|
|
|
|
|
|
# -------------------- PL row model --------------------
|
|
class PLRow:
|
|
"""State holder for one picking list row and its selection checkbox."""
|
|
|
|
def __init__(self, pl: Dict[str, Any], on_check):
|
|
"""Bind a picking list payload to a ``BooleanVar`` and callback."""
|
|
self.pl = pl
|
|
self.var = ctk.BooleanVar(value=False)
|
|
self._callback = on_check
|
|
|
|
def is_checked(self) -> bool:
|
|
"""Return whether the row is currently selected."""
|
|
return self.var.get()
|
|
|
|
def set_checked(self, val: bool):
|
|
"""Programmatically update the checkbox state."""
|
|
self.var.set(val)
|
|
|
|
def build_checkbox(self, parent) -> ctk.CTkCheckBox:
|
|
"""Create the checkbox widget bound to this row model."""
|
|
return ctk.CTkCheckBox(parent, text="", variable=self.var,
|
|
command=lambda: self._callback(self, self.var.get()))
|
|
|
|
|
|
# -------------------- main frame (no-flicker + UX tuning + spinner) --------------------
|
|
class GestionePickingListFrame(ctk.CTkFrame):
|
|
@_log_call()
|
|
def __init__(self, master, *, db_client=None, conn_str=None, session: UserSession | None = None):
|
|
"""Create the master/detail picking list frame."""
|
|
super().__init__(master)
|
|
if db_client is None:
|
|
raise ValueError("GestionePickingListFrame richiede un db_client condiviso.")
|
|
self.db_client = db_client
|
|
self.session = session
|
|
self.runner = AsyncRunner(self) # runner condiviso (usa loop globale)
|
|
self.busy = BusyOverlay(self) # overlay collaudato
|
|
|
|
self.rows_models: list[PLRow] = []
|
|
self._detail_cache: Dict[Any, list] = {}
|
|
self.detail_doc = None
|
|
self._detail_sort_key: Optional[str] = None
|
|
self._detail_sort_reverse = False
|
|
self._detail_sorting = False
|
|
|
|
self._first_loading: bool = False # flag per cursore d'attesa solo al primo load
|
|
self._render_job = None # Tracking del job di rendering in corso
|
|
|
|
self._build_layout()
|
|
# 🔇 Niente reload immediato: carichiamo quando la finestra è idle (= già resa)
|
|
self.after_idle(self._first_show)
|
|
|
|
def _can(self, action: str) -> bool:
|
|
"""Return whether the current user can execute one picking-list action."""
|
|
|
|
return self.session.can(action) if self.session else False
|
|
|
|
def _operator_id(self) -> int:
|
|
"""Return the authenticated operator id or ``0`` if no session is present."""
|
|
|
|
return int(self.session.operator_id) if self.session else 0
|
|
|
|
def _first_show(self):
|
|
"""Chiamato a finestra già resa → evitiamo sfarfallio del primo paint e mostriamo wait-cursor."""
|
|
self._first_loading = True
|
|
try:
|
|
self.winfo_toplevel().configure(cursor="watch")
|
|
except Exception:
|
|
pass
|
|
# spinner inizia
|
|
self.spinner.start(" Carico…")
|
|
self.reload_from_db(first=True)
|
|
|
|
# ---------- UI ----------
|
|
def _build_layout(self):
|
|
"""Build toolbar, master table and detail table."""
|
|
for r in (1, 3): self.grid_rowconfigure(r, weight=1)
|
|
self.grid_columnconfigure(0, weight=1)
|
|
|
|
top = ctk.CTkFrame(self)
|
|
top.grid(row=0, column=0, sticky="ew", padx=10, pady=(8,4))
|
|
for i, (text, cmd) in enumerate([
|
|
("Ricarica", self.reload_from_db),
|
|
("Prenota", self.on_prenota),
|
|
("S-prenota", self.on_sprenota),
|
|
("Esporta XLSX", self.on_export)
|
|
]):
|
|
ctk.CTkButton(top, text=text, command=cmd).grid(row=0, column=i, padx=6)
|
|
|
|
# --- micro spinner a destra della toolbar ---
|
|
self.spinner = ToolbarSpinner(top)
|
|
self.spinner.widget().grid(row=0, column=10, padx=(8,0)) # largo spazio a destra
|
|
|
|
self.pl_table = ScrollTable(self, PL_COLS)
|
|
self.pl_table.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4,8))
|
|
|
|
self.det_host = tk.Frame(self, bd=0, highlightthickness=0)
|
|
self.det_host.grid(row=3, column=0, sticky="nsew", padx=10, pady=(4,10))
|
|
self.det_host.grid_rowconfigure(0, weight=1)
|
|
self.det_host.grid_columnconfigure(0, weight=1)
|
|
self._build_detail_sheet()
|
|
|
|
self._draw_details_hint()
|
|
|
|
def _build_detail_sheet(self):
|
|
"""Create the high-volume detail table using tksheet."""
|
|
if Sheet is None:
|
|
raise RuntimeError("tksheet non disponibile: installa la dipendenza per usare la tabella dettagli.")
|
|
|
|
self.detail_sheet = Sheet(
|
|
self.det_host,
|
|
data=[],
|
|
show_row_index=False,
|
|
show_top_left=False,
|
|
width=1000,
|
|
height=320,
|
|
sort_key=natural_sort_key,
|
|
)
|
|
self.detail_sheet.change_theme("light green")
|
|
self.detail_sheet.enable_bindings("all")
|
|
self.detail_sheet.headers(self._detail_headers(), redraw=False)
|
|
self.detail_sheet.bind("<ButtonRelease-1>", self._on_detail_sheet_left_click, add="+")
|
|
self.detail_sheet.grid(row=0, column=0, sticky="nsew")
|
|
|
|
def _draw_details_hint(self):
|
|
"""Render the placeholder row shown when no document is selected."""
|
|
self._load_detail_sheet_data(
|
|
[["", "", "", "Seleziona una Picking List per vedere le UDC...", "", ""]]
|
|
)
|
|
|
|
def _detail_headers(self) -> List[str]:
|
|
"""Return detail headers with the active sort indicator, if any."""
|
|
headers: List[str] = []
|
|
for col in DET_COLS:
|
|
title = col.title
|
|
if col.key == self._detail_sort_key:
|
|
title = f"{title} {'[desc]' if self._detail_sort_reverse else '[asc]'}"
|
|
headers.append(title)
|
|
return headers
|
|
|
|
def _detail_rows_to_sheet_data(self, rows: List[Dict[str, Any]]) -> List[List[str]]:
|
|
"""Convert detail dictionaries to the row format expected by tksheet."""
|
|
data: List[List[str]] = []
|
|
for d in rows:
|
|
pallet = _s(_first(d, ["Pallet", "UDC", "PalletID"]))
|
|
lotto = _s(_first(d, ["Lotto"]))
|
|
articolo = _s(_first(d, ["Articolo", "CodArticolo", "CodiceArticolo", "Art", "Codice"]))
|
|
descr = _s(_first(d, ["Descrizione", "Descr", "DescrArticolo", "DescArticolo", "DesArticolo"]))
|
|
qta = _s(_first(d, ["Qta", "Quantita", "Qty", "QTY"]))
|
|
ubi_raw = _first(d, ["Ubicazione", "Cella", "PalletCella"])
|
|
loc = "Non scaffalata" if (ubi_raw is None or str(ubi_raw).strip() == "") else str(ubi_raw).strip()
|
|
data.append([pallet, lotto, articolo, descr, qta, loc])
|
|
return data
|
|
|
|
def _load_detail_sheet_data(self, data: List[List[str]]):
|
|
"""Push one full dataset into the tksheet detail widget."""
|
|
self.detail_sheet.headers(self._detail_headers(), redraw=False)
|
|
self.detail_sheet.set_sheet_data(
|
|
data,
|
|
reset_col_positions=True,
|
|
reset_row_positions=True,
|
|
redraw=True,
|
|
)
|
|
self.detail_sheet.set_all_column_widths()
|
|
|
|
def _detail_sort_value(self, row: Dict[str, Any], key: str):
|
|
"""Return a normalized value used to sort detail rows by one logical column."""
|
|
if key == "Ubicazione":
|
|
value = _first(row, ["Ubicazione", "Cella", "PalletCella"])
|
|
value = "Non scaffalata" if value in (None, "") else value
|
|
elif key == "Qta":
|
|
value = _first(row, ["Qta", "Quantita", "Qty", "QTY"], 0)
|
|
try:
|
|
return (0, float(value))
|
|
except Exception:
|
|
return (1, _s(value).lower())
|
|
elif key == "Articolo":
|
|
value = _first(row, ["Articolo", "CodArticolo", "CodiceArticolo", "Art", "Codice"])
|
|
elif key == "Descrizione":
|
|
value = _first(row, ["Descrizione", "Descr", "DescrArticolo", "DescArticolo", "DesArticolo"])
|
|
elif key == "Pallet":
|
|
value = _first(row, ["Pallet", "UDC", "PalletID"])
|
|
else:
|
|
value = row.get(key)
|
|
|
|
return (0, _s(value).lower())
|
|
|
|
def _sort_detail_rows(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Return detail rows sorted using the current header state."""
|
|
if not self._detail_sort_key:
|
|
return rows
|
|
|
|
return sorted(
|
|
rows,
|
|
key=lambda row: self._detail_sort_value(row, self._detail_sort_key),
|
|
reverse=self._detail_sort_reverse,
|
|
)
|
|
|
|
def _finish_detail_sort_feedback(self, root: tk.Misc):
|
|
"""Dismiss busy feedback only after Tk has flushed the detail redraw."""
|
|
try:
|
|
root.update_idletasks()
|
|
except Exception:
|
|
pass
|
|
|
|
self._detail_sorting = False
|
|
self.spinner.stop()
|
|
self.busy.hide()
|
|
try:
|
|
self.detail_sheet.configure(cursor="")
|
|
root.configure(cursor="")
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_detail_header_click(self, col: ColSpec):
|
|
"""Toggle detail sorting when the user clicks a detail header."""
|
|
if not self.detail_doc or self._detail_sorting:
|
|
return
|
|
|
|
if self._detail_sort_key == col.key:
|
|
self._detail_sort_reverse = not self._detail_sort_reverse
|
|
else:
|
|
self._detail_sort_key = col.key
|
|
self._detail_sort_reverse = False
|
|
|
|
self._detail_sorting = True
|
|
self.spinner.start(" Ordino dettagli...")
|
|
self.busy.show(f"Ordinamento per {col.title}...")
|
|
root = self.winfo_toplevel()
|
|
try:
|
|
root.configure(cursor="watch")
|
|
self.detail_sheet.configure(cursor="watch")
|
|
root.update_idletasks()
|
|
except Exception:
|
|
pass
|
|
|
|
def _apply_sort():
|
|
try:
|
|
rows = list(self._detail_cache.get(self.detail_doc, []))
|
|
rows = self._sort_detail_rows(rows)
|
|
self._detail_cache[self.detail_doc] = rows
|
|
self._refresh_details()
|
|
finally:
|
|
# Wait one more UI turn so the redraw becomes visible before removing feedback.
|
|
self.after_idle(lambda r=root: self.after(15, lambda: self._finish_detail_sort_feedback(r)))
|
|
|
|
self.after(25, _apply_sort)
|
|
|
|
def _on_detail_sheet_left_click(self, event):
|
|
"""Sort detail rows when the user clicks a tksheet header cell."""
|
|
try:
|
|
region = self.detail_sheet.identify_region(event)
|
|
column = self.detail_sheet.identify_column(event, exclude_header=False, allow_end=False)
|
|
except Exception:
|
|
return
|
|
|
|
if region != "header" or column is None or column < 0 or column >= len(DET_COLS):
|
|
return
|
|
|
|
self._on_detail_header_click(DET_COLS[column])
|
|
|
|
def _apply_row_colors(self, rows: List[Dict[str, Any]]):
|
|
"""Colorazione differita (after_idle) per evitare micro-jank durante l'inserimento righe."""
|
|
try:
|
|
for idx, d in enumerate(rows):
|
|
row_widget = self.pl_table.b_inner.winfo_children()[idx]
|
|
if int(d.get("IDStato") or 0) == 1:
|
|
row_widget.configure(fg_color="#ffe6f2") # rosa tenue
|
|
else:
|
|
row_widget.configure(fg_color="transparent")
|
|
except Exception:
|
|
pass
|
|
|
|
def _refresh_mid_rows(self, rows: List[Dict[str, Any]]):
|
|
"""Rebuild the master table using the latest query results."""
|
|
self.pl_table.clear_rows()
|
|
self.rows_models.clear()
|
|
|
|
for r, d in enumerate(rows):
|
|
model = PLRow(d, self.on_row_checked)
|
|
self.rows_models.append(model)
|
|
values = [
|
|
"", # checkbox
|
|
_s(d.get("Documento")),
|
|
_s(d.get("NAZIONE")),
|
|
_s(d.get("Stato")),
|
|
_s(d.get("IDStato")), # nuova colonna visibile
|
|
_s(d.get("Pallet")),
|
|
_s(d.get("Lotto")),
|
|
_s(d.get("Articolo")),
|
|
_s(d.get("Qta")),
|
|
]
|
|
self.pl_table.add_row(
|
|
values=values,
|
|
row_index=r,
|
|
anchors=[c.anchor for c in PL_COLS],
|
|
checkbox_builder=model.build_checkbox
|
|
)
|
|
|
|
# 🎯 Colora dopo che la UI è resa → no balzi visivi
|
|
self.after_idle(lambda: self._apply_row_colors(rows))
|
|
|
|
# ----- helpers -----
|
|
def _get_selected_model(self) -> Optional[PLRow]:
|
|
"""Return the currently checked picking list row, if any."""
|
|
for m in self.rows_models:
|
|
if m.is_checked():
|
|
return m
|
|
return None
|
|
|
|
def _recolor_row_by_documento(self, documento: str, idstato: int):
|
|
"""Aggiorna colore riga e cella IDStato per il Documento indicato."""
|
|
for idx, m in enumerate(self.rows_models):
|
|
if _s(m.pl.get("Documento")) == _s(documento):
|
|
m.pl["IDStato"] = idstato
|
|
def _paint():
|
|
try:
|
|
row_widget = self.pl_table.b_inner.winfo_children()[idx]
|
|
row_widget.configure(fg_color="#ffe6f2" if idstato == 1 else "transparent")
|
|
row_children = row_widget.winfo_children()
|
|
if len(row_children) >= 5:
|
|
holder = row_children[4]
|
|
if holder.winfo_children():
|
|
lbl = holder.winfo_children()[0]
|
|
if hasattr(lbl, "configure"):
|
|
lbl.configure(text=str(idstato))
|
|
except Exception:
|
|
pass
|
|
# differisci la colorazione (smooth)
|
|
self.after_idle(_paint)
|
|
break
|
|
|
|
def _reselect_documento_after_reload(self, documento: str):
|
|
"""(Opzionale) Dopo un reload DB, riseleziona la PL con lo stesso Documento."""
|
|
for m in self.rows_models:
|
|
if _s(m.pl.get("Documento")) == _s(documento):
|
|
m.set_checked(True)
|
|
self.on_row_checked(m, True)
|
|
break
|
|
|
|
# ----- eventi -----
|
|
@_log_call()
|
|
def on_row_checked(self, model: PLRow, is_checked: bool):
|
|
"""Handle row selection changes and refresh the detail section."""
|
|
# selezione esclusiva
|
|
if is_checked:
|
|
for m in self.rows_models:
|
|
if m is not model and m.is_checked():
|
|
m.set_checked(False)
|
|
|
|
self.detail_doc = model.pl.get("Documento")
|
|
_MODULE_LOGGER.info(f"Documento selezionato per il dettaglio: {self.detail_doc}")
|
|
self.spinner.start(" Carico dettagli…") # spinner ON
|
|
|
|
async def _job():
|
|
_log_sql("SQL_PL_DETAILS", SQL_PL_DETAILS, {"Documento": self.detail_doc})
|
|
return await self.db_client.query_json(SQL_PL_DETAILS, {"Documento": self.detail_doc})
|
|
|
|
def _ok(res):
|
|
# NON fermare lo spinner subito: lo farà _refresh_details_incremental
|
|
rows = _expand_detail_rows_for_test(_rows_to_dicts(res))
|
|
_log_dataset("SQL_PL_DETAILS", rows)
|
|
self._detail_cache[self.detail_doc] = rows
|
|
# Avvia il rendering incrementale che mantiene l'overlay attivo
|
|
self._refresh_details_incremental()
|
|
|
|
def _err(ex):
|
|
_MODULE_LOGGER.exception(f"Errore durante il caricamento dettagli del documento {self.detail_doc}: {ex}")
|
|
self.spinner.stop()
|
|
self.busy.hide() # Chiudi l'overlay in caso di errore
|
|
messagebox.showerror("DB", f"Errore nel caricamento dettagli:\n{ex}")
|
|
|
|
self.runner.run(
|
|
_job(),
|
|
on_success=_ok,
|
|
on_error=_err,
|
|
busy=None, # NON usare busy automatico: lo gestiamo manualmente nel rendering
|
|
message=f"Carico UDC per Documento {self.detail_doc}…"
|
|
)
|
|
# Mostra manualmente l'overlay per la query
|
|
self.busy.show(f"Carico UDC per Documento {self.detail_doc}…")
|
|
|
|
else:
|
|
if not any(m.is_checked() for m in self.rows_models):
|
|
self.detail_doc = None
|
|
_MODULE_LOGGER.info("Nessun documento selezionato: ripristino placeholder del dettaglio.")
|
|
self._refresh_details()
|
|
|
|
# ----- load PL -----
|
|
@_log_call()
|
|
def reload_from_db(self, first: bool = False):
|
|
"""Load or reload the picking list summary table from the database."""
|
|
self.spinner.start(" Carico…") # spinner ON
|
|
async def _job():
|
|
_log_sql("SQL_PL", SQL_PL, {})
|
|
return await self.db_client.query_json(SQL_PL, {})
|
|
def _on_success(res):
|
|
rows = _rows_to_dicts(res)
|
|
_log_dataset("SQL_PL", rows)
|
|
self._refresh_mid_rows(rows)
|
|
self.spinner.stop() # spinner OFF
|
|
# se era il primo load, ripristina il cursore standard
|
|
if self._first_loading:
|
|
try:
|
|
self.winfo_toplevel().configure(cursor="")
|
|
except Exception:
|
|
pass
|
|
self._first_loading = False
|
|
def _on_error(ex):
|
|
_MODULE_LOGGER.exception(f"Errore durante il caricamento della picking list: {ex}")
|
|
self.spinner.stop()
|
|
if self._first_loading:
|
|
try:
|
|
self.winfo_toplevel().configure(cursor="")
|
|
except Exception:
|
|
pass
|
|
self._first_loading = False
|
|
messagebox.showerror("DB", f"Errore nel caricamento:\n{ex}")
|
|
|
|
self.runner.run(
|
|
_job(),
|
|
on_success=_on_success,
|
|
on_error=_on_error,
|
|
busy=self.busy,
|
|
message="Caricamento Picking List…" if first else "Aggiornamento…"
|
|
)
|
|
|
|
@_log_call("DEBUG")
|
|
def _refresh_details(self):
|
|
"""Render the detail table for the currently selected document."""
|
|
if not self.detail_doc:
|
|
self._draw_details_hint()
|
|
return
|
|
|
|
rows = list(self._detail_cache.get(self.detail_doc, []))
|
|
rows = self._sort_detail_rows(rows)
|
|
_MODULE_LOGGER.debug(f"Ridisegno tabella dettaglio per documento={self.detail_doc} righe={len(rows)}")
|
|
if not rows:
|
|
self._load_detail_sheet_data([["", "", "", "Nessuna UDC trovata.", "", ""]])
|
|
return
|
|
|
|
self._load_detail_sheet_data(self._detail_rows_to_sheet_data(rows))
|
|
|
|
@_log_call("DEBUG")
|
|
def _refresh_details_incremental(self, batch_size: int = 25):
|
|
"""
|
|
Render detail table using tksheet while keeping busy feedback consistent.
|
|
"""
|
|
if not self.detail_doc:
|
|
self._draw_details_hint()
|
|
self.spinner.stop()
|
|
self.busy.hide()
|
|
return
|
|
|
|
rows = list(self._detail_cache.get(self.detail_doc, []))
|
|
rows = self._sort_detail_rows(rows)
|
|
self._detail_cache[self.detail_doc] = rows
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Avvio rendering dettagli documento={self.detail_doc} righe={len(rows)}")
|
|
if not rows:
|
|
self._load_detail_sheet_data([["", "", "", "Nessuna UDC trovata.", "", ""]])
|
|
self.spinner.stop()
|
|
self.busy.hide()
|
|
return
|
|
|
|
self.busy.show(f"Rendering {len(rows)} UDC...")
|
|
self._load_detail_sheet_data(self._detail_rows_to_sheet_data(rows))
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Rendering dettagli completato documento={self.detail_doc} righe={len(rows)}")
|
|
self.spinner.stop()
|
|
self.busy.hide()
|
|
|
|
@_log_call("DEBUG")
|
|
def _render_batch(self, rows: List[Dict[str, Any]], batch_size: int, start_idx: int, total_rows: int):
|
|
"""
|
|
Legacy helper kept for compatibility after the move to tksheet.
|
|
"""
|
|
del batch_size, start_idx, total_rows
|
|
self._load_detail_sheet_data(self._detail_rows_to_sheet_data(rows))
|
|
|
|
# ----- azioni -----
|
|
@_log_call()
|
|
def on_prenota(self):
|
|
"""Reserve the selected picking list."""
|
|
if not self._can("pickinglist.prenota"):
|
|
messagebox.showwarning("Permesso negato", "L'operatore corrente non puo' prenotare picking list.", parent=self)
|
|
return
|
|
model = self._get_selected_model()
|
|
if not model:
|
|
messagebox.showinfo("Prenota", "Seleziona una Picking List (checkbox) prima di prenotare.")
|
|
return
|
|
|
|
documento = _s(model.pl.get("Documento"))
|
|
current = int(model.pl.get("IDStato") or 0)
|
|
desired = 1
|
|
if current == desired:
|
|
messagebox.showinfo("Prenota", f"La Picking List {documento} è già prenotata.")
|
|
return
|
|
|
|
id_operatore = self._operator_id()
|
|
if id_operatore <= 0:
|
|
messagebox.showerror("Prenota", "Sessione operatore non valida.", parent=self)
|
|
return
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Richiesta prenotazione documento={documento} id_operatore={id_operatore}")
|
|
self.spinner.start(" Prenoto…")
|
|
|
|
async def _job():
|
|
return await sp_xExePackingListPallet_async(self.db_client, id_operatore, documento)
|
|
|
|
def _ok(res: SPResult):
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Esito prenotazione documento={documento} rc={getattr(res, 'rc', None)} message={getattr(res, 'message', None)}")
|
|
self.spinner.stop()
|
|
if res and res.rc == 0:
|
|
log_user_action(
|
|
self.session,
|
|
module=MODULE_LOG_NAME,
|
|
action="pickinglist.prenota",
|
|
outcome="ok",
|
|
target=documento,
|
|
)
|
|
self._recolor_row_by_documento(documento, desired)
|
|
else:
|
|
msg = (res.message if res else "Errore sconosciuto")
|
|
log_user_action(
|
|
self.session,
|
|
module=MODULE_LOG_NAME,
|
|
action="pickinglist.prenota",
|
|
outcome="denied",
|
|
target=documento,
|
|
details={"message": msg},
|
|
)
|
|
messagebox.showerror("Prenota", f"Operazione non riuscita:\n{msg}")
|
|
|
|
def _err(ex):
|
|
_MODULE_LOGGER.exception(f"Errore prenotazione documento={documento}: {ex}")
|
|
self.spinner.stop()
|
|
log_user_action(
|
|
self.session,
|
|
module=MODULE_LOG_NAME,
|
|
action="pickinglist.prenota",
|
|
outcome="error",
|
|
target=documento,
|
|
details={"error": str(ex)},
|
|
)
|
|
messagebox.showerror("Prenota", f"Errore:\n{ex}")
|
|
|
|
self.runner.run(
|
|
_job(),
|
|
on_success=_ok,
|
|
on_error=_err,
|
|
busy=self.busy,
|
|
message=f"Prenoto la Picking List {documento}…"
|
|
)
|
|
|
|
@_log_call()
|
|
def on_sprenota(self):
|
|
"""Unreserve the selected picking list."""
|
|
if not self._can("pickinglist.sprenota"):
|
|
messagebox.showwarning("Permesso negato", "L'operatore corrente non puo' s-prenotare picking list.", parent=self)
|
|
return
|
|
model = self._get_selected_model()
|
|
if not model:
|
|
messagebox.showinfo("S-prenota", "Seleziona una Picking List (checkbox) prima di s-prenotare.")
|
|
return
|
|
|
|
documento = _s(model.pl.get("Documento"))
|
|
current = int(model.pl.get("IDStato") or 0)
|
|
desired = 0
|
|
if current == desired:
|
|
messagebox.showinfo("S-prenota", f"La Picking List {documento} è già NON prenotata.")
|
|
return
|
|
|
|
id_operatore = self._operator_id()
|
|
if id_operatore <= 0:
|
|
messagebox.showerror("S-prenota", "Sessione operatore non valida.", parent=self)
|
|
return
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Richiesta s-prenotazione documento={documento} id_operatore={id_operatore}")
|
|
self.spinner.start(" S-prenoto…")
|
|
|
|
async def _job():
|
|
return await sp_xExePackingListPallet_async(self.db_client, id_operatore, documento)
|
|
|
|
def _ok(res: SPResult):
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Esito s-prenotazione documento={documento} rc={getattr(res, 'rc', None)} message={getattr(res, 'message', None)}")
|
|
self.spinner.stop()
|
|
if res and res.rc == 0:
|
|
log_user_action(
|
|
self.session,
|
|
module=MODULE_LOG_NAME,
|
|
action="pickinglist.sprenota",
|
|
outcome="ok",
|
|
target=documento,
|
|
)
|
|
self._recolor_row_by_documento(documento, desired)
|
|
else:
|
|
msg = (res.message if res else "Errore sconosciuto")
|
|
log_user_action(
|
|
self.session,
|
|
module=MODULE_LOG_NAME,
|
|
action="pickinglist.sprenota",
|
|
outcome="denied",
|
|
target=documento,
|
|
details={"message": msg},
|
|
)
|
|
messagebox.showerror("S-prenota", f"Operazione non riuscita:\n{msg}")
|
|
|
|
def _err(ex):
|
|
_MODULE_LOGGER.exception(f"Errore s-prenotazione documento={documento}: {ex}")
|
|
self.spinner.stop()
|
|
log_user_action(
|
|
self.session,
|
|
module=MODULE_LOG_NAME,
|
|
action="pickinglist.sprenota",
|
|
outcome="error",
|
|
target=documento,
|
|
details={"error": str(ex)},
|
|
)
|
|
messagebox.showerror("S-prenota", f"Errore:\n{ex}")
|
|
|
|
self.runner.run(
|
|
_job(),
|
|
on_success=_ok,
|
|
on_error=_err,
|
|
busy=self.busy,
|
|
message=f"S-prenoto la Picking List {documento}…"
|
|
)
|
|
|
|
def on_export(self):
|
|
"""Placeholder for a future export implementation."""
|
|
messagebox.showinfo("Esporta", "Stub esportazione.")
|
|
|
|
|
|
# factory per main
|
|
@_log_call()
|
|
def create_frame(parent, *, db_client=None, conn_str=None, session: UserSession | None = None) -> 'GestionePickingListFrame':
|
|
"""Factory used by the launcher to build the picking list frame."""
|
|
ctk.set_appearance_mode("light")
|
|
ctk.set_default_color_theme("green")
|
|
return GestionePickingListFrame(parent, db_client=db_client, session=session)
|
|
|
|
|
|
@_log_call()
|
|
def open_pickinglist_window(parent: tk.Misc, db_client, session: UserSession | None = None) -> tk.Misc:
|
|
"""Open the picking list window while minimizing the first paint flicker."""
|
|
key = "_gestione_pickinglist_window_singleton"
|
|
ex = getattr(parent, key, None)
|
|
if ex and ex.winfo_exists():
|
|
frame = getattr(ex, "_pickinglist_frame", None)
|
|
if frame is not None:
|
|
try:
|
|
frame.session = session
|
|
except Exception:
|
|
pass
|
|
try:
|
|
ex.deiconify()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
ex.lift()
|
|
ex.focus_force()
|
|
return ex
|
|
except Exception:
|
|
pass
|
|
|
|
win = ctk.CTkToplevel(parent)
|
|
win.title("Gestione Picking List")
|
|
win.geometry("1200x700+0+100")
|
|
win.minsize(1000, 560)
|
|
setattr(parent, key, win)
|
|
|
|
# Keep the toplevel hidden until the child frame has built its initial layout.
|
|
try:
|
|
win.withdraw()
|
|
win.attributes("-alpha", 0.0)
|
|
except Exception:
|
|
pass
|
|
|
|
frame = create_frame(win, db_client=db_client, session=session)
|
|
try:
|
|
frame.pack(fill="both", expand=True)
|
|
except Exception:
|
|
pass
|
|
setattr(win, "_pickinglist_frame", frame)
|
|
|
|
# Reveal the fully-laid out window only after pending geometry work completes.
|
|
try:
|
|
win.update_idletasks()
|
|
try:
|
|
win.transient(parent)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
win.deiconify()
|
|
except Exception:
|
|
pass
|
|
win.lift()
|
|
try:
|
|
win.focus_force()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
win.attributes("-alpha", 1.0)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
win.bind("<Escape>", lambda e: win.destroy())
|
|
win.protocol("WM_DELETE_WINDOW", win.destroy)
|
|
return win
|
|
|
|
# =================== /gestione_pickinglist.py ===================
|