Files
ware_house/storico_pickinglist.py

549 lines
20 KiB
Python

"""Read-only picking-list history window."""
from __future__ import annotations
from datetime import date, datetime, timedelta
import tkinter as tk
from tkinter import messagebox, ttk
from typing import Any
import customtkinter as ctk
from busy_overlay import InlineBusyOverlay
from gestione_aree import AsyncRunner
from gestione_scarico import move_pallet_async
from locale_text import load_locale_catalog, text as loc_text
from ui_theme import theme_color, theme_font, theme_section, theme_value
from ui_tables import merge_tags, style_treeview, zebra_tag
from version_info import module_version, versioned_title
from window_placement import place_window_fullsize_below_parent_later
__version__ = module_version(__name__)
SQL_STORICO_PL = """
WITH base AS (
SELECT
*,
NULLIF(LTRIM(RTRIM(CAST(Pallet AS varchar(32)))), '') AS PalletKey
FROM dbo.py_XMag_ViewPackingListStorico
),
pallets AS (
SELECT
Documento,
PalletKey,
MAX(CASE WHEN Cella <> 9999 OR Cella IS NULL THEN 1 ELSE 0 END) AS HasResiduo,
MAX(CASE WHEN Cella = 9999 THEN 1 ELSE 0 END) AS HasSpedito
FROM base
WHERE PalletKey IS NOT NULL
GROUP BY Documento, PalletKey
),
meta AS (
SELECT
Documento,
MAX(DataDocumento) AS DataDocumento,
MAX(StatoDocumento) AS StatoDocumento,
MAX(NAZIONE) AS NAZIONE,
MAX(CodNazione) AS CodNazione,
COUNT(*) AS RigheTotali,
MIN(Ordinamento) AS PrimoOrdine,
MAX(IDStato) AS IDStato
FROM base
GROUP BY Documento
),
agg AS (
SELECT
Documento,
COUNT(*) AS TotUDC,
SUM(CASE WHEN p.HasResiduo = 0 AND p.HasSpedito = 1 THEN 1 ELSE 0 END) AS RigheSpedite,
SUM(CASE WHEN p.HasResiduo = 1 THEN 1 ELSE 0 END) AS RigheResidue
FROM pallets p
GROUP BY Documento
)
SELECT
m.Documento,
m.DataDocumento,
m.StatoDocumento,
m.NAZIONE,
m.CodNazione,
COALESCE(a.TotUDC, 0) AS TotUDC,
COALESCE(a.RigheResidue, 0) AS RigheResidue,
COALESCE(a.RigheSpedite, 0) AS RigheSpedite,
m.RigheTotali,
CASE
WHEN m.StatoDocumento = 'D' AND COALESCE(a.RigheResidue, 0) > 0 THEN 'Chiusa ERP con residui'
WHEN m.StatoDocumento = 'D' THEN 'Chiusa'
WHEN COALESCE(a.RigheResidue, 0) = 0 THEN 'Esaurita'
WHEN COALESCE(a.RigheSpedite, 0) > 0 THEN 'In corso'
ELSE 'Da lavorare'
END AS StatoOperativo,
m.IDStato,
m.PrimoOrdine
FROM meta m
LEFT JOIN agg a ON a.Documento = m.Documento
WHERE (:documento IS NULL OR CAST(m.Documento AS varchar(32)) LIKE CONCAT('%', :documento, '%'))
ORDER BY m.Documento DESC;
"""
SQL_STORICO_PL_DETAILS = """
SELECT
Documento,
Pallet,
Lotto,
Articolo,
Descrizione,
Qta,
DataDocumento,
StatoDocumento,
Cella,
Ubicazione,
Ordinamento,
IDStato
FROM dbo.py_XMag_ViewPackingListStorico
WHERE Documento = :documento
ORDER BY Ordinamento, Pallet;
"""
def _rows_to_dicts(res: dict[str, Any] | None) -> list[dict[str, Any]]:
if not isinstance(res, dict):
return []
rows = res.get("rows") or []
cols = res.get("columns") or []
if rows and isinstance(rows[0], dict):
return [row for row in rows if isinstance(row, dict)]
out: list[dict[str, Any]] = []
for row in rows:
if isinstance(row, (list, tuple)) and cols:
out.append({str(cols[i]): row[i] for i in range(min(len(cols), len(row)))})
return out
def _format_date(value: Any) -> str:
"""Format SQL Server/SAMA date values into dd/mm/yyyy for operators."""
if value in (None, ""):
return ""
if isinstance(value, datetime):
return value.strftime("%d/%m/%Y")
if isinstance(value, date):
return value.strftime("%d/%m/%Y")
if isinstance(value, (int, float)):
try:
# SQL Server numeric datetime: CAST(datetime AS int), day 0 = 1900-01-01.
return (datetime(1900, 1, 1) + timedelta(days=int(value))).strftime("%d/%m/%Y")
except Exception:
return str(value)
text = str(value).strip()
if not text:
return ""
if text.isdigit() and len(text) == 8:
try:
return datetime.strptime(text, "%Y%m%d").strftime("%d/%m/%Y")
except ValueError:
pass
try:
return datetime.fromisoformat(text.replace("Z", "+00:00")).strftime("%d/%m/%Y")
except Exception:
return text
class StoricoPickingListWindow(ctk.CTkToplevel):
"""Window that shows historical picking-list status and details."""
def __init__(self, parent: tk.Widget, db_client, session=None):
super().__init__(parent)
self.db_client = db_client
self.session = session
self._theme = theme_section("history_picking_window", theme_section("pickinglist_window", {}))
self._locale_catalog = load_locale_catalog()
self._async = AsyncRunner(self)
self._busy = InlineBusyOverlay(self, self._theme)
self.var_documento = tk.StringVar()
self._selected_documento: str | None = None
self._selected_stato_operativo: str = ""
self._detail_rows: list[dict[str, Any]] = []
self.title(versioned_title(loc_text("history.picking.title", catalog=self._locale_catalog, default="Storico Picking List"), __name__))
self.geometry(str(theme_value(self._theme, "window_geometry", "1200x720")))
minsize = theme_value(self._theme, "window_minsize", [980, 560])
self.minsize(int(minsize[0]), int(minsize[1]))
try:
self.configure(fg_color=theme_color(self._theme, "window_fg_color", ("#efefef", "#2f2f2f")))
except Exception:
pass
self._build_ui()
self.after(300, self._load_master)
def _build_ui(self) -> None:
self.grid_rowconfigure(1, weight=1)
self.grid_rowconfigure(3, weight=1)
self.grid_columnconfigure(0, weight=1)
top = ctk.CTkFrame(
self,
fg_color=theme_color(self._theme, "toolbar_frame_fg_color", ("#d7d7d7", "#3b3b3b")),
)
top.grid(row=0, column=0, sticky="ew", padx=8, pady=8)
top.grid_columnconfigure(4, weight=1)
label_font = theme_font(self._theme, "toolbar_label_font", ("Segoe UI", 10))
entry_font = theme_font(self._theme, "entry_font", ("Segoe UI", 10))
button_font = theme_font(self._theme, "toolbar_button_font", ("Segoe UI", 10, "bold"))
ctk.CTkLabel(top, text="Documento:", font=label_font).grid(row=0, column=0, sticky="w")
ctk.CTkEntry(top, textvariable=self.var_documento, width=140, font=entry_font).grid(
row=0, column=1, sticky="w", padx=(4, 12)
)
ctk.CTkButton(
top,
text=loc_text("history.picking.button.reload", catalog=self._locale_catalog, default="Ricarica"),
command=self._load_master,
font=button_font,
).grid(
row=0, column=2, sticky="w"
)
self.btn_ship_residuals = ctk.CTkButton(
top,
text=loc_text(
"history.picking.button.ship_residuals",
catalog=self._locale_catalog,
default="Versa residui in 7G.1.1",
),
command=self._ship_selected_residuals,
state="disabled",
font=button_font,
)
self.btn_ship_residuals.grid(row=0, column=3, sticky="w", padx=(12, 0))
self.master_tree = self._make_tree(
row=1,
columns=("Documento", "Data", "StatoDoc", "NAZIONE", "TotUDC", "Residue", "Spedite", "Stato", "IDStato"),
widths={
"Documento": 100,
"Data": 110,
"StatoDoc": 75,
"NAZIONE": 260,
"TotUDC": 90,
"Residue": 90,
"Spedite": 90,
"Stato": 120,
"IDStato": 80,
},
)
self.master_tree.bind("<<TreeviewSelect>>", self._on_master_select)
ctk.CTkLabel(
self,
text=loc_text("history.picking.detail_title", catalog=self._locale_catalog, default="Dettaglio contenuto"),
anchor="w",
font=button_font,
).grid(
row=2, column=0, sticky="ew", padx=8, pady=(4, 2)
)
self.detail_tree = self._make_tree(
row=3,
columns=("Pallet", "Lotto", "Articolo", "Descrizione", "Qta", "Data", "StatoDoc", "Cella", "Ubicazione", "Ordine"),
widths={
"Pallet": 120,
"Lotto": 120,
"Articolo": 130,
"Descrizione": 320,
"Qta": 80,
"Data": 110,
"StatoDoc": 75,
"Cella": 80,
"Ubicazione": 180,
"Ordine": 80,
},
)
def _make_tree(self, *, row: int, columns: tuple[str, ...], widths: dict[str, int]) -> ttk.Treeview:
wrap = ctk.CTkFrame(self)
wrap.grid(row=row, column=0, sticky="nsew", padx=8, pady=(0, 8))
wrap.grid_rowconfigure(0, weight=1)
wrap.grid_columnconfigure(0, weight=1)
tree = ttk.Treeview(wrap, columns=columns, show="headings")
for col in columns:
tree.heading(col, text=col)
tree.column(col, width=widths.get(col, 100), anchor="w")
style_treeview(tree, style_name="HistoryPicking.Treeview", rowheight=24)
tree.tag_configure("done", background="#ECECEC")
tree.tag_configure("active", background="#EAF7EA")
tree.tag_configure("warning", background="#FFE3B3")
sy = ttk.Scrollbar(wrap, orient="vertical", command=tree.yview)
sx = ttk.Scrollbar(wrap, orient="horizontal", command=tree.xview)
tree.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
tree.grid(row=0, column=0, sticky="nsew")
sy.grid(row=0, column=1, sticky="ns")
sx.grid(row=1, column=0, sticky="ew")
return tree
def _load_master(self) -> None:
params = {"documento": str(self.var_documento.get() or "").strip() or None}
previous_documento = self._selected_documento
async def _job():
return await self.db_client.query_json(SQL_STORICO_PL, params)
def _ok(res):
self._fill_master(_rows_to_dicts(res))
if previous_documento:
self._restore_master_selection(previous_documento)
def _err(ex):
messagebox.showerror(
loc_text("history.picking.msg.title", catalog=self._locale_catalog, default="Storico Picking List"),
loc_text(
"history.picking.msg.load_error",
catalog=self._locale_catalog,
default="Errore caricamento:\n{error}",
).format(error=ex),
parent=self,
)
self._async.run(
_job(),
_ok,
_err,
busy=self._busy,
message=loc_text(
"history.picking.busy.master",
catalog=self._locale_catalog,
default="Carico storico picking list...",
),
)
def _fill_master(self, rows: list[dict[str, Any]]) -> None:
self.master_tree.delete(*self.master_tree.get_children(""))
self.detail_tree.delete(*self.detail_tree.get_children(""))
self._selected_documento = None
self._selected_stato_operativo = ""
self._detail_rows = []
self._update_residual_button()
for index, row in enumerate(rows):
stato = str(row.get("StatoOperativo") or "")
is_open_with_shipped = (
str(row.get("StatoDocumento") or "") == "P"
and int(row.get("RigheSpedite") or 0) > 0
)
tag = (
"warning"
if is_open_with_shipped
else "done"
if stato in {"Chiusa", "Esaurita"}
else "active"
if int(row.get("IDStato") or 0) == 1
else ""
)
self.master_tree.insert(
"",
"end",
iid=f"doc_{row.get('Documento')}_{index}",
values=(
row.get("Documento", ""),
_format_date(row.get("DataDocumento", "")),
row.get("StatoDocumento", ""),
row.get("NAZIONE", ""),
row.get("TotUDC", ""),
row.get("RigheResidue", ""),
row.get("RigheSpedite", ""),
stato,
row.get("IDStato", ""),
),
tags=merge_tags(zebra_tag(index), tag),
)
def _on_master_select(self, _event=None) -> None:
selected = self.master_tree.selection()
if not selected:
self._selected_documento = None
self._selected_stato_operativo = ""
self._detail_rows = []
self._update_residual_button()
return
values = self.master_tree.item(selected[0], "values")
if not values:
return
documento = values[0]
self._selected_documento = str(documento)
self._selected_stato_operativo = str(values[7] if len(values) > 7 else "")
self._detail_rows = []
self._update_residual_button()
async def _job():
return await self.db_client.query_json(SQL_STORICO_PL_DETAILS, {"documento": documento})
def _ok(res):
self._fill_detail(_rows_to_dicts(res))
self._update_residual_button()
def _err(ex):
messagebox.showerror(
loc_text("history.picking.msg.title", catalog=self._locale_catalog, default="Storico Picking List"),
loc_text(
"history.picking.msg.detail_error",
catalog=self._locale_catalog,
default="Errore dettaglio:\n{error}",
).format(error=ex),
parent=self,
)
self._async.run(
_job(),
_ok,
_err,
busy=self._busy,
message=loc_text(
"history.picking.busy.detail",
catalog=self._locale_catalog,
default="Carico dettaglio picking list...",
),
)
def _fill_detail(self, rows: list[dict[str, Any]]) -> None:
self.detail_tree.delete(*self.detail_tree.get_children(""))
self._detail_rows = rows
for index, row in enumerate(rows):
is_open_shipped = str(row.get("StatoDocumento") or "") == "P" and int(row.get("Cella") or 0) == 9999
done = str(row.get("StatoDocumento") or "") == "D" or int(row.get("Cella") or 0) == 9999
tag = "warning" if is_open_shipped else "done" if done else ""
self.detail_tree.insert(
"",
"end",
values=(
row.get("Pallet", ""),
row.get("Lotto", ""),
row.get("Articolo", ""),
row.get("Descrizione", ""),
row.get("Qta", ""),
_format_date(row.get("DataDocumento", "")),
row.get("StatoDocumento", ""),
row.get("Cella", ""),
row.get("Ubicazione", ""),
row.get("Ordinamento", ""),
),
tags=merge_tags(zebra_tag(index), tag),
)
def _update_residual_button(self) -> None:
"""Enable the bulk shipment button only for closed picking lists with residual UDCs."""
enabled = self._selected_stato_operativo == "Chiusa ERP con residui"
try:
self.btn_ship_residuals.configure(state="normal" if enabled else "disabled")
except Exception:
pass
def _restore_master_selection(self, documento: str) -> None:
"""Re-select a document after a reload, when it is still visible."""
for iid in self.master_tree.get_children(""):
values = self.master_tree.item(iid, "values")
if values and str(values[0]) == str(documento):
self.master_tree.selection_set(iid)
self.master_tree.focus(iid)
self.master_tree.see(iid)
self._on_master_select()
return
def _residual_pallets_from_rows(self, rows: list[dict[str, Any]]) -> list[str]:
"""Return distinct residual UDCs that are not already in 7G.1.1."""
pallets: list[str] = []
seen: set[str] = set()
for row in rows:
pallet = str(row.get("Pallet") or "").strip()
if not pallet or pallet in seen:
continue
try:
cella = int(row.get("Cella") or 0)
except Exception:
cella = 0
if cella == 9999:
continue
seen.add(pallet)
pallets.append(pallet)
return pallets
def _operator_login(self) -> str:
"""Return the user recorded on generated warehouse movements."""
login = str(getattr(self.session, "login", "") or "").strip()
return login or "warehouse_ui"
def _ship_selected_residuals(self) -> None:
"""Move all residual UDCs of the selected closed PL to the shipment cell 7G.1.1."""
if self._selected_stato_operativo != "Chiusa ERP con residui" or not self._selected_documento:
return
estimated = self._residual_pallets_from_rows(self._detail_rows)
count_text = str(len(estimated)) if estimated else "le"
if not messagebox.askyesno(
loc_text("history.picking.msg.title", catalog=self._locale_catalog, default="Storico Picking List"),
(
f"Documento {self._selected_documento}\n\n"
f"Verranno versate in 7G.1.1 {count_text} UDC residue della picking list chiusa.\n"
"L'operazione registra i movimenti nello storico UDC.\n\n"
"Procedere?"
),
parent=self,
):
return
documento = self._selected_documento
utente = self._operator_login()
async def _job():
detail_res = await self.db_client.query_json(SQL_STORICO_PL_DETAILS, {"documento": documento})
detail_rows = _rows_to_dicts(detail_res)
pallets = self._residual_pallets_from_rows(detail_rows)
results: list[dict[str, Any]] = []
for pallet in pallets:
result = await move_pallet_async(
self.db_client,
barcode_pallet=pallet,
target_idcella=9999,
target_barcode_cella="9000000",
utente=utente,
)
results.append(result)
return {"pallets": pallets, "results": results}
def _ok(res):
pallets = res.get("pallets", []) if isinstance(res, dict) else []
results = res.get("results", []) if isinstance(res, dict) else []
moved = sum(1 for row in results if int(row.get("ok") or 0) == 1)
messagebox.showinfo(
loc_text("history.picking.msg.title", catalog=self._locale_catalog, default="Storico Picking List"),
f"Documento {documento}\nUDC residue trovate: {len(pallets)}\nUDC versate in 7G.1.1: {moved}",
parent=self,
)
self._selected_documento = str(documento)
self._load_master()
def _err(ex):
messagebox.showerror(
loc_text("history.picking.msg.title", catalog=self._locale_catalog, default="Storico Picking List"),
f"Versamento residui fallito:\n{ex}",
parent=self,
)
self._async.run(
_job(),
_ok,
_err,
busy=self._busy,
message=f"Verso residui PL {documento} in 7G.1.1...",
)
def open_storico_pickinglist_window(parent: tk.Misc, db_client, session=None) -> tk.Misc:
"""Open the picking-list history window."""
win = StoricoPickingListWindow(parent, db_client, session=session)
place_window_fullsize_below_parent_later(parent, win)
win.bind("<Escape>", lambda _e: win.destroy())
return win