Files
ware_house/barcode_repository.py

259 lines
7.5 KiB
Python

"""Low-level DB access for the barcode WMS client.
The goal of this module is to mirror the legacy C# barcode form as closely as
possible while keeping SQL isolated from the UI.
"""
from __future__ import annotations
from dataclasses import dataclass
from runtime_support import log_exception, log_runtime_event
from typing import Any
from version_info import module_version
__version__ = module_version(__name__)
SQL_NEXT_PICKING = """
SELECT TOP (1)
Documento,
CodNazione,
NAZIONE,
Stato,
Pallet,
Cella,
Ubicazione,
Ordinamento,
IDStato
FROM dbo.py_XMag_ViewPackingList
WHERE Ordinamento > 0
AND IDStato = :id_stato
ORDER BY Ordinamento;
"""
SQL_PICKING_BY_PALLET = """
SELECT TOP (1)
Documento,
CodNazione,
NAZIONE,
Stato,
Pallet,
Cella,
Ubicazione,
Ordinamento,
IDStato
FROM dbo.py_XMag_ViewPackingList
WHERE Pallet = :pallet
ORDER BY Ordinamento;
"""
SQL_TRACE_BY_PALLET = """
SELECT TOP (1)
Pallet,
Lotto,
Prodotto,
Descrizione
FROM dbo.vXTracciaProdotti
WHERE Pallet = :pallet
ORDER BY Lotto;
"""
SQL_CURRENT_LOCATION_BY_PALLET = """
SELECT TOP (1)
g.BarcodePallet,
g.IDCella,
RTRIM(c.Corsia) AS Corsia,
RTRIM(CAST(c.Colonna AS varchar(32))) AS Colonna,
RTRIM(CAST(c.Fila AS varchar(32))) AS Fila
FROM dbo.XMag_GiacenzaPallet AS g
LEFT JOIN dbo.Celle AS c
ON c.ID = g.IDCella
WHERE g.BarcodePallet = :pallet;
"""
SQL_RESOLVE_PHYSICAL_CELL = """
DECLARE @raw int = TRY_CONVERT(int, :destination);
DECLARE @cell_id int =
CASE
WHEN @raw IS NULL THEN NULL
WHEN @raw >= 9000000 THEN @raw - 9000000
ELSE @raw
END;
SELECT TOP (1)
c.ID AS IDCella,
CAST(9000000 + c.ID AS varchar(8)) AS BarcodeCella,
9000000 + c.ID AS NumeroCella,
CONCAT(RTRIM(c.Corsia), '.', RTRIM(CAST(c.Colonna AS varchar(32))), '.', RTRIM(CAST(c.Fila AS varchar(32)))) AS Ubicazione
FROM dbo.Celle AS c
WHERE c.ID = @cell_id
AND c.ID <> 9999
AND c.DelDataOra IS NULL;
"""
SQL_LEGACY_MOVE = """
SET NOCOUNT ON;
SET XACT_ABORT ON;
DECLARE @RC int = 0;
EXEC dbo.sp_xMagGestioneMagazziniPallet
@IDOperatore = :id_operatore,
@BarcodeCella = :barcode_cella,
@BarcodePallet = :barcode_pallet,
@NumeroCella = :numero_cella,
@RC = @RC OUTPUT;
EXEC dbo.py_sp_ControllaPrenotazionePackingListPalletNew;
SELECT
@RC AS RC,
:barcode_cella AS BarcodeCella,
:barcode_pallet AS BarcodePallet,
:numero_cella AS NumeroCella;
"""
def _rows_to_dicts(res: dict[str, Any] | None) -> list[dict[str, Any]]:
"""Convert ``query_json`` payloads to a list of row dictionaries."""
if not isinstance(res, dict):
return []
rows = res.get("rows") or []
cols = res.get("columns") or []
if rows and isinstance(rows[0], dict):
return [row for row in rows if isinstance(row, dict)]
out: list[dict[str, Any]] = []
for row in rows:
if isinstance(row, (list, tuple)) and cols:
out.append({str(cols[i]): row[i] for i in range(min(len(cols), len(row)))})
return out
@dataclass
class LegacyMoveResult:
"""Result of one movement executed through the legacy stored procedure."""
rc: int
barcode_cella: str
barcode_pallet: str
numero_cella: int
@dataclass
class DestinationCell:
"""Resolved warehouse cell accepted by the legacy movement procedure."""
barcode_cella: str
numero_cella: int
id_cella: int
ubicazione: str
class BarcodeRepository:
"""Thin async repository used by the lightweight barcode client."""
def __init__(self, db_client):
self.db_client = db_client
async def fetch_next_picking(self, id_stato: int) -> dict[str, Any] | None:
"""Return the next pallet proposed by the legacy F1/F2 queue logic."""
res = await self.db_client.query_json(SQL_NEXT_PICKING, {"id_stato": int(id_stato)})
rows = _rows_to_dicts(res)
return rows[0] if rows else None
async def fetch_picking_by_pallet(self, pallet: str) -> dict[str, Any] | None:
"""Return one picking row for the given pallet, if still present in the queue."""
res = await self.db_client.query_json(SQL_PICKING_BY_PALLET, {"pallet": str(pallet or "").strip()})
rows = _rows_to_dicts(res)
return rows[0] if rows else None
async def fetch_trace_by_pallet(self, pallet: str) -> dict[str, Any] | None:
"""Return trace information used by the C# fallback confirmation path."""
res = await self.db_client.query_json(SQL_TRACE_BY_PALLET, {"pallet": str(pallet or "").strip()})
rows = _rows_to_dicts(res)
return rows[0] if rows else None
async def fetch_current_location_by_pallet(self, pallet: str) -> dict[str, Any] | None:
"""Return the current stock location for a pallet, if it exists in giacenza."""
res = await self.db_client.query_json(SQL_CURRENT_LOCATION_BY_PALLET, {"pallet": str(pallet or "").strip()})
rows = _rows_to_dicts(res)
return rows[0] if rows else None
async def resolve_physical_cell(self, destination: str) -> DestinationCell | None:
"""Accept either an internal cell ID or the scanned legacy cell barcode."""
res = await self.db_client.query_json(SQL_RESOLVE_PHYSICAL_CELL, {"destination": str(destination or "").strip()})
rows = _rows_to_dicts(res)
if not rows:
return None
row = rows[0]
return DestinationCell(
barcode_cella=str(row.get("BarcodeCella") or ""),
numero_cella=int(row.get("NumeroCella") or 0),
id_cella=int(row.get("IDCella") or 0),
ubicazione=str(row.get("Ubicazione") or ""),
)
async def execute_legacy_move(
self,
*,
operator_id: int,
barcode_cella: str,
barcode_pallet: str,
numero_cella: int,
) -> LegacyMoveResult:
"""Execute the same stored procedure used by the C# barcode form."""
params = {
"id_operatore": int(operator_id),
"barcode_cella": str(barcode_cella or "").strip(),
"barcode_pallet": str(barcode_pallet or "").strip(),
"numero_cella": int(numero_cella),
}
log_runtime_event(
"Barcode WMS",
(
"MOVE START "
f"operator={params['id_operatore']} "
f"cell={params['barcode_cella']} "
f"pallet={params['barcode_pallet']}"
),
)
try:
res = await self.db_client.query_json(SQL_LEGACY_MOVE, params, commit=True)
except Exception as exc:
log_exception(
"Barcode WMS",
exc,
context=(
"execute_legacy_move "
f"operator={params['id_operatore']} "
f"cell={params['barcode_cella']} "
f"pallet={params['barcode_pallet']}"
),
)
raise
rows = _rows_to_dicts(res)
row = rows[0] if rows else {}
result = LegacyMoveResult(
rc=int(row.get("RC") or 0),
barcode_cella=str(row.get("BarcodeCella") or params["barcode_cella"]),
barcode_pallet=str(row.get("BarcodePallet") or params["barcode_pallet"]),
numero_cella=int(row.get("NumeroCella") or params["numero_cella"]),
)
log_runtime_event(
"Barcode WMS",
(
"MOVE OK "
f"rc={result.rc} "
f"cell={result.barcode_cella} "
f"pallet={result.barcode_pallet}"
),
)
return result