"""Database connection bootstrap helpers for first-run configuration.""" from __future__ import annotations import asyncio import json import tkinter as tk from pathlib import Path from tkinter import messagebox, ttk from typing import Any from async_msssql_query import AsyncMSSQLClient, make_mssql_dsn from locale_text import load_locale_catalog, text as loc_text from tooltips import WidgetToolTip, load_tooltip_catalog, tooltip_text from ui_theme import theme_section, theme_value CONFIG_PATH = Path(__file__).with_name("db_connection.json") DEFAULT_DB_CONFIG: dict[str, Any] = { "server": r"mde3\gesterp", "database": "Mediseawall", "user": "sa", "password": "1Password1", "driver": "ODBC Driver 17 for SQL Server", "trust_server_certificate": True, "encrypt": "", } def load_db_config(path: Path = CONFIG_PATH) -> dict[str, Any] | None: """Return the DB config from disk, or ``None`` when missing/invalid.""" try: if not path.exists(): return None data = json.loads(path.read_text(encoding="utf-8")) if not isinstance(data, dict): return None return {**DEFAULT_DB_CONFIG, **data} except Exception: return None def save_db_config(config: dict[str, Any], path: Path = CONFIG_PATH) -> None: """Persist the DB config as UTF-8 JSON.""" payload = {**DEFAULT_DB_CONFIG, **config} path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def build_dsn_from_config(config: dict[str, Any]) -> str: """Build the SQLAlchemy DSN from the saved configuration.""" return make_mssql_dsn( server=str(config.get("server") or "").strip(), database=str(config.get("database") or "").strip(), user=str(config.get("user") or "").strip() or None, password=str(config.get("password") or "").strip() or None, driver=str(config.get("driver") or DEFAULT_DB_CONFIG["driver"]).strip(), trust_server_certificate=bool(config.get("trust_server_certificate", True)), encrypt=(str(config.get("encrypt") or "").strip() or None), ) def _is_complete(config: dict[str, Any] | None) -> bool: """Return True when the config contains the required connection fields.""" if not isinstance(config, dict): return False required = ("server", "database", "user", "password") return all(str(config.get(key) or "").strip() for key in required) def test_db_config_sync(config: dict[str, Any], loop: asyncio.AbstractEventLoop, timeout: float = 6.0) -> None: """Raise an exception when the DB configuration cannot open a connection.""" client = AsyncMSSQLClient(build_dsn_from_config(config), log=False) async def _job() -> None: try: await client.query_json("SELECT 1 AS Ok", {}, as_dict_rows=True) finally: try: await client.dispose() except Exception: pass fut = asyncio.run_coroutine_threadsafe(_job(), loop) fut.result(timeout=timeout) class DatabaseConfigWindow(tk.Toplevel): """Modal first-run form that collects the SQL Server connection settings.""" def __init__(self, parent: tk.Misc, loop: asyncio.AbstractEventLoop, initial: dict[str, Any] | None = None): super().__init__(parent) self._loop = loop self._theme = theme_section("db_config_window", {}) self._locale_catalog = load_locale_catalog() self._tooltip_catalog = load_tooltip_catalog() self.result_config: dict[str, Any] | None = None merged = {**DEFAULT_DB_CONFIG, **(initial or {})} self.title(loc_text("dbconfig.title", catalog=self._locale_catalog, default="Configurazione Database")) self.geometry(str(theme_value(self._theme, "window_geometry", "520x360"))) self.resizable(False, False) try: if parent is not None and parent.winfo_viewable(): self.transient(parent) except Exception: pass self.protocol("WM_DELETE_WINDOW", self._on_cancel) self.server_var = tk.StringVar(value=str(merged.get("server") or "")) self.database_var = tk.StringVar(value=str(merged.get("database") or "")) self.user_var = tk.StringVar(value=str(merged.get("user") or "")) self.password_var = tk.StringVar(value=str(merged.get("password") or "")) self.driver_var = tk.StringVar(value=str(merged.get("driver") or DEFAULT_DB_CONFIG["driver"])) self.encrypt_var = tk.StringVar(value=str(merged.get("encrypt") or "")) self.tsc_var = tk.BooleanVar(value=bool(merged.get("trust_server_certificate", True))) self._status_var = tk.StringVar(value="") self._busy_cover: tk.Frame | None = None self._busy_label: ttk.Label | None = None self._busy_bar: ttk.Progressbar | None = None self._build_ui() self.update_idletasks() req_w = self.winfo_reqwidth() req_h = self.winfo_reqheight() try: current_w, current_h = [int(v) for v in str(theme_value(self._theme, "window_geometry", "520x360")).split("x", 1)] except Exception: current_w, current_h = 520, 360 final_w = max(current_w, req_w + 16) final_h = max(current_h, req_h + 20) self.geometry(f"{final_w}x{final_h}") self.minsize(final_w, final_h) self.grab_set() self.deiconify() self.lift() self.attributes("-topmost", True) self.after(50, self._show_ready) def _build_ui(self) -> None: body = ttk.Frame(self, padding=14) body.pack(fill="both", expand=True) body.columnconfigure(1, weight=1) heading = ttk.Label( body, text=loc_text( "dbconfig.heading", catalog=self._locale_catalog, default="Configura la connessione al database del magazzino", ), font=("Segoe UI", 11, "bold"), ) heading.grid(row=0, column=0, columnspan=2, sticky="w", pady=(2, 12)) fields = [ ("dbconfig.label.server", "Server", self.server_var, "dbconfig.field.server"), ("dbconfig.label.database", "Database", self.database_var, "dbconfig.field.database"), ("dbconfig.label.user", "Utente", self.user_var, "dbconfig.field.user"), ("dbconfig.label.password", "Password", self.password_var, "dbconfig.field.password"), ("dbconfig.label.driver", "Driver ODBC", self.driver_var, "dbconfig.field.driver"), ("dbconfig.label.encrypt", "Encrypt", self.encrypt_var, "dbconfig.field.encrypt"), ] self._entries: list[ttk.Entry] = [] for row_idx, (key, default, var, tip_key) in enumerate(fields, start=1): label = ttk.Label(body, text=loc_text(key, catalog=self._locale_catalog, default=default)) label.grid( row=row_idx, column=0, sticky="w", padx=(0, 10), pady=6 ) entry = ttk.Entry(body, textvariable=var, width=34, show="*" if var is self.password_var else "") entry.grid(row=row_idx, column=1, sticky="ew", pady=6) self._entries.append(entry) self._attach_tooltip(label, tip_key) self._attach_tooltip(entry, tip_key) tsc = ttk.Checkbutton( body, text=loc_text( "dbconfig.label.trust_server_certificate", catalog=self._locale_catalog, default="Trust server certificate", ), variable=self.tsc_var, ) tsc.grid(row=7, column=0, columnspan=2, sticky="w", pady=(8, 4)) self._attach_tooltip(tsc, "dbconfig.field.trust_server_certificate") info = ttk.Label( body, text=loc_text( "dbconfig.info", catalog=self._locale_catalog, default="Il file verra' salvato localmente e non verra' piu' richiesto ai prossimi avvii.", ), wraplength=420, justify="left", ) info.grid(row=8, column=0, columnspan=2, sticky="w", pady=(6, 8)) ttk.Label(body, textvariable=self._status_var, foreground="#555555").grid( row=9, column=0, columnspan=2, sticky="w", pady=(2, 2) ) actions = ttk.Frame(body) actions.grid(row=10, column=0, columnspan=2, sticky="ew", pady=(10, 0)) actions.columnconfigure(0, weight=1) self._cancel_btn = ttk.Button( actions, text=loc_text("dbconfig.button.cancel", catalog=self._locale_catalog, default="Annulla"), command=self._on_cancel, ) self._cancel_btn.grid(row=0, column=1, padx=(0, 8)) self._attach_tooltip(self._cancel_btn, "dbconfig.button.cancel") self._test_btn = ttk.Button( actions, text=loc_text("dbconfig.button.test", catalog=self._locale_catalog, default="Test connessione"), command=self._on_test, ) self._test_btn.grid(row=0, column=2, padx=(0, 8)) self._attach_tooltip(self._test_btn, "dbconfig.button.test") self._save_btn = ttk.Button( actions, text=loc_text("dbconfig.button.save", catalog=self._locale_catalog, default="Salva"), command=self._on_save, ) self._save_btn.grid(row=0, column=3) self._attach_tooltip(self._save_btn, "dbconfig.button.save") self._attach_tooltip(heading, "dbconfig.heading") self._attach_tooltip(info, "dbconfig.info") def _attach_tooltip(self, widget: tk.Misc, key: str) -> None: """Attach a localized tooltip when a text exists for the given key.""" tip = tooltip_text(key, catalog=self._tooltip_catalog) if tip: WidgetToolTip(widget, tip) def _show_ready(self) -> None: """Ensure the modal is visible even with a hidden bootstrap root.""" try: self.attributes("-topmost", True) self.deiconify() self.lift() self.focus_force() if self._entries: self._entries[0].focus_force() finally: try: self.after(250, lambda: self.attributes("-topmost", False)) except Exception: pass def _show_busy_overlay(self, message: str) -> None: """Show a lightweight inline overlay without CustomTkinter callbacks.""" if self._busy_cover and self._busy_cover.winfo_exists(): if self._busy_label is not None: self._busy_label.configure(text=message) try: self._busy_cover.lift() except Exception: pass return cover = tk.Frame(self, bg="#d9d9d9") cover.place(relx=0, rely=0, relwidth=1, relheight=1) panel = ttk.Frame(cover, padding=14) panel.place(relx=0.5, rely=0.5, anchor="center") label = ttk.Label(panel, text=message, font=("Segoe UI", 10, "bold")) label.pack(padx=16, pady=(4, 8)) bar = ttk.Progressbar(panel, mode="indeterminate", length=220) bar.pack(padx=16, pady=(0, 6)) try: bar.start(10) except Exception: pass self._busy_cover = cover self._busy_label = label self._busy_bar = bar def _hide_busy_overlay(self) -> None: """Hide the lightweight inline overlay.""" if self._busy_bar is not None: try: self._busy_bar.stop() except Exception: pass self._busy_bar = None self._busy_label = None if self._busy_cover is not None and self._busy_cover.winfo_exists(): try: self._busy_cover.destroy() except Exception: pass self._busy_cover = None def _set_busy(self, busy: bool, message: str = "") -> None: state = "disabled" if busy else "normal" try: for entry in self._entries: entry.configure(state=state) self._cancel_btn.configure(state=state) self._test_btn.configure(state=state) self._save_btn.configure(state=state) self.configure(cursor="watch" if busy else "") self._status_var.set(message) if busy: self._show_busy_overlay(message or loc_text("dbconfig.busy", catalog=self._locale_catalog, default="Verifico...")) else: self._hide_busy_overlay() self.update_idletasks() except Exception: pass def _collect(self) -> dict[str, Any]: return { "server": str(self.server_var.get() or "").strip(), "database": str(self.database_var.get() or "").strip(), "user": str(self.user_var.get() or "").strip(), "password": str(self.password_var.get() or "").strip(), "driver": str(self.driver_var.get() or "").strip() or str(DEFAULT_DB_CONFIG["driver"]), "encrypt": str(self.encrypt_var.get() or "").strip(), "trust_server_certificate": bool(self.tsc_var.get()), } def _validate(self, config: dict[str, Any]) -> bool: required = ("server", "database", "user", "password") missing = [name for name in required if not str(config.get(name) or "").strip()] if not missing: return True messagebox.showwarning( loc_text("dbconfig.msg.title", catalog=self._locale_catalog, default="Configurazione Database"), loc_text( "dbconfig.msg.missing", catalog=self._locale_catalog, default="Compila almeno server, database, utente e password.", ), parent=self, ) return False def _test(self, config: dict[str, Any]) -> None: self._set_busy(True, loc_text("dbconfig.busy", catalog=self._locale_catalog, default="Verifico connessione...")) try: test_db_config_sync(config, self._loop) finally: self._set_busy(False, "") def _on_test(self) -> None: config = self._collect() if not self._validate(config): return try: self._test(config) except Exception as ex: messagebox.showerror( loc_text("dbconfig.msg.title", catalog=self._locale_catalog, default="Configurazione Database"), loc_text("dbconfig.msg.test_error", catalog=self._locale_catalog, default="Connessione fallita:\n{error}").format(error=ex), parent=self, ) return messagebox.showinfo( loc_text("dbconfig.msg.title", catalog=self._locale_catalog, default="Configurazione Database"), loc_text("dbconfig.msg.test_ok", catalog=self._locale_catalog, default="Connessione riuscita."), parent=self, ) def _on_save(self) -> None: config = self._collect() if not self._validate(config): return try: self._test(config) save_db_config(config) except Exception as ex: messagebox.showerror( loc_text("dbconfig.msg.title", catalog=self._locale_catalog, default="Configurazione Database"), loc_text("dbconfig.msg.save_error", catalog=self._locale_catalog, default="Salvataggio fallito:\n{error}").format(error=ex), parent=self, ) return self.result_config = config self.destroy() def _on_cancel(self) -> None: self.result_config = None try: self.destroy() except Exception: pass def ensure_db_config(loop: asyncio.AbstractEventLoop, parent: tk.Misc | None = None) -> dict[str, Any] | None: """Return a valid DB config, prompting the user the first time when needed.""" existing = load_db_config() if _is_complete(existing): return existing owns_root = False if parent is None: parent = tk.Tk() parent.geometry("1x1+0+0") parent.overrideredirect(True) parent.attributes("-alpha", 0.0) parent.deiconify() parent.update_idletasks() owns_root = True dlg = DatabaseConfigWindow(parent, loop=loop, initial=existing or DEFAULT_DB_CONFIG) try: parent.wait_window(dlg) finally: if owns_root: try: parent.destroy() except Exception: pass return dlg.result_config