Files
ware_house/prenota_sprenota_sql.py

303 lines
11 KiB
Python

"""Async port of the packing list reservation stored procedure."""
from __future__ import annotations
import json
import logging
import sys
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from loguru import logger
except Exception: # pragma: no cover - safety fallback if dependency is missing locally
class _FallbackLogger:
"""Minimal adapter used only when Loguru is not installed yet."""
def __init__(self):
self._logger = logging.getLogger(MODULE_LOG_NAME if "MODULE_LOG_NAME" in globals() else __name__)
self._logger.setLevel(logging.DEBUG)
self._logger.propagate = False
def bind(self, **_kwargs):
return self
def add(self, sink, level="INFO", format=None, encoding="utf-8", **_kwargs):
handler: logging.Handler
if hasattr(sink, "write"):
handler = logging.StreamHandler(sink)
else:
handler = logging.FileHandler(str(sink), encoding=encoding)
handler.setLevel(getattr(logging, str(level).upper(), logging.INFO))
handler.setFormatter(
logging.Formatter("%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
)
self._logger.addHandler(handler)
return 0
def log(self, level, message):
getattr(self._logger, str(level).lower(), self._logger.info)(message)
def debug(self, message):
self._logger.debug(message)
def info(self, message):
self._logger.info(message)
def exception(self, message):
self._logger.exception(message)
logger = _FallbackLogger()
PACKINGLIST_SP_LOG_MODE = "INFO" # "OFF" | "INFO" | "DEBUG"
MODULE_LOG_NAME = Path(__file__).stem
MODULE_LOG_PATH = Path(__file__).with_suffix(".log")
_MODULE_LOG_ENABLED = PACKINGLIST_SP_LOG_MODE.upper() != "OFF"
_MODULE_LOG_LEVEL = "DEBUG" if PACKINGLIST_SP_LOG_MODE.upper() == "DEBUG" else "INFO"
_MODULE_LOGGER = logger.bind(warehouse_module=MODULE_LOG_NAME)
_MODULE_LOGGING_CONFIGURED = False
def _configure_module_logger():
"""Configure console and file logging for this module."""
global _MODULE_LOGGING_CONFIGURED
if _MODULE_LOGGING_CONFIGURED:
return
if not _MODULE_LOG_ENABLED:
_MODULE_LOGGING_CONFIGURED = True
return
record_filter = lambda record: record["extra"].get("warehouse_module") == MODULE_LOG_NAME
logger.add(
sys.stderr,
level=_MODULE_LOG_LEVEL,
colorize=True,
filter=record_filter,
format=(
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>" + MODULE_LOG_NAME + "</cyan> | "
"<level>{message}</level>"
),
)
logger.add(
MODULE_LOG_PATH,
level=_MODULE_LOG_LEVEL,
colorize=False,
encoding="utf-8",
filter=record_filter,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | " + MODULE_LOG_NAME + " | {message}",
)
_MODULE_LOGGING_CONFIGURED = True
def _format_payload(payload: Any) -> str:
"""Serialize payloads for human-readable logging."""
try:
return json.dumps(payload, ensure_ascii=False, indent=2, default=str)
except Exception:
return repr(payload)
def _log_call(level: Optional[str] = None):
"""Trace entry, exit and failure of selected procedure helpers."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
effective_level = level or _MODULE_LOG_LEVEL
_MODULE_LOGGER.log(
effective_level,
f"CALL {func.__qualname__} args={_format_payload(args[1:] if len(args) > 1 else ())} kwargs={_format_payload(kwargs)}",
)
try:
result = await func(*args, **kwargs)
except Exception:
_MODULE_LOGGER.exception(f"FAIL {func.__qualname__}")
raise
_MODULE_LOGGER.log(effective_level, f"RETURN {func.__qualname__}")
return result
return wrapper
return decorator
def _log_sql(query_name: str, sql: str, params: Dict[str, Any]):
"""Log one SQL statement and its parameters."""
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} params={_format_payload(params)}")
_MODULE_LOGGER.debug(f"SQL {query_name} text:\n{sql.strip()}")
def _log_dataset(query_name: str, rows: Any):
"""Log query results at summary or full-debug level depending on the mode."""
if isinstance(rows, list):
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} returned {len(rows)} rows")
if PACKINGLIST_SP_LOG_MODE.upper() == "DEBUG":
_MODULE_LOGGER.debug(f"SQL {query_name} dataset:\n{_format_payload(rows)}")
else:
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"SQL {query_name} scalar={_format_payload(rows)}")
_configure_module_logger()
if _MODULE_LOG_ENABLED:
_MODULE_LOGGER.info(
f"Logging inizializzato su {MODULE_LOG_PATH.name} livello={_MODULE_LOG_LEVEL} mode={PACKINGLIST_SP_LOG_MODE.upper()}"
)
@dataclass
class SPResult:
"""Container returned by the async stored-procedure port."""
rc: int = 0
message: Optional[str] = ""
id_result: Optional[int] = None
@_log_call("DEBUG")
async def _query_one_value(db, sql: str, params: Dict[str, Any]) -> Optional[Any]:
"""Return the first column of the first row from a query result."""
_log_sql("_query_one_value", sql, params)
if hasattr(db, "query_json"):
res = await db.query_json(sql, params)
if isinstance(res, list) and res:
row0 = res[0]
if isinstance(row0, dict):
value = next(iter(row0.values()), None)
_log_dataset("_query_one_value", value)
return value
elif isinstance(res, dict):
rows = None
for key in ("rows", "data", "result", "records"):
if key in res and isinstance(res[key], list):
rows = res[key]
break
if rows:
row0 = rows[0]
if isinstance(row0, dict):
value = next(iter(row0.values()), None)
_log_dataset("_query_one_value", value)
return value
if isinstance(row0, (list, tuple)) and row0:
value = row0[0]
_log_dataset("_query_one_value", value)
return value
_log_dataset("_query_one_value", None)
return None
if hasattr(db, "query_value"):
value = await db.query_value(sql, params)
_log_dataset("_query_one_value", value)
return value
if hasattr(db, "scalar"):
value = await db.scalar(sql, params)
_log_dataset("_query_one_value", value)
return value
raise RuntimeError("Il client DB non espone query_json/query_value/scalar")
@_log_call("DEBUG")
async def _query_all(db, sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Return all rows as dictionaries, normalizing different DB client APIs."""
_log_sql("_query_all", sql, params)
if hasattr(db, "query_json"):
res = await db.query_json(sql, params)
if res is None:
_log_dataset("_query_all", [])
return []
if isinstance(res, list):
rows = res if res and isinstance(res[0], dict) else []
_log_dataset("_query_all", rows)
return rows
if isinstance(res, dict):
for key in ("rows", "data", "result", "records"):
if key in res and isinstance(res[key], list):
rows = res[key]
if rows and isinstance(rows[0], dict):
_log_dataset("_query_all", rows)
return rows
cols = res.get("columns") or res.get("cols") or []
out = []
for row in rows:
if isinstance(row, (list, tuple)) and cols:
out.append({(cols[i] if i < len(cols) else f"c{i}"): row[i] for i in range(min(len(cols), len(row)))})
_log_dataset("_query_all", out)
return out
_log_dataset("_query_all", [])
return []
if hasattr(db, "fetch_all"):
rows = await db.fetch_all(sql, params)
_log_dataset("_query_all", rows)
return rows
raise RuntimeError("Il client DB non espone query_json/fetch_all")
@_log_call("DEBUG")
async def _execute(db, sql: str, params: Dict[str, Any]) -> int:
"""Execute a DML statement using the best method exposed by the DB client."""
_log_sql("_execute", sql, params)
for name in ("execute", "exec", "execute_non_query"):
if hasattr(db, name):
rc = await getattr(db, name)(sql, params)
if isinstance(rc, int):
_log_dataset("_execute", rc)
return rc
_log_dataset("_execute", 0)
return 0
if hasattr(db, "query_json"):
await db.query_json(sql, params)
_log_dataset("_execute", 0)
return 0
raise RuntimeError("Il client DB non espone metodi di esecuzione DML noti")
@_log_call()
async def sp_xExePackingListPallet_async(db, IDOperatore: int, Documento: str, Azione: str = "P") -> SPResult:
"""Execute the Python-specific picking-list reservation stored procedure."""
try:
azione = str(Azione or "P").strip().upper()
if azione not in ("P", "S"):
return SPResult(rc=-10, message=f"Azione non valida: {Azione}", id_result=None)
_MODULE_LOGGER.log(
_MODULE_LOG_LEVEL,
f"Procedura packing list via stored procedure documento={Documento} azione={azione} id_operatore={IDOperatore}",
)
sql = """
SET NOCOUNT ON;
DECLARE @RC int = 0;
EXEC dbo.py_sp_xExePackingListPallet
@IDOperatore = :IDOperatore,
@Documento = :Documento,
@Azione = :Azione,
@RC = @RC OUTPUT;
SELECT CAST(@RC AS int) AS RC;
"""
_log_sql("py_sp_xExePackingListPallet", sql, {"IDOperatore": IDOperatore, "Documento": Documento, "Azione": azione})
if not hasattr(db, "query_json"):
raise RuntimeError("Il client DB non espone query_json necessario per eseguire la stored procedure.")
res = await db.query_json(
sql,
{"IDOperatore": IDOperatore, "Documento": Documento, "Azione": azione},
as_dict_rows=True,
commit=True,
)
rows = []
if isinstance(res, dict):
rows = res.get("rows", []) or []
_log_dataset("py_sp_xExePackingListPallet", rows)
rc = 0
if rows and isinstance(rows[0], dict):
try:
rc = int(rows[0].get("RC") or 0)
except Exception:
rc = 0
_MODULE_LOGGER.log(_MODULE_LOG_LEVEL, f"Stored procedure completata documento={Documento} azione={azione} rc={rc}")
return SPResult(rc=rc, message="", id_result=None)
except Exception as exc:
_MODULE_LOGGER.exception(f"Procedura fallita documento={Documento} azione={Azione}: {exc}")
return SPResult(rc=-1, message=str(exc), id_result=None)