import json import os import queue import re import sys import tempfile import threading import time import traceback import zipfile from pathlib import Path from dataclasses import dataclass from datetime import datetime, timedelta from typing import List, Optional, Tuple import customtkinter as ctk import tkinter as tk from tkinter import filedialog, messagebox import tkinter.font as tkfont from tkinter import ttk try: import pythoncom import win32timezone # required by pywin32 when Outlook returns timezone-aware COM dates import win32com.client except ImportError: pythoncom = None win32com = None APP_TITLE = "Outlook email exporter ver. 1.0" APP_GEOMETRY = "1220x760+0+0" MAIL_CLASS = 43 # OlObjectClass.olMail MAX_LOG_LINES = 3000 PREFS_APP_DIR = "OutlookExporter" PREFS_FILE_NAME = "prefs.json" LOG_FILE_NAME = "mail_exporter.log" EXPORT_MANIFEST_NAME = "export_manifest.json" EXPORT_MODE_ALL = "all" EXPORT_MODE_WITH_ATTACHMENTS = "with_attachments" EXPORT_MODE_FILTER = "filter" OUTLOOK_NOT_RUNNING_MESSAGE = ( "Outlook does not appear to be open.\n\n" "Open Microsoft Outlook with the email profile you want to export, wait for it to finish loading, " "then minimize it and come back here to press 'Connect Outlook' again." ) HELP_WINDOW_TITLE = "Help" HELP_TEXT = """Outlook Email Exporter 1.0 Overview - Connect to a running Outlook session. - Select an Outlook folder from the tree. - Choose an output folder. - Use immediate export to export the selected folder and all subfolders. - Use Load preview to inspect a folder before exporting a subset. Preview and filters - Sender and Subject filters narrow the visible preview. - Period filters the preview by recent time ranges. - New shows only emails not yet exported. - With attachments shows only emails with Outlook attachments. - Selected limits export to the rows currently selected in the preview. Export modes - Create zip builds a ZIP archive. - Create folders on file system writes the export as folders and files. - Export preview exports only the emails currently visible in the filtered preview. Notes - Outlook must already be installed and configured on this PC. - Large folders can take a few minutes to load or export. - The program uses an incremental manifest to avoid exporting the same email twice. """ ALLOWED_ATTACHMENT_EXTENSIONS = { ".pdf", ".doc", ".docx", ".docm", ".dot", ".dotx", ".dotm", ".xls", ".xlsx", ".xlsm", ".xlsb", ".xlt", ".xltx", ".xltm", ".csv", ".htm", ".html", ".ppt", ".pptx", ".pptm", ".pps", ".ppsx", ".ppsm", ".pot", ".potx", ".potm", ".rtf", ".fodp", ".fods", ".fodt", ".odb", ".odf", ".odg", ".odt", ".ods", ".odp", ".otg", ".oth", ".otp", ".ots", ".ott", ".sxc", ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tif", ".tiff", ".webp", } IMAGE_ATTACHMENT_EXTENSIONS = { ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tif", ".tiff", ".webp", } MAPI_ATTACHMENT_HIDDEN = "http://schemas.microsoft.com/mapi/proptag/0x7FFE000B" MAPI_ATTACH_CONTENT_ID = ( "http://schemas.microsoft.com/mapi/proptag/0x3712001F", "http://schemas.microsoft.com/mapi/proptag/0x3712001E", ) MAPI_ATTACH_CONTENT_LOCATION = ( "http://schemas.microsoft.com/mapi/proptag/0x3713001F", "http://schemas.microsoft.com/mapi/proptag/0x3713001E", ) MAPI_ATTACH_CONTENT_DISPOSITION = ( "http://schemas.microsoft.com/mapi/proptag/0x3716001F", "http://schemas.microsoft.com/mapi/proptag/0x3716001E", ) MAPI_RENDERING_POSITION = "http://schemas.microsoft.com/mapi/proptag/0x370B0003" MAPI_RENDERING_POSITION_NOT_RENDERED = -1 INLINE_ATTACHMENT_NAME_RE = re.compile( r"^(image\d{3,}|att\d{5}|oledata|logo|spacer|facebook|linkedin|twitter|instagram|youtube)\b", re.IGNORECASE, ) @dataclass class FolderNode: store_name: str path: str folder_obj: object display_name: Optional[str] = None mail_count: Optional[int] = None @dataclass class ExportOptions: mode: str = EXPORT_MODE_ALL only_with_attachments: bool = False folder_key: str = "" selected_keys: Optional[set[str]] = None selected_entry_refs: Optional[list[tuple[str, str]]] = None @dataclass class MailRecord: key: str entry_id: str store_id: str folder_key: str received_time: str txt_relative_path: str attachment_count: int exported_attachment_count: int export_mode: str subject: str = "" internet_message_id: str = "" status: str = "exported" @dataclass class EmailHeader: key: str entry_id: str store_id: str received_time: str sender: str subject: str attachment_count: int attachment_types: str exported: bool class ExportCancelled(Exception): pass class ToolTip: def __init__(self, widget: object, text: str, delay_ms: int = 450) -> None: self.widget = widget self.text = text self.delay_ms = delay_ms self._after_id = None self._tip_window = None try: self.widget.bind("", self._on_enter, add="+") self.widget.bind("", self._on_leave, add="+") self.widget.bind("", self._on_leave, add="+") except Exception: pass def _on_enter(self, _event=None) -> None: self._cancel_scheduled() try: self._after_id = self.widget.after(self.delay_ms, self._show) except Exception: self._after_id = None def _on_leave(self, _event=None) -> None: self._cancel_scheduled() self._hide() def _cancel_scheduled(self) -> None: if self._after_id is not None: try: self.widget.after_cancel(self._after_id) except Exception: pass self._after_id = None def _show(self) -> None: self._after_id = None if self._tip_window is not None: return try: if not self.widget.winfo_exists(): return x = self.widget.winfo_rootx() + 18 y = self.widget.winfo_rooty() + self.widget.winfo_height() + 6 tip = tk.Toplevel(self.widget) tip.wm_overrideredirect(True) tip.wm_geometry(f"+{x}+{y}") tip.attributes("-topmost", True) label = tk.Label( tip, text=self.text, justify="left", background="#111827", foreground="#f9fafb", relief="solid", borderwidth=1, padx=8, pady=4, font=("Segoe UI", 9), ) label.pack() self._tip_window = tip except Exception: self._tip_window = None def _hide(self) -> None: if self._tip_window is not None: try: self._tip_window.destroy() except Exception: pass self._tip_window = None class ExportManifest: def __init__(self, root_dir: Path) -> None: self.root_dir = root_dir self.path = root_dir / EXPORT_MANIFEST_NAME self.data = self._load() def _load(self) -> dict: if not self.path.exists(): return {"version": 2, "records": {}, "exports": {}} try: with self.path.open("r", encoding="utf-8") as fh: data = json.load(fh) if not isinstance(data, dict): return {"version": 2, "records": {}, "exports": {}} records = data.get("records") if not isinstance(records, dict): data["records"] = {} exports = data.get("exports") if not isinstance(exports, dict): data["exports"] = {} data["version"] = 2 return data except Exception: return {"version": 2, "records": {}, "exports": {}} def has_record(self, key: str) -> bool: return bool(self.data.get("records", {}).get(key)) def get_record(self, key: str) -> Optional[dict]: return self.data.get("records", {}).get(key) def record_file_exists(self, key: str) -> bool: record = self.get_record(key) if not isinstance(record, dict): return False rel_path = str(record.get("p") or "").strip() if not rel_path: return False try: return (self.root_dir / Path(rel_path)).exists() except Exception: return False def remove_record(self, key: str) -> None: self.data.get("records", {}).pop(key, None) def update_record(self, record: MailRecord) -> None: self.data.setdefault("records", {})[record.key] = { "eid": record.entry_id, "sid": record.store_id, "f": record.folder_key, "r": record.received_time, "p": record.txt_relative_path, "a": record.attachment_count, "ea": record.exported_attachment_count, "m": record.export_mode, "subject": record.subject, "imid": record.internet_message_id, "s": record.status, "u": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } def get_export_scope(self, folder_key: str) -> Optional[dict]: return self.data.get("exports", {}).get(folder_key) def update_export_scope(self, folder_key: str, relative_path: str, result_path: Path, zip_mode: bool) -> None: self.data.setdefault("exports", {})[folder_key] = { "rp": relative_path, "lp": str(result_path), "z": bool(zip_mode), "u": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } def folder_record_count(self, folder_key: str) -> int: count = 0 for record in self.data.get("records", {}).values(): if isinstance(record, dict) and record.get("f") == folder_key: count += 1 return count def total_record_count(self) -> int: return len(self.data.get("records", {})) def clear_folder(self, folder_key: str) -> int: records = self.data.get("records", {}) to_delete = [key for key, value in records.items() if isinstance(value, dict) and value.get("f") == folder_key] for key in to_delete: records.pop(key, None) self.data.get("exports", {}).pop(folder_key, None) return len(to_delete) def save(self) -> None: self.root_dir.mkdir(parents=True, exist_ok=True) with self.path.open("w", encoding="utf-8") as fh: json.dump(self.data, fh, ensure_ascii=False, separators=(",", ":")) class PreferenceStore: def __init__(self, default_output_dir: Path) -> None: self.default_output_dir = default_output_dir self.path = self._prefs_path() self.data = self._load_or_create() def get_output_dir(self) -> str: value = str(self.data.get("output_dir") or "").strip() return value or str(self.default_output_dir) def set_output_dir(self, output_dir: str) -> None: output_dir = str(output_dir or "").strip() if not output_dir: return self.data["output_dir"] = output_dir self._save() @classmethod def _prefs_path(cls) -> Path: appdata = os.environ.get("APPDATA") base_dir = Path(appdata) if appdata else Path.home() / ".config" return base_dir / PREFS_APP_DIR / PREFS_FILE_NAME @classmethod def app_data_dir(cls) -> Path: return cls._prefs_path().parent def _load_or_create(self) -> dict: if self.path.exists(): try: with self.path.open("r", encoding="utf-8") as fh: data = json.load(fh) return data if isinstance(data, dict) else {} except Exception: return {"output_dir": str(self.default_output_dir)} data = {"output_dir": str(self.default_output_dir)} self.data = data self._save() return data def _save(self) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) with self.path.open("w", encoding="utf-8") as fh: json.dump(self.data, fh, ensure_ascii=False, indent=2) class OutlookService: def __init__(self) -> None: self.outlook = None self.namespace = None self.root_folders: List[FolderNode] = [] def connect(self) -> None: if win32com is None or pythoncom is None: raise RuntimeError( "pywin32 non รจ installato. Esegui: pip install pywin32" ) pythoncom.CoInitialize() try: try: self.outlook = win32com.client.GetActiveObject("Outlook.Application") except Exception as exc: raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc self.namespace = self.outlook.GetNamespace("MAPI") self.root_folders = [] for i in range(1, self.namespace.Folders.Count + 1): store = self.namespace.Folders.Item(i) store_name = str(store.Name) node = FolderNode( store_name=store_name, path=NameCodec.store_display_name(store_name), folder_obj=store, display_name=NameCodec.store_display_name(store_name), ) self.root_folders.append(node) except Exception: pythoncom.CoUninitialize() raise def disconnect(self) -> None: try: self.root_folders = [] self.namespace = None self.outlook = None finally: if pythoncom is not None: try: pythoncom.CoUninitialize() except Exception: pass def list_root_folders(self) -> List[FolderNode]: return list(self.root_folders) def list_children(self, folder_obj: object, current_path: str) -> List[FolderNode]: children = [] folders = folder_obj.Folders for i in range(1, folders.Count + 1): child = folders.Item(i) child_name = str(child.Name) child_path = f"{current_path} / {child_name}" children.append( FolderNode( store_name=current_path.split(" / ")[0], path=child_path, folder_obj=child, display_name=child_name, mail_count=self.count_mail_items(child), ) ) return children @staticmethod def count_mail_items(folder_obj: object) -> Optional[int]: diagnostics = OutlookService.folder_item_diagnostics(folder_obj) restricted_count = diagnostics.get("restricted_count") raw_count = diagnostics.get("raw_count") if restricted_count is None: return raw_count if restricted_count == 0 and raw_count: return raw_count return restricted_count @staticmethod def folder_item_diagnostics(folder_obj: object) -> dict: diagnostics = { "folder_name": None, "raw_count": None, "restricted_count": None, "items_error": None, "restrict_error": None, "folder_path": None, "store_id": None, } try: diagnostics["folder_name"] = str(folder_obj.Name) except Exception: pass try: diagnostics["folder_path"] = str(folder_obj.FolderPath) except Exception: pass try: items = folder_obj.Items except Exception as exc: diagnostics["items_error"] = str(exc) return diagnostics try: diagnostics["raw_count"] = items.Count except Exception as exc: diagnostics["items_error"] = str(exc) try: diagnostics["restricted_count"] = items.Restrict("[MessageClass] = 'IPM.Note'").Count except Exception as exc: diagnostics["restrict_error"] = str(exc) try: diagnostics["store_id"] = str(folder_obj.StoreID) except Exception: pass return diagnostics @staticmethod def get_namespace_for_worker() -> object: if win32com is None: raise RuntimeError("pywin32 non disponibile.") try: outlook = win32com.client.GetActiveObject("Outlook.Application") except Exception as exc: raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc return outlook.GetNamespace("MAPI") class NameCodec: INVALID_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1F]') SPACE_RE = re.compile(r'\s+') @classmethod def compact_text(cls, value: str, max_len: int = 32) -> str: value = cls.SPACE_RE.sub(" ", value or "").strip() if len(value) <= max_len: return value return value[: max_len - 3].rstrip() + "..." @classmethod def store_display_name(cls, store_name: str) -> str: store_name = store_name or "Account" if "@" in store_name: local_part = store_name.split("@", 1)[0] return cls.compact_text(local_part, max_len=34) return cls.compact_text(store_name, max_len=34) @classmethod def tree_label(cls, display_name: str, mail_count: Optional[int]) -> str: display_name = cls.compact_text(display_name or "Folder", max_len=46) if mail_count is None: return display_name return f"{display_name} ({mail_count})" @classmethod def sanitize(cls, value: str, max_len: int = 80) -> str: value = value or "" value = cls.INVALID_CHARS_RE.sub("_", value) value = cls.SPACE_RE.sub(" ", value).strip() value = value.replace(".", "_") if not value: value = "empty" if len(value) > max_len: value = value[:max_len].rstrip(" _") return value @classmethod def folder_name_from_outlook_path(cls, folder_path: str) -> str: # Rimuove lo store iniziale quando il path arriva dall'albero "Store / Cartella". parts = [p.strip() for p in folder_path.split(" / ")] parts = [p for p in parts if p] export_parts = parts[1:] if len(parts) > 1 else parts safe_parts = [cls.sanitize(p, max_len=50) for p in export_parts] return os.path.join(*safe_parts) if safe_parts else "folder" @classmethod def archive_base_name_from_outlook_path(cls, folder_path: str) -> str: parts = [p.strip() for p in folder_path.split(" / ")] parts = [p for p in parts if p] folder_name = parts[-1] if parts else "export" return cls.sanitize(folder_name, max_len=60) @classmethod def email_base_name(cls, received_time: Optional[datetime], seq: int) -> str: if received_time is None: stamp = "data_sconosciuta" else: stamp = received_time.strftime("%Y-%m-%d_%H-%M-%S") return f"mail_{stamp}_{seq:05d}" @classmethod def mail_folder_name(cls, base_mail_name: str, subject: str, attachment_count: int = 0) -> str: safe_subject = cls.sanitize(subject or "no_subject", max_len=50) attachment_marker = f"CON_ALLEGATI_{attachment_count:02d}" if attachment_count else "SOLO_MAIL" return f"{base_mail_name}__{attachment_marker}__{safe_subject}" @classmethod def attachment_name(cls, base_mail_name: str, original_filename: str, att_index: int) -> str: original_filename = original_filename or f"attachment_{att_index}" original_filename = os.path.basename(original_filename) stem, ext = os.path.splitext(original_filename) stem = cls.sanitize(stem, max_len=80) ext = cls.sanitize_extension(ext) return f"attachment__{base_mail_name}__{att_index:02d}__{stem}{ext}" @classmethod def sanitize_extension(cls, ext: str, max_len: int = 20) -> str: ext = (ext or "").strip().lower() if not ext: return "" if not ext.startswith("."): ext = f".{ext}" ext = cls.INVALID_CHARS_RE.sub("_", ext) ext = ext.replace(" ", "_") if len(ext) > max_len: ext = ext[:max_len] return ext @classmethod def is_allowed_attachment(cls, filename: str) -> bool: ext = cls.sanitize_extension(os.path.splitext(filename or "")[1]) return ext in ALLOWED_ATTACHMENT_EXTENSIONS class Exporter: MANIFEST_FLUSH_INTERVAL_SECONDS = 20.0 def __init__(self, app: "OutlookExporterApp") -> None: self.app = app self.cancel_requested = False self.global_seq = 0 self.exported_mail_count = 0 self.exported_attachment_count = 0 self.skipped_mail_count = 0 self.skipped_manifest_count = 0 self.skipped_no_attachments_count = 0 self.non_mail_item_count = 0 self.datetime_diagnostic_count = 0 self.datetime_diagnostic_limit = 20 self.manifest: Optional[ExportManifest] = None self.manifest_dirty = False self.last_manifest_flush_monotonic = time.monotonic() self.options = ExportOptions() def configure(self, manifest: Optional[ExportManifest], options: ExportOptions) -> None: self.manifest = manifest self.manifest_dirty = False self.last_manifest_flush_monotonic = time.monotonic() self.options = options def cancel(self) -> None: self.cancel_requested = True def check_cancel(self) -> None: if self.cancel_requested: raise ExportCancelled("Export cancelled by the user.") def _mark_manifest_dirty(self) -> None: if self.manifest is None: return self.manifest_dirty = True def _flush_manifest_if_due(self, force: bool = False) -> None: if self.manifest is None or not self.manifest_dirty: return now = time.monotonic() if not force and (now - self.last_manifest_flush_monotonic) < self.MANIFEST_FLUSH_INTERVAL_SECONDS: return self.manifest.save() self.last_manifest_flush_monotonic = now self.manifest_dirty = False def export_folder(self, folder_obj: object, folder_out_path: Path) -> Tuple[int, int, int]: self.check_cancel() folder_out_path.mkdir(parents=True, exist_ok=True) folder_name = self._safe_get(folder_obj, "Name") or "(unnamed folder)" self.app.log(f"Outlook folder: {folder_name}") self.app.log(f"Destination folder: {folder_out_path}") self._log_folder_count_diagnostics(folder_obj) items = self._safe_get(folder_obj, "Items") if items is None: self.app.log("Warning: unable to read folder items, continuing with subfolders.") count = 0 else: try: items.Sort("[ReceivedTime]", False) except Exception: self.app.log("Warning: unable to sort by ReceivedTime, continuing without sorting.") try: count = items.Count except Exception as exc: self.app.log(f"Warning: unable to count folder items: {exc}") count = 0 local_mail_count = 0 local_attachment_count = 0 local_skipped_count = 0 self.app.log(f"Detected items: {count}") for idx in range(1, count + 1): self.check_cancel() try: item = items.Item(idx) except Exception as exc: self.app.log(f"Error accessing item {idx}: {exc}") continue message_class = self._safe_get(item, "Class") if message_class != MAIL_CLASS: self.non_mail_item_count += 1 continue if self.options.selected_keys is not None: key = self._build_manifest_key_for_item(item) if key not in self.options.selected_keys: continue try: self.global_seq += 1 mail_count, att_count, skipped_count = self._export_mail(item, folder_out_path) local_mail_count += mail_count local_attachment_count += att_count local_skipped_count += skipped_count self.exported_mail_count += mail_count self.exported_attachment_count += att_count self.skipped_mail_count += skipped_count except Exception as exc: subject = self._safe_get(item, "Subject") or "(no subject)" self.app.log(f"Error exporting email '{subject}': {exc}") if idx % 10 == 0: self.app.set_status( "Export in progress... " f"item {idx}/{count} | emails {self.exported_mail_count} | " f"attachments {self.exported_attachment_count} | skipped {self.skipped_mail_count}" ) self.app.pump_ui() if self.options.selected_keys is not None: return local_mail_count, local_attachment_count, local_skipped_count subfolders = self._safe_get(folder_obj, "Folders") subfolder_count = 0 try: subfolder_count = subfolders.Count if subfolders is not None else 0 except Exception as exc: self.app.log(f"Warning: unable to read subfolders: {exc}") for i in range(1, subfolder_count + 1): self.check_cancel() try: child = subfolders.Item(i) except Exception as exc: self.app.log(f"Error accessing subfolder {i}: {exc}") continue child_name = self._safe_get(child, "Name") or f"subfolder_{i}" child_dir = folder_out_path / NameCodec.sanitize(str(child_name), max_len=80) m, a, s = self.export_folder(child, child_dir) local_mail_count += m local_attachment_count += a local_skipped_count += s return local_mail_count, local_attachment_count, local_skipped_count def export_selected_items( self, namespace: object, selected_entry_refs: list[tuple[str, str]], folder_out_path: Path, ) -> Tuple[int, int, int]: self.check_cancel() folder_out_path.mkdir(parents=True, exist_ok=True) local_mail_count = 0 local_attachment_count = 0 local_skipped_count = 0 total = len(selected_entry_refs) self.app.log(f"Direct Outlook selective export: {total} selected emails") for idx, (store_id, entry_id) in enumerate(selected_entry_refs, start=1): self.check_cancel() try: if store_id: item = namespace.GetItemFromID(entry_id, store_id) else: item = namespace.GetItemFromID(entry_id) except Exception as exc: self.app.log(f"Error retrieving selected email {idx}/{total}: {exc}") continue message_class = self._safe_get(item, "Class") if message_class != MAIL_CLASS: self.non_mail_item_count += 1 continue try: self.global_seq += 1 mail_count, att_count, skipped_count = self._export_mail(item, folder_out_path) local_mail_count += mail_count local_attachment_count += att_count local_skipped_count += skipped_count self.exported_mail_count += mail_count self.exported_attachment_count += att_count self.skipped_mail_count += skipped_count except Exception as exc: subject = self._safe_get(item, "Subject") or "(no subject)" self.app.log(f"Error exporting selected email '{subject}': {exc}") self.app.set_status( "Selective export in progress... " f"{idx}/{total} | emails {self.exported_mail_count} | " f"attachments {self.exported_attachment_count} | skipped {self.skipped_mail_count}" ) self.app.pump_ui() return local_mail_count, local_attachment_count, local_skipped_count def _log_folder_count_diagnostics(self, folder_obj: object) -> None: diagnostics = OutlookService.folder_item_diagnostics(folder_obj) self.app.log("Folder count diagnostics:") self.app.log(f" folder_name: {diagnostics.get('folder_name')}") self.app.log(f" folder_path: {diagnostics.get('folder_path')}") self.app.log(f" raw Items.Count: {diagnostics.get('raw_count')}") self.app.log(f" Restrict IPM.Note Count: {diagnostics.get('restricted_count')}") self.app.log(f" items_error: {diagnostics.get('items_error')}") self.app.log(f" restrict_error: {diagnostics.get('restrict_error')}") def _export_mail(self, item: object, folder_out_path: Path) -> Tuple[int, int, int]: received_dt = self._get_best_mail_datetime(item) subject = self._safe_get(item, "Subject") or "" manifest_record = self._build_mail_record(item, subject, received_dt, 0) if self.manifest is not None and self.manifest.has_record(manifest_record.key): if self.manifest.record_file_exists(manifest_record.key): self.app.log(f"Email already present in the manifest, skipping: {subject or '(no subject)'}") self.skipped_manifest_count += 1 return 0, 0, 1 self.app.log( f"Obsolete manifest record, re-exporting the email: {subject or '(no subject)'}" ) self.manifest.remove_record(manifest_record.key) self._mark_manifest_dirty() body = self._safe_get(item, "Body") or "" html_body = self._safe_get(item, "HTMLBody") or "" exportable_attachments = self._collect_exportable_attachments(item, html_body) manifest_record.attachment_count = len(exportable_attachments) if self.options.only_with_attachments and not exportable_attachments: self.app.log(f"Email without exportable attachments, skipping: {subject or '(no subject)'}") self.skipped_no_attachments_count += 1 return 0, 0, 1 base_name = NameCodec.email_base_name(received_dt, self.global_seq) canonical_mail_dir = folder_out_path / NameCodec.mail_folder_name(base_name, subject, len(exportable_attachments)) canonical_txt_path = canonical_mail_dir / f"{base_name}.txt" if canonical_txt_path.exists(): self.app.log(f"Email already present on the file system, skipping: {subject or '(no subject)'}") self.skipped_manifest_count += 1 if self.manifest is not None: try: manifest_record.txt_relative_path = canonical_txt_path.relative_to(self.manifest.root_dir).as_posix() except Exception: manifest_record.txt_relative_path = "" attachments_dir = canonical_mail_dir / "attachments" if attachments_dir.exists(): try: manifest_record.exported_attachment_count = len([p for p in attachments_dir.iterdir() if p.is_file()]) except Exception: manifest_record.exported_attachment_count = 0 manifest_record.status = "exported" self.manifest.update_record(manifest_record) self._mark_manifest_dirty() self._flush_manifest_if_due() return 0, 0, 1 mail_dir = self._unique_path(canonical_mail_dir) mail_dir.mkdir(parents=True, exist_ok=True) txt_path = mail_dir / f"{base_name}.txt" header_lines = [ f"Subject: {subject}", f"Received: {received_dt.strftime('%Y-%m-%d %H:%M:%S') if received_dt else ''}", "", "--- EMAIL TEXT ---", body, ] txt_path.write_text("\n".join(header_lines), encoding="utf-8", errors="replace") self.app.log(f"Mail salvata: {txt_path.name}") if self.manifest is not None: try: manifest_record.txt_relative_path = txt_path.relative_to(self.manifest.root_dir).as_posix() except Exception: manifest_record.txt_relative_path = "" attachment_count = 0 if exportable_attachments: attachments_dir = mail_dir / "attachments" for att_index, attachment, original_filename in exportable_attachments: self.check_cancel() try: export_name = NameCodec.attachment_name(base_name, original_filename, att_index) attachments_dir.mkdir(parents=True, exist_ok=True) export_path = self._unique_path(attachments_dir / export_name) attachment.SaveAsFile(str(export_path)) attachment_count += 1 self.app.log(f" Attachment saved: {export_path.name}") except Exception as exc: self.app.log(f" Error saving attachment {att_index}: {exc}") manifest_record.exported_attachment_count = attachment_count manifest_record.status = "exported" if self.manifest is not None: self.manifest.update_record(manifest_record) self._mark_manifest_dirty() self._flush_manifest_if_due() return 1, attachment_count, 0 def _collect_exportable_attachments(self, item: object, html_body: str) -> List[Tuple[int, object, str]]: exportable = [] attachments = self._safe_get(item, "Attachments") if attachments is None: return exportable try: attachment_total = attachments.Count except Exception as exc: self.app.log(f" Error reading the number of attachments: {exc}") return exportable for att_index in range(1, attachment_total + 1): self.check_cancel() try: attachment = attachments.Item(att_index) original_filename = self._safe_get(attachment, "FileName") or f"attachment_{att_index}" inline_score, inline_reasons = self._signature_image_score( attachment, original_filename, html_body ) if inline_score >= 3: self.app.log( f" Ignored inline/signature attachment: {original_filename} | " f"score={inline_score} | reasons={', '.join(inline_reasons)}" ) elif NameCodec.is_allowed_attachment(original_filename): exportable.append((att_index, attachment, original_filename)) else: self.app.log(f" Ignored attachment: {original_filename}") except Exception as exc: self.app.log(f" Error reading attachment {att_index}: {exc}") return exportable def _signature_image_score(self, attachment: object, filename: str, html_body: str) -> Tuple[int, List[str]]: ext = NameCodec.sanitize_extension(os.path.splitext(filename or "")[1]) if ext not in IMAGE_ATTACHMENT_EXTENSIONS: return 0, [] score = 0 reasons = [] html_lower = (html_body or "").lower() filename_lower = os.path.basename(filename or "").lower() hidden = self._get_attachment_mapi_property(attachment, MAPI_ATTACHMENT_HIDDEN) if hidden is True: score += 4 reasons.append("hidden") rendering_position = self._get_attachment_mapi_property(attachment, MAPI_RENDERING_POSITION) rendering_position_int = self._safe_int(rendering_position) if rendering_position_int is not None: if rendering_position_int not in (MAPI_RENDERING_POSITION_NOT_RENDERED, 0xFFFFFFFF): score += 1 reasons.append(f"rendering-position={rendering_position_int}") else: reasons.append("rendering-position=-1") content_id = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_ID) if self._has_value(content_id): cid = str(content_id).strip() if self._html_references_cid(html_lower, cid): score += 3 reasons.append(f"cid nel corpo HTML:{self._short_diag(cid)}") else: score += 1 reasons.append(f"cid presente:{self._short_diag(cid)}") content_location = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_LOCATION) if self._has_value(content_location): location = str(content_location).strip() if location.lower() in html_lower: score += 2 reasons.append(f"content-location nel corpo HTML:{self._short_diag(location)}") else: score += 1 reasons.append(f"content-location presente:{self._short_diag(location)}") disposition = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_DISPOSITION) if self._has_value(disposition) and "inline" in str(disposition).lower(): score += 3 reasons.append(f"disposition inline:{self._short_diag(disposition)}") if filename_lower and filename_lower in html_lower: score += 2 reasons.append("nome file nel corpo HTML") stem = os.path.splitext(filename_lower)[0] if INLINE_ATTACHMENT_NAME_RE.match(stem or "") is not None: score += 2 reasons.append("nome tipico inline/firma") return score, reasons @staticmethod def _html_references_cid(html_lower: str, content_id: str) -> bool: if not content_id: return False cid = content_id.strip().strip("<>").lower() if not cid: return False return f"cid:{cid}" in html_lower or cid in html_lower def _get_attachment_mapi_property(self, attachment: object, schema: str): try: accessor = attachment.PropertyAccessor return accessor.GetProperty(schema) except Exception: return None def _get_first_attachment_mapi_property(self, attachment: object, schemas): for schema in schemas: value = self._get_attachment_mapi_property(attachment, schema) if self._has_value(value): return value return None @staticmethod def _safe_int(value: object) -> Optional[int]: if value is None: return None try: return int(value) except Exception: return None @staticmethod def _short_diag(value: object, max_len: int = 60) -> str: try: text = str(value).strip() except Exception: text = repr(value) text = text.replace("\r", " ").replace("\n", " ") if len(text) > max_len: return text[:max_len] + "..." return text @staticmethod def _has_value(value: object) -> bool: if value is None: return False try: return bool(str(value).strip()) except Exception: return True def _get_best_mail_datetime(self, item: object) -> Optional[datetime]: for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"): value = self._safe_get(item, attr) converted = self._convert_outlook_datetime(value) if converted is not None: if attr != "ReceivedTime": self.app.log(f" ReceivedTime not available, using {attr}.") return converted self._log_datetime_diagnostics(item) return None def _log_datetime_diagnostics(self, item: object) -> None: if self.datetime_diagnostic_count >= self.datetime_diagnostic_limit: if self.datetime_diagnostic_count == self.datetime_diagnostic_limit: self.app.log( " Date diagnostics: limit reached, further emails without a date will not be logged in detail." ) self.datetime_diagnostic_count += 1 return self.datetime_diagnostic_count += 1 subject = self._safe_get(item, "Subject") or "(no subject)" self.app.log(f" Email date diagnostics #{self.global_seq}: {subject}") for label, value in self._mail_identity_details(item): self.app.log(f" {label}: {value}") for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"): ok, value_or_error = self._safe_get_with_error(item, attr) if not ok: self.app.log(f" {attr}: ACCESS ERROR: {value_or_error}") continue value = value_or_error self.app.log( f" {attr}: tipo={type(value).__name__}; repr={repr(value)}; str={self._safe_str(value)}" ) def _mail_identity_details(self, item: object) -> List[Tuple[str, str]]: parent = self._safe_get(item, "Parent") folder_name = self._safe_get(parent, "Name") if parent is not None else None store_id = self._safe_get(parent, "StoreID") if parent is not None else None details = [ ("EntryID", self._safe_str(self._safe_get(item, "EntryID"))), ("ConversationID", self._safe_str(self._safe_get(item, "ConversationID"))), ("InternetMessageID", self._safe_str(self._get_mail_mapi_property(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F"))), ("Outlook folder", self._safe_str(folder_name)), ("StoreID", self._safe_str(store_id)), ] return details def _build_mail_record( self, item: object, subject: str, received_dt: Optional[datetime], attachment_count: int, ) -> MailRecord: received_text = received_dt.strftime("%Y-%m-%d %H:%M:%S") if received_dt else "" entry_id = self._safe_full_str(self._safe_get(item, "EntryID")) parent = self._safe_get(item, "Parent") store_id = self._safe_full_str(self._safe_get(parent, "StoreID") if parent is not None else None) internet_message_id = self._safe_full_str( self._get_mail_mapi_property(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F") ) key = self._build_manifest_key_for_item(item) return MailRecord( key=key, entry_id=entry_id, store_id=store_id, folder_key=self.options.folder_key, received_time=received_text, txt_relative_path="", attachment_count=attachment_count, exported_attachment_count=0, export_mode=self.options.mode, subject=self._safe_str(subject), internet_message_id=internet_message_id, ) def _build_manifest_key_for_item( self, item: object, subject: Optional[str] = None, received_text: Optional[str] = None, ) -> str: entry_id = self._safe_full_str(self._safe_get(item, "EntryID")) parent = self._safe_get(item, "Parent") store_id = self._safe_full_str(self._safe_get(parent, "StoreID") if parent is not None else None) return self._build_manifest_key_from_ids(store_id, entry_id) @staticmethod def _build_manifest_key_from_ids(store_id: object, entry_id: object) -> str: return f"{Exporter._safe_full_str(store_id)}|{Exporter._safe_full_str(entry_id)}" def _get_mail_mapi_property(self, item: object, schema: str): try: accessor = item.PropertyAccessor return accessor.GetProperty(schema) except Exception: return None @staticmethod def _unique_path(path: Path) -> Path: if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent counter = 2 while True: candidate = parent / f"{stem}__{counter}{suffix}" if not candidate.exists(): return candidate counter += 1 @staticmethod def _safe_get(obj: object, attr: str): try: return getattr(obj, attr) except Exception: return None @staticmethod def _safe_get_with_error(obj: object, attr: str) -> Tuple[bool, object]: try: return True, getattr(obj, attr) except Exception as exc: return False, exc @staticmethod def _safe_str(value: object, max_len: int = 300) -> str: try: text = str(value) except Exception as exc: text = f"" if len(text) > max_len: return text[:max_len] + "..." return text @staticmethod def _safe_full_str(value: object) -> str: try: return str(value) except Exception: return "" @staticmethod def _convert_outlook_datetime(value) -> Optional[datetime]: if value is None: return None if isinstance(value, datetime): return value try: return datetime( value.year, value.month, value.day, value.hour, value.minute, value.second, ) except Exception: pass for converter in (getattr(value, "Format", None), getattr(value, "strftime", None)): if converter is None: continue try: text = str(converter("%Y-%m-%d %H:%M:%S")) return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") except Exception: continue try: return datetime.fromisoformat(str(value)) except Exception: return None class OutlookExporterApp(ctk.CTk): def __init__(self) -> None: super().__init__() self.title(APP_TITLE) self.geometry(APP_GEOMETRY) self.minsize(1080, 680) self.withdraw() ctk.set_appearance_mode("system") ctk.set_default_color_theme("blue") self.splash = None self.show_startup_splash() self.outlook_service = OutlookService() self.exporter = Exporter(self) self.export_thread = None self.main_thread_id = threading.get_ident() self.ui_queue = queue.Queue() self.connected = False self.tree_folder_map = {} self.tree_path_map = {} self.empty_folder_alert_shown = set() self.selected_folder_obj = None self.filter_thread = None self.shutting_down = False self.ui_queue_after_id = None self.loaded_email_headers: List[EmailHeader] = [] self.filtered_email_headers: List[EmailHeader] = [] self.filter_tree_key_map = {} self.filter_tree = None self.filter_loaded_folder_text = "" self.filter_live_after_id = None self.filter_live_delay_ms = 300 self.filter_status_var = ctk.StringVar(value="No emails loaded") self.filter_loaded_folder_var = ctk.StringVar(value="No folder loaded") self.filter_subject_var = ctk.StringVar(value="") self.filter_sender_var = ctk.StringVar(value="") self.filter_date_range_var = ctk.StringVar(value="") self.filter_export_new_var = ctk.IntVar(value=0) self.filter_export_with_attachments_var = ctk.IntVar(value=0) self.filter_export_selected_var = ctk.IntVar(value=0) self.log_lines: List[str] = [] self.pending_file_log_lines: List[str] = [] self.tooltips: List[ToolTip] = [] self.log_window = None self.log_window_box = None self.help_window = None self.help_window_box = None default_output_dir = Path.home() / "Desktop" / "outlook_export" self.prefs = PreferenceStore(default_output_dir) self.log_file_path = PreferenceStore.app_data_dir() / LOG_FILE_NAME self._init_file_log() self.selected_folder_display = ctk.StringVar(value="No folder selected") self.output_dir_var = ctk.StringVar(value=self.prefs.get_output_dir()) self.status_var = ctk.StringVar(value="Ready") self.busy_title_var = ctk.StringVar(value="") self.busy_detail_var = ctk.StringVar(value="") self.busy_overlay_visible = False self.last_export_duration_seconds = 0.0 self.last_filter_load_duration_seconds = 0.0 self.create_zip_var = ctk.IntVar(value=0) self.create_hierarchy_var = ctk.IntVar(value=1) self._build_ui() self._install_tooltips() self.hide_startup_splash() self.deiconify() self.close_pyinstaller_splash() self.log(f"Log file: {self.log_file_path}") self.ui_queue_after_id = self.after(100, self.process_ui_queue) self.protocol("WM_DELETE_WINDOW", self.on_close) def show_startup_splash(self) -> None: splash = ctk.CTkToplevel(self) splash.title(APP_TITLE) splash.overrideredirect(True) splash.resizable(False, False) splash.attributes("-topmost", True) width = 420 height = 180 x = max(0, int((splash.winfo_screenwidth() - width) / 2)) y = max(0, int((splash.winfo_screenheight() - height) / 2)) splash.geometry(f"{width}x{height}+{x}+{y}") splash.grid_columnconfigure(0, weight=1) splash.grid_rowconfigure(0, weight=1) content = ctk.CTkFrame(splash) content.grid(row=0, column=0, sticky="nsew", padx=12, pady=12) content.grid_columnconfigure(0, weight=1) ctk.CTkLabel( content, text="Outlook Email Exporter", font=ctk.CTkFont(size=26, weight="bold"), ).grid(row=0, column=0, pady=(24, 8)) ctk.CTkLabel(content, text="Starting...").grid(row=1, column=0, pady=(0, 18)) progress = ctk.CTkProgressBar(content, mode="indeterminate") progress.grid(row=2, column=0, sticky="ew", padx=34, pady=(0, 18)) progress.start() self.splash = splash self.update_idletasks() self.update() def hide_startup_splash(self) -> None: if self.splash is not None: try: self.splash.destroy() except Exception: pass self.splash = None @staticmethod def close_pyinstaller_splash() -> None: try: import pyi_splash pyi_splash.close() except Exception: pass def _build_ui(self) -> None: self.grid_columnconfigure(0, weight=1) self.grid_rowconfigure(1, weight=1) top = ctk.CTkFrame(self) top.grid(row=0, column=0, sticky="nsew", padx=8, pady=(8, 4)) top.grid_columnconfigure(3, weight=1) top.grid_columnconfigure(4, weight=0) self.btn_connect = ctk.CTkButton(top, text="Connect Outlook", command=self.connect_outlook) self.btn_connect.grid(row=0, column=0, padx=6, pady=6) self.btn_refresh = ctk.CTkButton(top, text="Reload folders", command=self.reload_tree, state="disabled") self.btn_refresh.grid(row=0, column=1, padx=6, pady=6) self.btn_expand = ctk.CTkButton(top, text="Expand tree", command=self.expand_all, state="disabled") self.btn_expand.grid(row=0, column=2, padx=6, pady=6) lbl_selected = ctk.CTkLabel(top, textvariable=self.selected_folder_display, anchor="w") lbl_selected.grid(row=0, column=3, padx=6, pady=6, sticky="ew") self.btn_help = ctk.CTkButton(top, text="?", width=34, command=self.open_help_window) self.btn_help.grid(row=0, column=4, padx=(6, 10), pady=6, sticky="e") main_pane = ttk.Panedwindow(self, orient="horizontal") main_pane.grid(row=1, column=0, sticky="nsew", padx=8, pady=(4, 8)) left = ctk.CTkFrame(main_pane) left.grid_rowconfigure(1, weight=1) left.grid_columnconfigure(0, weight=1) ctk.CTkLabel(left, text="Outlook", font=ctk.CTkFont(size=18, weight="bold")).grid( row=0, column=0, sticky="w", padx=8, pady=(8, 2) ) tree_container = ctk.CTkFrame(left) tree_container.grid(row=1, column=0, sticky="nsew", padx=8, pady=(2, 8)) tree_container.grid_rowconfigure(0, weight=1) tree_container.grid_columnconfigure(0, weight=1) style = ttk.Style() try: style.theme_use("default") except Exception: pass style.configure("Treeview", rowheight=24) self.tree_font_bold = ("Segoe UI", 10, "bold") style.map( "Treeview", background=[("selected", "#93c5fd")], foreground=[("selected", "#111827")], ) self.tree = ttk.Treeview(tree_container, show="tree") self.tree.grid(row=0, column=0, sticky="nsew") self.tree.column("#0", width=280, minwidth=220, stretch=True) self.tree.tag_configure("root", background="#d9d9d9", font=self.tree_font_bold) self.tree.tag_configure("odd", background="#f3f4f6") self.tree.tag_configure("even", background="#e5e7eb") self.tree.tag_configure("branch_odd", background="#f3f4f6", font=self.tree_font_bold) self.tree.tag_configure("branch_even", background="#e5e7eb", font=self.tree_font_bold) self.tree.bind("<>", self.on_tree_open) self.tree.bind("<>", self.on_tree_select) yscroll = ttk.Scrollbar(tree_container, orient="vertical", command=self.tree.yview) yscroll.grid(row=0, column=1, sticky="ns") self.tree.configure(yscrollcommand=yscroll.set) right = ctk.CTkFrame(main_pane) right.grid_rowconfigure(1, weight=1) right.grid_columnconfigure(0, weight=1) main_pane.add(left, weight=7) main_pane.add(right, weight=13) right_top = ctk.CTkFrame(right) right_top.grid(row=0, column=0, sticky="nsew", padx=0, pady=(0, 4)) right_top.grid_columnconfigure(1, weight=1) ctk.CTkLabel(right_top, text="Immediate export", font=ctk.CTkFont(size=18, weight="bold")).grid( row=0, column=0, columnspan=3, sticky="w", padx=8, pady=(8, 4) ) ctk.CTkLabel(right_top, text="Output folder:").grid(row=1, column=0, padx=8, pady=4, sticky="w") self.entry_output = ctk.CTkEntry(right_top, textvariable=self.output_dir_var) self.entry_output.grid(row=1, column=1, padx=8, pady=4, sticky="ew") self.btn_browse = ctk.CTkButton(right_top, text="Browse", width=110, command=self.choose_output_dir) self.btn_browse.grid(row=1, column=2, padx=8, pady=4) help_text = ( "The selected folder will be exported together with all its subfolders.\n" "Each email is saved as a TXT file and attachments are saved in their original format." ) ctk.CTkLabel(right_top, text=help_text, justify="left").grid( row=2, column=0, columnspan=3, padx=8, pady=(2, 4), sticky="w" ) options = ctk.CTkFrame(right_top, fg_color="transparent") options.grid(row=3, column=0, columnspan=3, sticky="ew", padx=8, pady=(2, 4)) options.grid_columnconfigure(0, weight=1) options.grid_columnconfigure(1, weight=1) self.chk_zip = ctk.CTkCheckBox( options, text="Create zip", variable=self.create_zip_var, command=lambda: self.on_export_mode_change("zip"), ) self.chk_zip.grid(row=0, column=0, sticky="w", padx=(0, 12)) self.chk_hierarchy = ctk.CTkCheckBox( options, text="Create folders on file system", variable=self.create_hierarchy_var, command=lambda: self.on_export_mode_change("hierarchy"), ) self.chk_hierarchy.grid(row=0, column=1, sticky="w", padx=(12, 0)) self.btn_export = ctk.CTkButton( right_top, text="Export selected folder without preview", command=lambda: self.start_export(EXPORT_MODE_ALL), state="disabled", height=32, ) self.btn_export.grid(row=4, column=0, columnspan=2, padx=8, pady=(4, 6), sticky="ew") right_bottom = ctk.CTkFrame(right) right_bottom.grid(row=1, column=0, sticky="nsew", padx=0, pady=(4, 0)) right_bottom.grid_rowconfigure(4, weight=1) right_bottom.grid_columnconfigure(1, weight=1) right_bottom.grid_columnconfigure(3, weight=1) filter_group = ctk.CTkFrame(right_bottom) filter_group.grid(row=0, column=0, columnspan=4, sticky="nsew", padx=(8, 4), pady=(8, 4)) filter_group.grid_columnconfigure(1, weight=1) filter_group.grid_columnconfigure(3, weight=1) ctk.CTkLabel(filter_group, text="Preview and filters", font=ctk.CTkFont(size=18, weight="bold")).grid( row=0, column=0, columnspan=4, sticky="w", padx=8, pady=(8, 2) ) ctk.CTkLabel(filter_group, textvariable=self.filter_loaded_folder_var, anchor="w").grid( row=1, column=0, columnspan=4, sticky="ew", padx=8, pady=(0, 4) ) ctk.CTkLabel(filter_group, text="Sender contains").grid(row=2, column=0, sticky="w", padx=8, pady=(0, 4)) self.entry_filter_sender = ctk.CTkEntry(filter_group, textvariable=self.filter_sender_var, state="disabled") self.entry_filter_sender.grid(row=2, column=1, sticky="ew", padx=(0, 8), pady=(0, 4)) self.entry_filter_sender.bind("", self._on_live_filter_input) ctk.CTkLabel(filter_group, text="Subject contains").grid(row=2, column=2, sticky="w", padx=8, pady=(0, 4)) self.entry_filter_subject = ctk.CTkEntry(filter_group, textvariable=self.filter_subject_var, state="disabled") self.entry_filter_subject.grid(row=2, column=3, sticky="ew", padx=(0, 8), pady=(0, 4)) self.entry_filter_subject.bind("", self._on_live_filter_input) ctk.CTkLabel(filter_group, text="Period").grid(row=3, column=0, sticky="w", padx=8, pady=(0, 4)) self.combo_filter_date_range = ctk.CTkComboBox( filter_group, variable=self.filter_date_range_var, values=["", "Last week", "Last month", "Last 6 months", "Last year"], command=lambda _value: self.schedule_live_filter(), state="disabled", ) self.combo_filter_date_range.grid(row=3, column=1, sticky="w", padx=(0, 8), pady=(0, 4)) filter_options = ctk.CTkFrame(filter_group, fg_color="transparent") filter_options.grid(row=4, column=0, columnspan=4, sticky="ew", padx=8, pady=(0, 8)) filter_options.grid_columnconfigure(3, weight=1) self.chk_filter_export_new = ctk.CTkCheckBox( filter_options, text="New", variable=self.filter_export_new_var, command=self.schedule_live_filter, state="disabled", ) self.chk_filter_export_new.grid(row=0, column=0, sticky="w", padx=(0, 12)) self.chk_filter_export_with_attachments = ctk.CTkCheckBox( filter_options, text="With attachments", variable=self.filter_export_with_attachments_var, command=self.schedule_live_filter, state="disabled", ) self.chk_filter_export_with_attachments.grid(row=0, column=1, sticky="w", padx=(0, 12)) self.chk_filter_export_selected = ctk.CTkCheckBox( filter_options, text="Selected", variable=self.filter_export_selected_var, command=self.schedule_live_filter, state="disabled", ) self.chk_filter_export_selected.grid(row=0, column=2, sticky="w", padx=(0, 12)) actions_group = ctk.CTkFrame(right_bottom) actions_group.grid(row=0, column=4, sticky="nsew", padx=(5, 10), pady=(10, 8)) actions_group.grid_columnconfigure(0, weight=1) actions_group.grid_rowconfigure(4, weight=1) ctk.CTkLabel(actions_group, text="Actions", font=ctk.CTkFont(size=15, weight="bold")).grid( row=0, column=0, sticky="w", padx=8, pady=(8, 4) ) self.btn_load_headers = ctk.CTkButton( actions_group, text="Load preview", width=130, command=self.load_folder_for_filtering, state="disabled", ) self.btn_load_headers.grid(row=1, column=0, sticky="ew", padx=8, pady=(0, 6)) self.btn_apply_filters = ctk.CTkButton( actions_group, text="Apply filters", command=self.apply_email_filters, width=120, state="disabled", ) self.btn_apply_filters.grid(row=2, column=0, sticky="ew", padx=8, pady=(0, 6)) self.btn_reset_filters = ctk.CTkButton( actions_group, text="Reset filters", command=self.reset_email_filters, width=110, state="disabled", ) self.btn_reset_filters.grid(row=3, column=0, sticky="ew", padx=8, pady=(0, 8)) table_frame = ctk.CTkFrame(right_bottom) table_frame.grid(row=4, column=0, columnspan=5, sticky="nsew", padx=8, pady=(0, 8)) table_frame.grid_columnconfigure(0, weight=1) table_frame.grid_rowconfigure(0, weight=1) columns = ("date", "sender", "subject", "attachments", "types", "state") self.filter_tree = ttk.Treeview(table_frame, columns=columns, show="headings", selectmode="extended") self.filter_tree.grid(row=0, column=0, sticky="nsew") self.filter_tree.tag_configure("odd", background="#f8fafc") self.filter_tree.tag_configure("even", background="#e5e7eb") self.filter_tree.heading("date", text="Date") self.filter_tree.heading("sender", text="Sender") self.filter_tree.heading("subject", text="Subject") self.filter_tree.heading("attachments", text="Att.") self.filter_tree.heading("types", text="Type") self.filter_tree.heading("state", text="State") self.filter_tree.column("date", width=150, anchor="w") self.filter_tree.column("sender", width=220, anchor="w") self.filter_tree.column("subject", width=430, anchor="w") self.filter_tree.column("attachments", width=36, minwidth=32, anchor="center", stretch=False) self.filter_tree.column("types", width=120, minwidth=90, anchor="center") self.filter_tree.column("state", width=135, minwidth=120, anchor="center") filter_yscroll = ttk.Scrollbar(table_frame, orient="vertical", command=self.filter_tree.yview) filter_yscroll.grid(row=0, column=1, sticky="ns") filter_xscroll = ttk.Scrollbar(table_frame, orient="horizontal", command=self.filter_tree.xview) filter_xscroll.grid(row=1, column=0, sticky="ew") self.filter_tree.configure(yscrollcommand=filter_yscroll.set, xscrollcommand=filter_xscroll.set) self.filter_tree.bind("<>", self._on_filter_tree_select) preview_actions = ctk.CTkFrame(right_bottom, fg_color="transparent") preview_actions.grid(row=5, column=0, columnspan=5, sticky="ew", padx=8, pady=(0, 8)) preview_actions.grid_columnconfigure(0, weight=1) ctk.CTkLabel(preview_actions, textvariable=self.filter_status_var, anchor="w").grid( row=0, column=0, sticky="ew", padx=(0, 10), pady=0 ) self.btn_export_preview = ctk.CTkButton( preview_actions, text="Export preview", width=150, command=self.export_preview_subset, state="disabled", ) self.btn_export_preview.grid(row=0, column=1, sticky="e", pady=0) status = ctk.CTkFrame(self) status.grid(row=2, column=0, sticky="ew", padx=8, pady=(0, 8)) status.grid_columnconfigure(0, weight=1) status.grid_columnconfigure(1, weight=0) status.grid_columnconfigure(2, weight=0) status.grid_columnconfigure(3, weight=0) ctk.CTkLabel(status, textvariable=self.status_var, anchor="w").grid( row=0, column=0, sticky="ew", padx=8, pady=6 ) self.btn_status_log = ctk.CTkButton(status, text="Open log", width=90, command=self.toggle_log_window) self.btn_status_log.grid(row=0, column=1, sticky="e", padx=(0, 8), pady=6) self.btn_cancel = ctk.CTkButton( status, text="Cancel export", command=self.cancel_export, state="disabled", width=150, fg_color="#9b2c2c", hover_color="#7f1d1d", ) self.btn_cancel.grid(row=0, column=2, sticky="e", padx=(0, 8), pady=6) self.btn_exit = ctk.CTkButton(status, text="Exit", width=100, command=self.on_close) self.btn_exit.grid(row=0, column=3, sticky="e", padx=8, pady=6) self.busy_overlay = ctk.CTkFrame(self, corner_radius=10, border_width=1, height=150) self.busy_overlay.grid_columnconfigure(0, weight=1) ctk.CTkLabel( self.busy_overlay, textvariable=self.busy_title_var, font=ctk.CTkFont(size=18, weight="bold"), ).grid(row=0, column=0, sticky="ew", padx=18, pady=(18, 8)) ctk.CTkLabel( self.busy_overlay, textvariable=self.busy_detail_var, justify="center", ).grid(row=1, column=0, sticky="ew", padx=18, pady=(0, 10)) self.busy_progress = ctk.CTkProgressBar(self.busy_overlay, mode="indeterminate") self.busy_progress.grid(row=2, column=0, sticky="ew", padx=18, pady=(0, 18)) self.busy_overlay.place_forget() self._update_filter_controls_state() def _install_tooltips(self) -> None: tooltip_specs = [ (self.btn_connect, "Connect to the already running Outlook session and load the available accounts."), (self.btn_refresh, "Reload the Outlook folder tree from the current session."), (self.btn_expand, "Expand all folders that are already loaded in the tree."), (self.btn_help, "Open the quick help window."), (self.btn_browse, "Choose the destination folder for export files and the manifest."), (self.btn_export, "Export the selected folder and all subfolders immediately, without using the preview."), (self.btn_load_headers, "Load a lightweight preview of the emails in the selected folder."), (self.btn_apply_filters, "Apply the current filters to the loaded preview."), (self.btn_reset_filters, "Clear preview filters and show all loaded emails again."), (self.btn_export_preview, "Export the subset of emails currently visible in the filtered preview."), (self.btn_status_log, "Open or close the application log window."), (self.btn_cancel, "Request cancellation of the long-running operation in progress."), (self.btn_exit, "Close the application and try to shut it down cleanly."), (self.entry_output, "Destination folder used for exported files and the export manifest."), (self.entry_filter_sender, "Filter the preview by sender name or sender address."), (self.entry_filter_subject, "Filter the preview by words contained in the email subject."), (self.combo_filter_date_range, "Limit the preview to a recent time range."), (self.chk_zip, "Create a ZIP archive as export output."), (self.chk_hierarchy, "Create folders and files directly on the file system."), (self.chk_filter_export_new, "Show only emails that are not yet exported."), (self.chk_filter_export_with_attachments, "Show only emails that have Outlook attachments."), (self.chk_filter_export_selected, "Limit the preview and export to the rows currently selected."), ] self.tooltips = [ToolTip(widget, text) for widget, text in tooltip_specs] def connect_outlook(self) -> None: try: self.set_status("Connecting to Outlook...") self.log("Connecting to Outlook...") self.outlook_service.connect() self.connected = True self.log("Connected to Outlook.") self.reload_tree() self.btn_refresh.configure(state="normal") self.btn_expand.configure(state="normal") self.set_status("Connected to Outlook") except Exception as exc: self.connected = False self.log(f"Outlook connection error: {exc}") self.set_status("Connection error") messagebox.showerror(APP_TITLE, f"Unable to connect to Outlook.\n\n{exc}") def reload_tree(self) -> None: if not self.connected: return if self.filter_live_after_id is not None: try: self.after_cancel(self.filter_live_after_id) except Exception: pass self.filter_live_after_id = None self.tree.delete(*self.tree.get_children()) self.tree_folder_map.clear() self.tree_path_map.clear() self.empty_folder_alert_shown.clear() self.selected_folder_obj = None self.loaded_email_headers = [] self.filtered_email_headers = [] self.filter_tree_key_map = {} self.filter_loaded_folder_text = "" self.filter_loaded_folder_var.set("No folder loaded") self.filter_status_var.set("No emails loaded") self.selected_folder_display.set("No folder selected") self.btn_export.configure(state="disabled") self.refresh_filter_tree() self._update_filter_controls_state() for root_node in self.outlook_service.list_root_folders(): node_text = NameCodec.tree_label(root_node.display_name or root_node.path, root_node.mail_count) node_id = self.tree.insert("", "end", text=node_text, open=False, tags=("root",)) self.tree_folder_map[node_id] = root_node.folder_obj self.tree_path_map[node_id] = root_node.path self._insert_dummy(node_id) self.log("Folder tree reloaded.") def _insert_dummy(self, node_id: str) -> None: self.tree.insert(node_id, "end", text="__dummy__") def on_tree_open(self, _event=None) -> None: selected = self.tree.focus() if not selected: return children = self.tree.get_children(selected) if len(children) == 1 and self.tree.item(children[0], "text") == "__dummy__": self.tree.delete(children[0]) folder_obj = self.tree_folder_map.get(selected) current_path = self.tree_path_map.get(selected, self.tree.item(selected, "text")) try: child_nodes = self.outlook_service.list_children(folder_obj, current_path) for idx, node in enumerate(child_nodes, start=1): node_text = NameCodec.tree_label(node.display_name or node.path, node.mail_count) has_children = False try: has_children = bool(node.folder_obj.Folders.Count > 0) except Exception: has_children = False if has_children: row_tag = "branch_even" if idx % 2 == 0 else "branch_odd" else: row_tag = "even" if idx % 2 == 0 else "odd" child_id = self.tree.insert(selected, "end", text=node_text, open=False, tags=(row_tag,)) self.tree_folder_map[child_id] = node.folder_obj self.tree_path_map[child_id] = node.path try: if has_children: self._insert_dummy(child_id) except Exception: pass except Exception as exc: self.log(f"Error loading subfolders: {exc}") def on_tree_select(self, _event=None) -> None: selected = self.tree.selection() if not selected: return node_id = selected[0] folder_obj = self.tree_folder_map.get(node_id) self.selected_folder_obj = folder_obj folder_path = self.tree_path_map.get(node_id, self.tree.item(node_id, "text")) if folder_obj is None: self.selected_folder_display.set("No folder selected") self.btn_export.configure(state="disabled") return self.selected_folder_display.set(f"Selected: {folder_path}") self.btn_export.configure(state="normal") self.btn_load_headers.configure(state="normal") if folder_path != self.filter_loaded_folder_text: self.loaded_email_headers = [] self.filtered_email_headers = [] self.filter_tree_key_map = {} self.filter_loaded_folder_text = "" self.filter_loaded_folder_var.set(f"Selected folder: {folder_path}") self.filter_status_var.set("No emails loaded for the selected folder") self.refresh_filter_tree() self._update_filter_controls_state() self.maybe_show_empty_folder_server_alert(node_id, folder_obj) def maybe_show_empty_folder_server_alert(self, node_id: str, folder_obj: object) -> None: if self.tree.parent(node_id) == "": return if node_id in self.empty_folder_alert_shown: return diagnostics = OutlookService.folder_item_diagnostics(folder_obj) mail_count = OutlookService.count_mail_items(folder_obj) if mail_count != 0: return self.empty_folder_alert_shown.add(node_id) self.log_empty_folder_diagnostics(diagnostics) self.show_empty_folder_server_alert() def log_empty_folder_diagnostics(self, diagnostics: dict) -> None: self.log("Empty folder diagnostics via Outlook COM:") self.log(f" folder_name: {diagnostics.get('folder_name')}") self.log(f" folder_path: {diagnostics.get('folder_path')}") self.log(f" raw Items.Count: {diagnostics.get('raw_count')}") self.log(f" Restrict IPM.Note Count: {diagnostics.get('restricted_count')}") self.log(f" items_error: {diagnostics.get('items_error')}") self.log(f" restrict_error: {diagnostics.get('restrict_error')}") self.log(f" store_id: {diagnostics.get('store_id')}") def show_empty_folder_server_alert(self) -> None: dialog = ctk.CTkToplevel(self) dialog.title("Empty Outlook folder") dialog.geometry("720x430") dialog.minsize(640, 380) dialog.transient(self) dialog.grab_set() dialog.grid_columnconfigure(0, weight=0) dialog.grid_columnconfigure(1, weight=1) dialog.grid_rowconfigure(1, weight=1) alert = ctk.CTkLabel( dialog, text="!", width=72, height=72, corner_radius=36, fg_color="#b45309", text_color="white", font=ctk.CTkFont(size=42, weight="bold"), ) alert.grid(row=0, column=0, rowspan=2, sticky="n", padx=(24, 16), pady=(24, 12)) ctk.CTkLabel( dialog, text="The selected folder appears to be empty", font=ctk.CTkFont(size=22, weight="bold"), anchor="w", ).grid(row=0, column=1, sticky="ew", padx=(0, 24), pady=(26, 8)) message = ( "The selected folder looks empty, but there may still be emails on the Outlook server " "that are not yet synchronized locally.\n\n" "To synchronize, go to:\n\n" "File -> Account settings -> Sync settings and Account -> All\n\n" "Then wait for synchronization to complete and try the export again.\n\n" "Note: if Outlook shows a link such as 'there are more items on the server', the program can " "only see emails that are already synchronized locally." ) ctk.CTkLabel(dialog, text=message, justify="left", anchor="nw", wraplength=560).grid( row=1, column=1, sticky="nsew", padx=(0, 24), pady=(0, 16) ) buttons = ctk.CTkFrame(dialog, fg_color="transparent") buttons.grid(row=2, column=0, columnspan=2, sticky="ew", padx=24, pady=(0, 24)) buttons.grid_columnconfigure(0, weight=1) buttons.grid_columnconfigure(1, weight=0) buttons.grid_columnconfigure(2, weight=0) def open_outlook() -> None: try: os.startfile("outlook") except Exception as exc: messagebox.showerror(APP_TITLE, f"Unable to open Outlook.\n\n{exc}") ctk.CTkButton(buttons, text="Open Outlook", width=140, command=open_outlook).grid( row=0, column=1, sticky="e", padx=(0, 12) ) ctk.CTkButton( buttons, text="Close", width=120, command=dialog.destroy, fg_color="#5f6368", hover_color="#4b4f52", ).grid(row=0, column=2, sticky="e") dialog.protocol("WM_DELETE_WINDOW", dialog.destroy) dialog.after(100, dialog.focus_force) def expand_all(self) -> None: def _expand(node_id: str) -> None: self.tree.item(node_id, open=True) self.tree.focus(node_id) self.on_tree_open() for child in self.tree.get_children(node_id): if self.tree.item(child, "text") != "__dummy__": _expand(child) for root in self.tree.get_children(""): _expand(root) self.log("Tree expansion completed.") def choose_output_dir(self) -> None: selected = filedialog.askdirectory(initialdir=self.output_dir_var.get() or str(Path.home())) if selected: self.output_dir_var.set(selected) self.prefs.set_output_dir(selected) def load_folder_for_filtering(self) -> None: if self.selected_folder_obj is None: messagebox.showwarning(APP_TITLE, "Select an Outlook folder first.") return if self.filter_thread is not None and self.filter_thread.is_alive(): messagebox.showwarning(APP_TITLE, "An email loading task is already in progress.") return folder_text = self.selected_folder_display.get().replace("Selected: ", "") self.set_busy_overlay( True, "Please wait... loading many emails may take a few minutes", f"Loading email headers for:\n{folder_text}", ) self._set_controls_enabled(False) self.filter_status_var.set("Loading in progress...") output_dir = self.output_dir_var.get().strip() self.filter_thread = threading.Thread( target=self._run_filter_load_worker, args=(self._marshal_folder_for_worker(self.selected_folder_obj), folder_text, output_dir), daemon=True, ) self.filter_thread.start() @staticmethod def _format_elapsed(seconds: float) -> str: total_seconds = max(0, int(round(seconds))) minutes, secs = divmod(total_seconds, 60) return f"{minutes}m {secs:02d}s" def on_export_mode_change(self, selected_mode: str) -> None: if selected_mode == "zip": self.create_zip_var.set(1) self.create_hierarchy_var.set(0) else: self.create_zip_var.set(0) self.create_hierarchy_var.set(1) if not self.create_zip_var.get() and not self.create_hierarchy_var.get(): self.create_zip_var.set(1) def _build_export_options(self, export_mode: str) -> ExportOptions: if export_mode == EXPORT_MODE_WITH_ATTACHMENTS: return ExportOptions(mode=export_mode, only_with_attachments=True) if export_mode == EXPORT_MODE_FILTER: return ExportOptions(mode=export_mode, only_with_attachments=False) return ExportOptions(mode=EXPORT_MODE_ALL, only_with_attachments=False) def _prepare_manifest_for_export( self, output_root: Path, relative_path: str, folder_text: str, zip_mode: bool, ) -> bool: manifest = ExportManifest(output_root) folder_key = folder_text scope = manifest.get_export_scope(folder_key) expected_path = output_root / relative_path record_count = manifest.folder_record_count(folder_key) self.log( "Pre-export manifest check: " f"folder_key={folder_key} | scope={'yes' if scope is not None else 'no'} | " f"record_count={record_count} | expected_path={expected_path}" ) if scope is None: if record_count <= 0: return True last_path = expected_path target_label = relative_path or folder_text else: last_path_text = str(scope.get("lp") or "").strip() last_path = Path(last_path_text) if last_path_text else expected_path target_label = scope.get("rp") or relative_path or folder_text self.log(f"Checking previous export path: {last_path}") if last_path.exists(): return True recreate = messagebox.askyesno( APP_TITLE, f"The previous export for the folder\n\n" f"\"{target_label}\"\n\n" "is no longer present on disk.\n\n" "Do you want to recreate it?", ) if not recreate: return False removed = manifest.clear_folder(folder_key) manifest.save() self.log( f"Manifest cleaned to recreate the export for '{target_label}'. " f"Removed records: {removed}" ) return True def _export_mode_label(self, export_mode: str) -> str: labels = { EXPORT_MODE_ALL: "Export all", EXPORT_MODE_WITH_ATTACHMENTS: "Export emails with attachments only", EXPORT_MODE_FILTER: "Export preview", } return labels.get(export_mode, export_mode) def start_export( self, export_mode: str = EXPORT_MODE_ALL, selected_keys: Optional[set[str]] = None, selected_entry_refs: Optional[list[tuple[str, str]]] = None, ) -> None: if self.export_thread is not None and self.export_thread.is_alive(): messagebox.showwarning(APP_TITLE, "An export is already in progress.") return if self.selected_folder_obj is None: messagebox.showwarning(APP_TITLE, "Select an Outlook folder first.") return output_dir = self.output_dir_var.get().strip() if not output_dir: messagebox.showwarning(APP_TITLE, "Choose a valid output folder.") return output_root = Path(output_dir) try: output_root.mkdir(parents=True, exist_ok=True) except Exception as exc: messagebox.showerror(APP_TITLE, f"Unable to create the output folder.\n\n{exc}") return self.prefs.set_output_dir(str(output_root)) options = self._build_export_options(export_mode) if selected_keys is not None: if not selected_keys: messagebox.showwarning(APP_TITLE, "Select at least one email from the filtered list.") return options.selected_keys = set(selected_keys) if selected_entry_refs is not None: if not selected_entry_refs: messagebox.showwarning(APP_TITLE, "Select at least one email from the filtered list.") return options.selected_entry_refs = list(selected_entry_refs) folder_text = self.selected_folder_display.get().replace("Selected: ", "") options.folder_key = folder_text relative_path = NameCodec.folder_name_from_outlook_path(folder_text) zip_mode = bool(self.create_zip_var.get()) mode_label = self._export_mode_label(export_mode) if not self._prepare_manifest_for_export(output_root, relative_path, folder_text, zip_mode): return if zip_mode: confirm_message = ( f"Mode: {mode_label}\n\n" "A ZIP archive will be created inside the output folder.\n" "The temporary folder structure used to prepare the archive will be removed at the end.\n\n" "Do you want to continue?" ) else: confirm_message = ( f"Mode: {mode_label}\n\n" "A folder hierarchy will be created in the output folder.\n\n" "Do you want to continue?" ) if not messagebox.askyesno(APP_TITLE, confirm_message): return self._set_export_controls_running() self.exporter = Exporter(self) self.log("=" * 80) self.log(f"Starting export: {folder_text}") self.log(f"Action: {mode_label}") self.log(f"Output mode: {'ZIP' if zip_mode else 'Folder hierarchy'}") self.set_status("Export in progress...") self.export_thread = threading.Thread( target=self._run_export_worker, args=( self._marshal_folder_for_worker(self.selected_folder_obj), output_root, relative_path, folder_text, zip_mode, options, ), daemon=True, ) self.export_thread.start() def cancel_export(self) -> None: self.exporter.cancel() self.set_status("Cancellation requested...") self.log("Cancellation request sent.") def _set_controls_enabled(self, enabled: bool) -> None: state = "normal" if enabled else "disabled" export_state = state if enabled and self.selected_folder_obj else "disabled" self.btn_export.configure(state=export_state) self.btn_refresh.configure(state=state if self.connected else "disabled") self.btn_connect.configure(state=state) self.btn_expand.configure(state=state if self.connected else "disabled") self.btn_browse.configure(state=state) self.btn_exit.configure(state=state) self.chk_zip.configure(state=state) self.chk_hierarchy.configure(state=state) self.btn_status_log.configure(state=state) if enabled: self._update_filter_controls_state() else: self.btn_load_headers.configure(state="disabled") self.entry_filter_subject.configure(state="disabled") self.entry_filter_sender.configure(state="disabled") self.combo_filter_date_range.configure(state="disabled") self.chk_filter_export_new.configure(state="disabled") self.chk_filter_export_with_attachments.configure(state="disabled") self.chk_filter_export_selected.configure(state="disabled") self.btn_apply_filters.configure(state="disabled") self.btn_reset_filters.configure(state="disabled") self.btn_export_preview.configure(state="disabled") def _set_export_controls_running(self) -> None: self._set_controls_enabled(False) self.btn_cancel.configure(state="normal") self.set_busy_overlay( True, "Please wait... exporting many emails may take a few minutes", "Export in progress...", ) def _restore_export_controls(self) -> None: self._set_controls_enabled(True) self.btn_cancel.configure(state="disabled") self.set_busy_overlay(False) def _run_export_worker( self, folder_ref: object, output_root: Path, relative_path: str, folder_text: str, zip_mode: bool, options: ExportOptions, ) -> None: com_initialized = False started_at = datetime.now() try: if pythoncom is not None: pythoncom.CoInitialize() com_initialized = True manifest = ExportManifest(output_root) self.exporter.configure(manifest, options) self.log(f"Manifest export: {manifest.path}") if zip_mode: with tempfile.TemporaryDirectory(prefix="outlook_export_") as temp_dir: temp_root = Path(temp_dir) temp_export_target = temp_root / relative_path self.log(f"Destinazione temporanea: {temp_export_target}") if options.selected_entry_refs is not None: namespace = OutlookService.get_namespace_for_worker() mails, attachments, skipped = self.exporter.export_selected_items( namespace, options.selected_entry_refs, temp_export_target, ) else: folder_obj = self._resolve_worker_folder(folder_ref) mails, attachments, skipped = self.exporter.export_folder(folder_obj, temp_export_target) self.exporter.check_cancel() zip_path = self._create_zip_archive(temp_root, output_root, folder_text) result_path = zip_path else: export_target = output_root / relative_path self.log(f"Destinazione: {export_target}") if options.selected_entry_refs is not None: namespace = OutlookService.get_namespace_for_worker() mails, attachments, skipped = self.exporter.export_selected_items( namespace, options.selected_entry_refs, export_target, ) else: folder_obj = self._resolve_worker_folder(folder_ref) mails, attachments, skipped = self.exporter.export_folder(folder_obj, export_target) result_path = export_target manifest.update_export_scope(options.folder_key, relative_path, result_path, zip_mode) manifest.save() elapsed_seconds = (datetime.now() - started_at).total_seconds() self.ui_queue.put(("complete", mails, attachments, skipped, result_path, zip_mode, options.mode, elapsed_seconds)) except ExportCancelled: self.ui_queue.put(("cancelled",)) except Exception as exc: self.ui_queue.put(("error", str(exc), traceback.format_exc())) finally: if com_initialized: try: pythoncom.CoUninitialize() except Exception: pass def _run_filter_load_worker(self, folder_ref: object, folder_text: str, output_dir: str) -> None: com_initialized = False started_at = datetime.now() try: if pythoncom is not None: pythoncom.CoInitialize() com_initialized = True folder_obj = self._resolve_worker_folder(folder_ref) headers = self._load_email_headers(folder_obj, output_dir) elapsed_seconds = (datetime.now() - started_at).total_seconds() self.ui_queue.put(("filter_loaded", folder_text, headers, elapsed_seconds)) except Exception as exc: self.ui_queue.put(("filter_error", str(exc), traceback.format_exc())) finally: if com_initialized: try: pythoncom.CoUninitialize() except Exception: pass def _load_email_headers(self, folder_obj: object, output_dir: str) -> List[EmailHeader]: headers: List[EmailHeader] = [] output_root = Path(output_dir) if output_dir else None manifest = ExportManifest(output_root) if output_root and output_root.exists() else None items = getattr(folder_obj, "Items", None) if items is None: return headers try: items.Sort("[ReceivedTime]", True) except Exception: pass try: count = items.Count except Exception: count = 0 for idx in range(1, count + 1): try: item = items.Item(idx) except Exception: continue try: if getattr(item, "Class", None) != MAIL_CLASS: continue except Exception: continue subject_raw = getattr(item, "Subject", "") or "" subject = Exporter._safe_str(subject_raw, max_len=180) sender = Exporter._safe_str( getattr(item, "SenderName", None) or getattr(item, "SenderEmailAddress", None) or "", max_len=120, ) entry_id = Exporter._safe_full_str(getattr(item, "EntryID", None)) parent = getattr(item, "Parent", None) store_id = Exporter._safe_full_str(getattr(parent, "StoreID", None) if parent is not None else None) received_dt = Exporter._convert_outlook_datetime(getattr(item, "ReceivedTime", None)) if received_dt is None: received_dt = Exporter._convert_outlook_datetime(getattr(item, "SentOn", None)) received_text = received_dt.strftime("%Y-%m-%d %H:%M:%S") if received_dt else "" attachment_count, attachment_types = self._collect_preview_attachment_info_minimal(item) key = self._build_manifest_key_for_item(item, Exporter._safe_str(subject_raw), received_text) exported = bool(manifest and manifest.has_record(key) and manifest.record_file_exists(key)) headers.append( EmailHeader( key=key, entry_id=entry_id, store_id=store_id, received_time=received_text, sender=sender, subject=subject, attachment_count=attachment_count, attachment_types=attachment_types, exported=exported, ) ) if idx % 100 == 0: self.ui_queue.put( ( "status", f"Loading emails... {idx}/{count} | found {len(headers)}", ) ) return headers def _refresh_loaded_header_export_flags(self) -> None: if not self.loaded_email_headers: return output_dir = self.output_dir_var.get().strip() if not output_dir: return output_root = Path(output_dir) if not output_root.exists(): return manifest = ExportManifest(output_root) changed = False for header in self.loaded_email_headers: exported = bool(manifest.has_record(header.key) and manifest.record_file_exists(header.key)) if header.exported != exported: header.exported = exported changed = True if changed: self.apply_email_filters() def _collect_preview_attachment_info(self, item: object, html_body: str) -> Tuple[int, str]: attachments = getattr(item, "Attachments", None) if attachments is None: return 0, "" try: attachment_total = attachments.Count except Exception: return 0, "" exported_exts: List[str] = [] for att_index in range(1, attachment_total + 1): try: attachment = attachments.Item(att_index) except Exception: continue original_filename = Exporter._safe_str(getattr(attachment, "FileName", None) or f"attachment_{att_index}") inline_score, _ = self.exporter._signature_image_score(attachment, original_filename, html_body) if inline_score >= 3: continue if not NameCodec.is_allowed_attachment(original_filename): continue ext = NameCodec.sanitize_extension(os.path.splitext(original_filename or "")[1]).lstrip(".") if ext: exported_exts.append(ext.lower()) if not exported_exts: return 0, "" counts: dict[str, int] = {} ordered_exts: List[str] = [] for ext in exported_exts: if ext not in counts: counts[ext] = 0 ordered_exts.append(ext) counts[ext] += 1 summary = ", ".join(f"{ext}({counts[ext]})" for ext in ordered_exts) return len(exported_exts), summary @staticmethod def _collect_preview_attachment_info_minimal(item: object) -> Tuple[int, str]: attachments = getattr(item, "Attachments", None) if attachments is None: return 0, "" try: attachment_total = int(attachments.Count or 0) except Exception: return 0, "" if attachment_total <= 0: return 0, "" return attachment_total, "" def _build_manifest_key_for_item(self, item: object, subject: str, received_text: str) -> str: entry_id = Exporter._safe_full_str(getattr(item, "EntryID", None)) parent = getattr(item, "Parent", None) store_id = Exporter._safe_full_str(getattr(parent, "StoreID", None) if parent is not None else None) return Exporter._build_manifest_key_from_ids(store_id, entry_id) @staticmethod def _get_mail_mapi_property(item: object) -> object: try: accessor = item.PropertyAccessor return accessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x1035001F") except Exception: return None @staticmethod def _marshal_folder_for_worker(folder_obj: object) -> object: if pythoncom is None: return folder_obj try: ole_obj = getattr(folder_obj, "_oleobj_", None) if ole_obj is None: return folder_obj return pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, ole_obj) except Exception: return folder_obj @staticmethod def _resolve_worker_folder(folder_ref: object) -> object: if pythoncom is None or win32com is None: return folder_ref try: dispatch = pythoncom.CoGetInterfaceAndReleaseStream(folder_ref, pythoncom.IID_IDispatch) return win32com.client.Dispatch(dispatch) except Exception: return folder_ref def _create_zip_archive(self, source_root: Path, output_root: Path, folder_text: str) -> Path: stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") base_name = NameCodec.archive_base_name_from_outlook_path(folder_text) zip_path = Exporter._unique_path(output_root / f"{base_name}_{stamp}.zip") self.log(f"Creating ZIP archive: {zip_path}") with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: for path in source_root.rglob("*"): self.exporter.check_cancel() if path.is_file(): archive.write(path, path.relative_to(source_root).as_posix()) return zip_path def process_ui_queue(self) -> None: if self.shutting_down: self.ui_queue_after_id = None return try: while True: event = self.ui_queue.get_nowait() event_type = event[0] if event_type == "log": self._write_log(event[1]) elif event_type == "status": self.status_var.set(event[1]) if self.busy_overlay_visible: self.busy_detail_var.set(event[1]) elif event_type == "complete": _, mails, attachments, skipped, result_path, zip_mode, export_mode, elapsed_seconds = event self.last_export_duration_seconds = elapsed_seconds elapsed_text = self._format_elapsed(elapsed_seconds) self.set_status( f"Export completed | emails: {mails} | attachments: {attachments} | skipped: {skipped} | time: {elapsed_text}" ) self.log( f"Export completed ({self._export_mode_label(export_mode)}). " f"Emails: {mails}, Attachments: {attachments}, Skipped: {skipped}, Time: {elapsed_text}" ) self.log( "Export diagnostics summary: " f"skipped_from_manifest={self.exporter.skipped_manifest_count}, " f"skipped_without_attachments={self.exporter.skipped_no_attachments_count}, " f"non_mail_items={self.exporter.non_mail_item_count}" ) self._refresh_loaded_header_export_flags() self._restore_export_controls() self._flush_file_log() self.show_export_complete_dialog(mails, attachments, skipped, result_path, zip_mode, export_mode, elapsed_text) elif event_type == "cancelled": self.set_status("Export cancelled") self.log("Export cancelled by the user.") self._restore_export_controls() self._flush_file_log() messagebox.showinfo(APP_TITLE, "Export cancelled.") elif event_type == "filter_loaded": _, folder_text, headers, elapsed_seconds = event self.filter_thread = None self.last_filter_load_duration_seconds = elapsed_seconds elapsed_text = self._format_elapsed(elapsed_seconds) self.loaded_email_headers = headers self.filtered_email_headers = list(headers) self.filter_loaded_folder_text = folder_text self.filter_loaded_folder_var.set(f"Loaded folder: {folder_text}") self.filter_status_var.set( f"Loaded emails: {len(headers)} | time: {elapsed_text} | folder: {folder_text}" ) self.set_status(f"Loading completed | emails: {len(headers)} | time: {elapsed_text}") self.log(f"Preview loaded: {len(headers)} emails | folder: {folder_text} | time: {elapsed_text}") self.set_busy_overlay(False) self._set_controls_enabled(True) self.apply_email_filters() self._flush_file_log() elif event_type == "filter_error": _, message, details = event self.filter_thread = None self.set_status("Error while loading emails") self.log(f"Email loading error: {message}") self.log(details) self.set_busy_overlay(False) self._set_controls_enabled(True) self._flush_file_log() messagebox.showerror(APP_TITLE, f"Error while loading emails.\n\n{message}") elif event_type == "error": _, message, details = event self.set_status("Error during export") self.log(f"Export error: {message}") self.log(details) self._restore_export_controls() self._flush_file_log() messagebox.showerror(APP_TITLE, f"Error during export.\n\n{message}") except queue.Empty: pass finally: if not self.shutting_down: self.ui_queue_after_id = self.after(100, self.process_ui_queue) else: self.ui_queue_after_id = None def show_export_complete_dialog( self, mails: int, attachments: int, skipped: int, export_target: Path, zip_mode: bool, export_mode: str, elapsed_text: str, ) -> None: dialog = ctk.CTkToplevel(self) dialog.title("Export completed") dialog.geometry("520x260") dialog.resizable(False, False) dialog.transient(self) dialog.grab_set() dialog.grid_columnconfigure(0, weight=1) dialog.grid_rowconfigure(1, weight=1) ctk.CTkLabel( dialog, text="Export completed", font=ctk.CTkFont(size=20, weight="bold"), ).grid(row=0, column=0, sticky="w", padx=20, pady=(20, 8)) summary = ( f"Action: {self._export_mode_label(export_mode)}\n" f"Exported emails: {mails}\n" f"Exported attachments: {attachments}\n\n" f"Skipped emails: {skipped}\n\n" f"Elapsed time: {elapsed_text}\n\n" f"Skipped from manifest: {self.exporter.skipped_manifest_count}\n" f"Skipped without attachments: {self.exporter.skipped_no_attachments_count}\n" f"Non-mail items detected: {self.exporter.non_mail_item_count}\n\n" f"{'ZIP archive' if zip_mode else 'Folder'}:\n{export_target}" ) summary_box = ctk.CTkTextbox(dialog, wrap="word") summary_box.grid(row=1, column=0, sticky="nsew", padx=20, pady=(0, 16)) summary_box.insert("1.0", summary) summary_box.configure(state="disabled") buttons = ctk.CTkFrame(dialog, fg_color="transparent") buttons.grid(row=2, column=0, sticky="ew", padx=20, pady=(0, 20)) buttons.grid_columnconfigure(0, weight=1) buttons.grid_columnconfigure(1, weight=1) buttons.grid_columnconfigure(2, weight=1) def copy_summary() -> None: try: self.clipboard_clear() self.clipboard_append(summary) except Exception: pass def open_export_folder() -> None: try: target = export_target.parent if zip_mode else export_target os.startfile(str(target)) except Exception as exc: messagebox.showerror(APP_TITLE, f"Unable to open the export destination.\n\n{exc}") finally: dialog.destroy() ctk.CTkButton( buttons, text="Copy report", command=copy_summary, height=38, ).grid(row=0, column=0, sticky="ew", padx=(0, 8)) ctk.CTkButton( buttons, text="Open zip folder" if zip_mode else "Open export folder", command=open_export_folder, height=38, ).grid(row=0, column=1, sticky="ew", padx=8) ctk.CTkButton( buttons, text="Close", command=dialog.destroy, height=38, fg_color="#5f6368", hover_color="#4b4f52", ).grid(row=0, column=2, sticky="ew", padx=(8, 0)) dialog.protocol("WM_DELETE_WINDOW", dialog.destroy) dialog.after(100, dialog.focus_force) def set_busy_overlay(self, visible: bool, title: str = "", detail: str = "") -> None: self.busy_overlay_visible = visible if visible: self.busy_title_var.set(title or "Please wait...") self.busy_detail_var.set(detail or "") self.busy_overlay.place(relx=0.5, rely=0.52, anchor="center", relwidth=0.42) self.busy_overlay.lift() try: self.busy_progress.start() except Exception: pass else: try: self.busy_progress.stop() except Exception: pass self.busy_overlay.place_forget() def apply_email_filters(self, autosize: bool = True) -> None: if not self.loaded_email_headers: self.filtered_email_headers = [] self.refresh_filter_tree(autosize=autosize) return subject_filter = self.filter_subject_var.get().strip().lower() sender_filter = self.filter_sender_var.get().strip().lower() date_range = self.filter_date_range_var.get().strip() only_new = bool(self.filter_export_new_var.get()) only_with_attachments = bool(self.filter_export_with_attachments_var.get()) only_selected = bool(self.filter_export_selected_var.get()) selected_keys: set[str] = set() if only_selected and self.filter_tree is not None: selected_keys = { self.filter_tree_key_map[item_id] for item_id in self.filter_tree.selection() if item_id in self.filter_tree_key_map } filtered: List[EmailHeader] = [] for header in self.loaded_email_headers: if subject_filter and subject_filter not in header.subject.lower(): continue if sender_filter and sender_filter not in header.sender.lower(): continue if date_range and not self._header_matches_date_range(header, date_range): continue if only_new and header.exported: continue if only_with_attachments and header.attachment_count <= 0: continue if only_selected and header.key not in selected_keys: continue filtered.append(header) self.filtered_email_headers = filtered self.refresh_filter_tree(autosize=autosize) def reset_email_filters(self) -> None: if self.filter_live_after_id is not None: try: self.after_cancel(self.filter_live_after_id) except Exception: pass self.filter_live_after_id = None self.filter_subject_var.set("") self.filter_sender_var.set("") self.filter_date_range_var.set("") self.filter_export_new_var.set(0) self.filter_export_with_attachments_var.set(0) self.filter_export_selected_var.set(0) self.apply_email_filters() def _on_live_filter_input(self, _event=None) -> None: self.schedule_live_filter() def _on_filter_tree_select(self, _event=None) -> None: self._update_filter_controls_state() if self.filter_export_selected_var.get(): self.schedule_live_filter() def schedule_live_filter(self) -> None: if not self.loaded_email_headers: return if self.filter_live_after_id is not None: try: self.after_cancel(self.filter_live_after_id) except Exception: pass self.filter_live_after_id = None self.filter_live_after_id = self.after(self.filter_live_delay_ms, self._apply_live_filters) def _apply_live_filters(self) -> None: self.filter_live_after_id = None self.apply_email_filters(autosize=False) @staticmethod def _header_matches_date_range(header: EmailHeader, date_range: str) -> bool: received_text = (header.received_time or "").strip() if not received_text: return False try: received_dt = datetime.strptime(received_text, "%Y-%m-%d %H:%M:%S") except Exception: return False now = datetime.now() range_days = { "Last week": 7, "Last month": 30, "Last 6 months": 183, "Last year": 365, } days = range_days.get(date_range) if days is None: return True return received_dt >= now - timedelta(days=days) def _get_export_headers_from_preview(self) -> List[EmailHeader]: return list(self.filtered_email_headers) def refresh_filter_tree(self, autosize: bool = True) -> None: if self.filter_tree is None: return self.filter_tree.delete(*self.filter_tree.get_children()) self.filter_tree_key_map = {} for idx, header in enumerate(self.filtered_email_headers, start=1): state = "Exported" if header.exported else "New" item_id = f"mail_{idx}" row_tag = "even" if idx % 2 == 0 else "odd" self.filter_tree.insert( "", "end", iid=item_id, tags=(row_tag,), values=( header.received_time, header.sender, header.subject, header.attachment_count, header.attachment_types, state, ), ) self.filter_tree_key_map[item_id] = header.key if autosize: self._autosize_preview_columns() exported = sum(1 for item in self.filtered_email_headers if item.exported) self.filter_status_var.set( f"Visible emails: {len(self.filtered_email_headers)} | already exported: {exported} | new: {len(self.filtered_email_headers) - exported}" ) self._update_filter_controls_state() def _autosize_preview_columns(self) -> None: if self.filter_tree is None: return try: body_font = tkfont.nametofont("TkDefaultFont") except Exception: body_font = None try: heading_font = tkfont.nametofont("TkHeadingFont") except Exception: heading_font = body_font column_limits = { "date": (110, 190), "sender": (140, 360), "subject": (180, 900), "attachments": (32, 60), "types": (90, 260), "state": (120, 180), } sample_size = 400 for column_id in ("date", "sender", "subject", "attachments", "types", "state"): header_text = self.filter_tree.heading(column_id, "text") or "" if heading_font is not None: max_width = heading_font.measure(str(header_text)) else: max_width = max(40, len(str(header_text)) * 8) children = self.filter_tree.get_children("") for item_id in children[:sample_size]: values = self.filter_tree.item(item_id, "values") column_index = self.filter_tree["columns"].index(column_id) if column_index >= len(values): continue value_text = str(values[column_index]) width = body_font.measure(value_text) if body_font is not None else max(20, len(value_text) * 8) if width > max_width: max_width = width min_width, max_allowed = column_limits[column_id] final_width = max(min_width, min(max_allowed, max_width + 24)) self.filter_tree.column(column_id, width=final_width, minwidth=min_width) def export_preview_subset(self) -> None: headers_to_export = self._get_export_headers_from_preview() if not headers_to_export: messagebox.showwarning(APP_TITLE, "There are no emails in the current preview to export.") return selected_keys = {header.key for header in headers_to_export} selected_refs = [] for header in headers_to_export: if not header.entry_id: continue selected_refs.append((header.store_id, header.entry_id)) self.start_export(EXPORT_MODE_FILTER, selected_keys=selected_keys, selected_entry_refs=selected_refs) def open_help_window(self) -> None: if self.help_window is not None: try: if self.help_window.winfo_exists(): self.help_window.deiconify() self.help_window.lift() self.help_window.attributes("-topmost", True) self.help_window.after(200, lambda: self.help_window.attributes("-topmost", False)) self.help_window.update_idletasks() self.help_window.focus_force() return except Exception: pass dialog = ctk.CTkToplevel(self) dialog.withdraw() dialog.title(HELP_WINDOW_TITLE) dialog.geometry(self._suggest_log_window_geometry()) dialog.minsize(560, 440) self.help_window = dialog dialog.grid_columnconfigure(0, weight=1) dialog.grid_rowconfigure(1, weight=1) ctk.CTkLabel( dialog, text="Quick help", font=ctk.CTkFont(size=20, weight="bold"), anchor="w", ).grid(row=0, column=0, sticky="ew", padx=14, pady=(14, 8)) self.help_window_box = ctk.CTkTextbox(dialog, wrap="word") self.help_window_box.grid(row=1, column=0, sticky="nsew", padx=14, pady=(0, 14)) self.help_window_box.insert("1.0", HELP_TEXT) self.help_window_box.configure(state="disabled") self.help_window_box.see("1.0") dialog.protocol("WM_DELETE_WINDOW", self.close_help_window) dialog.deiconify() dialog.lift() dialog.attributes("-topmost", True) dialog.after(200, lambda: dialog.attributes("-topmost", False)) dialog.after(100, dialog.focus_force) def close_help_window(self) -> None: if self.help_window is None: return try: self.help_window.destroy() except Exception: pass self.help_window = None self.help_window_box = None def log(self, message: str) -> None: if threading.get_ident() != self.main_thread_id: self.ui_queue.put(("log", message)) return self._write_log(message) def _init_file_log(self) -> None: try: self.log_file_path.parent.mkdir(parents=True, exist_ok=True) header = ( f"MailExporter log started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" f"Log file: {self.log_file_path}\n" "=" * 80 + "\n" ) self.log_file_path.write_text(header, encoding="utf-8", errors="replace") except Exception: pass def _write_log(self, message: str) -> None: timestamp = datetime.now().strftime("%H:%M:%S") line = f"[{timestamp}] {message}\n" self._append_file_log(line) self.log_lines.append(line) if len(self.log_lines) > MAX_LOG_LINES: self.log_lines = self.log_lines[-MAX_LOG_LINES:] if self.log_window_box is not None: try: self.log_window_box.configure(state="normal") self.log_window_box.delete("1.0", "end") self.log_window_box.insert("1.0", "".join(self.log_lines)) self.log_window_box.see("end") self.log_window_box.configure(state="disabled") except Exception: self.log_window_box = None self.pump_ui() def open_log_window(self) -> None: if self.log_window is not None: try: if self.log_window.winfo_exists(): self.log_window.deiconify() self.log_window.lift() self.log_window.attributes("-topmost", True) self.log_window.after(200, lambda: self.log_window.attributes("-topmost", False)) self.log_window.update_idletasks() self.log_window.focus_force() if hasattr(self, "btn_status_log"): self.btn_status_log.configure(text="Close log") return except Exception: pass dialog = ctk.CTkToplevel(self) dialog.withdraw() dialog.title("Export log") dialog.geometry(self._suggest_log_window_geometry()) dialog.minsize(520, 400) self.log_window = dialog if hasattr(self, "btn_status_log"): self.btn_status_log.configure(text="Close log") dialog.grid_columnconfigure(0, weight=1) dialog.grid_rowconfigure(0, weight=1) self.log_window_box = ctk.CTkTextbox(dialog, wrap="word") self.log_window_box.grid(row=0, column=0, sticky="nsew", padx=12, pady=12) self.log_window_box.insert("1.0", "".join(self.log_lines)) self.log_window_box.configure(state="disabled") self.log_window_box.see("end") dialog.protocol("WM_DELETE_WINDOW", self.close_log_window) dialog.deiconify() dialog.lift() dialog.attributes("-topmost", True) dialog.after(200, lambda: dialog.attributes("-topmost", False)) dialog.after(50, dialog.deiconify) dialog.after(100, dialog.lift) dialog.after(150, dialog.focus_force) def toggle_log_window(self) -> None: if self.log_window is not None: try: if self.log_window.winfo_exists(): self.close_log_window() return except Exception: pass self.open_log_window() def close_log_window(self) -> None: if self.log_window is None: return try: self.log_window.destroy() except Exception: pass self.log_window = None self.log_window_box = None if hasattr(self, "btn_status_log"): try: self.btn_status_log.configure(text="Open log") except Exception: pass def _suggest_log_window_geometry(self) -> str: try: self.update_idletasks() width = 620 height = 760 screen_width = max(800, int(self.winfo_screenwidth() or 0)) screen_height = max(600, int(self.winfo_screenheight() or 0)) main_x = int(self.winfo_x()) main_y = int(self.winfo_y()) main_width = max(800, int(self.winfo_width() or 0)) preferred_x = main_x + main_width + 20 max_x = max(0, screen_width - width - 20) x = min(preferred_x, max_x) y = max(0, min(main_y, screen_height - height - 60)) return f"{width}x{height}+{x}+{y}" except Exception: return "620x760" def _update_filter_controls_state(self) -> None: has_selection = self.selected_folder_obj is not None has_headers = bool(self.loaded_email_headers) if hasattr(self, "btn_load_headers"): self.btn_load_headers.configure( state="normal" if has_selection and self.connected and self.filter_thread is None else "disabled" ) field_state = "normal" if has_headers else "disabled" button_state = "normal" if has_headers else "disabled" export_base_state = "normal" if has_selection and self.connected and self.filter_thread is None else "disabled" self.entry_filter_subject.configure(state=field_state) self.entry_filter_sender.configure(state=field_state) self.combo_filter_date_range.configure(state="readonly" if has_headers else "disabled") self.chk_filter_export_new.configure(state=field_state) self.chk_filter_export_with_attachments.configure(state=field_state) self.chk_filter_export_selected.configure(state=field_state) self.btn_apply_filters.configure(state=button_state) self.btn_reset_filters.configure(state=button_state) export_headers = self._get_export_headers_from_preview() if has_headers else [] export_ready = bool(export_headers) and export_base_state == "normal" self.btn_export_preview.configure(state="normal" if export_ready else "disabled") def _append_file_log(self, line: str) -> None: self.pending_file_log_lines.append(line) def _flush_file_log(self) -> None: if not self.pending_file_log_lines: return try: self.log_file_path.parent.mkdir(parents=True, exist_ok=True) with self.log_file_path.open("a", encoding="utf-8", errors="replace") as fh: fh.writelines(self.pending_file_log_lines) self.pending_file_log_lines.clear() except Exception: pass def set_status(self, message: str) -> None: if self.shutting_down: return if threading.get_ident() != self.main_thread_id: self.ui_queue.put(("status", message)) return self.status_var.set(message) if self.busy_overlay_visible: self.busy_detail_var.set(message) self.pump_ui() def pump_ui(self) -> None: if self.shutting_down: return if threading.get_ident() != self.main_thread_id: return try: self.update_idletasks() self.update() except Exception: pass def on_close(self) -> None: if self.shutting_down: return self.shutting_down = True try: if self.filter_live_after_id is not None: try: self.after_cancel(self.filter_live_after_id) except Exception: pass self.filter_live_after_id = None if self.ui_queue_after_id is not None: try: self.after_cancel(self.ui_queue_after_id) except Exception: pass self.ui_queue_after_id = None try: self.exporter.cancel() except Exception: pass try: self.busy_progress.stop() except Exception: pass try: self.set_busy_overlay(False) except Exception: pass self._flush_file_log() self.close_help_window() self.close_log_window() self.outlook_service.disconnect() finally: try: self.destroy() except Exception: pass if __name__ == "__main__": if sys.platform != "win32": raise SystemExit("This program runs on Windows only.") app = OutlookExporterApp() try: app.mainloop() except KeyboardInterrupt: try: app.on_close() except Exception: pass