"""Async port of the packing list reservation stored procedure.""" from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Optional @dataclass class SPResult: """Container returned by the async stored-procedure port.""" rc: int = 0 message: Optional[str] = "" id_result: Optional[int] = None async def _query_one_value(db, sql: str, params: Dict[str, Any]) -> Optional[Any]: """Return the first column of the first row from a query result.""" if hasattr(db, "query_json"): res = await db.query_json(sql, params) if isinstance(res, list) and res: row0 = res[0] if isinstance(row0, dict): return next(iter(row0.values()), None) elif isinstance(res, dict): rows = None for key in ("rows", "data", "result", "records"): if key in res and isinstance(res[key], list): rows = res[key] break if rows: row0 = rows[0] if isinstance(row0, dict): return next(iter(row0.values()), None) if isinstance(row0, (list, tuple)) and row0: return row0[0] return None if hasattr(db, "query_value"): return await db.query_value(sql, params) if hasattr(db, "scalar"): return await db.scalar(sql, params) raise RuntimeError("Il client DB non espone query_json/query_value/scalar") async def _query_all(db, sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: """Return all rows as dictionaries, normalizing different DB client APIs.""" if hasattr(db, "query_json"): res = await db.query_json(sql, params) if res is None: return [] if isinstance(res, list): return res if res and isinstance(res[0], dict) else [] if isinstance(res, dict): for key in ("rows", "data", "result", "records"): if key in res and isinstance(res[key], list): rows = res[key] if rows and isinstance(rows[0], dict): return rows cols = res.get("columns") or res.get("cols") or [] out = [] for row in rows: if isinstance(row, (list, tuple)) and cols: out.append({(cols[i] if i < len(cols) else f"c{i}"): row[i] for i in range(min(len(cols), len(row)))}) return out return [] if hasattr(db, "fetch_all"): return await db.fetch_all(sql, params) raise RuntimeError("Il client DB non espone query_json/fetch_all") async def _execute(db, sql: str, params: Dict[str, Any]) -> int: """Execute a DML statement using the best method exposed by the DB client.""" for name in ("execute", "exec", "execute_non_query"): if hasattr(db, name): rc = await getattr(db, name)(sql, params) if isinstance(rc, int): return rc return 0 if hasattr(db, "query_json"): await db.query_json(sql, params) return 0 raise RuntimeError("Il client DB non espone metodi di esecuzione DML noti") async def sp_xExePackingListPallet_async(db, IDOperatore: int, Documento: str) -> SPResult: """Toggle the reservation state of all cells belonging to a packing list. The implementation mirrors the original SQL stored procedure while using the shared async DB client already managed by the application. """ try: nominativo = await _query_one_value( db, "SELECT LOGIN FROM Operatori WHERE id = :IDOperatore", {"IDOperatore": IDOperatore}, ) or "" celle = await _query_all( db, """ SELECT DISTINCT Cella FROM dbo.XMag_ViewPackingList WHERE Documento = :Documento """, {"Documento": Documento}, ) id_celle = [row.get("Cella") for row in celle if "Cella" in row] # Each cell is toggled individually because the original procedure also # updates metadata such as operator and timestamp per row. for id_cella in id_celle: if id_cella is None: continue stato = await _query_one_value( db, "SELECT IDStato FROM Celle WHERE ID = :IDC", {"IDC": id_cella}, ) if stato == 0: await _execute( db, """ UPDATE Celle SET IDStato = 1, ModUtente = :N, ModDataOra = GETDATE() WHERE ID = :IDC """, {"N": nominativo, "IDC": id_cella}, ) else: await _execute( db, """ UPDATE Celle SET IDStato = 0, ModUtente = :N, ModDataOra = GETDATE() WHERE ID = :IDC """, {"N": nominativo, "IDC": id_cella}, ) description = await _query_one_value( db, """ SELECT TOP 1 NAZIONE FROM dbo.XMag_ViewPackingList WHERE Documento = :Documento GROUP BY Documento, NAZIONE ORDER BY NAZIONE """, {"Documento": Documento}, ) await _execute( db, """ INSERT INTO dbo.LogPackingList (Code, Description, IDInsUser, InsDateTime) VALUES (:Code, :Descr, :IDInsUser, GETDATE()); """, {"Code": Documento, "Descr": description, "IDInsUser": IDOperatore}, ) new_id = await _query_one_value(db, "SELECT SCOPE_IDENTITY() AS ID", {}) return SPResult(rc=0, message="", id_result=int(new_id) if new_id is not None else None) except Exception as exc: return SPResult(rc=-1, message=str(exc), id_result=None)