191 lines
5.9 KiB
Python
191 lines
5.9 KiB
Python
"""Shared Tk/async helpers used by multiple warehouse windows.
|
|
|
|
The module bundles three concerns used throughout the GUI:
|
|
|
|
* lifecycle of the shared background asyncio loop;
|
|
* a modal-like busy overlay shown during long-running tasks;
|
|
* an ``AsyncRunner`` that schedules coroutines and re-enters Tk safely.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import threading
|
|
import tkinter as tk
|
|
from typing import Any, Callable, Optional
|
|
|
|
import customtkinter as ctk
|
|
|
|
__VERSION__ = "GestioneAreeFrame v3.2.5-singleloop"
|
|
|
|
try:
|
|
from async_msssql_query import AsyncMSSQLClient # noqa: F401
|
|
except Exception:
|
|
AsyncMSSQLClient = object # type: ignore
|
|
|
|
|
|
class _LoopHolder:
|
|
"""Keep references to the shared event loop and its worker thread."""
|
|
|
|
def __init__(self):
|
|
self.loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self.thread: Optional[threading.Thread] = None
|
|
self.ready = threading.Event()
|
|
|
|
|
|
_GLOBAL = _LoopHolder()
|
|
|
|
|
|
def _run_loop():
|
|
"""Create and run the shared event loop inside the worker thread."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
_GLOBAL.loop = loop
|
|
_GLOBAL.ready.set()
|
|
loop.run_forever()
|
|
|
|
|
|
def get_global_loop() -> asyncio.AbstractEventLoop:
|
|
"""Return the shared background event loop, creating it if needed."""
|
|
if _GLOBAL.loop is not None:
|
|
return _GLOBAL.loop
|
|
_GLOBAL.thread = threading.Thread(target=_run_loop, name="warehouse-asyncio", daemon=True)
|
|
_GLOBAL.thread.start()
|
|
_GLOBAL.ready.wait(timeout=5.0)
|
|
if _GLOBAL.loop is None:
|
|
raise RuntimeError("Impossibile avviare l'event loop globale")
|
|
return _GLOBAL.loop
|
|
|
|
|
|
def stop_global_loop():
|
|
"""Stop the shared event loop and release thread references."""
|
|
if _GLOBAL.loop and _GLOBAL.loop.is_running():
|
|
_GLOBAL.loop.call_soon_threadsafe(_GLOBAL.loop.stop)
|
|
if _GLOBAL.thread:
|
|
_GLOBAL.thread.join(timeout=2.0)
|
|
_GLOBAL.loop = None
|
|
_GLOBAL.thread = None
|
|
_GLOBAL.ready.clear()
|
|
|
|
|
|
class BusyOverlay:
|
|
"""Semi-transparent overlay used to block interaction during async tasks."""
|
|
|
|
def __init__(self, parent: tk.Misc):
|
|
"""Bind the overlay lifecycle to the given parent widget."""
|
|
self.parent = parent
|
|
self._top: Optional[ctk.CTkToplevel] = None
|
|
self._pb: Optional[ctk.CTkProgressBar] = None
|
|
self._lbl: Optional[ctk.CTkLabel] = None
|
|
self._bind_id = None
|
|
|
|
def _reposition(self):
|
|
"""Resize the overlay so it keeps covering the parent toplevel."""
|
|
if not self._top:
|
|
return
|
|
root = self.parent.winfo_toplevel()
|
|
root.update_idletasks()
|
|
x, y = root.winfo_rootx(), root.winfo_rooty()
|
|
w, h = root.winfo_width(), root.winfo_height()
|
|
self._top.geometry(f"{w}x{h}+{x}+{y}")
|
|
|
|
def show(self, message="Attendere..."):
|
|
"""Display the overlay or just update its message if already visible."""
|
|
if self._top:
|
|
if self._lbl:
|
|
self._lbl.configure(text=message)
|
|
return
|
|
root = self.parent.winfo_toplevel()
|
|
top = ctk.CTkToplevel(root)
|
|
self._top = top
|
|
top.overrideredirect(True)
|
|
try:
|
|
top.attributes("-alpha", 0.22)
|
|
except tk.TclError:
|
|
pass
|
|
try:
|
|
top.configure(fg_color="#000000")
|
|
except Exception:
|
|
top.configure(bg="#000000")
|
|
top.attributes("-topmost", True)
|
|
|
|
wrap = ctk.CTkFrame(top, corner_radius=8)
|
|
wrap.place(relx=0.5, rely=0.5, anchor="center")
|
|
self._lbl = ctk.CTkLabel(wrap, text=message, font=("Segoe UI", 11, "bold"))
|
|
self._lbl.pack(pady=(0, 10))
|
|
self._pb = ctk.CTkProgressBar(wrap, mode="indeterminate", width=260)
|
|
self._pb.pack(fill="x")
|
|
try:
|
|
self._pb.start()
|
|
except Exception:
|
|
pass
|
|
|
|
self._reposition()
|
|
self._bind_id = root.bind("<Configure>", lambda e: self._reposition(), add="+")
|
|
|
|
def hide(self):
|
|
"""Dismiss the overlay and unregister resize bindings."""
|
|
if self._pb:
|
|
try:
|
|
self._pb.stop()
|
|
except Exception:
|
|
pass
|
|
self._pb = None
|
|
if self._top:
|
|
try:
|
|
self._top.destroy()
|
|
except Exception:
|
|
pass
|
|
self._top = None
|
|
root = self.parent.winfo_toplevel()
|
|
if self._bind_id:
|
|
try:
|
|
root.unbind("<Configure>", self._bind_id)
|
|
except Exception:
|
|
pass
|
|
self._bind_id = None
|
|
|
|
|
|
class AsyncRunner:
|
|
"""Run awaitables on the shared loop and callback on Tk's main thread."""
|
|
|
|
def __init__(self, widget: tk.Misc):
|
|
"""Capture the widget used to marshal callbacks back to Tk."""
|
|
self.widget = widget
|
|
self.loop = get_global_loop()
|
|
|
|
def run(
|
|
self,
|
|
awaitable,
|
|
on_success: Callable[[Any], None],
|
|
on_error: Optional[Callable[[BaseException], None]] = None,
|
|
busy: Optional[BusyOverlay] = None,
|
|
message: str = "Operazione in corso...",
|
|
):
|
|
"""Schedule ``awaitable`` and dispatch completion callbacks in Tk."""
|
|
if busy:
|
|
busy.show(message)
|
|
fut = asyncio.run_coroutine_threadsafe(awaitable, self.loop)
|
|
|
|
def _poll():
|
|
if fut.done():
|
|
if busy:
|
|
busy.hide()
|
|
try:
|
|
res = fut.result()
|
|
except BaseException as ex:
|
|
if on_error:
|
|
self.widget.after(0, lambda e=ex: on_error(e))
|
|
else:
|
|
print("[AsyncRunner] Unhandled error:", repr(ex))
|
|
else:
|
|
self.widget.after(0, lambda r=res: on_success(r))
|
|
else:
|
|
self.widget.after(60, _poll)
|
|
|
|
_poll()
|
|
|
|
def close(self):
|
|
"""No-op kept for API compatibility with older callers."""
|
|
pass
|