Files
ware_house/gestione_layout.py

1435 lines
58 KiB
Python

"""Warehouse layout management window.
This module renders one aisle as a high-volume matrix, exposes search and
statistics helpers, and relies on ``tksheet`` to keep the layout responsive
even when the number of cells grows significantly.
"""
from __future__ import annotations
import json
import logging
import sys
import tkinter as tk
from tkinter import Menu, messagebox, filedialog, simpledialog
import customtkinter as ctk
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Any
from audit_log import log_user_action
from busy_overlay import InlineBusyOverlay
from gestione_aree import AsyncRunner
from gestione_scarico import DEFAULT_SCARICO_USER, move_pallet_async, open_scarico_dialog
from tksheet import Sheet, natural_sort_key
from tooltips import WidgetToolTip, load_tooltip_catalog, tooltip_text
from ui_theme import theme_color, theme_font, theme_section, theme_value
from user_session import UserSession
from window_placement import place_window_fullsize_below_parent_later
try:
from loguru import logger
except Exception: # pragma: no cover - fallback used only when Loguru is not available
class _FallbackLogger:
"""Minimal adapter used only when Loguru is not installed yet."""
def __init__(self):
self._logger = logging.getLogger(MODULE_LOG_NAME if "MODULE_LOG_NAME" in globals() else __name__)
self._logger.setLevel(logging.DEBUG)
self._logger.propagate = False
def bind(self, **_kwargs):
return self
def add(self, sink, level="INFO", format=None, encoding="utf-8", **_kwargs):
handler: logging.Handler
if hasattr(sink, "write"):
handler = logging.StreamHandler(sink)
else:
handler = logging.FileHandler(str(sink), encoding=encoding)
handler.setLevel(getattr(logging, str(level).upper(), logging.INFO))
handler.setFormatter(
logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
)
self._logger.addHandler(handler)
return 0
def log(self, level, message):
getattr(self._logger, str(level).lower(), self._logger.info)(message)
def debug(self, message):
self._logger.debug(message)
def info(self, message):
self._logger.info(message)
def exception(self, message):
self._logger.exception(message)
logger = _FallbackLogger()
# ---- Color palette ----
COLOR_EMPTY = "#B0B0B0" # grigio (vuota)
COLOR_FULL = "#FFA500" # arancione (una UDC)
COLOR_DOUBLE = "#D62728" # rosso (>=2 UDC)
FG_DARK = "#111111"
FG_LIGHT = "#FFFFFF"
HEADER_BG = "#E8EDF5"
HEADER_FG = "#1A1A1A"
STATE_PRENOTATA = 1
STATE_DISABILITATA = 3
PRENOTATA_BG = "#F6E26B"
DISABLED_BG = "#555555"
DISABLED_FG = "#F4F4F4"
SQL_SET_CELL_PRENOTAZIONE = """
UPDATE dbo.Celle
SET IDStato = :stato,
ModUtente = :utente,
ModDataOra = GETDATE()
WHERE ID = :idcella;
"""
LAYOUT_LOG_MODE = "INFO" # "OFF" | "INFO" | "DEBUG"
MODULE_LOG_NAME = Path(__file__).stem
MODULE_LOG_PATH = Path(__file__).with_suffix(".log")
_MODULE_LOG_ENABLED = LAYOUT_LOG_MODE.upper() != "OFF"
_MODULE_LOG_LEVEL = "DEBUG" if LAYOUT_LOG_MODE.upper() == "DEBUG" else "INFO"
_MODULE_LOGGER = logger.bind(warehouse_module=MODULE_LOG_NAME)
_MODULE_LOGGING_CONFIGURED = False
def _configure_module_logger():
"""Configure console and file logging for this module."""
global _MODULE_LOGGING_CONFIGURED
if _MODULE_LOGGING_CONFIGURED:
return
if not _MODULE_LOG_ENABLED:
_MODULE_LOGGING_CONFIGURED = True
return
record_filter = lambda record: record["extra"].get("warehouse_module") == MODULE_LOG_NAME
logger.add(
sys.stderr,
level=_MODULE_LOG_LEVEL,
colorize=True,
filter=record_filter,
format=(
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>" + MODULE_LOG_NAME + "</cyan> | "
"<level>{message}</level>"
),
)
logger.add(
MODULE_LOG_PATH,
level=_MODULE_LOG_LEVEL,
colorize=False,
encoding="utf-8",
filter=record_filter,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | " + MODULE_LOG_NAME + " | {message}",
)
_MODULE_LOGGING_CONFIGURED = True
def _format_payload(payload: Any) -> str:
"""Serialize payloads for human-readable logging."""
try:
return json.dumps(payload, ensure_ascii=False, indent=2, default=str)
except Exception:
return repr(payload)
def _log_call(level: str | None = None):
"""Trace entry, exit and failure of selected high-level functions."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
effective_level = level or _MODULE_LOG_LEVEL
_MODULE_LOGGER.log(
effective_level,
f"CALL {func.__qualname__} args={_format_payload(args[1:] if args else ())} kwargs={_format_payload(kwargs)}",
)
try:
result = func(*args, **kwargs)
except Exception:
_MODULE_LOGGER.exception(f"FAIL {func.__qualname__}")
raise
_MODULE_LOGGER.log(effective_level, f"RETURN {func.__qualname__}")
return result
return wrapper
return decorator
def _log_sql(query_name: str, sql: str, params: dict[str, Any]):
"""Log one SQL statement and its parameters."""
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} params={_format_payload(params)}")
_MODULE_LOGGER.debug(f"SQL {query_name} text:\n{sql.strip()}")
def _log_dataset(query_name: str, rows: list[Any]):
"""Log query results at summary or full-debug level depending on the flag."""
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} returned {len(rows)} rows")
if LAYOUT_LOG_MODE.upper() == "DEBUG":
_MODULE_LOGGER.debug(f"SQL {query_name} dataset:\n{_format_payload(rows)}")
_configure_module_logger()
if _MODULE_LOG_ENABLED:
_MODULE_LOGGER.info(
f"Logging inizializzato su {MODULE_LOG_PATH.name} livello={_MODULE_LOG_LEVEL} mode={LAYOUT_LOG_MODE.upper()}"
)
def pct_text(p_full: float, p_double: float | None = None) -> str:
"""Format occupancy percentages for the progress-bar labels."""
p_full = max(0.0, min(1.0, p_full))
pf = round(p_full * 100, 1)
pe = round(100 - pf, 1)
if p_double and p_double > 0:
pd = round(p_double * 100, 1)
return f"Pieno {pf}% · Vuoto {pe}% (di cui doppie {pd}%)"
return f"Pieno {pf}% · Vuoto {pe}%"
class LayoutWindow(ctk.CTkToplevel):
"""
Visualizzazione layout corsie con matrice di celle.
- Ogni cella è resa in una griglia ad alte prestazioni (vuota/piena/doppia)
- Etichetta su DUE righe:
1) "Corsia.Colonna.Fila" (una sola riga, senza andare a capo)
2) barcode UDC (primo, se presente)
- Ricerca per barcode UDC con cambio automatico corsia + highlight cella
- Statistiche: globale e corsia selezionata
- Export XLSX
"""
@_log_call()
def __init__(self, parent: tk.Widget, db_app, session: UserSession | None = None):
"""Create the window and initialize the state used by the matrix view."""
super().__init__(parent)
self._theme = theme_section("layout_window", {})
self._tooltip_catalog = load_tooltip_catalog()
self.title("Warehouse - Layout corsie")
self.geometry(str(theme_value(self._theme, "window_geometry", "1200x740")))
minsize = theme_value(self._theme, "window_minsize", [980, 560])
self.minsize(int(minsize[0]), int(minsize[1]))
self.resizable(True, True)
try:
self.configure(fg_color=theme_color(self._theme, "window_fg_color", ("#efefef", "#2f2f2f")))
except Exception:
pass
self.db = db_app
self.session = session
self._busy = InlineBusyOverlay(self, self._theme)
self._async = AsyncRunner(self)
# layout principale 5% / 80% / 15%
self.grid_rowconfigure(0, weight=5)
self.grid_rowconfigure(1, weight=80)
self.grid_rowconfigure(2, weight=15)
self.grid_columnconfigure(0, weight=1)
# stato runtime
self.corsia_selezionata = tk.StringVar()
self.matrix_state: list[list[int]] = [] # rinominata: prima era self.state
self.fila_txt: list[list[str]] = []
self.col_txt: list[list[str]] = []
self.desc: list[list[str]] = []
self.udc1: list[list[str]] = [] # primo barcode UDC trovato (o "")
self.cell_ids: list[list[int]] = []
self.cell_logical_state: list[list[int]] = []
self.sheet_to_matrix_rows: list[int] = []
# ricerca -> focus differito (corsia, col, fila, barcode)
self._pending_focus: tuple[str, str, str, str] | None = None
self._highlighted: tuple[int, int] | None = None
# anti-race: token per ignorare risposte vecchie
self._req_counter = 0
self._last_req = 0
self._alive = True
self._stats_after_id = None # se mai userai un refresh periodico, potremo cancellarlo qui
self._build_top()
self._build_matrix_host()
self._build_stats()
self._load_corsie()
# disabilitato: il refresh ad ogni <Configure> generava molte query/lag
# self.bind("<Configure>", lambda e: self.after_idle(self._refresh_stats))
# ---------------- TOP BAR ----------------
def _build_top(self):
"""Create the top toolbar with aisle selection and search controls."""
top = ctk.CTkFrame(self)
top.grid(row=0, column=0, sticky="nsew", padx=8, pady=6)
try:
top.configure(fg_color=theme_color(self._theme, "top_frame_fg_color", ("#d7d7d7", "#3b3b3b")))
except Exception:
pass
for i in range(4):
top.grid_columnconfigure(i, weight=0)
top.grid_columnconfigure(1, weight=1)
# lista corsie
lf = ctk.CTkFrame(top)
lf.grid(row=0, column=0, sticky="nsw")
try:
lf.configure(fg_color=theme_color(self._theme, "panel_frame_fg_color", ("#dcdcdc", "#363636")))
except Exception:
pass
lf.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(
lf,
text="Corsie",
font=theme_font(self._theme, "toolbar_label_font", ("Segoe UI", 12, "bold")),
).grid(row=0, column=0, sticky="w", padx=6, pady=(6, 2))
self.lb = tk.Listbox(lf, height=6, exportselection=False)
self.lb.grid(row=1, column=0, sticky="nsw", padx=6, pady=(0, 6))
self.lb.bind("<<ListboxSelect>>", self._on_select)
# search by barcode
srch = ctk.CTkFrame(top)
srch.grid(row=0, column=1, sticky="nsew", padx=(10, 10))
self.search_var = tk.StringVar()
try:
srch.configure(fg_color=theme_color(self._theme, "panel_frame_fg_color", ("#dcdcdc", "#363636")))
except Exception:
pass
self.search_entry = ctk.CTkEntry(
srch,
textvariable=self.search_var,
width=260,
font=theme_font(self._theme, "entry_font", ("Segoe UI", 10)),
)
self.search_entry.grid(row=0, column=0, sticky="w")
btn_search = ctk.CTkButton(
srch,
text="Cerca per barcode UDC",
command=self._search_udc,
font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")),
)
btn_search.grid(row=0, column=1, padx=(8, 0))
srch.grid_columnconfigure(0, weight=1)
WidgetToolTip(btn_search, tooltip_text("layout.search_udc", catalog=self._tooltip_catalog))
# toolbar
tb = ctk.CTkFrame(top)
tb.grid(row=0, column=3, sticky="ne")
try:
tb.configure(fg_color=theme_color(self._theme, "panel_frame_fg_color", ("#dcdcdc", "#363636")))
except Exception:
pass
btn_refresh = ctk.CTkButton(
tb,
text="Aggiorna",
command=self._refresh_current,
font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")),
)
btn_refresh.grid(row=0, column=0, padx=4)
btn_export = ctk.CTkButton(
tb,
text="Export XLSX",
command=self._export_xlsx,
font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")),
)
btn_export.grid(row=0, column=1, padx=4)
WidgetToolTip(btn_refresh, tooltip_text("layout.refresh", catalog=self._tooltip_catalog))
WidgetToolTip(btn_export, tooltip_text("layout.export_xlsx", catalog=self._tooltip_catalog))
# ---------------- MATRIX HOST ----------------
def _build_matrix_host(self):
"""Create the container that hosts the high-volume warehouse matrix."""
center = ctk.CTkFrame(self)
center.grid(row=1, column=0, sticky="nsew", padx=8, pady=(0, 6))
center.grid_rowconfigure(0, weight=1)
center.grid_columnconfigure(0, weight=1)
self.host = ctk.CTkFrame(center)
self.host.grid(row=0, column=0, sticky="nsew", padx=4, pady=4)
self.host.grid_rowconfigure(0, weight=1)
self.host.grid_columnconfigure(0, weight=1)
self.sheet = Sheet(
self.host,
show_header=True,
show_row_index=True,
show_top_left=True,
width=1000,
height=520,
default_column_width=118,
default_row_height=44,
set_all_heights_and_widths=True,
align="center",
table_wrap="c",
show_default_header_for_empty=False,
show_default_index_for_empty=False,
sort_key=natural_sort_key,
font=("Segoe UI", 10, "normal"),
header_font=("Segoe UI", 11, "bold"),
index_font=("Segoe UI", 11, "bold"),
table_bg="#F6F7F9",
table_fg=FG_DARK,
table_grid_fg="#FFFFFF",
header_bg=HEADER_BG,
header_fg=HEADER_FG,
header_grid_fg="#D5DCE7",
header_selected_cells_bg=HEADER_BG,
header_selected_cells_fg=HEADER_FG,
index_bg=HEADER_BG,
index_fg=HEADER_FG,
index_grid_fg="#D5DCE7",
index_selected_cells_bg=HEADER_BG,
index_selected_cells_fg=HEADER_FG,
table_selected_cells_border_fg="#0B57D0",
table_selected_cells_bg="#DCE9FF",
table_selected_cells_fg="#0B57D0",
frame_bg="#F6F7F9",
outline_thickness=0,
)
self.sheet.enable_bindings(
"single_select",
"drag_select",
"arrowkeys",
"prior",
"next",
"copy",
"select_all",
"rc_select",
)
self.sheet.readonly("all")
self.sheet.set_index_width(92, redraw=False)
self.sheet.bind("<Button-3>", self._on_sheet_right_click, add="+")
self.sheet.grid(row=0, column=0, sticky="nsew")
# The top-left corner of the grid acts as the visual "scaffale" marker.
self.sheet_corner = ctk.CTkLabel(
self.host,
text="",
width=92,
height=28,
corner_radius=0,
fg_color=HEADER_BG,
text_color=HEADER_FG,
font=("Segoe UI", 12, "bold"),
)
self.sheet_corner.place(x=1, y=1)
self.sheet_corner.lift()
def _column_header_labels(self, cols: int, col_txt: list[list[str]]) -> list[str]:
"""Build fixed column headers using the physical stack numbers."""
labels: list[str] = []
for c in range(cols):
value = ""
for r in range(len(col_txt)):
if c < len(col_txt[r]) and str(col_txt[r][c] or "").strip():
value = str(col_txt[r][c]).strip()
break
try:
normalized = str(int(value))
except Exception:
normalized = value or str(c + 1)
labels.append(f"Pila {normalized}")
return labels
def _row_header_labels(self, rows: int, fila_txt: list[list[str]]) -> list[str]:
"""Build fixed row headers using the lowercase physical level labels."""
labels: list[str] = []
for sheet_row in range(rows):
matrix_row = (rows - 1) - sheet_row
value = ""
if 0 <= matrix_row < len(fila_txt):
for cell_value in fila_txt[matrix_row]:
if str(cell_value or "").strip():
value = str(cell_value).strip()
break
labels.append(f"Livello {(value or str(matrix_row + 1)).lower()}")
return labels
def _scaffale_corner_label(self, corsia: str) -> str:
"""Return the uppercase scaffale letter shown in the top-left corner."""
letters = "".join(ch for ch in str(corsia or "") if ch.isalpha()).upper()
if letters:
return letters[-1]
return str(corsia or "").strip().upper()
def _state_colors(self, state: int) -> tuple[str, str]:
"""Return background and foreground colors for one occupancy state."""
if state == 0:
return COLOR_EMPTY, FG_DARK
if state == 1:
return COLOR_FULL, FG_DARK
return COLOR_DOUBLE, FG_LIGHT
def _effective_cell_style(self, occupancy_state: int, logical_state: int) -> tuple[str, str]:
"""Combine occupancy and logistic states into one rendered style."""
if int(logical_state or 0) == STATE_DISABILITATA:
return DISABLED_BG, DISABLED_FG
if int(logical_state or 0) == STATE_PRENOTATA:
return PRENOTATA_BG, FG_DARK
return self._state_colors(occupancy_state)
def _sheet_row_for_matrix_row(self, matrix_row: int) -> int:
"""Map one logical matrix row to the displayed tksheet row."""
return (len(self.matrix_state) - 1) - matrix_row
def _apply_sheet_cell_style(self, matrix_row: int, matrix_col: int, state: int, redraw: bool = False):
"""Apply the visual state associated with a cell occupancy level."""
if not self.matrix_state:
return
logical_state = 0
if self.cell_logical_state and matrix_row < len(self.cell_logical_state):
if matrix_col < len(self.cell_logical_state[matrix_row]):
logical_state = int(self.cell_logical_state[matrix_row][matrix_col] or 0)
bg, fg = self._effective_cell_style(state, logical_state)
sheet_row = self._sheet_row_for_matrix_row(matrix_row)
self.sheet.highlight_cells(cells=[(sheet_row, matrix_col)], bg=bg, fg=fg, redraw=redraw, overwrite=True)
def _apply_sheet_styles(self):
"""Paint all visible cells according to the current matrix state."""
self.sheet.dehighlight_cells(all_=True, redraw=False)
for r, row in enumerate(self.matrix_state):
for c, state in enumerate(row):
self._apply_sheet_cell_style(r, c, state, redraw=False)
self.sheet.refresh()
def _clear_highlight(self):
"""Remove the temporary highlight from the previously focused cell."""
if self._highlighted and self.matrix_state:
r, c = self._highlighted
try:
self._apply_sheet_cell_style(r, c, self.matrix_state[r][c], redraw=True)
except Exception:
pass
self._highlighted = None
def _rebuild_matrix(self, rows: int, cols: int, state, logical_state, fila_txt, col_txt, desc, udc1, corsia):
"""Recreate the visible cell matrix from the latest query result."""
_MODULE_LOGGER.log(
_MODULE_LOG_LEVEL,
f"Rendering matrice corsia={corsia} rows={rows} cols={cols}",
)
self._clear_highlight()
self.matrix_state, self.cell_logical_state, self.fila_txt, self.col_txt, self.desc, self.udc1 = (
state,
logical_state,
fila_txt,
col_txt,
desc,
udc1,
)
self.sheet_to_matrix_rows = list(reversed(range(rows)))
data = [["" for _ in range(cols)] for _ in range(rows)]
for r in range(rows):
for c in range(cols):
sheet_r = (rows - 1) - r
code = f"{corsia}.{col_txt[r][c]}.{fila_txt[r][c]}"
udc = udc1[r][c] or ""
logical = int(logical_state[r][c] or 0) if logical_state else 0
marker = " [PREN]" if logical == STATE_PRENOTATA else (" [DIS]" if logical == STATE_DISABILITATA else "")
data[sheet_r][c] = f"{code}{marker}\n{udc}"
self.sheet.set_sheet_data(data, reset_col_positions=True, reset_row_positions=True, redraw=False)
self.sheet.headers(self._column_header_labels(cols, col_txt), redraw=False)
self.sheet.row_index(self._row_header_labels(rows, fila_txt), redraw=False)
self.sheet.readonly("all")
self.sheet.set_all_column_widths(width=118, redraw=False)
self.sheet.set_all_row_heights(height=44, redraw=False)
self.sheet.set_index_width(92, redraw=False)
self.sheet_corner.configure(text=self._scaffale_corner_label(corsia))
self._apply_sheet_styles()
if getattr(self, '_alive', True) and self._pending_focus and self._pending_focus[0] == corsia:
_, col, fila, _barcode = self._pending_focus
self._pending_focus = None
self._highlight_cell_by_labels(col, fila)
def _sheet_cell_to_matrix(self, sheet_row: int, sheet_col: int) -> tuple[int, int] | None:
"""Map one displayed tksheet coordinate back to the logical matrix coordinate."""
if not self.matrix_state or sheet_row < 0 or sheet_col < 0:
return None
if sheet_row >= len(self.sheet_to_matrix_rows):
return None
matrix_row = self.sheet_to_matrix_rows[sheet_row]
if sheet_col >= len(self.matrix_state[matrix_row]):
return None
return matrix_row, sheet_col
def _on_sheet_right_click(self, event):
"""Open the existing context menu for the cell under the pointer."""
try:
if self.sheet.identify_region(event) != "table":
return
sheet_row = self.sheet.identify_row(event, exclude_index=True, allow_end=False)
sheet_col = self.sheet.identify_column(event, exclude_header=True, allow_end=False)
except Exception:
return
if sheet_row is None or sheet_col is None:
return
mapped = self._sheet_cell_to_matrix(sheet_row, sheet_col)
if not mapped:
return
self.sheet.select_cell(sheet_row, sheet_col, redraw=True, run_binding_func=False)
self._open_menu(event, mapped[0], mapped[1])
# ---------------- CONTEXT MENU ----------------
def _open_menu(self, event, r, c):
"""Open the context menu for a single matrix cell."""
label = self._cell_label(r, c)
m = Menu(self, tearoff=0)
m.add_command(
label="Carico",
state="normal" if self._can("layout.carico") else "disabled",
command=lambda: self._prompt_carico(r, c),
)
m.add_command(
label="Scarico",
state="normal" if self._can("layout.scarico") else "disabled",
command=lambda: self._scarica_from_menu(r, c),
)
m.add_command(
label="Abilita cella",
state="normal" if self._can("layout.abilita_cella") else "disabled",
command=lambda: self._set_logical_state(r, c, 0),
)
m.add_command(
label="Disabilita cella",
state="normal" if self._can("layout.disabilita_cella") else "disabled",
command=lambda: self._set_logical_state(r, c, STATE_DISABILITATA),
)
m.add_separator()
m.add_command(label="Copia ubicazione", command=lambda: self._copy(label))
x = self.winfo_pointerx() if event is None else event.x_root
y = self.winfo_pointery() if event is None else event.y_root
m.tk_popup(x, y)
def _can(self, action: str) -> bool:
"""Return whether the current session can execute one layout action."""
return self.session.can(action) if self.session else False
def _actor_login(self) -> str:
"""Return the current application login or the technical fallback."""
if self.session and str(self.session.login or "").strip():
return str(self.session.login).strip()
return DEFAULT_SCARICO_USER
def _guard_action(self, action: str, message: str) -> bool:
"""Gate one action and inform the user if it is not currently allowed."""
if self._can(action):
return True
messagebox.showwarning("Permesso negato", message, parent=self)
return False
def _cell_label(self, r: int, c: int) -> str:
"""Return the human-readable cell label used across the layout UI."""
return f"{self.corsia_selezionata.get()}.{self.col_txt[r][c]}.{self.fila_txt[r][c]}"
def _cell_barcode(self, r: int, c: int) -> str:
"""Return the barcode-style cell description used by the legacy movement logic."""
return str(self.desc[r][c] or "").strip()
def _cell_id(self, r: int, c: int) -> int:
"""Return the internal cell id, or ``0`` when the matrix slot is not mapped."""
if not self.cell_ids or r >= len(self.cell_ids) or c >= len(self.cell_ids[r]):
return 0
try:
return int(self.cell_ids[r][c] or 0)
except Exception:
return 0
@_log_call()
def _prompt_carico(self, r: int, c: int):
"""Prompt the operator for a pallet barcode and move it to the selected cell."""
if not self._guard_action("layout.carico", "L'operatore corrente non puo' eseguire carichi."):
return
barcode = simpledialog.askstring(
"Carico",
f"Inserisci il barcode UDC da caricare in {self._cell_label(r, c)}:",
parent=self,
)
barcode = str(barcode or "").strip()
if not barcode:
return
self._run_pallet_move(
barcode_pallet=barcode,
target_idcella=self._cell_id(r, c),
target_barcode_cella=self._cell_barcode(r, c),
success_message=f"Carico completato su {self._cell_label(r, c)}.",
busy_message=f"Carico UDC su {self._cell_label(r, c)}...",
)
@_log_call()
def _scarica_from_menu(self, r: int, c: int):
"""Unload the current cell using the same logical semantics as the original app."""
if not self._guard_action("layout.scarico", "L'operatore corrente non puo' eseguire scarichi."):
return
if self._logical_state_at(r, c) == STATE_DISABILITATA:
self._toast("La cella e' disabilitata.")
return
stato = int(self.matrix_state[r][c] or 0)
if stato <= 0:
self._toast("La cella selezionata non contiene alcuna UDC da scaricare.")
return
if stato >= 2:
self._open_scarico_dialog(r, c)
return
barcode = str(self.udc1[r][c] or "").strip()
if not barcode:
self._toast("UDC non disponibile per lo scarico.")
return
if not messagebox.askyesno(
"Scarico",
f"Scaricare l'UDC {barcode} da {self._cell_label(r, c)}?",
parent=self,
):
return
self._run_pallet_move(
barcode_pallet=barcode,
target_idcella=9999,
target_barcode_cella="9000000",
success_message=f"Scarico completato per {barcode}.",
busy_message=f"Scarico {barcode}...",
)
@_log_call()
def _run_pallet_move(
self,
*,
barcode_pallet: str,
target_idcella: int,
target_barcode_cella: str,
success_message: str,
busy_message: str,
):
"""Execute one pallet movement and refresh the current aisle afterwards."""
barcode_pallet = str(barcode_pallet or "").strip()
if not barcode_pallet:
self._toast("Barcode UDC non valido.")
return
if int(target_idcella or 0) <= 0:
self._toast("Cella di destinazione non valida.")
return
def _ok(result):
_MODULE_LOGGER.log(
_MODULE_LOG_LEVEL,
f"Movimento completato udc={barcode_pallet} target={target_barcode_cella} result={_format_payload(result)}",
)
log_user_action(
self.session,
module=MODULE_LOG_NAME,
action="layout.scarico" if int(target_idcella) == 9999 else "layout.carico",
outcome="ok",
target=target_barcode_cella,
details={"barcode_pallet": barcode_pallet},
)
self._refresh_current()
self._toast(success_message)
def _err(ex):
_MODULE_LOGGER.exception(
f"Errore movimento udc={barcode_pallet} target={target_barcode_cella}: {ex}"
)
log_user_action(
self.session,
module=MODULE_LOG_NAME,
action="layout.scarico" if int(target_idcella) == 9999 else "layout.carico",
outcome="error",
target=target_barcode_cella,
details={"barcode_pallet": barcode_pallet, "error": str(ex)},
)
messagebox.showerror("Movimento UDC", f"Operazione fallita:\n{ex}", parent=self)
self._async.run(
move_pallet_async(
self.db,
barcode_pallet=barcode_pallet,
target_idcella=int(target_idcella),
target_barcode_cella=str(target_barcode_cella or ""),
utente=self._actor_login(),
),
_ok,
_err,
busy=self._busy,
message=busy_message,
)
@_log_call()
def _open_scarico_dialog(self, r: int, c: int):
"""Open the modal unload dialog for one multi-UDC cell."""
if not self._guard_action("layout.scarico", "L'operatore corrente non puo' eseguire scarichi."):
return
if not self.cell_ids or r >= len(self.cell_ids) or c >= len(self.cell_ids[r]):
_MODULE_LOGGER.info(f"Scarico non disponibile per cella r={r} c={c}: IDCella assente")
self._toast("IDCella non disponibile per lo scarico.")
return
idcella = self._cell_id(r, c)
if idcella <= 0:
_MODULE_LOGGER.info(f"Scarico non disponibile per cella r={r} c={c}: IDCella non valida ({idcella})")
self._toast("IDCella non valida per lo scarico.")
return
ubicazione = self._cell_label(r, c)
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Apro dialog di scarico per ubicazione={ubicazione} idcella={idcella}")
open_scarico_dialog(
self,
db_client=self.db,
idcella=idcella,
ubicazione=ubicazione,
on_completed=self._refresh_current,
session=self.session,
)
@_log_call()
def _set_prenotazione(self, r: int, c: int, stato: int):
"""Apply the original reservation toggle semantics to the selected cell."""
action = "layout.prenota" if int(stato) == 1 else "layout.libera_prenotazione"
if not self._guard_action(action, "L'operatore corrente non puo' modificare la prenotazione."):
return
idcella = self._cell_id(r, c)
if idcella <= 0:
self._toast("IDCella non valida per la prenotazione.")
return
params = {
"idcella": idcella,
"stato": int(stato),
"utente": self._actor_login(),
}
_log_sql("set_prenotazione", SQL_SET_CELL_PRENOTAZIONE, params)
def _ok(_affected):
_MODULE_LOGGER.log(
_MODULE_LOG_LEVEL,
f"Prenotazione aggiornata idcella={idcella} stato={stato} ubicazione={self._cell_label(r, c)}",
)
log_user_action(
self.session,
module=MODULE_LOG_NAME,
action=action,
outcome="ok",
target=self._cell_label(r, c),
details={"idcella": idcella, "stato": int(stato)},
)
self._refresh_current()
self._toast(
"Prenotazione impostata." if stato == 1 else "Prenotazione liberata."
)
def _err(ex):
_MODULE_LOGGER.exception(
f"Errore aggiornamento prenotazione idcella={idcella} stato={stato}: {ex}"
)
log_user_action(
self.session,
module=MODULE_LOG_NAME,
action=action,
outcome="error",
target=self._cell_label(r, c),
details={"idcella": idcella, "stato": int(stato), "error": str(ex)},
)
messagebox.showerror("Prenotazione", f"Aggiornamento fallito:\n{ex}", parent=self)
self._async.run(
self.db.exec(SQL_SET_CELL_PRENOTAZIONE, params, commit=True),
_ok,
_err,
busy=self._busy,
message="Aggiorno prenotazione...",
)
def _logical_state_at(self, r: int, c: int) -> int:
"""Return the logical state stored for one cell."""
if not self.cell_logical_state or r >= len(self.cell_logical_state):
return 0
if c >= len(self.cell_logical_state[r]):
return 0
try:
return int(self.cell_logical_state[r][c] or 0)
except Exception:
return 0
@_log_call()
def _set_logical_state(self, r: int, c: int, stato: int):
"""Set one logical warehouse state directly on the selected cell."""
action = "layout.disabilita_cella" if int(stato) == STATE_DISABILITATA else "layout.abilita_cella"
if not self._guard_action(action, "L'operatore corrente non puo' modificare lo stato della cella."):
return
idcella = self._cell_id(r, c)
if idcella <= 0:
self._toast("IDCella non valida.")
return
params = {
"idcella": idcella,
"stato": int(stato),
"utente": self._actor_login(),
}
_log_sql("set_logical_state", SQL_SET_CELL_PRENOTAZIONE, params)
def _ok(_affected):
log_user_action(
self.session,
module=MODULE_LOG_NAME,
action=action,
outcome="ok",
target=self._cell_label(r, c),
details={"idcella": idcella, "stato": int(stato)},
)
self._refresh_current()
self._toast("Cella disabilitata." if int(stato) == STATE_DISABILITATA else "Cella abilitata.")
def _err(ex):
log_user_action(
self.session,
module=MODULE_LOG_NAME,
action=action,
outcome="error",
target=self._cell_label(r, c),
details={"idcella": idcella, "stato": int(stato), "error": str(ex)},
)
messagebox.showerror("Stato cella", f"Aggiornamento fallito:\n{ex}", parent=self)
self._async.run(
self.db.exec(SQL_SET_CELL_PRENOTAZIONE, params, commit=True),
_ok,
_err,
busy=self._busy,
message="Aggiorno stato cella...",
)
@_log_call()
def _set_cell(self, r, c, val):
"""Update a cell state in memory and refresh the local statistics."""
self.matrix_state[r][c] = val
self._apply_sheet_cell_style(r, c, val, redraw=True)
self._refresh_stats()
# ---------------- STATS ----------------
def _build_stats(self):
"""Create progress bars, labels and legend for occupancy statistics."""
bottom = ctk.CTkFrame(self)
bottom.grid(row=2, column=0, sticky="nsew", padx=8, pady=6)
bottom.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(bottom, text="Riempimento globale", font=("", 10, "bold")).grid(row=0, column=0, sticky="w", pady=(0, 2))
self.tot_canvas = tk.Canvas(bottom, height=22, highlightthickness=0)
self.tot_canvas.grid(row=1, column=0, sticky="ew", padx=(0, 260))
self.tot_text = ctk.CTkLabel(bottom, text=pct_text(0.0, 0.0))
self.tot_text.grid(row=1, column=0, sticky="e")
ctk.CTkLabel(bottom, text="Riempimento corsia selezionata", font=("", 10, "bold")).grid(row=2, column=0, sticky="w", pady=(10, 2))
self.sel_canvas = tk.Canvas(bottom, height=22, highlightthickness=0)
self.sel_canvas.grid(row=3, column=0, sticky="ew", padx=(0, 260))
self.sel_text = ctk.CTkLabel(bottom, text=pct_text(0.0, 0.0))
self.sel_text.grid(row=3, column=0, sticky="e")
leg = ctk.CTkFrame(bottom)
leg.grid(row=4, column=0, sticky="w", pady=(10, 0))
ctk.CTkLabel(leg, text="Legenda celle:").grid(row=0, column=0, padx=(0, 8))
self._legend(leg, 1, "Vuota", COLOR_EMPTY)
self._legend(leg, 3, "Piena", COLOR_FULL)
self._legend(leg, 5, "Doppia UDC", COLOR_DOUBLE)
self._legend(leg, 7, "Prenotata", PRENOTATA_BG)
self._legend(leg, 9, "Disabilitata", DISABLED_BG)
def _legend(self, parent, col, text, color):
"""Add a legend entry describing one matrix color."""
box = tk.Canvas(parent, width=18, height=12, highlightthickness=0)
box.create_rectangle(0, 0, 18, 12, fill=color, width=1, outline="#444")
box.grid(row=0, column=col)
ctk.CTkLabel(parent, text=text).grid(row=0, column=col + 1, padx=(4, 12))
# ---------------- DATA LOADING ----------------
@_log_call()
def _load_corsie(self):
"""Load the list of aisles available for visualization."""
sql = """
WITH C AS (
SELECT DISTINCT LTRIM(RTRIM(Corsia)) AS Corsia
FROM dbo.Celle
WHERE ID <> 9999 AND (DelDataOra IS NULL) AND LTRIM(RTRIM(Corsia)) <> '7G'
)
SELECT Corsia
FROM C
ORDER BY
CASE
WHEN LEFT(Corsia,3)='MAG' AND TRY_CONVERT(int, SUBSTRING(Corsia,4,50)) IS NOT NULL THEN 0
WHEN TRY_CONVERT(int, Corsia) IS NOT NULL THEN 1
ELSE 2
END,
CASE WHEN LEFT(Corsia,3)='MAG' THEN TRY_CONVERT(int, SUBSTRING(Corsia,4,50)) END,
CASE WHEN TRY_CONVERT(int, Corsia) IS NOT NULL THEN TRY_CONVERT(int, Corsia) END,
CASE WHEN TRY_CONVERT(int, Corsia) IS NOT NULL THEN SUBSTRING(Corsia, LEN(CAST(TRY_CONVERT(int, Corsia) AS varchar(20)))+1, 50) END,
Corsia;
"""
_log_sql("load_corsie", sql, {})
def _ok(res):
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
rows = res.get("rows", []) if isinstance(res, dict) else []
_log_dataset("load_corsie", rows)
self.lb.delete(0, tk.END)
corsie = [r[0] for r in rows]
for c in corsie:
self.lb.insert(tk.END, c)
idx = corsie.index("1A") if "1A" in corsie else (0 if corsie else -1)
if idx >= 0:
self.lb.selection_clear(0, tk.END)
self.lb.selection_set(idx)
self.lb.see(idx)
self._busy.hide()
self.after_idle(lambda: self._on_select(None))
else:
self._toast("Nessuna corsia trovata.")
self._busy.hide()
def _err(ex):
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
self._busy.hide()
_MODULE_LOGGER.exception(f"Errore caricamento corsie: {ex}")
messagebox.showerror("Errore", f"Caricamento corsie fallito:\n{ex}")
self._async.run(self.db.query_json(sql, {}), _ok, _err, busy=self._busy, message="Carico corsie…")
@_log_call()
def _on_select(self, _):
"""Load the selected aisle when the listbox selection changes."""
sel = self.lb.curselection()
if not sel:
return
corsia = self.lb.get(sel[0])
self.corsia_selezionata.set(corsia)
self._load_matrix(corsia)
def _select_corsia_in_listbox(self, corsia: str):
"""Select a given aisle inside the listbox if it is present."""
for i in range(self.lb.size()):
if self.lb.get(i) == corsia:
self.lb.selection_clear(0, tk.END)
self.lb.selection_set(i)
self.lb.see(i)
break
@_log_call()
def _load_matrix(self, corsia: str):
"""Query and render the matrix for the selected aisle."""
# nuovo token richiesta → evita che risposte vecchie spazzino la UI
self._req_counter += 1
req_id = self._req_counter
self._last_req = req_id
sql = """
WITH C AS (
SELECT
ID,
LTRIM(RTRIM(Corsia)) AS Corsia,
LTRIM(RTRIM(Fila)) AS Fila,
LTRIM(RTRIM(Colonna)) AS Colonna,
Descrizione,
IDStato
FROM dbo.Celle
WHERE ID <> 9999 AND (DelDataOra IS NULL)
AND LTRIM(RTRIM(Corsia)) <> '7G' AND LTRIM(RTRIM(Corsia)) = :corsia
),
R AS (
SELECT Fila,
DENSE_RANK() OVER (
ORDER BY CASE WHEN TRY_CONVERT(int, Fila) IS NULL THEN 1 ELSE 0 END,
TRY_CONVERT(int, Fila), Fila
) AS RowN
FROM C GROUP BY Fila
),
K AS (
SELECT Colonna,
DENSE_RANK() OVER (
ORDER BY CASE WHEN TRY_CONVERT(int, Colonna) IS NULL THEN 1 ELSE 0 END,
TRY_CONVERT(int, Colonna), Colonna
) AS ColN
FROM C GROUP BY Colonna
),
S AS (
SELECT c.ID, COUNT(DISTINCT g.BarcodePallet) AS n
FROM C AS c
LEFT JOIN dbo.XMag_GiacenzaPallet AS g ON g.IDCella = c.ID
GROUP BY c.ID
),
P AS (
SELECT g.IDCella, g.BarcodePallet
FROM dbo.XMag_GiacenzaPallet g
JOIN C c ON c.ID = g.IDCella
),
U AS (
SELECT ranked.IDCella AS ID, ranked.BarcodePallet AS LastUDC
FROM (
SELECT
p.IDCella,
p.BarcodePallet,
ROW_NUMBER() OVER (
PARTITION BY p.IDCella
ORDER BY
COALESCE(last_move.ModDataOra, last_move.InsDataOra, last_move.DataMagazzino) DESC,
last_move.ID DESC,
p.BarcodePallet DESC
) AS rn
FROM P p
OUTER APPLY (
SELECT TOP (1)
mp.ID,
mp.ModDataOra,
mp.InsDataOra,
mp.DataMagazzino
FROM dbo.MagazziniPallet mp
WHERE mp.IDCella = p.IDCella
AND mp.Attributo = p.BarcodePallet
AND mp.Tipo = 'V'
ORDER BY
COALESCE(mp.ModDataOra, mp.InsDataOra, mp.DataMagazzino) DESC,
mp.ID DESC
) AS last_move
) ranked
WHERE ranked.rn = 1
)
SELECT
r.RowN, k.ColN,
CASE WHEN s.n IS NULL OR s.n = 0 THEN 0
WHEN s.n = 1 THEN 1
ELSE 2 END AS Stato,
ISNULL(c.IDStato, 0) AS StatoLogico,
c.ID AS IDCella,
c.Descrizione,
LTRIM(RTRIM(c.Fila)) AS FilaTxt,
LTRIM(RTRIM(c.Colonna)) AS ColTxt,
U.LastUDC
FROM C c
JOIN R r ON r.Fila = c.Fila
JOIN K k ON k.Colonna = c.Colonna
LEFT JOIN S s ON s.ID = c.ID
LEFT JOIN U ON U.ID = c.ID
ORDER BY r.RowN, k.ColN;
"""
_log_sql("load_matrix", sql, {"corsia": corsia})
def _ok(res):
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
# ignora risposte superate
if req_id < self._last_req:
return
rows = res.get("rows", []) if isinstance(res, dict) else []
_log_dataset(f"load_matrix[{corsia}]", rows)
if not rows:
# mostra matrice vuota senza rimuovere il frame (evita "schermo bianco")
self._rebuild_matrix(0, 0, [], [], [], [], [], [], corsia)
self._refresh_stats()
self._busy.hide()
return
max_r = max_c = 0
for row in rows:
rown, coln = row[0], row[1]
if rown and coln:
max_r = max(max_r, int(rown))
max_c = max(max_c, int(coln))
mat = [[0] * max_c for _ in range(max_r)]
logical = [[0] * max_c for _ in range(max_r)]
cell_ids = [[0] * max_c for _ in range(max_r)]
fila = [[""] * max_c for _ in range(max_r)]
col = [[""] * max_c for _ in range(max_r)]
desc = [[""] * max_c for _ in range(max_r)]
udc = [[""] * max_c for _ in range(max_r)]
for row in rows:
rown, coln, stato, stato_logico, idcella, descr, fila_txt, col_txt, first_udc = row
r = int(rown) - 1
c = int(coln) - 1
mat[r][c] = int(stato)
logical[r][c] = int(stato_logico or 0)
cell_ids[r][c] = int(idcella or 0)
fila[r][c] = str(fila_txt or "")
col[r][c] = str(col_txt or "")
desc[r][c] = str(descr or f"{corsia}.{col_txt}.{fila_txt}")
udc[r][c] = str(first_udc or "")
def _finish_render():
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
if req_id < self._last_req:
return
self.cell_ids = cell_ids
self._rebuild_matrix(max_r, max_c, mat, logical, fila, col, desc, udc, corsia)
self._refresh_stats()
self._busy.hide()
self.update_idletasks()
self.after_idle(_finish_render)
def _err(ex):
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
if req_id < self._last_req:
return
self._busy.hide()
_MODULE_LOGGER.exception(f"Errore caricamento matrice corsia={corsia}: {ex}")
messagebox.showerror("Errore", f"Caricamento matrice {corsia} fallito:\n{ex}")
self._async.run(self.db.query_json(sql, {"corsia": corsia}), _ok, _err, busy=self._busy, message=f"Carico corsia {corsia}")
# ---------------- SEARCH ----------------
@_log_call()
def _search_udc(self):
"""Find a pallet barcode and navigate to the aisle and cell that contain it."""
barcode = (self.search_var.get() or "").strip()
if not barcode:
self._toast("Inserisci un barcode UDC da cercare.")
return
# bump token per impedire che una vecchia _load_matrix cancelli UI
self._req_counter += 1
search_req_id = self._req_counter
self._last_req = search_req_id
sql = """
SELECT TOP (1)
RTRIM(c.Corsia) AS Corsia,
RTRIM(c.Colonna) AS Colonna,
RTRIM(c.Fila) AS Fila,
c.ID AS IDCella
FROM dbo.XMag_GiacenzaPallet g
JOIN dbo.Celle c ON c.ID = g.IDCella
WHERE g.BarcodePallet = :barcode
AND c.ID <> 9999 AND RTRIM(c.Corsia) <> '7G'
"""
_log_sql("search_udc", sql, {"barcode": barcode})
def _ok(res):
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
if search_req_id < self._last_req:
return
rows = res.get("rows", []) if isinstance(res, dict) else []
_log_dataset(f"search_udc[{barcode}]", rows)
if not rows:
messagebox.showinfo("Ricerca", f"UDC {barcode} non trovata.", parent=self)
return
corsia, col, fila, _idc = rows[0]
corsia = str(corsia).strip(); col = str(col).strip(); fila = str(fila).strip()
self._pending_focus = (corsia, col, fila, barcode)
# sincronizza listbox e carica SEMPRE la corsia della UDC
self._select_corsia_in_listbox(corsia)
self.corsia_selezionata.set(corsia)
self._load_matrix(corsia) # highlight avverrà in _rebuild_matrix
def _err(ex):
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
if search_req_id < self._last_req:
return
_MODULE_LOGGER.exception(f"Errore ricerca UDC barcode={barcode}: {ex}")
messagebox.showerror("Ricerca", f"Errore ricerca UDC:\n{ex}", parent=self)
self._async.run(self.db.query_json(sql, {"barcode": barcode}), _ok, _err, busy=self._busy, message="Cerco UDC…")
def _try_highlight(self, col_txt: str, fila_txt: str) -> bool:
"""Highlight a cell by its textual row and column labels."""
for r in range(len(self.col_txt)):
for c in range(len(self.col_txt[r])):
if self.col_txt[r][c] == col_txt and self.fila_txt[r][c] == fila_txt:
self._clear_highlight()
sheet_row = self._sheet_row_for_matrix_row(r)
self.sheet.highlight_cells(cells=[(sheet_row, c)], bg="#0B57D0", fg="#FFFFFF", redraw=True, overwrite=True)
self.sheet.set_currently_selected(row=sheet_row, column=c)
self._highlighted = (r, c)
return True
return False
def _highlight_cell_by_labels(self, col_txt: str, fila_txt: str):
"""Show a toast when a searched cell cannot be highlighted."""
if not self._try_highlight(col_txt, fila_txt):
self._toast("Cella trovata ma non evidenziabile nella griglia.")
# ---------------- COMMANDS ----------------
@_log_call()
def _refresh_current(self):
"""Reload the matrix of the currently selected aisle."""
if self.corsia_selezionata.get():
self._load_matrix(self.corsia_selezionata.get())
@_log_call()
def _export_xlsx(self):
"""Export both matrix metadata and the rendered grid to Excel."""
if not self.matrix_state:
messagebox.showinfo("Export", "Nessuna matrice da esportare.")
return
corsia = self.corsia_selezionata.get() or "NA"
ts = datetime.now().strftime("%d_%m_%Y_%H-%M")
default = f"layout_matrice_{corsia}_{ts}.xlsx"
path = filedialog.asksaveasfilename(
title="Esporta matrice",
defaultextension=".xlsx",
initialfile=default,
filetypes=[("Excel", "*.xlsx")]
)
if not path:
return
try:
from openpyxl import Workbook
from openpyxl.styles import PatternFill, Alignment, Font
except Exception as ex:
messagebox.showerror("Export", f"Manca openpyxl: {ex}\nInstalla con: pip install openpyxl")
return
rows = len(self.matrix_state)
cols = len(self.matrix_state[0]) if self.matrix_state else 0
wb = Workbook()
ws1 = wb.active
ws1.title = f"Dettaglio {corsia}"
ws1.append(["Corsia", "FilaIdx", "ColIdx", "Stato", "Descrizione", "FilaTxt", "ColTxt", "UDC1"])
for r in range(rows):
for c in range(cols):
st = self.matrix_state[r][c]
stato_lbl = "Vuota" if st == 0 else ("Piena" if st == 1 else "Doppia")
ws1.append([corsia, r + 1, c + 1, stato_lbl,
self.desc[r][c], self.fila_txt[r][c], self.col_txt[r][c], self.udc1[r][c]])
for cell in ws1[1]:
cell.font = Font(bold=True)
ws2 = wb.create_sheet(f"Matrice {corsia}")
fills = {
0: PatternFill("solid", fgColor="B0B0B0"),
1: PatternFill("solid", fgColor="FFA500"),
2: PatternFill("solid", fgColor="D62728"),
}
center = Alignment(horizontal="center", vertical="center", wrap_text=True)
for r in range(rows):
for c in range(cols):
value = f"{corsia}.{self.col_txt[r][c]}.{self.fila_txt[r][c]}\n{self.udc1[r][c]}"
cell = ws2.cell(row=(rows - r), column=c + 1, value=value) # capovolto per avere 'a' in basso
cell.fill = fills.get(self.matrix_state[r][c], fills[0])
cell.alignment = center
try:
wb.save(path)
self._toast(f"Esportato: {path}")
except Exception as ex:
messagebox.showerror("Export", f"Salvataggio fallito:\n{ex}")
# ---------------- STATS ----------------
def _refresh_stats(self):
"""Refresh global and local occupancy statistics shown in the footer."""
# globale dal DB
sql_tot = """
WITH C AS (
SELECT ID
FROM dbo.Celle
WHERE ID <> 9999 AND (DelDataOra IS NULL)
AND LTRIM(RTRIM(Corsia)) <> '7G'
AND LTRIM(RTRIM(Fila)) IS NOT NULL
AND LTRIM(RTRIM(Colonna)) IS NOT NULL
),
S AS (
SELECT c.ID, COUNT(DISTINCT g.BarcodePallet) AS n
FROM C AS c LEFT JOIN dbo.XMag_GiacenzaPallet AS g ON g.IDCella = c.ID
GROUP BY c.ID
)
SELECT
CAST(SUM(CASE WHEN s.n>0 THEN 1 ELSE 0 END) AS float)/NULLIF(COUNT(*),0) AS PercPieno,
CAST(SUM(CASE WHEN s.n>1 THEN 1 ELSE 0 END) AS float)/NULLIF(COUNT(*),0) AS PercDoppie
FROM C LEFT JOIN S s ON s.ID = C.ID;
"""
_log_sql("refresh_stats_global", sql_tot, {})
def _ok(res):
if not getattr(self, '_alive', True) or not self.winfo_exists():
return
rows = res.get("rows", []) if isinstance(res, dict) else []
_log_dataset("refresh_stats_global", rows)
p_full = float(rows[0][0] or 0.0) if rows else 0.0
p_dbl = float(rows[0][1] or 0.0) if rows else 0.0
self._draw_bar(self.tot_canvas, p_full)
self.tot_text.configure(text=pct_text(p_full, p_dbl))
self._async.run(self.db.query_json(sql_tot, {}), _ok, lambda e: None, busy=None, message=None)
# selezionata dalla matrice in memoria
if self.matrix_state:
tot = sum(len(r) for r in self.matrix_state)
full = sum(1 for row in self.matrix_state for v in row if v in (1, 2))
doubles = sum(1 for row in self.matrix_state for v in row if v == 2)
p_full = (full / tot) if tot else 0.0
p_dbl = (doubles / tot) if tot else 0.0
else:
p_full = p_dbl = 0.0
self._draw_bar(self.sel_canvas, p_full)
self.sel_text.configure(text=pct_text(p_full, p_dbl))
def _draw_bar(self, cv: tk.Canvas, p_full: float):
"""Draw a horizontal occupancy bar on the given canvas."""
cv.delete("all")
w = max(300, cv.winfo_width() or 600)
h = 18
fw = int(w * max(0.0, min(1.0, p_full)))
cv.create_rectangle(2, 2, 2 + fw, 2 + h, fill="#D62728", width=0)
cv.create_rectangle(2 + fw, 2, 2 + w, 2 + h, fill="#2CA02C", width=0)
cv.create_rectangle(2, 2, 2 + w, 2 + h, outline="#555", width=1)
# ---------------- UTIL ----------------
def _toast(self, msg, ms=1400):
"""Show a transient status message at the bottom of the window."""
if not hasattr(self, "_status"):
self._status = ctk.CTkLabel(self, anchor="w")
self._status.grid(row=3, column=0, sticky="ew")
self._status.configure(text=msg)
self.after(ms, lambda: self._status.configure(text=""))
def _copy(self, txt: str):
"""Copy a string to the clipboard and inform the user."""
self.clipboard_clear()
self.clipboard_append(txt)
self._toast(f"Copiato: {txt}")
@_log_call()
def destroy(self):
"""Mark the window as closed and release dynamic widgets safely."""
# evita nuovi refresh/async dopo destroy
self._alive = False
# cancella eventuali timer
try:
if self._stats_after_id is not None:
self.after_cancel(self._stats_after_id)
except Exception:
pass
# pulizia UI leggera
try:
for w in list(self.host.winfo_children()):
w.destroy()
except Exception:
pass
try:
super().destroy()
except Exception:
pass
@_log_call()
def open_layout_window(parent, db_app, session: UserSession | None = None):
"""Open the layout window as a singleton-like child of ``parent``."""
key = "_gestione_layout_window_singleton"
ex = getattr(parent, key, None)
if ex and ex.winfo_exists():
ex.session = session
try:
ex.deiconify()
except Exception:
pass
try:
ex.lift()
ex.focus_force()
return ex
except Exception:
pass
w = LayoutWindow(parent, db_app, session=session)
setattr(parent, key, w)
place_window_fullsize_below_parent_later(parent, w)
return w