"""Low-level DB access for the barcode WMS client. The goal of this module is to mirror the legacy C# barcode form as closely as possible while keeping SQL isolated from the UI. """ from __future__ import annotations from dataclasses import dataclass from runtime_support import log_exception, log_runtime_event from typing import Any from version_info import module_version __version__ = module_version(__name__) SQL_NEXT_PICKING = """ SELECT TOP (1) Documento, CodNazione, NAZIONE, Stato, Pallet, Cella, Ubicazione, Ordinamento, IDStato FROM dbo.py_XMag_ViewPackingList WHERE Ordinamento > 0 AND IDStato = :id_stato ORDER BY Ordinamento; """ SQL_PICKING_BY_PALLET = """ SELECT TOP (1) Documento, CodNazione, NAZIONE, Stato, Pallet, Cella, Ubicazione, Ordinamento, IDStato FROM dbo.py_XMag_ViewPackingList WHERE Pallet = :pallet ORDER BY Ordinamento; """ SQL_TRACE_BY_PALLET = """ SELECT TOP (1) Pallet, Lotto, Prodotto, Descrizione FROM dbo.vXTracciaProdotti WHERE Pallet = :pallet ORDER BY Lotto; """ SQL_CURRENT_LOCATION_BY_PALLET = """ SELECT TOP (1) g.BarcodePallet, g.IDCella, RTRIM(c.Corsia) AS Corsia, RTRIM(CAST(c.Colonna AS varchar(32))) AS Colonna, RTRIM(CAST(c.Fila AS varchar(32))) AS Fila FROM dbo.XMag_GiacenzaPallet AS g LEFT JOIN dbo.Celle AS c ON c.ID = g.IDCella WHERE g.BarcodePallet = :pallet; """ SQL_RESOLVE_PHYSICAL_CELL = """ DECLARE @raw int = TRY_CONVERT(int, :destination); DECLARE @cell_id int = CASE WHEN @raw IS NULL THEN NULL WHEN @raw >= 9000000 THEN @raw - 9000000 ELSE @raw END; SELECT TOP (1) c.ID AS IDCella, CAST(9000000 + c.ID AS varchar(8)) AS BarcodeCella, 9000000 + c.ID AS NumeroCella, CONCAT(RTRIM(c.Corsia), '.', RTRIM(CAST(c.Colonna AS varchar(32))), '.', RTRIM(CAST(c.Fila AS varchar(32)))) AS Ubicazione FROM dbo.Celle AS c WHERE c.ID = @cell_id AND c.ID <> 9999 AND c.DelDataOra IS NULL; """ SQL_LEGACY_MOVE = """ SET NOCOUNT ON; SET XACT_ABORT ON; DECLARE @RC int = 0; EXEC dbo.sp_xMagGestioneMagazziniPallet @IDOperatore = :id_operatore, @BarcodeCella = :barcode_cella, @BarcodePallet = :barcode_pallet, @NumeroCella = :numero_cella, @RC = @RC OUTPUT; EXEC dbo.py_sp_ControllaPrenotazionePackingListPalletNew; SELECT @RC AS RC, :barcode_cella AS BarcodeCella, :barcode_pallet AS BarcodePallet, :numero_cella AS NumeroCella; """ def _rows_to_dicts(res: dict[str, Any] | None) -> list[dict[str, Any]]: """Convert ``query_json`` payloads to a list of row dictionaries.""" if not isinstance(res, dict): return [] rows = res.get("rows") or [] cols = res.get("columns") or [] if rows and isinstance(rows[0], dict): return [row for row in rows if isinstance(row, dict)] out: list[dict[str, Any]] = [] for row in rows: if isinstance(row, (list, tuple)) and cols: out.append({str(cols[i]): row[i] for i in range(min(len(cols), len(row)))}) return out @dataclass class LegacyMoveResult: """Result of one movement executed through the legacy stored procedure.""" rc: int barcode_cella: str barcode_pallet: str numero_cella: int @dataclass class DestinationCell: """Resolved warehouse cell accepted by the legacy movement procedure.""" barcode_cella: str numero_cella: int id_cella: int ubicazione: str class BarcodeRepository: """Thin async repository used by the lightweight barcode client.""" def __init__(self, db_client): self.db_client = db_client async def fetch_next_picking(self, id_stato: int) -> dict[str, Any] | None: """Return the next pallet proposed by the legacy F1/F2 queue logic.""" res = await self.db_client.query_json(SQL_NEXT_PICKING, {"id_stato": int(id_stato)}) rows = _rows_to_dicts(res) return rows[0] if rows else None async def fetch_picking_by_pallet(self, pallet: str) -> dict[str, Any] | None: """Return one picking row for the given pallet, if still present in the queue.""" res = await self.db_client.query_json(SQL_PICKING_BY_PALLET, {"pallet": str(pallet or "").strip()}) rows = _rows_to_dicts(res) return rows[0] if rows else None async def fetch_trace_by_pallet(self, pallet: str) -> dict[str, Any] | None: """Return trace information used by the C# fallback confirmation path.""" res = await self.db_client.query_json(SQL_TRACE_BY_PALLET, {"pallet": str(pallet or "").strip()}) rows = _rows_to_dicts(res) return rows[0] if rows else None async def fetch_current_location_by_pallet(self, pallet: str) -> dict[str, Any] | None: """Return the current stock location for a pallet, if it exists in giacenza.""" res = await self.db_client.query_json(SQL_CURRENT_LOCATION_BY_PALLET, {"pallet": str(pallet or "").strip()}) rows = _rows_to_dicts(res) return rows[0] if rows else None async def resolve_physical_cell(self, destination: str) -> DestinationCell | None: """Accept either an internal cell ID or the scanned legacy cell barcode.""" res = await self.db_client.query_json(SQL_RESOLVE_PHYSICAL_CELL, {"destination": str(destination or "").strip()}) rows = _rows_to_dicts(res) if not rows: return None row = rows[0] return DestinationCell( barcode_cella=str(row.get("BarcodeCella") or ""), numero_cella=int(row.get("NumeroCella") or 0), id_cella=int(row.get("IDCella") or 0), ubicazione=str(row.get("Ubicazione") or ""), ) async def execute_legacy_move( self, *, operator_id: int, barcode_cella: str, barcode_pallet: str, numero_cella: int, ) -> LegacyMoveResult: """Execute the same stored procedure used by the C# barcode form.""" params = { "id_operatore": int(operator_id), "barcode_cella": str(barcode_cella or "").strip(), "barcode_pallet": str(barcode_pallet or "").strip(), "numero_cella": int(numero_cella), } log_runtime_event( "Barcode WMS", ( "MOVE START " f"operator={params['id_operatore']} " f"cell={params['barcode_cella']} " f"pallet={params['barcode_pallet']}" ), ) try: res = await self.db_client.query_json(SQL_LEGACY_MOVE, params, commit=True) except Exception as exc: log_exception( "Barcode WMS", exc, context=( "execute_legacy_move " f"operator={params['id_operatore']} " f"cell={params['barcode_cella']} " f"pallet={params['barcode_pallet']}" ), ) raise rows = _rows_to_dicts(res) row = rows[0] if rows else {} result = LegacyMoveResult( rc=int(row.get("RC") or 0), barcode_cella=str(row.get("BarcodeCella") or params["barcode_cella"]), barcode_pallet=str(row.get("BarcodePallet") or params["barcode_pallet"]), numero_cella=int(row.get("NumeroCella") or params["numero_cella"]), ) log_runtime_event( "Barcode WMS", ( "MOVE OK " f"rc={result.rc} " f"cell={result.barcode_cella} " f"pallet={result.barcode_pallet}" ), ) return result