Files
ware_house/async_msssql_query.py
2026-03-31 19:15:33 +02:00

158 lines
5.5 KiB
Python

"""Async SQL Server access layer used by the warehouse application.
The module centralizes DSN creation and exposes :class:`AsyncMSSQLClient`,
which lazily binds a SQLAlchemy async engine to the running event loop. The
implementation intentionally avoids pooling because the GUI schedules work on a
single shared background loop and pooled connections were a source of
cross-loop errors.
"""
from __future__ import annotations
import asyncio
import logging
import time
import urllib.parse
from typing import Any, Dict, Optional
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
try:
import orjson as _json
def _dumps(obj: Any) -> str:
"""Serialize an object to JSON using the fastest available backend."""
return _json.dumps(obj, default=str).decode("utf-8")
except Exception:
import json as _json
def _dumps(obj: Any) -> str:
"""Serialize an object to JSON using the standard library fallback."""
return _json.dumps(obj, default=str)
def make_mssql_dsn(
*,
server: str,
database: str,
user: Optional[str] = None,
password: Optional[str] = None,
driver: str = "ODBC Driver 17 for SQL Server",
trust_server_certificate: bool = True,
encrypt: Optional[str] = None,
extra_odbc_kv: Optional[Dict[str, str]] = None,
) -> str:
"""Build a SQLAlchemy ``mssql+aioodbc`` DSN from SQL Server parameters."""
kv = {
"DRIVER": driver,
"SERVER": server,
"DATABASE": database,
"TrustServerCertificate": "Yes" if trust_server_certificate else "No",
}
if user:
kv["UID"] = user
if password:
kv["PWD"] = password
if encrypt:
kv["Encrypt"] = encrypt
if extra_odbc_kv:
kv.update(extra_odbc_kv)
odbc = ";".join(f"{k}={v}" for k, v in kv.items()) + ";"
return f"mssql+aioodbc:///?odbc_connect={urllib.parse.quote_plus(odbc)}"
class AsyncMSSQLClient:
"""Thin async query client for SQL Server.
The engine is created lazily on the currently running event loop and uses
:class:`sqlalchemy.pool.NullPool` to avoid recycling connections across
loops or threads.
"""
def __init__(self, dsn: str, *, echo: bool = False, log: bool = True):
"""Initialize the client without opening any connection immediately."""
self._dsn = dsn
self._echo = echo
self._engine = None
self._engine_loop: Optional[asyncio.AbstractEventLoop] = None
self._logger = logging.getLogger("AsyncMSSQLClient")
if log and not self._logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
self._logger.addHandler(handler)
self._enable_log = log
async def _ensure_engine(self):
"""Create the async engine on first use for the current running loop."""
if self._engine is not None:
return
loop = asyncio.get_running_loop()
self._engine = create_async_engine(
self._dsn,
echo=self._echo,
# NullPool avoids reusing connections bound to a different event loop.
poolclass=NullPool,
# aioodbc must explicitly receive the loop to bind to.
connect_args={"loop": loop},
)
self._engine_loop = loop
if self._enable_log:
self._logger.info("Engine created on loop %s", id(loop))
async def dispose(self):
"""Dispose the engine on the loop where it was created."""
if self._engine is None:
return
if asyncio.get_running_loop() is self._engine_loop:
await self._engine.dispose()
else:
fut = asyncio.run_coroutine_threadsafe(self._engine.dispose(), self._engine_loop)
fut.result(timeout=2)
self._engine = None
if self._enable_log:
self._logger.info("Engine disposed")
async def query_json(
self,
sql: str,
params: Optional[Dict[str, Any]] = None,
*,
as_dict_rows: bool = False,
) -> Dict[str, Any]:
"""Execute a query and return a JSON-friendly payload.
Args:
sql: SQL statement to execute.
params: Optional named parameters bound to the statement.
as_dict_rows: When ``True`` returns rows as dictionaries keyed by
column name; otherwise rows are returned as lists.
Returns:
A dictionary containing column names, rows and elapsed execution
time in milliseconds.
"""
await self._ensure_engine()
t0 = time.perf_counter()
async with self._engine.connect() as conn:
res = await conn.execute(text(sql), params or {})
rows = res.fetchall()
cols = list(res.keys())
if as_dict_rows:
rows_out = [dict(zip(cols, row)) for row in rows]
else:
rows_out = [list(row) for row in rows]
return {
"columns": cols,
"rows": rows_out,
"elapsed_ms": round((time.perf_counter() - t0) * 1000, 3),
}
async def exec(self, sql: str, params: Optional[Dict[str, Any]] = None, *, commit: bool = False) -> int:
"""Execute a DML statement and return its row count."""
await self._ensure_engine()
async with (self._engine.begin() if commit else self._engine.connect()) as conn:
res = await conn.execute(text(sql), params or {})
return res.rowcount or 0