import json import os import queue import re import sys import tempfile import threading import traceback import zipfile from pathlib import Path from dataclasses import dataclass from datetime import datetime from typing import List, Optional, Tuple import customtkinter as ctk from tkinter import filedialog, messagebox from tkinter import ttk try: import pythoncom import win32com.client except ImportError: pythoncom = None win32com = None APP_TITLE = "Outlook Exporter" APP_GEOMETRY = "1220x760" MAIL_CLASS = 43 # OlObjectClass.olMail MAX_LOG_LINES = 3000 PREFS_APP_DIR = "OutlookExporter" PREFS_FILE_NAME = "prefs.json" OUTLOOK_NOT_RUNNING_MESSAGE = ( "Outlook non risulta aperto.\n\n" "Apri Microsoft Outlook con il profilo email da esportare, attendi che abbia completato il caricamento, " "poi riducilo a icona e torna qui per premere di nuovo 'Connetti a Outlook'." ) ALLOWED_ATTACHMENT_EXTENSIONS = { ".pdf", ".doc", ".docx", ".docm", ".dot", ".dotx", ".dotm", ".xls", ".xlsx", ".xlsm", ".xlsb", ".xlt", ".xltx", ".xltm", ".csv", ".ppt", ".pptx", ".pptm", ".pps", ".ppsx", ".ppsm", ".pot", ".potx", ".potm", ".rtf", ".odt", ".ods", ".odp", ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tif", ".tiff", ".webp", } @dataclass class FolderNode: store_name: str path: str folder_obj: object display_name: Optional[str] = None mail_count: Optional[int] = None class ExportCancelled(Exception): pass class PreferenceStore: def __init__(self, default_output_dir: Path) -> None: self.default_output_dir = default_output_dir self.path = self._prefs_path() self.data = self._load_or_create() def get_output_dir(self) -> str: value = str(self.data.get("output_dir") or "").strip() return value or str(self.default_output_dir) def set_output_dir(self, output_dir: str) -> None: output_dir = str(output_dir or "").strip() if not output_dir: return self.data["output_dir"] = output_dir self._save() @classmethod def _prefs_path(cls) -> Path: appdata = os.environ.get("APPDATA") base_dir = Path(appdata) if appdata else Path.home() / ".config" return base_dir / PREFS_APP_DIR / PREFS_FILE_NAME def _load_or_create(self) -> dict: if self.path.exists(): try: with self.path.open("r", encoding="utf-8") as fh: data = json.load(fh) return data if isinstance(data, dict) else {} except Exception: return {"output_dir": str(self.default_output_dir)} data = {"output_dir": str(self.default_output_dir)} self.data = data self._save() return data def _save(self) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) with self.path.open("w", encoding="utf-8") as fh: json.dump(self.data, fh, ensure_ascii=False, indent=2) class OutlookService: def __init__(self) -> None: self.outlook = None self.namespace = None self.root_folders: List[FolderNode] = [] def connect(self) -> None: if win32com is None or pythoncom is None: raise RuntimeError( "pywin32 non è installato. Esegui: pip install pywin32" ) pythoncom.CoInitialize() try: try: self.outlook = win32com.client.GetActiveObject("Outlook.Application") except Exception as exc: raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc self.namespace = self.outlook.GetNamespace("MAPI") self.root_folders = [] for i in range(1, self.namespace.Folders.Count + 1): store = self.namespace.Folders.Item(i) store_name = str(store.Name) node = FolderNode( store_name=store_name, path=NameCodec.store_display_name(store_name), folder_obj=store, display_name=NameCodec.store_display_name(store_name), ) self.root_folders.append(node) except Exception: pythoncom.CoUninitialize() raise def disconnect(self) -> None: try: self.root_folders = [] self.namespace = None self.outlook = None finally: if pythoncom is not None: try: pythoncom.CoUninitialize() except Exception: pass def list_root_folders(self) -> List[FolderNode]: return list(self.root_folders) def list_children(self, folder_obj: object, current_path: str) -> List[FolderNode]: children = [] folders = folder_obj.Folders for i in range(1, folders.Count + 1): child = folders.Item(i) child_name = str(child.Name) child_path = f"{current_path} / {child_name}" children.append( FolderNode( store_name=current_path.split(" / ")[0], path=child_path, folder_obj=child, display_name=child_name, mail_count=self.count_mail_items(child), ) ) return children @staticmethod def count_mail_items(folder_obj: object) -> Optional[int]: try: items = folder_obj.Items except Exception: return None try: return items.Restrict("[MessageClass] = 'IPM.Note'").Count except Exception: try: return items.Count except Exception: return None class NameCodec: INVALID_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1F]') SPACE_RE = re.compile(r'\s+') @classmethod def compact_text(cls, value: str, max_len: int = 32) -> str: value = cls.SPACE_RE.sub(" ", value or "").strip() if len(value) <= max_len: return value return value[: max_len - 3].rstrip() + "..." @classmethod def store_display_name(cls, store_name: str) -> str: store_name = store_name or "Account" if "@" in store_name: local_part = store_name.split("@", 1)[0] return cls.compact_text(local_part, max_len=34) return cls.compact_text(store_name, max_len=34) @classmethod def tree_label(cls, display_name: str, mail_count: Optional[int]) -> str: display_name = cls.compact_text(display_name or "Cartella", max_len=46) if mail_count is None: return display_name return f"{display_name} ({mail_count})" @classmethod def sanitize(cls, value: str, max_len: int = 80) -> str: value = value or "" value = cls.INVALID_CHARS_RE.sub("_", value) value = cls.SPACE_RE.sub(" ", value).strip() value = value.replace(".", "_") if not value: value = "vuoto" if len(value) > max_len: value = value[:max_len].rstrip(" _") return value @classmethod def folder_name_from_outlook_path(cls, folder_path: str) -> str: # Rimuove lo store iniziale quando il path arriva dall'albero "Store / Cartella". parts = [p.strip() for p in folder_path.split(" / ")] parts = [p for p in parts if p] export_parts = parts[1:] if len(parts) > 1 else parts safe_parts = [cls.sanitize(p, max_len=50) for p in export_parts] return os.path.join(*safe_parts) if safe_parts else "cartella" @classmethod def archive_base_name_from_outlook_path(cls, folder_path: str) -> str: parts = [p.strip() for p in folder_path.split(" / ")] parts = [p for p in parts if p] folder_name = parts[-1] if parts else "export" return cls.sanitize(folder_name, max_len=60) @classmethod def email_base_name(cls, received_time: Optional[datetime], seq: int) -> str: if received_time is None: stamp = "data_sconosciuta" else: stamp = received_time.strftime("%Y-%m-%d_%H-%M-%S") return f"mail_{stamp}_{seq:05d}" @classmethod def mail_folder_name(cls, base_mail_name: str, subject: str) -> str: safe_subject = cls.sanitize(subject or "senza oggetto", max_len=50) return f"{base_mail_name}__{safe_subject}" @classmethod def attachment_name(cls, base_mail_name: str, original_filename: str, att_index: int) -> str: original_filename = original_filename or f"attachment_{att_index}" original_filename = os.path.basename(original_filename) stem, ext = os.path.splitext(original_filename) stem = cls.sanitize(stem, max_len=80) ext = cls.sanitize_extension(ext) return f"allegato__{base_mail_name}__{att_index:02d}__{stem}{ext}" @classmethod def sanitize_extension(cls, ext: str, max_len: int = 20) -> str: ext = (ext or "").strip().lower() if not ext: return "" if not ext.startswith("."): ext = f".{ext}" ext = cls.INVALID_CHARS_RE.sub("_", ext) ext = ext.replace(" ", "_") if len(ext) > max_len: ext = ext[:max_len] return ext @classmethod def is_allowed_attachment(cls, filename: str) -> bool: ext = cls.sanitize_extension(os.path.splitext(filename or "")[1]) return ext in ALLOWED_ATTACHMENT_EXTENSIONS class Exporter: def __init__(self, app: "OutlookExporterApp") -> None: self.app = app self.cancel_requested = False self.global_seq = 0 self.exported_mail_count = 0 self.exported_attachment_count = 0 def cancel(self) -> None: self.cancel_requested = True def check_cancel(self) -> None: if self.cancel_requested: raise ExportCancelled("Esportazione annullata dall'utente.") def export_folder(self, folder_obj: object, folder_out_path: Path) -> Tuple[int, int]: self.check_cancel() folder_out_path.mkdir(parents=True, exist_ok=True) folder_name = self._safe_get(folder_obj, "Name") or "(cartella senza nome)" self.app.log(f"Cartella Outlook: {folder_name}") self.app.log(f"Cartella destinazione: {folder_out_path}") items = self._safe_get(folder_obj, "Items") if items is None: self.app.log("Attenzione: impossibile leggere gli elementi della cartella, continuo con le sottocartelle.") count = 0 else: try: items.Sort("[ReceivedTime]", False) except Exception: self.app.log("Attenzione: impossibile ordinare per ReceivedTime, continuo senza ordinamento.") try: count = items.Count except Exception as exc: self.app.log(f"Attenzione: impossibile contare gli elementi della cartella: {exc}") count = 0 local_mail_count = 0 local_attachment_count = 0 self.app.log(f"Elementi rilevati: {count}") for idx in range(1, count + 1): self.check_cancel() try: item = items.Item(idx) except Exception as exc: self.app.log(f"Errore accesso elemento {idx}: {exc}") continue message_class = self._safe_get(item, "Class") if message_class != MAIL_CLASS: continue try: self.global_seq += 1 mail_count, att_count = self._export_mail(item, folder_out_path) local_mail_count += mail_count local_attachment_count += att_count self.exported_mail_count += mail_count self.exported_attachment_count += att_count except Exception as exc: subject = self._safe_get(item, "Subject") or "(senza oggetto)" self.app.log(f"Errore esportando mail '{subject}': {exc}") if idx % 10 == 0: self.app.set_status( f"Esportazione in corso... elemento {idx}/{count} | mail {self.exported_mail_count} | allegati {self.exported_attachment_count}" ) self.app.pump_ui() subfolders = self._safe_get(folder_obj, "Folders") subfolder_count = 0 try: subfolder_count = subfolders.Count if subfolders is not None else 0 except Exception as exc: self.app.log(f"Attenzione: impossibile leggere le sottocartelle: {exc}") for i in range(1, subfolder_count + 1): self.check_cancel() try: child = subfolders.Item(i) except Exception as exc: self.app.log(f"Errore accesso sottocartella {i}: {exc}") continue child_name = self._safe_get(child, "Name") or f"sottocartella_{i}" child_dir = folder_out_path / NameCodec.sanitize(str(child_name), max_len=80) m, a = self.export_folder(child, child_dir) local_mail_count += m local_attachment_count += a return local_mail_count, local_attachment_count def _export_mail(self, item: object, folder_out_path: Path) -> Tuple[int, int]: received_dt = self._convert_received_time(self._safe_get(item, "ReceivedTime")) subject = self._safe_get(item, "Subject") or "" body = self._safe_get(item, "Body") or "" base_name = NameCodec.email_base_name(received_dt, self.global_seq) mail_dir = self._unique_path(folder_out_path / NameCodec.mail_folder_name(base_name, subject)) mail_dir.mkdir(parents=True, exist_ok=True) txt_path = mail_dir / f"{base_name}.txt" header_lines = [ f"Oggetto: {subject}", f"Ricevuta: {received_dt.strftime('%Y-%m-%d %H:%M:%S') if received_dt else ''}", "", "--- TESTO EMAIL ---", body, ] txt_path.write_text("\n".join(header_lines), encoding="utf-8", errors="replace") self.app.log(f"Mail salvata: {txt_path.name}") attachment_count = 0 attachments = self._safe_get(item, "Attachments") if attachments is not None: try: attachment_total = attachments.Count except Exception as exc: self.app.log(f" Errore leggendo il numero di allegati: {exc}") attachment_total = 0 attachments_dir = mail_dir / "allegati" for att_index in range(1, attachment_total + 1): self.check_cancel() try: attachment = attachments.Item(att_index) original_filename = self._safe_get(attachment, "FileName") or f"attachment_{att_index}" if not NameCodec.is_allowed_attachment(original_filename): self.app.log(f" Allegato ignorato: {original_filename}") continue export_name = NameCodec.attachment_name(base_name, original_filename, att_index) attachments_dir.mkdir(parents=True, exist_ok=True) export_path = self._unique_path(attachments_dir / export_name) attachment.SaveAsFile(str(export_path)) attachment_count += 1 self.app.log(f" Allegato salvato: {export_path.name}") except Exception as exc: self.app.log(f" Errore salvando allegato {att_index}: {exc}") return 1, attachment_count @staticmethod def _unique_path(path: Path) -> Path: if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent counter = 2 while True: candidate = parent / f"{stem}__{counter}{suffix}" if not candidate.exists(): return candidate counter += 1 @staticmethod def _safe_get(obj: object, attr: str): try: return getattr(obj, attr) except Exception: return None @staticmethod def _convert_received_time(value) -> Optional[datetime]: if value is None: return None if isinstance(value, datetime): return value try: return datetime( value.year, value.month, value.day, value.hour, value.minute, value.second, ) except Exception: return None class OutlookExporterApp(ctk.CTk): def __init__(self) -> None: super().__init__() self.title(APP_TITLE) self.geometry(APP_GEOMETRY) self.minsize(1080, 680) self.withdraw() ctk.set_appearance_mode("system") ctk.set_default_color_theme("blue") self.splash = None self.show_startup_splash() self.outlook_service = OutlookService() self.exporter = Exporter(self) self.export_thread = None self.main_thread_id = threading.get_ident() self.ui_queue = queue.Queue() self.connected = False self.tree_folder_map = {} self.tree_path_map = {} self.selected_folder_obj = None default_output_dir = Path.home() / "Desktop" / "outlook_export" self.prefs = PreferenceStore(default_output_dir) self.selected_folder_display = ctk.StringVar(value="Nessuna cartella selezionata") self.output_dir_var = ctk.StringVar(value=self.prefs.get_output_dir()) self.status_var = ctk.StringVar(value="Pronto") self.create_zip_var = ctk.IntVar(value=1) self.create_hierarchy_var = ctk.IntVar(value=0) self._build_ui() self.hide_startup_splash() self.deiconify() self.after(100, self.process_ui_queue) self.protocol("WM_DELETE_WINDOW", self.on_close) def show_startup_splash(self) -> None: splash = ctk.CTkToplevel(self) splash.title(APP_TITLE) splash.overrideredirect(True) splash.resizable(False, False) splash.attributes("-topmost", True) width = 420 height = 180 x = max(0, int((splash.winfo_screenwidth() - width) / 2)) y = max(0, int((splash.winfo_screenheight() - height) / 2)) splash.geometry(f"{width}x{height}+{x}+{y}") splash.grid_columnconfigure(0, weight=1) splash.grid_rowconfigure(0, weight=1) content = ctk.CTkFrame(splash) content.grid(row=0, column=0, sticky="nsew", padx=12, pady=12) content.grid_columnconfigure(0, weight=1) ctk.CTkLabel( content, text="MailExporter", font=ctk.CTkFont(size=26, weight="bold"), ).grid(row=0, column=0, pady=(24, 8)) ctk.CTkLabel(content, text="Avvio in corso...").grid(row=1, column=0, pady=(0, 18)) progress = ctk.CTkProgressBar(content, mode="indeterminate") progress.grid(row=2, column=0, sticky="ew", padx=34, pady=(0, 18)) progress.start() self.splash = splash self.update_idletasks() self.update() def hide_startup_splash(self) -> None: if self.splash is not None: try: self.splash.destroy() except Exception: pass self.splash = None def _build_ui(self) -> None: self.grid_columnconfigure(0, weight=1) self.grid_rowconfigure(1, weight=1) top = ctk.CTkFrame(self) top.grid(row=0, column=0, sticky="nsew", padx=12, pady=(12, 6)) top.grid_columnconfigure(3, weight=1) self.btn_connect = ctk.CTkButton(top, text="Connetti a Outlook gia aperto", command=self.connect_outlook) self.btn_connect.grid(row=0, column=0, padx=8, pady=8) self.btn_refresh = ctk.CTkButton(top, text="Ricarica cartelle", command=self.reload_tree, state="disabled") self.btn_refresh.grid(row=0, column=1, padx=8, pady=8) self.btn_expand = ctk.CTkButton(top, text="Espandi tutto", command=self.expand_all, state="disabled") self.btn_expand.grid(row=0, column=2, padx=8, pady=8) lbl_selected = ctk.CTkLabel(top, textvariable=self.selected_folder_display, anchor="w") lbl_selected.grid(row=0, column=3, padx=8, pady=8, sticky="ew") main_pane = ttk.Panedwindow(self, orient="horizontal") main_pane.grid(row=1, column=0, sticky="nsew", padx=12, pady=(6, 12)) left = ctk.CTkFrame(main_pane) left.grid_rowconfigure(1, weight=1) left.grid_columnconfigure(0, weight=1) ctk.CTkLabel(left, text="Albero cartelle Outlook", font=ctk.CTkFont(size=18, weight="bold")).grid( row=0, column=0, sticky="w", padx=10, pady=(10, 4) ) tree_container = ctk.CTkFrame(left) tree_container.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4, 10)) tree_container.grid_rowconfigure(0, weight=1) tree_container.grid_columnconfigure(0, weight=1) style = ttk.Style() try: style.theme_use("default") except Exception: pass style.configure("Treeview", rowheight=24) self.tree = ttk.Treeview(tree_container, show="tree") self.tree.grid(row=0, column=0, sticky="nsew") self.tree.bind("<>", self.on_tree_open) self.tree.bind("<>", self.on_tree_select) yscroll = ttk.Scrollbar(tree_container, orient="vertical", command=self.tree.yview) yscroll.grid(row=0, column=1, sticky="ns") self.tree.configure(yscrollcommand=yscroll.set) right = ctk.CTkFrame(main_pane) right.grid_rowconfigure(1, weight=1) right.grid_columnconfigure(0, weight=1) main_pane.add(left, weight=3) main_pane.add(right, weight=2) right_top = ctk.CTkFrame(right) right_top.grid(row=0, column=0, sticky="nsew", padx=0, pady=(0, 6)) right_top.grid_columnconfigure(1, weight=1) ctk.CTkLabel(right_top, text="Esportazione", font=ctk.CTkFont(size=18, weight="bold")).grid( row=0, column=0, columnspan=3, sticky="w", padx=10, pady=(10, 8) ) ctk.CTkLabel(right_top, text="Cartella di output:").grid(row=1, column=0, padx=10, pady=6, sticky="w") self.entry_output = ctk.CTkEntry(right_top, textvariable=self.output_dir_var) self.entry_output.grid(row=1, column=1, padx=10, pady=6, sticky="ew") self.btn_browse = ctk.CTkButton(right_top, text="Sfoglia", width=110, command=self.choose_output_dir) self.btn_browse.grid(row=1, column=2, padx=10, pady=6) help_text = ( "La cartella selezionata verrà esportata insieme a tutte le sue sottocartelle.\n" "Ogni email verrà salvata in TXT e gli allegati verranno salvati nel formato originale." ) ctk.CTkLabel(right_top, text=help_text, justify="left").grid( row=2, column=0, columnspan=3, padx=10, pady=(4, 8), sticky="w" ) options = ctk.CTkFrame(right_top, fg_color="transparent") options.grid(row=3, column=0, columnspan=3, sticky="ew", padx=10, pady=(4, 8)) options.grid_columnconfigure(0, weight=1) options.grid_columnconfigure(1, weight=1) self.chk_zip = ctk.CTkCheckBox( options, text="Crea zip", variable=self.create_zip_var, command=lambda: self.on_export_mode_change("zip"), ) self.chk_zip.grid(row=0, column=0, sticky="w", padx=(0, 12)) self.chk_hierarchy = ctk.CTkCheckBox( options, text="Crea gerarchia", variable=self.create_hierarchy_var, command=lambda: self.on_export_mode_change("hierarchy"), ) self.chk_hierarchy.grid(row=0, column=1, sticky="w", padx=(12, 0)) self.btn_export = ctk.CTkButton( right_top, text="Esporta cartella selezionata", command=self.start_export, state="disabled", height=36, ) self.btn_export.grid(row=4, column=0, columnspan=2, padx=10, pady=(8, 10), sticky="ew") self.btn_cancel = ctk.CTkButton( right_top, text="Annulla esportazione", command=self.cancel_export, state="disabled", fg_color="#9b2c2c", hover_color="#7f1d1d", ) self.btn_cancel.grid(row=4, column=2, padx=10, pady=(8, 10), sticky="ew") right_bottom = ctk.CTkFrame(right) right_bottom.grid(row=1, column=0, sticky="nsew", padx=0, pady=(6, 0)) right_bottom.grid_rowconfigure(1, weight=1) right_bottom.grid_columnconfigure(0, weight=1) ctk.CTkLabel(right_bottom, text="Log", font=ctk.CTkFont(size=18, weight="bold")).grid( row=0, column=0, sticky="w", padx=10, pady=(10, 4) ) self.log_box = ctk.CTkTextbox(right_bottom) self.log_box.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4, 10)) self.log_box.configure(state="disabled") status = ctk.CTkFrame(self) status.grid(row=2, column=0, sticky="ew", padx=12, pady=(0, 12)) status.grid_columnconfigure(0, weight=1) status.grid_columnconfigure(1, weight=0) ctk.CTkLabel(status, textvariable=self.status_var, anchor="w").grid( row=0, column=0, sticky="ew", padx=10, pady=8 ) self.btn_exit = ctk.CTkButton(status, text="Esci", width=100, command=self.on_close) self.btn_exit.grid(row=0, column=1, sticky="e", padx=10, pady=8) def connect_outlook(self) -> None: try: self.set_status("Connessione a Outlook in corso...") self.log("Connessione a Outlook...") self.outlook_service.connect() self.connected = True self.log("Connesso a Outlook.") self.reload_tree() self.btn_refresh.configure(state="normal") self.btn_expand.configure(state="normal") self.set_status("Connesso a Outlook") except Exception as exc: self.connected = False self.log(f"Errore connessione Outlook: {exc}") self.set_status("Errore di connessione") messagebox.showerror(APP_TITLE, f"Impossibile collegarsi a Outlook.\n\n{exc}") def reload_tree(self) -> None: if not self.connected: return self.tree.delete(*self.tree.get_children()) self.tree_folder_map.clear() self.tree_path_map.clear() self.selected_folder_obj = None self.selected_folder_display.set("Nessuna cartella selezionata") self.btn_export.configure(state="disabled") for root_node in self.outlook_service.list_root_folders(): node_text = NameCodec.tree_label(root_node.display_name or root_node.path, root_node.mail_count) node_id = self.tree.insert("", "end", text=node_text, open=False) self.tree_folder_map[node_id] = root_node.folder_obj self.tree_path_map[node_id] = root_node.path self._insert_dummy(node_id) self.log("Albero cartelle ricaricato.") def _insert_dummy(self, node_id: str) -> None: self.tree.insert(node_id, "end", text="__dummy__") def on_tree_open(self, _event=None) -> None: selected = self.tree.focus() if not selected: return children = self.tree.get_children(selected) if len(children) == 1 and self.tree.item(children[0], "text") == "__dummy__": self.tree.delete(children[0]) folder_obj = self.tree_folder_map.get(selected) current_path = self.tree_path_map.get(selected, self.tree.item(selected, "text")) try: child_nodes = self.outlook_service.list_children(folder_obj, current_path) for node in child_nodes: node_text = NameCodec.tree_label(node.display_name or node.path, node.mail_count) child_id = self.tree.insert(selected, "end", text=node_text, open=False) self.tree_folder_map[child_id] = node.folder_obj self.tree_path_map[child_id] = node.path try: if node.folder_obj.Folders.Count > 0: self._insert_dummy(child_id) except Exception: pass except Exception as exc: self.log(f"Errore caricando sottocartelle: {exc}") def on_tree_select(self, _event=None) -> None: selected = self.tree.selection() if not selected: return node_id = selected[0] folder_obj = self.tree_folder_map.get(node_id) self.selected_folder_obj = folder_obj folder_path = self.tree_path_map.get(node_id, self.tree.item(node_id, "text")) if folder_obj is None: self.selected_folder_display.set("Nessuna cartella selezionata") self.btn_export.configure(state="disabled") return self.selected_folder_display.set(f"Selezionata: {folder_path}") self.btn_export.configure(state="normal") def expand_all(self) -> None: def _expand(node_id: str) -> None: self.tree.item(node_id, open=True) self.tree.focus(node_id) self.on_tree_open() for child in self.tree.get_children(node_id): if self.tree.item(child, "text") != "__dummy__": _expand(child) for root in self.tree.get_children(""): _expand(root) self.log("Espansione albero completata.") def choose_output_dir(self) -> None: selected = filedialog.askdirectory(initialdir=self.output_dir_var.get() or str(Path.home())) if selected: self.output_dir_var.set(selected) self.prefs.set_output_dir(selected) def on_export_mode_change(self, selected_mode: str) -> None: if selected_mode == "zip": self.create_zip_var.set(1) self.create_hierarchy_var.set(0) else: self.create_zip_var.set(0) self.create_hierarchy_var.set(1) if not self.create_zip_var.get() and not self.create_hierarchy_var.get(): self.create_zip_var.set(1) def start_export(self) -> None: if self.export_thread is not None and self.export_thread.is_alive(): messagebox.showwarning(APP_TITLE, "Una esportazione e gia in corso.") return if self.selected_folder_obj is None: messagebox.showwarning(APP_TITLE, "Seleziona prima una cartella Outlook.") return output_dir = self.output_dir_var.get().strip() if not output_dir: messagebox.showwarning(APP_TITLE, "Scegli una cartella di output valida.") return output_root = Path(output_dir) try: output_root.mkdir(parents=True, exist_ok=True) except Exception as exc: messagebox.showerror(APP_TITLE, f"Impossibile creare la cartella di output.\n\n{exc}") return self.prefs.set_output_dir(str(output_root)) folder_text = self.selected_folder_display.get().replace("Selezionata: ", "") relative_path = NameCodec.folder_name_from_outlook_path(folder_text) zip_mode = bool(self.create_zip_var.get()) if zip_mode: confirm_message = ( "Verra creato un archivio ZIP nella cartella di output.\n" "La gerarchia temporanea usata per preparare l'archivio verra rimossa al termine.\n\n" "Vuoi continuare?" ) else: confirm_message = ( "Verra creata una gerarchia di cartelle nella cartella di output.\n\n" "Vuoi continuare?" ) if not messagebox.askyesno(APP_TITLE, confirm_message): return self._set_export_controls_running() self.exporter = Exporter(self) self.log("=" * 80) self.log(f"Avvio esportazione: {folder_text}") self.log(f"Modalita: {'ZIP' if zip_mode else 'Gerarchia'}") self.set_status("Esportazione in corso...") self.export_thread = threading.Thread( target=self._run_export_worker, args=( self._marshal_folder_for_worker(self.selected_folder_obj), output_root, relative_path, folder_text, zip_mode, ), daemon=True, ) self.export_thread.start() def cancel_export(self) -> None: self.exporter.cancel() self.set_status("Richiesta annullamento in corso...") self.log("Richiesta di annullamento inviata.") def _set_export_controls_running(self) -> None: self.btn_export.configure(state="disabled") self.btn_cancel.configure(state="normal") self.btn_refresh.configure(state="disabled") self.btn_connect.configure(state="disabled") self.btn_expand.configure(state="disabled") self.chk_zip.configure(state="disabled") self.chk_hierarchy.configure(state="disabled") def _restore_export_controls(self) -> None: self.btn_export.configure(state="normal" if self.selected_folder_obj else "disabled") self.btn_cancel.configure(state="disabled") self.btn_refresh.configure(state="normal" if self.connected else "disabled") self.btn_connect.configure(state="normal") self.btn_expand.configure(state="normal" if self.connected else "disabled") self.chk_zip.configure(state="normal") self.chk_hierarchy.configure(state="normal") def _run_export_worker( self, folder_ref: object, output_root: Path, relative_path: str, folder_text: str, zip_mode: bool, ) -> None: com_initialized = False try: if pythoncom is not None: pythoncom.CoInitialize() com_initialized = True folder_obj = self._resolve_worker_folder(folder_ref) if zip_mode: with tempfile.TemporaryDirectory(prefix="outlook_export_") as temp_dir: temp_root = Path(temp_dir) temp_export_target = temp_root / relative_path self.log(f"Destinazione temporanea: {temp_export_target}") mails, attachments = self.exporter.export_folder(folder_obj, temp_export_target) self.exporter.check_cancel() zip_path = self._create_zip_archive(temp_root, output_root, folder_text) result_path = zip_path else: export_target = output_root / relative_path self.log(f"Destinazione: {export_target}") mails, attachments = self.exporter.export_folder(folder_obj, export_target) result_path = export_target self.ui_queue.put(("complete", mails, attachments, result_path, zip_mode)) except ExportCancelled: self.ui_queue.put(("cancelled",)) except Exception as exc: self.ui_queue.put(("error", str(exc), traceback.format_exc())) finally: if com_initialized: try: pythoncom.CoUninitialize() except Exception: pass @staticmethod def _marshal_folder_for_worker(folder_obj: object) -> object: if pythoncom is None: return folder_obj try: ole_obj = getattr(folder_obj, "_oleobj_", None) if ole_obj is None: return folder_obj return pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, ole_obj) except Exception: return folder_obj @staticmethod def _resolve_worker_folder(folder_ref: object) -> object: if pythoncom is None or win32com is None: return folder_ref try: dispatch = pythoncom.CoGetInterfaceAndReleaseStream(folder_ref, pythoncom.IID_IDispatch) return win32com.client.Dispatch(dispatch) except Exception: return folder_ref def _create_zip_archive(self, source_root: Path, output_root: Path, folder_text: str) -> Path: stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") base_name = NameCodec.archive_base_name_from_outlook_path(folder_text) zip_path = Exporter._unique_path(output_root / f"{base_name}_{stamp}.zip") self.log(f"Creazione archivio ZIP: {zip_path}") with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: for path in source_root.rglob("*"): self.exporter.check_cancel() if path.is_file(): archive.write(path, path.relative_to(source_root).as_posix()) return zip_path def process_ui_queue(self) -> None: try: while True: event = self.ui_queue.get_nowait() event_type = event[0] if event_type == "log": self._write_log(event[1]) elif event_type == "status": self.status_var.set(event[1]) elif event_type == "complete": _, mails, attachments, result_path, zip_mode = event self.set_status(f"Esportazione completata | mail: {mails} | allegati: {attachments}") self.log(f"Esportazione completata. Mail: {mails}, Allegati: {attachments}") self._restore_export_controls() self.show_export_complete_dialog(mails, attachments, result_path, zip_mode) elif event_type == "cancelled": self.set_status("Esportazione annullata") self.log("Esportazione annullata dall'utente.") self._restore_export_controls() messagebox.showinfo(APP_TITLE, "Esportazione annullata.") elif event_type == "error": _, message, details = event self.set_status("Errore durante l'esportazione") self.log(f"Errore esportazione: {message}") self.log(details) self._restore_export_controls() messagebox.showerror(APP_TITLE, f"Errore durante l'esportazione.\n\n{message}") except queue.Empty: pass finally: self.after(100, self.process_ui_queue) def show_export_complete_dialog(self, mails: int, attachments: int, export_target: Path, zip_mode: bool) -> None: dialog = ctk.CTkToplevel(self) dialog.title("Esportazione completata") dialog.geometry("520x260") dialog.resizable(False, False) dialog.transient(self) dialog.grab_set() dialog.grid_columnconfigure(0, weight=1) dialog.grid_rowconfigure(1, weight=1) ctk.CTkLabel( dialog, text="Esportazione completata", font=ctk.CTkFont(size=20, weight="bold"), ).grid(row=0, column=0, sticky="w", padx=20, pady=(20, 8)) summary = ( f"Mail esportate: {mails}\n" f"Allegati esportati: {attachments}\n\n" f"{'Archivio ZIP' if zip_mode else 'Cartella'}:\n{export_target}" ) ctk.CTkLabel(dialog, text=summary, justify="left", anchor="w").grid( row=1, column=0, sticky="nsew", padx=20, pady=(0, 16) ) buttons = ctk.CTkFrame(dialog, fg_color="transparent") buttons.grid(row=2, column=0, sticky="ew", padx=20, pady=(0, 20)) buttons.grid_columnconfigure(0, weight=1) buttons.grid_columnconfigure(1, weight=1) def open_export_folder() -> None: try: target = export_target.parent if zip_mode else export_target os.startfile(str(target)) except Exception as exc: messagebox.showerror(APP_TITLE, f"Impossibile aprire la destinazione di export.\n\n{exc}") finally: dialog.destroy() ctk.CTkButton( buttons, text="Vai alla cartella di zip" if zip_mode else "Vai alla cartella di export", command=open_export_folder, height=38, ).grid(row=0, column=0, sticky="ew", padx=(0, 8)) ctk.CTkButton( buttons, text="Chiudi", command=dialog.destroy, height=38, fg_color="#5f6368", hover_color="#4b4f52", ).grid(row=0, column=1, sticky="ew", padx=(8, 0)) dialog.protocol("WM_DELETE_WINDOW", dialog.destroy) dialog.after(100, dialog.focus_force) def log(self, message: str) -> None: if threading.get_ident() != self.main_thread_id: self.ui_queue.put(("log", message)) return self._write_log(message) def _write_log(self, message: str) -> None: timestamp = datetime.now().strftime("%H:%M:%S") line = f"[{timestamp}] {message}\n" self.log_box.configure(state="normal") self.log_box.insert("end", line) current_text = self.log_box.get("1.0", "end") lines = current_text.splitlines() if len(lines) > MAX_LOG_LINES: trimmed = "\n".join(lines[-MAX_LOG_LINES:]) + "\n" self.log_box.delete("1.0", "end") self.log_box.insert("1.0", trimmed) self.log_box.see("end") self.log_box.configure(state="disabled") self.pump_ui() def set_status(self, message: str) -> None: if threading.get_ident() != self.main_thread_id: self.ui_queue.put(("status", message)) return self.status_var.set(message) self.pump_ui() def pump_ui(self) -> None: if threading.get_ident() != self.main_thread_id: return try: self.update_idletasks() self.update() except Exception: pass def on_close(self) -> None: try: self.outlook_service.disconnect() finally: self.destroy() if __name__ == "__main__": if sys.platform != "win32": raise SystemExit("Questo programma funziona solo su Windows.") app = OutlookExporterApp() app.mainloop()