1113 lines
43 KiB
Python
1113 lines
43 KiB
Python
"""Exploration window for cells containing more than one pallet."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import sys
|
|
import tkinter as tk
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
from tkinter import filedialog, messagebox, ttk
|
|
from typing import Any
|
|
|
|
import customtkinter as ctk
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Alignment, Font
|
|
|
|
from busy_overlay import InlineBusyOverlay
|
|
from gestione_aree import AsyncRunner
|
|
from gestione_scarico import move_pallet_async
|
|
from locale_text import load_locale_catalog, text as loc_text
|
|
from tooltips import WidgetToolTip, load_tooltip_catalog, tooltip_text
|
|
from ui_theme import theme_color, theme_font, theme_section, theme_value
|
|
from window_placement import place_window_fullsize_below_parent_later
|
|
|
|
try:
|
|
from loguru import logger
|
|
except Exception: # pragma: no cover - fallback used only when Loguru is not available
|
|
class _FallbackLogger:
|
|
"""Minimal adapter used only when Loguru is not installed yet."""
|
|
|
|
def __init__(self):
|
|
self._logger = logging.getLogger(MODULE_LOG_NAME if "MODULE_LOG_NAME" in globals() else __name__)
|
|
self._logger.setLevel(logging.DEBUG)
|
|
self._logger.propagate = False
|
|
|
|
def bind(self, **_kwargs):
|
|
return self
|
|
|
|
def add(self, sink, level="INFO", format=None, encoding="utf-8", **_kwargs):
|
|
handler: logging.Handler
|
|
if hasattr(sink, "write"):
|
|
handler = logging.StreamHandler(sink)
|
|
else:
|
|
handler = logging.FileHandler(str(sink), encoding=encoding)
|
|
handler.setLevel(getattr(logging, str(level).upper(), logging.INFO))
|
|
handler.setFormatter(
|
|
logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
|
|
)
|
|
self._logger.addHandler(handler)
|
|
return 0
|
|
|
|
def log(self, level, message):
|
|
getattr(self._logger, str(level).lower(), self._logger.info)(message)
|
|
|
|
def debug(self, message):
|
|
self._logger.debug(message)
|
|
|
|
def info(self, message):
|
|
self._logger.info(message)
|
|
|
|
def exception(self, message):
|
|
self._logger.exception(message)
|
|
|
|
logger = _FallbackLogger()
|
|
|
|
|
|
MULTI_UDC_LOG_MODE = "INFO" # "OFF" | "INFO" | "DEBUG"
|
|
MODULE_LOG_NAME = Path(__file__).stem
|
|
MODULE_LOG_PATH = Path(__file__).with_suffix(".log")
|
|
_MODULE_LOG_ENABLED = MULTI_UDC_LOG_MODE.upper() != "OFF"
|
|
_MODULE_LOG_LEVEL = "DEBUG" if MULTI_UDC_LOG_MODE.upper() == "DEBUG" else "INFO"
|
|
_MODULE_LOGGER = logger.bind(warehouse_module=MODULE_LOG_NAME)
|
|
_MODULE_LOGGING_CONFIGURED = False
|
|
|
|
|
|
def _configure_module_logger():
|
|
"""Configure console and file logging for this module."""
|
|
global _MODULE_LOGGING_CONFIGURED
|
|
if _MODULE_LOGGING_CONFIGURED:
|
|
return
|
|
if not _MODULE_LOG_ENABLED:
|
|
_MODULE_LOGGING_CONFIGURED = True
|
|
return
|
|
|
|
record_filter = lambda record: record["extra"].get("warehouse_module") == MODULE_LOG_NAME
|
|
|
|
logger.add(
|
|
sys.stderr,
|
|
level=_MODULE_LOG_LEVEL,
|
|
colorize=True,
|
|
filter=record_filter,
|
|
format=(
|
|
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
|
|
"<level>{level: <8}</level> | "
|
|
"<cyan>" + MODULE_LOG_NAME + "</cyan> | "
|
|
"<level>{message}</level>"
|
|
),
|
|
)
|
|
logger.add(
|
|
MODULE_LOG_PATH,
|
|
level=_MODULE_LOG_LEVEL,
|
|
colorize=False,
|
|
encoding="utf-8",
|
|
filter=record_filter,
|
|
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | " + MODULE_LOG_NAME + " | {message}",
|
|
)
|
|
_MODULE_LOGGING_CONFIGURED = True
|
|
|
|
|
|
def _format_payload(payload: Any) -> str:
|
|
"""Serialize payloads for human-readable logging."""
|
|
try:
|
|
return json.dumps(payload, ensure_ascii=False, indent=2, default=str)
|
|
except Exception:
|
|
return repr(payload)
|
|
|
|
|
|
def _log_call(level: str | None = None):
|
|
"""Trace entry, exit and failure of selected high-level functions."""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
effective_level = level or _MODULE_LOG_LEVEL
|
|
_MODULE_LOGGER.log(
|
|
effective_level,
|
|
f"CALL {func.__qualname__} args={_format_payload(args[1:] if args else ())} kwargs={_format_payload(kwargs)}",
|
|
)
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
except Exception:
|
|
_MODULE_LOGGER.exception(f"FAIL {func.__qualname__}")
|
|
raise
|
|
_MODULE_LOGGER.log(effective_level, f"RETURN {func.__qualname__}")
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def _log_sql(query_name: str, sql: str, params: dict[str, Any] | None = None):
|
|
"""Log one SQL statement and its parameters."""
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} params={_format_payload(params or {})}")
|
|
_MODULE_LOGGER.debug(f"SQL {query_name} text:\n{sql.strip()}")
|
|
|
|
|
|
def _log_dataset(query_name: str, rows: list[Any]):
|
|
"""Log query results at summary or full-debug level depending on the flag."""
|
|
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} returned {len(rows)} rows")
|
|
if MULTI_UDC_LOG_MODE.upper() == "DEBUG":
|
|
_MODULE_LOGGER.debug(f"SQL {query_name} dataset:\n{_format_payload(rows)}")
|
|
|
|
|
|
_configure_module_logger()
|
|
if _MODULE_LOG_ENABLED:
|
|
_MODULE_LOGGER.info(
|
|
f"Logging inizializzato su {MODULE_LOG_PATH.name} livello={_MODULE_LOG_LEVEL} mode={MULTI_UDC_LOG_MODE.upper()}"
|
|
)
|
|
|
|
|
|
def _json_obj(res):
|
|
"""Normalize raw DB responses into a dictionary with a ``rows`` key."""
|
|
if isinstance(res, str):
|
|
try:
|
|
res = json.loads(res)
|
|
except Exception as ex:
|
|
raise RuntimeError(f"Risposta non JSON: {ex}\nRaw: {res!r}")
|
|
if isinstance(res, dict) and "error" in res:
|
|
err = res.get("error") or "Errore sconosciuto"
|
|
detail = res.get("sql") or ""
|
|
raise RuntimeError(f"{err}\n{detail}")
|
|
return res if isinstance(res, dict) else {"rows": res}
|
|
|
|
|
|
UBI_B = (
|
|
"UPPER("
|
|
" CONCAT("
|
|
" RTRIM(b.Corsia), '.', RTRIM(CAST(b.Colonna AS varchar(32))), '.', RTRIM(CAST(b.Fila AS varchar(32)))"
|
|
" )"
|
|
")"
|
|
)
|
|
|
|
BASE_CTE = """
|
|
WITH base AS (
|
|
SELECT
|
|
g.IDCella,
|
|
g.BarcodePallet,
|
|
RTRIM(c.Corsia) AS Corsia,
|
|
c.Colonna,
|
|
c.Fila
|
|
FROM dbo.XMag_GiacenzaPallet AS g
|
|
JOIN dbo.Celle AS c ON c.ID = g.IDCella
|
|
WHERE g.IDCella <> 9999 AND RTRIM(c.Corsia) <> '7G'
|
|
)
|
|
"""
|
|
|
|
SQL_CORSIE = BASE_CTE + """
|
|
, dup_celle AS (
|
|
SELECT IDCCella = b.IDCella
|
|
FROM base b
|
|
GROUP BY b.IDCella
|
|
HAVING COUNT(DISTINCT b.BarcodePallet) > 1
|
|
)
|
|
SELECT DISTINCT b.Corsia
|
|
FROM base b
|
|
WHERE EXISTS (SELECT 1 FROM dup_celle d WHERE d.IDCCella = b.IDCella)
|
|
ORDER BY b.Corsia;
|
|
"""
|
|
|
|
SQL_CELLE_DUP_PER_CORSIA = BASE_CTE + f"""
|
|
, dup_celle AS (
|
|
SELECT b.IDCella, COUNT(DISTINCT b.BarcodePallet) AS NumUDC
|
|
FROM base b
|
|
GROUP BY b.IDCella
|
|
HAVING COUNT(DISTINCT b.BarcodePallet) > 1
|
|
)
|
|
SELECT dc.IDCella,
|
|
{UBI_B} AS Ubicazione,
|
|
b.Colonna, b.Fila, b.Corsia,
|
|
dc.NumUDC
|
|
FROM dup_celle dc
|
|
JOIN base b ON b.IDCella = dc.IDCella
|
|
WHERE b.Corsia = RTRIM(:corsia)
|
|
GROUP BY dc.IDCella, {UBI_B}, b.Colonna, b.Fila, b.Corsia, dc.NumUDC
|
|
ORDER BY b.Colonna, b.Fila;
|
|
"""
|
|
|
|
SQL_PALLET_IN_CELLA = BASE_CTE + """
|
|
, cell_pallets AS (
|
|
SELECT DISTINCT b.BarcodePallet
|
|
FROM base b
|
|
WHERE b.IDCella = :idcella
|
|
),
|
|
latest_any AS (
|
|
SELECT
|
|
ranked.BarcodePallet,
|
|
ranked.IDCella
|
|
FROM (
|
|
SELECT
|
|
mp.Attributo AS BarcodePallet,
|
|
mp.IDCella,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY mp.Attributo
|
|
ORDER BY mp.ID DESC
|
|
) AS rn
|
|
FROM dbo.MagazziniPallet mp
|
|
JOIN cell_pallets cp
|
|
ON cp.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
mp.Attributo COLLATE Latin1_General_CI_AS
|
|
WHERE mp.Tipo = 'V'
|
|
AND mp.PesoUnitario > 0
|
|
) ranked
|
|
WHERE ranked.rn = 1
|
|
),
|
|
shipped AS (
|
|
SELECT DISTINCT shipped.BarcodePallet
|
|
FROM dbo.XMag_GiacenzaPalletPlistChiuse shipped
|
|
JOIN cell_pallets cp
|
|
ON cp.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
shipped.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
)
|
|
SELECT
|
|
b.BarcodePallet AS Pallet,
|
|
ta.Descrizione,
|
|
ta.Lotto,
|
|
CASE
|
|
WHEN shipped.BarcodePallet IS NOT NULL THEN CAST(1 AS int)
|
|
ELSE CAST(0 AS int)
|
|
END AS IsShippedGhost,
|
|
CASE
|
|
WHEN la.IDCella IS NOT NULL
|
|
AND la.IDCella <> :idcella
|
|
THEN CAST(1 AS int)
|
|
ELSE CAST(0 AS int)
|
|
END AS IsMovedGhost
|
|
FROM base b
|
|
OUTER APPLY (
|
|
SELECT TOP (1) t.Descrizione, t.Lotto
|
|
FROM dbo.vXTracciaProdotti AS t
|
|
WHERE t.Pallet = b.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
ORDER BY t.Lotto
|
|
) AS ta
|
|
LEFT JOIN latest_any la
|
|
ON la.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
b.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
LEFT JOIN shipped
|
|
ON shipped.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
b.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
WHERE b.IDCella = :idcella
|
|
GROUP BY b.BarcodePallet, ta.Descrizione, ta.Lotto, shipped.BarcodePallet, la.IDCella
|
|
ORDER BY b.BarcodePallet;
|
|
"""
|
|
|
|
SQL_PALLET_IN_CORSIA = BASE_CTE + f"""
|
|
, dup_celle AS (
|
|
SELECT b.IDCella
|
|
FROM base b
|
|
WHERE b.Corsia = RTRIM(:corsia)
|
|
GROUP BY b.IDCella
|
|
HAVING COUNT(DISTINCT b.BarcodePallet) > 1
|
|
),
|
|
corsia_pallets AS (
|
|
SELECT DISTINCT b.IDCella, b.BarcodePallet
|
|
FROM base b
|
|
JOIN dup_celle dc ON dc.IDCella = b.IDCella
|
|
),
|
|
latest_any AS (
|
|
SELECT
|
|
ranked.BarcodePallet,
|
|
ranked.IDCella
|
|
FROM (
|
|
SELECT
|
|
mp.Attributo AS BarcodePallet,
|
|
mp.IDCella,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY mp.Attributo
|
|
ORDER BY mp.ID DESC
|
|
) AS rn
|
|
FROM dbo.MagazziniPallet mp
|
|
JOIN (SELECT DISTINCT BarcodePallet FROM corsia_pallets) cp
|
|
ON cp.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
mp.Attributo COLLATE Latin1_General_CI_AS
|
|
WHERE mp.Tipo = 'V'
|
|
AND mp.PesoUnitario > 0
|
|
) ranked
|
|
WHERE ranked.rn = 1
|
|
),
|
|
shipped AS (
|
|
SELECT DISTINCT shipped.BarcodePallet
|
|
FROM dbo.XMag_GiacenzaPalletPlistChiuse shipped
|
|
JOIN (SELECT DISTINCT BarcodePallet FROM corsia_pallets) cp
|
|
ON cp.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
shipped.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
)
|
|
SELECT
|
|
cp.IDCella,
|
|
{UBI_B} AS Ubicazione,
|
|
cp.BarcodePallet AS Pallet,
|
|
ta.Descrizione,
|
|
ta.Lotto,
|
|
CASE
|
|
WHEN shipped.BarcodePallet IS NOT NULL THEN CAST(1 AS int)
|
|
ELSE CAST(0 AS int)
|
|
END AS IsShippedGhost,
|
|
CASE
|
|
WHEN la.IDCella IS NOT NULL
|
|
AND la.IDCella <> cp.IDCella
|
|
THEN CAST(1 AS int)
|
|
ELSE CAST(0 AS int)
|
|
END AS IsMovedGhost
|
|
FROM corsia_pallets cp
|
|
JOIN base b
|
|
ON b.IDCella = cp.IDCella
|
|
AND b.BarcodePallet = cp.BarcodePallet
|
|
OUTER APPLY (
|
|
SELECT TOP (1) t.Descrizione, t.Lotto
|
|
FROM dbo.vXTracciaProdotti AS t
|
|
WHERE t.Pallet = cp.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
ORDER BY t.Lotto
|
|
) AS ta
|
|
LEFT JOIN latest_any la
|
|
ON la.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
cp.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
LEFT JOIN shipped
|
|
ON shipped.BarcodePallet COLLATE Latin1_General_CI_AS =
|
|
cp.BarcodePallet COLLATE Latin1_General_CI_AS
|
|
GROUP BY
|
|
cp.IDCella,
|
|
{UBI_B},
|
|
cp.BarcodePallet,
|
|
ta.Descrizione,
|
|
ta.Lotto,
|
|
shipped.BarcodePallet,
|
|
la.IDCella
|
|
ORDER BY cp.IDCella, cp.BarcodePallet;
|
|
"""
|
|
|
|
|
|
def _build_diagnostic_note(is_shipped: int | bool, is_moved: int | bool) -> str:
|
|
"""Translate anomaly flags into the operator-facing ghost cause."""
|
|
|
|
notes: list[str] = []
|
|
if bool(is_shipped):
|
|
notes.append("Mancato scarico: spedita")
|
|
if bool(is_moved):
|
|
notes.append("Mancato scarico: spostata")
|
|
return " | ".join(notes)
|
|
|
|
SQL_RIEPILOGO_PERCENTUALI = BASE_CTE + """
|
|
, tot AS (
|
|
SELECT b.Corsia, COUNT(DISTINCT b.IDCella) AS TotCelle
|
|
FROM base b GROUP BY b.Corsia
|
|
),
|
|
dup_celle AS (
|
|
SELECT b.Corsia, b.IDCella
|
|
FROM base b
|
|
GROUP BY b.Corsia, b.IDCella
|
|
HAVING COUNT(DISTINCT b.BarcodePallet) > 1
|
|
),
|
|
per_corsia AS (
|
|
SELECT t.Corsia, t.TotCelle, COALESCE(d.CelleMultiple, 0) AS CelleMultiple
|
|
FROM tot t
|
|
LEFT JOIN (
|
|
SELECT Corsia, COUNT(IDCella) AS CelleMultiple
|
|
FROM dup_celle GROUP BY Corsia
|
|
) d ON d.Corsia = t.Corsia
|
|
),
|
|
unione AS (
|
|
SELECT Corsia, TotCelle, CelleMultiple,
|
|
CAST(100.0 * CelleMultiple / NULLIF(TotCelle, 0) AS decimal(5,2)) AS Percentuale,
|
|
CAST(0 AS int) AS Ord
|
|
FROM per_corsia
|
|
UNION ALL
|
|
SELECT 'TOTALE' AS Corsia,
|
|
SUM(TotCelle), SUM(CelleMultiple),
|
|
CAST(100.0 * SUM(CelleMultiple) / NULLIF(SUM(TotCelle), 0) AS decimal(5,2)),
|
|
CAST(1 AS int) AS Ord
|
|
FROM per_corsia
|
|
)
|
|
SELECT Corsia, TotCelle, CelleMultiple, Percentuale
|
|
FROM unione
|
|
ORDER BY Ord, Corsia;
|
|
"""
|
|
|
|
|
|
class CelleMultipleWindow(ctk.CTkToplevel):
|
|
"""Tree-based explorer for duplicated pallet allocations."""
|
|
|
|
@_log_call()
|
|
def __init__(self, root, db_client, runner: AsyncRunner | None = None, session=None):
|
|
"""Bind the shared DB client and immediately load the tree summary."""
|
|
super().__init__(root)
|
|
self._theme = theme_section("multi_udc", {})
|
|
self._locale_catalog = load_locale_catalog()
|
|
self._tooltip_catalog = load_tooltip_catalog()
|
|
self.title(loc_text("multi.title", catalog=self._locale_catalog, default="Celle con piu' pallet"))
|
|
self.session = session
|
|
self.geometry(str(theme_value(self._theme, "window_geometry", "1100x700")))
|
|
minsize = theme_value(self._theme, "window_minsize", [900, 550])
|
|
self.minsize(int(minsize[0]), int(minsize[1]))
|
|
self.resizable(True, True)
|
|
try:
|
|
self.configure(fg_color=theme_color(self._theme, "window_fg_color", ("#efefef", "#2f2f2f")))
|
|
except Exception:
|
|
pass
|
|
|
|
self.db = db_client
|
|
self.runner = runner or AsyncRunner(self)
|
|
self._busy = InlineBusyOverlay(self, self._theme)
|
|
self.selected_corsia_id: str | None = None
|
|
self.selected_udc_keys: set[str] = set()
|
|
self.udc_meta_by_key: dict[str, dict[str, Any]] = {}
|
|
|
|
self._build_layout()
|
|
self._bind_events()
|
|
self.refresh_all()
|
|
|
|
def _build_layout(self):
|
|
"""Create the toolbar, lazy-loaded tree and percentage summary table."""
|
|
self.grid_rowconfigure(0, weight=5)
|
|
self.grid_rowconfigure(1, weight=70)
|
|
self.grid_rowconfigure(2, weight=25, minsize=160)
|
|
self.grid_columnconfigure(0, weight=1)
|
|
|
|
toolbar = ctk.CTkFrame(self)
|
|
toolbar.grid(row=0, column=0, sticky="nsew")
|
|
try:
|
|
toolbar.configure(fg_color=theme_color(self._theme, "toolbar_frame_fg_color", ("#d7d7d7", "#3b3b3b")))
|
|
except Exception:
|
|
pass
|
|
btn_refresh = ctk.CTkButton(toolbar, text=loc_text("multi.button.refresh", catalog=self._locale_catalog, default="Aggiorna"), command=self.refresh_all, font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")))
|
|
btn_refresh.pack(side="left", padx=6, pady=4)
|
|
btn_expand = ctk.CTkButton(toolbar, text=loc_text("multi.button.expand", catalog=self._locale_catalog, default="Espandi tutto"), command=self.expand_all, font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")))
|
|
btn_expand.pack(side="left", padx=6, pady=4)
|
|
btn_collapse = ctk.CTkButton(toolbar, text=loc_text("multi.button.collapse", catalog=self._locale_catalog, default="Comprimi tutto"), command=self.collapse_all, font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")))
|
|
btn_collapse.pack(side="left", padx=6, pady=4)
|
|
btn_preselect = ctk.CTkButton(toolbar, text=loc_text("multi.button.preselect", catalog=self._locale_catalog, default="Preselezione fantasmi corsia"), command=self._preselect_selected_corsia, font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")))
|
|
btn_preselect.pack(side="left", padx=6, pady=4)
|
|
btn_remove = ctk.CTkButton(toolbar, text=loc_text("multi.button.remove", catalog=self._locale_catalog, default="Rimuovi fantasmi corsia"), command=self._remove_selected_ghosts_for_corsia, font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")))
|
|
btn_remove.pack(side="left", padx=6, pady=4)
|
|
btn_export = ctk.CTkButton(toolbar, text=loc_text("multi.button.export", catalog=self._locale_catalog, default="Esporta in XLSX"), command=self.export_to_xlsx, font=theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold")))
|
|
btn_export.pack(side="left", padx=6, pady=4)
|
|
WidgetToolTip(btn_refresh, tooltip_text("multi_udc.refresh", catalog=self._tooltip_catalog))
|
|
WidgetToolTip(btn_expand, tooltip_text("multi_udc.expand_all", catalog=self._tooltip_catalog))
|
|
WidgetToolTip(btn_collapse, tooltip_text("multi_udc.collapse_all", catalog=self._tooltip_catalog))
|
|
WidgetToolTip(btn_preselect, tooltip_text("multi_udc.preselect", catalog=self._tooltip_catalog))
|
|
WidgetToolTip(btn_remove, tooltip_text("multi_udc.remove_ghosts", catalog=self._tooltip_catalog))
|
|
WidgetToolTip(btn_export, tooltip_text("multi_udc.export_xlsx", catalog=self._tooltip_catalog))
|
|
|
|
frame = ctk.CTkFrame(self)
|
|
frame.grid(row=1, column=0, sticky="nsew", padx=6, pady=(0, 6))
|
|
try:
|
|
frame.configure(fg_color=theme_color(self._theme, "content_frame_fg_color", ("#e5e5e5", "#383838")))
|
|
except Exception:
|
|
pass
|
|
frame.grid_rowconfigure(0, weight=1)
|
|
frame.grid_columnconfigure(0, weight=1)
|
|
self.tree = ttk.Treeview(frame, columns=("col2", "col3", "col4"), show="tree headings", selectmode="browse")
|
|
self.tree.heading("#0", text="Nodo")
|
|
self.tree.heading("col2", text="Descrizione")
|
|
self.tree.heading("col3", text="Lotto")
|
|
self.tree.heading("col4", text="Causale")
|
|
self.tree.column("#0", width=220, anchor="w")
|
|
self.tree.column("col2", width=250, anchor="w")
|
|
self.tree.column("col3", width=120, anchor="w")
|
|
self.tree.column("col4", width=260, anchor="w")
|
|
y = ttk.Scrollbar(frame, orient="vertical", command=self.tree.yview)
|
|
x = ttk.Scrollbar(frame, orient="horizontal", command=self.tree.xview)
|
|
self.tree.configure(yscrollcommand=y.set, xscrollcommand=x.set)
|
|
self.tree.grid(row=0, column=0, sticky="nsew")
|
|
y.grid(row=0, column=1, sticky="ns")
|
|
x.grid(row=1, column=0, sticky="ew")
|
|
|
|
sumf = ctk.CTkFrame(self)
|
|
sumf.grid(row=2, column=0, sticky="nsew", padx=6, pady=(0, 6))
|
|
try:
|
|
sumf.configure(fg_color=theme_color(self._theme, "summary_frame_fg_color", ("#dcdcdc", "#363636")))
|
|
except Exception:
|
|
pass
|
|
ctk.CTkLabel(
|
|
sumf,
|
|
text=loc_text("multi.summary", catalog=self._locale_catalog, default="Riepilogo % celle multiple per corsia"),
|
|
font=theme_font(self._theme, "summary_title_font", ("Segoe UI", 12, "bold")),
|
|
).pack(anchor="w", padx=8, pady=(8, 0))
|
|
inner = ctk.CTkFrame(sumf)
|
|
inner.pack(fill="both", expand=True, padx=6, pady=6)
|
|
try:
|
|
inner.configure(fg_color=theme_color(self._theme, "inner_summary_frame_fg_color", ("#d4d4d4", "#404040")))
|
|
except Exception:
|
|
pass
|
|
inner.grid_rowconfigure(0, weight=1)
|
|
inner.grid_columnconfigure(0, weight=1)
|
|
self.sum_tbl = ttk.Treeview(inner, columns=("Corsia", "TotCelle", "CelleMultiple", "Percentuale"), show="headings")
|
|
for key, title, width, anchor in (
|
|
("Corsia", "Corsia", 100, "center"),
|
|
("TotCelle", "Totale celle", 120, "e"),
|
|
("CelleMultiple", ">1 UDC", 120, "e"),
|
|
("Percentuale", "%", 80, "e"),
|
|
):
|
|
self.sum_tbl.heading(key, text=title)
|
|
self.sum_tbl.column(key, width=width, anchor=anchor)
|
|
y2 = ttk.Scrollbar(inner, orient="vertical", command=self.sum_tbl.yview)
|
|
x2 = ttk.Scrollbar(inner, orient="horizontal", command=self.sum_tbl.xview)
|
|
self.sum_tbl.configure(yscrollcommand=y2.set, xscrollcommand=x2.set)
|
|
self.sum_tbl.grid(row=0, column=0, sticky="nsew")
|
|
y2.grid(row=0, column=1, sticky="ns")
|
|
x2.grid(row=1, column=0, sticky="ew")
|
|
|
|
def _bind_events(self):
|
|
"""Attach lazy-load behavior when nodes are expanded."""
|
|
self.tree.bind("<<TreeviewOpen>>", self._on_open_node)
|
|
self.tree.bind("<Button-1>", self._on_tree_click, add="+")
|
|
|
|
def _format_corsia_text(self, corsia: str) -> str:
|
|
"""Render one aisle root with its exclusive selection marker."""
|
|
|
|
return f"[{'x' if self.selected_corsia_id == f'corsia:{corsia}' else ' '}] Corsia {corsia}"
|
|
|
|
def _format_pallet_text(self, pallet: str, selected: bool) -> str:
|
|
"""Render one pallet leaf with its selection marker."""
|
|
|
|
return f"[{'x' if selected else ' '}] {pallet}"
|
|
|
|
def _selected_corsia_value(self) -> str | None:
|
|
"""Return the code of the currently active aisle."""
|
|
|
|
if self.selected_corsia_id and self.selected_corsia_id.startswith("corsia:"):
|
|
return self.selected_corsia_id.split(":", 1)[1]
|
|
return None
|
|
|
|
def _set_selected_corsia(self, node_id: str | None):
|
|
"""Keep exactly one selected aisle and refresh root labels."""
|
|
|
|
previous = self.selected_corsia_id
|
|
self.selected_corsia_id = node_id
|
|
for iid in self.tree.get_children(""):
|
|
if not iid.startswith("corsia:"):
|
|
continue
|
|
self.tree.item(iid, text=self._format_corsia_text(iid.split(":", 1)[1]))
|
|
if previous and previous != node_id and previous.startswith("corsia:"):
|
|
self._clear_leaf_selection_for_corsia(previous.split(":", 1)[1])
|
|
|
|
def _set_leaf_selected(self, leaf_id: str, selected: bool):
|
|
"""Toggle one selected pallet leaf and refresh its label if visible."""
|
|
|
|
meta = self.udc_meta_by_key.get(leaf_id)
|
|
if selected:
|
|
self.selected_udc_keys.add(leaf_id)
|
|
else:
|
|
self.selected_udc_keys.discard(leaf_id)
|
|
if meta and self.tree.exists(leaf_id):
|
|
self.tree.item(leaf_id, text=self._format_pallet_text(str(meta.get("pallet", "")), selected))
|
|
|
|
def _clear_leaf_selection_for_corsia(self, corsia: str):
|
|
"""Clear all selected pallet leaves for one aisle."""
|
|
|
|
for leaf_id, meta in list(self.udc_meta_by_key.items()):
|
|
if meta.get("corsia") == corsia:
|
|
self._set_leaf_selected(leaf_id, False)
|
|
|
|
def _on_tree_click(self, event):
|
|
"""Handle custom selection on aisle roots and pallet leaves."""
|
|
|
|
try:
|
|
element = self.tree.identify_element(event.x, event.y)
|
|
if "indicator" in str(element).lower():
|
|
return
|
|
item_id = self.tree.identify_row(event.y)
|
|
except Exception:
|
|
return
|
|
if not item_id:
|
|
return
|
|
tags = self.tree.item(item_id, "tags") or ()
|
|
if "corsia" in tags:
|
|
self._set_selected_corsia(item_id)
|
|
self.tree.selection_set(item_id)
|
|
return "break"
|
|
if "pallet" in tags:
|
|
corsia = next((tag.split(":", 1)[1] for tag in tags if tag.startswith("corsia:")), "")
|
|
self._set_selected_corsia(f"corsia:{corsia}")
|
|
self._set_leaf_selected(item_id, item_id not in self.selected_udc_keys)
|
|
self.tree.selection_set(item_id)
|
|
return "break"
|
|
|
|
@_log_call()
|
|
def refresh_all(self):
|
|
"""Reload both the duplication tree and the summary percentage table."""
|
|
self.selected_corsia_id = None
|
|
self.selected_udc_keys.clear()
|
|
self.udc_meta_by_key.clear()
|
|
self._load_corsie()
|
|
self._load_riepilogo()
|
|
|
|
@_log_call()
|
|
def _load_corsie(self):
|
|
"""Load root nodes representing aisles with duplicated cells."""
|
|
self.tree.delete(*self.tree.get_children())
|
|
_log_sql("multi_udc_corsie", SQL_CORSIE, {})
|
|
|
|
async def _q(db):
|
|
return await db.query_json(SQL_CORSIE, as_dict_rows=True)
|
|
|
|
def _err(ex):
|
|
_MODULE_LOGGER.exception(f"Errore caricamento corsie UDC fantasma: {ex}")
|
|
messagebox.showerror("Errore", str(ex), parent=self)
|
|
|
|
self.runner.run(
|
|
_q(self.db),
|
|
self._fill_corsie,
|
|
_err,
|
|
busy=self._busy,
|
|
message="Carico corsie UDC fantasma...",
|
|
)
|
|
|
|
@_log_call()
|
|
def _fill_corsie(self, res):
|
|
"""Populate root tree nodes after the aisle query completes."""
|
|
rows = _json_obj(res).get("rows", [])
|
|
_log_dataset("multi_udc_corsie", rows)
|
|
for row in rows:
|
|
corsia = row.get("Corsia")
|
|
if not corsia:
|
|
continue
|
|
node_id = f"corsia:{corsia}"
|
|
self.tree.insert(
|
|
"",
|
|
"end",
|
|
iid=node_id,
|
|
text=self._format_corsia_text(corsia),
|
|
values=("", "", ""),
|
|
open=False,
|
|
tags=("corsia",),
|
|
)
|
|
self.tree.insert(node_id, "end", iid=f"{node_id}::lazy", text="...", values=("", "", ""))
|
|
|
|
def _on_open_node(self, _evt):
|
|
"""Lazy-load children when a tree node is expanded."""
|
|
sel = self.tree.focus()
|
|
if not sel:
|
|
return
|
|
if sel.startswith("corsia:"):
|
|
lazy_id = f"{sel}::lazy"
|
|
if lazy_id in self.tree.get_children(sel):
|
|
self.tree.delete(lazy_id)
|
|
corsia = sel.split(":", 1)[1]
|
|
self._load_celle_for_corsia(sel, corsia)
|
|
elif sel.startswith("cella:"):
|
|
lazy_id = f"{sel}::lazy"
|
|
if lazy_id in self.tree.get_children(sel):
|
|
self.tree.delete(lazy_id)
|
|
idcella = int(sel.split(":", 1)[1])
|
|
for child in self.tree.get_children(sel):
|
|
self.tree.delete(child)
|
|
self._load_pallet_for_cella(sel, idcella)
|
|
|
|
@_log_call()
|
|
def _load_celle_for_corsia(self, parent_iid, corsia):
|
|
"""Query duplicated cells for the selected aisle."""
|
|
_log_sql("multi_udc_celle_per_corsia", SQL_CELLE_DUP_PER_CORSIA, {"corsia": corsia})
|
|
|
|
async def _q(db):
|
|
return await db.query_json(SQL_CELLE_DUP_PER_CORSIA, params={"corsia": corsia}, as_dict_rows=True)
|
|
|
|
def _err(ex):
|
|
_MODULE_LOGGER.exception(f"Errore caricamento celle duplicate corsia={corsia}: {ex}")
|
|
messagebox.showerror("Errore", str(ex), parent=self)
|
|
|
|
self.runner.run(
|
|
_q(self.db),
|
|
lambda res: self._fill_celle(parent_iid, res),
|
|
_err,
|
|
busy=self._busy,
|
|
message=f"Carico celle duplicate corsia {corsia}...",
|
|
)
|
|
|
|
@_log_call()
|
|
def _fill_celle(self, parent_iid, res):
|
|
"""Populate duplicated-cell nodes under an aisle node."""
|
|
rows = _json_obj(res).get("rows", [])
|
|
_log_dataset("multi_udc_celle_per_corsia", rows)
|
|
if not rows:
|
|
self.tree.insert(parent_iid, "end", text="(nessuna cella con >1 UDC)", values=("", "", ""))
|
|
return
|
|
for row in rows:
|
|
idc = row["IDCella"]
|
|
ubi = row["Ubicazione"]
|
|
corsia = row.get("Corsia")
|
|
num = row.get("NumUDC", 0)
|
|
node_id = f"cella:{idc}"
|
|
label = f"{ubi} [x{num}]"
|
|
if self.tree.exists(node_id):
|
|
self.tree.item(node_id, text=label, values=(f"IDCella {idc}", "", ""))
|
|
else:
|
|
self.tree.insert(
|
|
parent_iid,
|
|
"end",
|
|
iid=node_id,
|
|
text=label,
|
|
values=(f"IDCella {idc}", "", ""),
|
|
open=False,
|
|
tags=("cella", f"corsia:{corsia}"),
|
|
)
|
|
if not any(child.endswith("::lazy") for child in self.tree.get_children(node_id)):
|
|
self.tree.insert(node_id, "end", iid=f"{node_id}::lazy", text="...", values=("", "", ""))
|
|
|
|
@_log_call()
|
|
def _load_pallet_for_cella(self, parent_iid, idcella: int):
|
|
"""Query pallet details for a duplicated cell."""
|
|
_log_sql("multi_udc_pallet_in_cella", SQL_PALLET_IN_CELLA, {"idcella": idcella})
|
|
|
|
async def _q(db):
|
|
return await db.query_json(SQL_PALLET_IN_CELLA, params={"idcella": idcella}, as_dict_rows=True)
|
|
|
|
def _err(ex):
|
|
_MODULE_LOGGER.exception(f"Errore caricamento pallet cella idcella={idcella}: {ex}")
|
|
messagebox.showerror("Errore", str(ex), parent=self)
|
|
|
|
self.runner.run(
|
|
_q(self.db),
|
|
lambda res: self._fill_pallet(parent_iid, res),
|
|
_err,
|
|
busy=self._busy,
|
|
message=f"Carico pallet della cella {idcella}...",
|
|
)
|
|
|
|
@_log_call()
|
|
def _fill_pallet(self, parent_iid, res):
|
|
"""Add pallet leaves under the selected cell node."""
|
|
rows = _json_obj(res).get("rows", [])
|
|
_log_dataset("multi_udc_pallet_in_cella", rows)
|
|
if not rows:
|
|
self.tree.insert(parent_iid, "end", text="(nessun pallet)", values=("", "", ""))
|
|
return
|
|
parent_tags = self.tree.item(parent_iid, "tags") or ()
|
|
corsia_tag = next((tag for tag in parent_tags if tag.startswith("corsia:")), None)
|
|
corsia_val = corsia_tag.split(":", 1)[1] if corsia_tag else ""
|
|
cella_ubi = self.tree.item(parent_iid, "text")
|
|
idcella_txt = self.tree.item(parent_iid, "values")[0]
|
|
idcella_num = int(idcella_txt.split()[-1]) if idcella_txt else None
|
|
|
|
for row in rows:
|
|
pallet = row.get("Pallet", "")
|
|
desc = row.get("Descrizione", "")
|
|
lotto = row.get("Lotto", "")
|
|
is_shipped = int(row.get("IsShippedGhost", 0) or 0)
|
|
is_moved = int(row.get("IsMovedGhost", 0) or 0)
|
|
causale = _build_diagnostic_note(is_shipped, is_moved)
|
|
leaf_id = f"pallet:{idcella_num}:{pallet}"
|
|
self.udc_meta_by_key[leaf_id] = {
|
|
"corsia": corsia_val,
|
|
"ubicazione": cella_ubi,
|
|
"idcella": idcella_num,
|
|
"pallet": str(pallet),
|
|
"descrizione": desc,
|
|
"lotto": lotto,
|
|
"causale": causale,
|
|
"is_shipped": is_shipped,
|
|
"is_moved": is_moved,
|
|
}
|
|
if self.tree.exists(leaf_id):
|
|
self.tree.item(
|
|
leaf_id,
|
|
text=self._format_pallet_text(str(pallet), leaf_id in self.selected_udc_keys),
|
|
values=(desc, lotto, causale),
|
|
)
|
|
continue
|
|
self.tree.insert(
|
|
parent_iid,
|
|
"end",
|
|
iid=leaf_id,
|
|
text=self._format_pallet_text(str(pallet), leaf_id in self.selected_udc_keys),
|
|
values=(desc, lotto, causale),
|
|
tags=("pallet", f"corsia:{corsia_val}", f"ubicazione:{cella_ubi}", f"idcella:{idcella_num}"),
|
|
)
|
|
|
|
@_log_call()
|
|
def _preselect_selected_corsia(self):
|
|
"""Expand one aisle and preselect only shipped/moved ghost pallets."""
|
|
|
|
corsia = self._selected_corsia_value()
|
|
if not corsia:
|
|
messagebox.showinfo("Preselezione", "Seleziona prima una corsia.", parent=self)
|
|
return
|
|
|
|
corsia_node = f"corsia:{corsia}"
|
|
self.tree.item(corsia_node, open=True)
|
|
self._clear_leaf_selection_for_corsia(corsia)
|
|
self._busy.show(f"Preselezione fantasmi corsia {corsia.strip()}...")
|
|
|
|
_log_sql("multi_udc_celle_per_corsia.preselect", SQL_CELLE_DUP_PER_CORSIA, {"corsia": corsia})
|
|
_log_sql("multi_udc_pallet_in_corsia.preselect", SQL_PALLET_IN_CORSIA, {"corsia": corsia})
|
|
|
|
async def _q(db):
|
|
celle_res = _json_obj(await db.query_json(SQL_CELLE_DUP_PER_CORSIA, params={"corsia": corsia}, as_dict_rows=True))
|
|
celle_rows = celle_res.get("rows", [])
|
|
pallet_res = _json_obj(await db.query_json(SQL_PALLET_IN_CORSIA, params={"corsia": corsia}, as_dict_rows=True))
|
|
return {"cells": celle_rows, "pallets": pallet_res.get("rows", [])}
|
|
|
|
def _ok(res):
|
|
self._busy.hide()
|
|
payload = _json_obj(res)
|
|
cell_rows = payload.get("cells", [])
|
|
pallet_rows = payload.get("pallets", [])
|
|
selected_count = 0
|
|
grouped_pallets: dict[int, list[dict[str, Any]]] = {}
|
|
for pallet_row in pallet_rows:
|
|
grouped_pallets.setdefault(int(pallet_row["IDCella"]), []).append(pallet_row)
|
|
|
|
if self.tree.exists(f"{corsia_node}::lazy"):
|
|
self.tree.delete(f"{corsia_node}::lazy")
|
|
|
|
for cell_row in cell_rows:
|
|
idcella = int(cell_row["IDCella"])
|
|
cell_node = f"cella:{idcella}"
|
|
if not self.tree.exists(cell_node):
|
|
self._fill_celle(corsia_node, {"rows": [cell_row]})
|
|
self.tree.item(cell_node, open=True)
|
|
for child in list(self.tree.get_children(cell_node)):
|
|
self.tree.delete(child)
|
|
self._fill_pallet(cell_node, {"rows": grouped_pallets.get(idcella, [])})
|
|
for leaf_id, meta in list(self.udc_meta_by_key.items()):
|
|
if meta.get("corsia") != corsia or meta.get("idcella") != idcella:
|
|
continue
|
|
if meta.get("is_shipped") or meta.get("is_moved"):
|
|
self._set_leaf_selected(leaf_id, True)
|
|
selected_count += 1
|
|
|
|
messagebox.showinfo(
|
|
"Preselezione completata",
|
|
f"Corsia {corsia}\nUDC candidate selezionate automaticamente: {selected_count}",
|
|
parent=self,
|
|
)
|
|
|
|
def _err(ex):
|
|
self._busy.hide()
|
|
_MODULE_LOGGER.exception(f"Errore preselezione fantasmi corsia={corsia}: {ex}")
|
|
messagebox.showerror("Errore", str(ex), parent=self)
|
|
|
|
self.runner.run(
|
|
_q(self.db),
|
|
_ok,
|
|
_err,
|
|
busy=self._busy,
|
|
message=f"Preselezione fantasmi corsia {corsia.strip()}...",
|
|
)
|
|
|
|
@_log_call()
|
|
def _remove_selected_ghosts_for_corsia(self):
|
|
"""Remove the selected ghost pallets only for the active aisle."""
|
|
|
|
corsia = self._selected_corsia_value()
|
|
if not corsia:
|
|
messagebox.showinfo("Bonifica fantasmi", "Seleziona prima una corsia.", parent=self)
|
|
return
|
|
|
|
selected_meta = [
|
|
meta
|
|
for leaf_id, meta in self.udc_meta_by_key.items()
|
|
if leaf_id in self.selected_udc_keys and meta.get("corsia") == corsia
|
|
]
|
|
if not selected_meta:
|
|
messagebox.showinfo("Bonifica fantasmi", "Nessuna UDC selezionata nella corsia attiva.", parent=self)
|
|
return
|
|
|
|
shipped_count = sum(1 for meta in selected_meta if meta.get("is_shipped"))
|
|
moved_count = sum(1 for meta in selected_meta if meta.get("is_moved"))
|
|
cell_count = len({meta.get("idcella") for meta in selected_meta})
|
|
if not messagebox.askyesno(
|
|
"Conferma bonifica corsia",
|
|
(
|
|
f"Corsia {corsia}\n"
|
|
f"UDC selezionate: {len(selected_meta)}\n"
|
|
f"Celle coinvolte: {cell_count}\n"
|
|
f"Spedite: {shipped_count}\n"
|
|
f"Spostate: {moved_count}\n\n"
|
|
"Procedere con la rimozione delle sole UDC selezionate?"
|
|
),
|
|
parent=self,
|
|
):
|
|
return
|
|
self._busy.show(f"Rimozione fantasmi corsia {corsia.strip()}...")
|
|
|
|
async def _q(_db):
|
|
results: list[dict[str, Any]] = []
|
|
for meta in selected_meta:
|
|
result = await move_pallet_async(
|
|
self.db,
|
|
barcode_pallet=str(meta.get("pallet", "")).strip(),
|
|
target_idcella=9999,
|
|
target_barcode_cella="9000000",
|
|
utente=str(getattr(self.session, "login", "") or "warehouse_ui").strip(),
|
|
)
|
|
results.append({"meta": meta, "result": result})
|
|
return {"rows": results}
|
|
|
|
def _ok(res):
|
|
self._busy.hide()
|
|
rows = _json_obj(res).get("rows", [])
|
|
removed = sum(1 for row in rows if row.get("result"))
|
|
self.refresh_all()
|
|
messagebox.showinfo(
|
|
"Bonifica completata",
|
|
f"Corsia {corsia}\nUDC rimosse: {removed}",
|
|
parent=self,
|
|
)
|
|
|
|
def _err(ex):
|
|
self._busy.hide()
|
|
_MODULE_LOGGER.exception(f"Errore bonifica fantasmi corsia={corsia}: {ex}")
|
|
messagebox.showerror("Errore bonifica", str(ex), parent=self)
|
|
|
|
self.runner.run(
|
|
_q(self.db),
|
|
_ok,
|
|
_err,
|
|
busy=self._busy,
|
|
message=f"Rimozione fantasmi corsia {corsia.strip()}...",
|
|
)
|
|
|
|
@_log_call()
|
|
def _load_riepilogo(self):
|
|
"""Load the percentage summary by aisle."""
|
|
_log_sql("multi_udc_riepilogo", SQL_RIEPILOGO_PERCENTUALI, {})
|
|
|
|
async def _q(db):
|
|
return await db.query_json(SQL_RIEPILOGO_PERCENTUALI, as_dict_rows=True)
|
|
|
|
def _err(ex):
|
|
_MODULE_LOGGER.exception(f"Errore caricamento riepilogo UDC fantasma: {ex}")
|
|
messagebox.showerror("Errore", str(ex), parent=self)
|
|
|
|
self.runner.run(
|
|
_q(self.db),
|
|
self._fill_riepilogo,
|
|
_err,
|
|
busy=self._busy,
|
|
message="Carico riepilogo UDC fantasma...",
|
|
)
|
|
|
|
@_log_call()
|
|
def _fill_riepilogo(self, res):
|
|
"""Refresh the bottom summary table."""
|
|
rows = _json_obj(res).get("rows", [])
|
|
_log_dataset("multi_udc_riepilogo", rows)
|
|
for item in self.sum_tbl.get_children():
|
|
self.sum_tbl.delete(item)
|
|
for row in rows:
|
|
self.sum_tbl.insert(
|
|
"",
|
|
"end",
|
|
values=(row.get("Corsia"), row.get("TotCelle", 0), row.get("CelleMultiple", 0), f"{row.get('Percentuale', 0):.2f}"),
|
|
)
|
|
|
|
def expand_all(self):
|
|
"""Expand all aisle roots and trigger lazy loading where needed."""
|
|
for iid in self.tree.get_children(""):
|
|
self.tree.item(iid, open=True)
|
|
if f"{iid}::lazy" in self.tree.get_children(iid):
|
|
self.tree.delete(f"{iid}::lazy")
|
|
corsia = iid.split(":", 1)[1]
|
|
self._load_celle_for_corsia(iid, corsia)
|
|
|
|
def collapse_all(self):
|
|
"""Collapse all root nodes in the duplication tree."""
|
|
for iid in self.tree.get_children(""):
|
|
self.tree.item(iid, open=False)
|
|
|
|
@_log_call()
|
|
def export_to_xlsx(self):
|
|
"""Export both the detailed tree and the summary table to Excel."""
|
|
ts = datetime.now().strftime("%d_%m_%Y_%H-%M")
|
|
default_name = f"esportazione_celle_udc_multiple_{ts}.xlsx"
|
|
fname = filedialog.asksaveasfilename(
|
|
parent=self,
|
|
title="Esporta in Excel",
|
|
defaultextension=".xlsx",
|
|
filetypes=[("Excel Workbook", "*.xlsx")],
|
|
initialfile=default_name,
|
|
)
|
|
if not fname:
|
|
return
|
|
try:
|
|
wb = Workbook()
|
|
ws_det = wb.active
|
|
ws_det.title = "Dettaglio"
|
|
ws_sum = wb.create_sheet("Riepilogo")
|
|
det_headers = ["Corsia", "Ubicazione", "IDCella", "Pallet", "Descrizione", "Lotto", "Causale"]
|
|
sum_headers = ["Corsia", "TotCelle", "CelleMultiple", "Percentuale"]
|
|
|
|
def _hdr(ws, headers):
|
|
"""Write formatted headers into the given worksheet."""
|
|
for j, header in enumerate(headers, start=1):
|
|
cell = ws.cell(row=1, column=j, value=header)
|
|
cell.font = Font(bold=True)
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
|
|
_hdr(ws_det, det_headers)
|
|
_hdr(ws_sum, sum_headers)
|
|
|
|
row_idx = 2
|
|
for corsia_node in self.tree.get_children(""):
|
|
for cella_node in self.tree.get_children(corsia_node):
|
|
for pallet_node in self.tree.get_children(cella_node):
|
|
tags = self.tree.item(pallet_node, "tags") or ()
|
|
if "pallet" not in tags:
|
|
continue
|
|
meta = self.udc_meta_by_key.get(pallet_node, {})
|
|
corsia = meta.get("corsia") or next((tag.split(":", 1)[1] for tag in tags if tag.startswith("corsia:")), "")
|
|
ubi = meta.get("ubicazione") or next((tag.split(":", 1)[1] for tag in tags if tag.startswith("ubicazione:")), "")
|
|
idcella = meta.get("idcella") or next((tag.split(":", 1)[1] for tag in tags if tag.startswith("idcella:")), "")
|
|
pallet = meta.get("pallet") or self.tree.item(pallet_node, "text")
|
|
desc, lotto, causale = self.tree.item(pallet_node, "values")
|
|
for j, value in enumerate([corsia, ubi, idcella, pallet, desc, lotto, causale], start=1):
|
|
ws_det.cell(row=row_idx, column=j, value=value)
|
|
row_idx += 1
|
|
|
|
row_idx = 2
|
|
for iid in self.sum_tbl.get_children(""):
|
|
vals = self.sum_tbl.item(iid, "values")
|
|
for j, value in enumerate(vals, start=1):
|
|
ws_sum.cell(row=row_idx, column=j, value=value)
|
|
row_idx += 1
|
|
|
|
def _autosize(ws):
|
|
"""Resize worksheet columns based on their longest value."""
|
|
widths = {}
|
|
for row in ws.iter_rows(values_only=True):
|
|
for j, value in enumerate(row, start=1):
|
|
value_s = "" if value is None else str(value)
|
|
widths[j] = max(widths.get(j, 0), len(value_s))
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
for j, width in widths.items():
|
|
ws.column_dimensions[get_column_letter(j)].width = min(max(width + 2, 10), 60)
|
|
|
|
_autosize(ws_det)
|
|
_autosize(ws_sum)
|
|
wb.save(fname)
|
|
messagebox.showinfo("Esportazione completata", f"File creato:\n{fname}", parent=self)
|
|
except Exception as ex:
|
|
_MODULE_LOGGER.exception(f"Errore esportazione UDC fantasma: {ex}")
|
|
messagebox.showerror("Errore esportazione", str(ex), parent=self)
|
|
|
|
|
|
def open_celle_multiple_window(
|
|
root: tk.Tk,
|
|
db_client,
|
|
runner: AsyncRunner | None = None,
|
|
session=None,
|
|
):
|
|
"""Create, focus and return the duplicated-cells explorer."""
|
|
key = "_celle_multiple_window_singleton"
|
|
ex = getattr(root, key, None)
|
|
if ex and ex.winfo_exists():
|
|
try:
|
|
ex.lift()
|
|
ex.focus_force()
|
|
return ex
|
|
except Exception:
|
|
pass
|
|
win = CelleMultipleWindow(root, db_client, runner=runner, session=session)
|
|
setattr(root, key, win)
|
|
place_window_fullsize_below_parent_later(root, win)
|
|
try:
|
|
win.lift()
|
|
win.focus_force()
|
|
except Exception:
|
|
pass
|
|
return win
|