Files
mail_exp/mail_exporter.py
2026-05-06 15:10:36 +02:00

3146 lines
125 KiB
Python

import json
import os
import queue
import re
import sys
import tempfile
import threading
import time
import traceback
import zipfile
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
import customtkinter as ctk
import tkinter as tk
from tkinter import filedialog, messagebox
import tkinter.font as tkfont
from tkinter import ttk
try:
import pythoncom
import win32timezone # required by pywin32 when Outlook returns timezone-aware COM dates
import win32com.client
except ImportError:
pythoncom = None
win32com = None
APP_TITLE = "Outlook email exporter ver. 1.0"
APP_GEOMETRY = "1220x760+0+0"
MAIL_CLASS = 43 # OlObjectClass.olMail
MAX_LOG_LINES = 3000
PREFS_APP_DIR = "OutlookExporter"
PREFS_FILE_NAME = "prefs.json"
LOG_FILE_NAME = "mail_exporter.log"
EXPORT_MANIFEST_NAME = "export_manifest.json"
EXPORT_MODE_ALL = "all"
EXPORT_MODE_WITH_ATTACHMENTS = "with_attachments"
EXPORT_MODE_FILTER = "filter"
OUTLOOK_NOT_RUNNING_MESSAGE = (
"Outlook does not appear to be open.\n\n"
"Open Microsoft Outlook with the email profile you want to export, wait for it to finish loading, "
"then minimize it and come back here to press 'Connect Outlook' again."
)
HELP_WINDOW_TITLE = "Help"
HELP_TEXT = """Outlook Email Exporter 1.0
Overview
- Connect to a running Outlook session.
- Select an Outlook folder from the tree.
- Choose an output folder.
- Use immediate export to export the selected folder and all subfolders.
- Use Load preview to inspect a folder before exporting a subset.
Preview and filters
- Sender and Subject filters narrow the visible preview.
- Period filters the preview by recent time ranges.
- New shows only emails not yet exported.
- With attachments shows only emails with Outlook attachments.
- Selected limits export to the rows currently selected in the preview.
Export modes
- Create zip builds a ZIP archive.
- Create folders on file system writes the export as folders and files.
- Export preview exports only the emails currently visible in the filtered preview.
Notes
- Outlook must already be installed and configured on this PC.
- Large folders can take a few minutes to load or export.
- The program uses an incremental manifest to avoid exporting the same email twice.
"""
ALLOWED_ATTACHMENT_EXTENSIONS = {
".pdf",
".doc",
".docx",
".docm",
".dot",
".dotx",
".dotm",
".xls",
".xlsx",
".xlsm",
".xlsb",
".xlt",
".xltx",
".xltm",
".csv",
".htm",
".html",
".ppt",
".pptx",
".pptm",
".pps",
".ppsx",
".ppsm",
".pot",
".potx",
".potm",
".rtf",
".fodp",
".fods",
".fodt",
".odb",
".odf",
".odg",
".odt",
".ods",
".odp",
".otg",
".oth",
".otp",
".ots",
".ott",
".sxc",
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".tif",
".tiff",
".webp",
}
IMAGE_ATTACHMENT_EXTENSIONS = {
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".tif",
".tiff",
".webp",
}
MAPI_ATTACHMENT_HIDDEN = "http://schemas.microsoft.com/mapi/proptag/0x7FFE000B"
MAPI_ATTACH_CONTENT_ID = (
"http://schemas.microsoft.com/mapi/proptag/0x3712001F",
"http://schemas.microsoft.com/mapi/proptag/0x3712001E",
)
MAPI_ATTACH_CONTENT_LOCATION = (
"http://schemas.microsoft.com/mapi/proptag/0x3713001F",
"http://schemas.microsoft.com/mapi/proptag/0x3713001E",
)
MAPI_ATTACH_CONTENT_DISPOSITION = (
"http://schemas.microsoft.com/mapi/proptag/0x3716001F",
"http://schemas.microsoft.com/mapi/proptag/0x3716001E",
)
MAPI_RENDERING_POSITION = "http://schemas.microsoft.com/mapi/proptag/0x370B0003"
MAPI_RENDERING_POSITION_NOT_RENDERED = -1
INLINE_ATTACHMENT_NAME_RE = re.compile(
r"^(image\d{3,}|att\d{5}|oledata|logo|spacer|facebook|linkedin|twitter|instagram|youtube)\b",
re.IGNORECASE,
)
@dataclass
class FolderNode:
store_name: str
path: str
folder_obj: object
display_name: Optional[str] = None
mail_count: Optional[int] = None
@dataclass
class ExportOptions:
mode: str = EXPORT_MODE_ALL
only_with_attachments: bool = False
folder_key: str = ""
selected_keys: Optional[set[str]] = None
selected_entry_refs: Optional[list[tuple[str, str]]] = None
@dataclass
class MailRecord:
key: str
entry_id: str
store_id: str
folder_key: str
received_time: str
txt_relative_path: str
attachment_count: int
exported_attachment_count: int
export_mode: str
subject: str = ""
internet_message_id: str = ""
status: str = "exported"
@dataclass
class EmailHeader:
key: str
entry_id: str
store_id: str
received_time: str
sender: str
subject: str
attachment_count: int
attachment_types: str
exported: bool
class ExportCancelled(Exception):
pass
class ToolTip:
def __init__(self, widget: object, text: str, delay_ms: int = 450) -> None:
self.widget = widget
self.text = text
self.delay_ms = delay_ms
self._after_id = None
self._tip_window = None
try:
self.widget.bind("<Enter>", self._on_enter, add="+")
self.widget.bind("<Leave>", self._on_leave, add="+")
self.widget.bind("<ButtonPress>", self._on_leave, add="+")
except Exception:
pass
def _on_enter(self, _event=None) -> None:
self._cancel_scheduled()
try:
self._after_id = self.widget.after(self.delay_ms, self._show)
except Exception:
self._after_id = None
def _on_leave(self, _event=None) -> None:
self._cancel_scheduled()
self._hide()
def _cancel_scheduled(self) -> None:
if self._after_id is not None:
try:
self.widget.after_cancel(self._after_id)
except Exception:
pass
self._after_id = None
def _show(self) -> None:
self._after_id = None
if self._tip_window is not None:
return
try:
if not self.widget.winfo_exists():
return
x = self.widget.winfo_rootx() + 18
y = self.widget.winfo_rooty() + self.widget.winfo_height() + 6
tip = tk.Toplevel(self.widget)
tip.wm_overrideredirect(True)
tip.wm_geometry(f"+{x}+{y}")
tip.attributes("-topmost", True)
label = tk.Label(
tip,
text=self.text,
justify="left",
background="#111827",
foreground="#f9fafb",
relief="solid",
borderwidth=1,
padx=8,
pady=4,
font=("Segoe UI", 9),
)
label.pack()
self._tip_window = tip
except Exception:
self._tip_window = None
def _hide(self) -> None:
if self._tip_window is not None:
try:
self._tip_window.destroy()
except Exception:
pass
self._tip_window = None
class ExportManifest:
def __init__(self, root_dir: Path) -> None:
self.root_dir = root_dir
self.path = root_dir / EXPORT_MANIFEST_NAME
self.data = self._load()
def _load(self) -> dict:
if not self.path.exists():
return {"version": 2, "records": {}, "exports": {}}
try:
with self.path.open("r", encoding="utf-8") as fh:
data = json.load(fh)
if not isinstance(data, dict):
return {"version": 2, "records": {}, "exports": {}}
records = data.get("records")
if not isinstance(records, dict):
data["records"] = {}
exports = data.get("exports")
if not isinstance(exports, dict):
data["exports"] = {}
data["version"] = 2
return data
except Exception:
return {"version": 2, "records": {}, "exports": {}}
def has_record(self, key: str) -> bool:
return bool(self.data.get("records", {}).get(key))
def get_record(self, key: str) -> Optional[dict]:
return self.data.get("records", {}).get(key)
def record_file_exists(self, key: str) -> bool:
record = self.get_record(key)
if not isinstance(record, dict):
return False
rel_path = str(record.get("p") or "").strip()
if not rel_path:
return False
try:
return (self.root_dir / Path(rel_path)).exists()
except Exception:
return False
def remove_record(self, key: str) -> None:
self.data.get("records", {}).pop(key, None)
def update_record(self, record: MailRecord) -> None:
self.data.setdefault("records", {})[record.key] = {
"eid": record.entry_id,
"sid": record.store_id,
"f": record.folder_key,
"r": record.received_time,
"p": record.txt_relative_path,
"a": record.attachment_count,
"ea": record.exported_attachment_count,
"m": record.export_mode,
"subject": record.subject,
"imid": record.internet_message_id,
"s": record.status,
"u": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
def get_export_scope(self, folder_key: str) -> Optional[dict]:
return self.data.get("exports", {}).get(folder_key)
def update_export_scope(self, folder_key: str, relative_path: str, result_path: Path, zip_mode: bool) -> None:
self.data.setdefault("exports", {})[folder_key] = {
"rp": relative_path,
"lp": str(result_path),
"z": bool(zip_mode),
"u": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
def folder_record_count(self, folder_key: str) -> int:
count = 0
for record in self.data.get("records", {}).values():
if isinstance(record, dict) and record.get("f") == folder_key:
count += 1
return count
def total_record_count(self) -> int:
return len(self.data.get("records", {}))
def clear_folder(self, folder_key: str) -> int:
records = self.data.get("records", {})
to_delete = [key for key, value in records.items() if isinstance(value, dict) and value.get("f") == folder_key]
for key in to_delete:
records.pop(key, None)
self.data.get("exports", {}).pop(folder_key, None)
return len(to_delete)
def save(self) -> None:
self.root_dir.mkdir(parents=True, exist_ok=True)
with self.path.open("w", encoding="utf-8") as fh:
json.dump(self.data, fh, ensure_ascii=False, separators=(",", ":"))
class PreferenceStore:
def __init__(self, default_output_dir: Path) -> None:
self.default_output_dir = default_output_dir
self.path = self._prefs_path()
self.data = self._load_or_create()
def get_output_dir(self) -> str:
value = str(self.data.get("output_dir") or "").strip()
return value or str(self.default_output_dir)
def set_output_dir(self, output_dir: str) -> None:
output_dir = str(output_dir or "").strip()
if not output_dir:
return
self.data["output_dir"] = output_dir
self._save()
@classmethod
def _prefs_path(cls) -> Path:
appdata = os.environ.get("APPDATA")
base_dir = Path(appdata) if appdata else Path.home() / ".config"
return base_dir / PREFS_APP_DIR / PREFS_FILE_NAME
@classmethod
def app_data_dir(cls) -> Path:
return cls._prefs_path().parent
def _load_or_create(self) -> dict:
if self.path.exists():
try:
with self.path.open("r", encoding="utf-8") as fh:
data = json.load(fh)
return data if isinstance(data, dict) else {}
except Exception:
return {"output_dir": str(self.default_output_dir)}
data = {"output_dir": str(self.default_output_dir)}
self.data = data
self._save()
return data
def _save(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
with self.path.open("w", encoding="utf-8") as fh:
json.dump(self.data, fh, ensure_ascii=False, indent=2)
class OutlookService:
def __init__(self) -> None:
self.outlook = None
self.namespace = None
self.root_folders: List[FolderNode] = []
def connect(self) -> None:
if win32com is None or pythoncom is None:
raise RuntimeError(
"pywin32 non è installato. Esegui: pip install pywin32"
)
pythoncom.CoInitialize()
try:
try:
self.outlook = win32com.client.GetActiveObject("Outlook.Application")
except Exception as exc:
raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc
self.namespace = self.outlook.GetNamespace("MAPI")
self.root_folders = []
for i in range(1, self.namespace.Folders.Count + 1):
store = self.namespace.Folders.Item(i)
store_name = str(store.Name)
node = FolderNode(
store_name=store_name,
path=NameCodec.store_display_name(store_name),
folder_obj=store,
display_name=NameCodec.store_display_name(store_name),
)
self.root_folders.append(node)
except Exception:
pythoncom.CoUninitialize()
raise
def disconnect(self) -> None:
try:
self.root_folders = []
self.namespace = None
self.outlook = None
finally:
if pythoncom is not None:
try:
pythoncom.CoUninitialize()
except Exception:
pass
def list_root_folders(self) -> List[FolderNode]:
return list(self.root_folders)
def list_children(self, folder_obj: object, current_path: str) -> List[FolderNode]:
children = []
folders = folder_obj.Folders
for i in range(1, folders.Count + 1):
child = folders.Item(i)
child_name = str(child.Name)
child_path = f"{current_path} / {child_name}"
children.append(
FolderNode(
store_name=current_path.split(" / ")[0],
path=child_path,
folder_obj=child,
display_name=child_name,
mail_count=self.count_mail_items(child),
)
)
return children
@staticmethod
def count_mail_items(folder_obj: object) -> Optional[int]:
diagnostics = OutlookService.folder_item_diagnostics(folder_obj)
restricted_count = diagnostics.get("restricted_count")
raw_count = diagnostics.get("raw_count")
if restricted_count is None:
return raw_count
if restricted_count == 0 and raw_count:
return raw_count
return restricted_count
@staticmethod
def folder_item_diagnostics(folder_obj: object) -> dict:
diagnostics = {
"folder_name": None,
"raw_count": None,
"restricted_count": None,
"items_error": None,
"restrict_error": None,
"folder_path": None,
"store_id": None,
}
try:
diagnostics["folder_name"] = str(folder_obj.Name)
except Exception:
pass
try:
diagnostics["folder_path"] = str(folder_obj.FolderPath)
except Exception:
pass
try:
items = folder_obj.Items
except Exception as exc:
diagnostics["items_error"] = str(exc)
return diagnostics
try:
diagnostics["raw_count"] = items.Count
except Exception as exc:
diagnostics["items_error"] = str(exc)
try:
diagnostics["restricted_count"] = items.Restrict("[MessageClass] = 'IPM.Note'").Count
except Exception as exc:
diagnostics["restrict_error"] = str(exc)
try:
diagnostics["store_id"] = str(folder_obj.StoreID)
except Exception:
pass
return diagnostics
@staticmethod
def get_namespace_for_worker() -> object:
if win32com is None:
raise RuntimeError("pywin32 non disponibile.")
try:
outlook = win32com.client.GetActiveObject("Outlook.Application")
except Exception as exc:
raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc
return outlook.GetNamespace("MAPI")
class NameCodec:
INVALID_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
SPACE_RE = re.compile(r'\s+')
@classmethod
def compact_text(cls, value: str, max_len: int = 32) -> str:
value = cls.SPACE_RE.sub(" ", value or "").strip()
if len(value) <= max_len:
return value
return value[: max_len - 3].rstrip() + "..."
@classmethod
def store_display_name(cls, store_name: str) -> str:
store_name = store_name or "Account"
if "@" in store_name:
local_part = store_name.split("@", 1)[0]
return cls.compact_text(local_part, max_len=34)
return cls.compact_text(store_name, max_len=34)
@classmethod
def tree_label(cls, display_name: str, mail_count: Optional[int]) -> str:
display_name = cls.compact_text(display_name or "Folder", max_len=46)
if mail_count is None:
return display_name
return f"{display_name} ({mail_count})"
@classmethod
def sanitize(cls, value: str, max_len: int = 80) -> str:
value = value or ""
value = cls.INVALID_CHARS_RE.sub("_", value)
value = cls.SPACE_RE.sub(" ", value).strip()
value = value.replace(".", "_")
if not value:
value = "empty"
if len(value) > max_len:
value = value[:max_len].rstrip(" _")
return value
@classmethod
def folder_name_from_outlook_path(cls, folder_path: str) -> str:
# Rimuove lo store iniziale quando il path arriva dall'albero "Store / Cartella".
parts = [p.strip() for p in folder_path.split(" / ")]
parts = [p for p in parts if p]
export_parts = parts[1:] if len(parts) > 1 else parts
safe_parts = [cls.sanitize(p, max_len=50) for p in export_parts]
return os.path.join(*safe_parts) if safe_parts else "folder"
@classmethod
def archive_base_name_from_outlook_path(cls, folder_path: str) -> str:
parts = [p.strip() for p in folder_path.split(" / ")]
parts = [p for p in parts if p]
folder_name = parts[-1] if parts else "export"
return cls.sanitize(folder_name, max_len=60)
@classmethod
def email_base_name(cls, received_time: Optional[datetime], seq: int) -> str:
if received_time is None:
stamp = "data_sconosciuta"
else:
stamp = received_time.strftime("%Y-%m-%d_%H-%M-%S")
return f"mail_{stamp}_{seq:05d}"
@classmethod
def mail_folder_name(cls, base_mail_name: str, subject: str, attachment_count: int = 0) -> str:
safe_subject = cls.sanitize(subject or "no_subject", max_len=50)
attachment_marker = f"CON_ALLEGATI_{attachment_count:02d}" if attachment_count else "SOLO_MAIL"
return f"{base_mail_name}__{attachment_marker}__{safe_subject}"
@classmethod
def attachment_name(cls, base_mail_name: str, original_filename: str, att_index: int) -> str:
original_filename = original_filename or f"attachment_{att_index}"
original_filename = os.path.basename(original_filename)
stem, ext = os.path.splitext(original_filename)
stem = cls.sanitize(stem, max_len=80)
ext = cls.sanitize_extension(ext)
return f"attachment__{base_mail_name}__{att_index:02d}__{stem}{ext}"
@classmethod
def sanitize_extension(cls, ext: str, max_len: int = 20) -> str:
ext = (ext or "").strip().lower()
if not ext:
return ""
if not ext.startswith("."):
ext = f".{ext}"
ext = cls.INVALID_CHARS_RE.sub("_", ext)
ext = ext.replace(" ", "_")
if len(ext) > max_len:
ext = ext[:max_len]
return ext
@classmethod
def is_allowed_attachment(cls, filename: str) -> bool:
ext = cls.sanitize_extension(os.path.splitext(filename or "")[1])
return ext in ALLOWED_ATTACHMENT_EXTENSIONS
class Exporter:
MANIFEST_FLUSH_INTERVAL_SECONDS = 20.0
def __init__(self, app: "OutlookExporterApp") -> None:
self.app = app
self.cancel_requested = False
self.global_seq = 0
self.exported_mail_count = 0
self.exported_attachment_count = 0
self.skipped_mail_count = 0
self.skipped_manifest_count = 0
self.skipped_no_attachments_count = 0
self.non_mail_item_count = 0
self.datetime_diagnostic_count = 0
self.datetime_diagnostic_limit = 20
self.manifest: Optional[ExportManifest] = None
self.manifest_dirty = False
self.last_manifest_flush_monotonic = time.monotonic()
self.options = ExportOptions()
def configure(self, manifest: Optional[ExportManifest], options: ExportOptions) -> None:
self.manifest = manifest
self.manifest_dirty = False
self.last_manifest_flush_monotonic = time.monotonic()
self.options = options
def cancel(self) -> None:
self.cancel_requested = True
def check_cancel(self) -> None:
if self.cancel_requested:
raise ExportCancelled("Export cancelled by the user.")
def _mark_manifest_dirty(self) -> None:
if self.manifest is None:
return
self.manifest_dirty = True
def _flush_manifest_if_due(self, force: bool = False) -> None:
if self.manifest is None or not self.manifest_dirty:
return
now = time.monotonic()
if not force and (now - self.last_manifest_flush_monotonic) < self.MANIFEST_FLUSH_INTERVAL_SECONDS:
return
self.manifest.save()
self.last_manifest_flush_monotonic = now
self.manifest_dirty = False
def export_folder(self, folder_obj: object, folder_out_path: Path) -> Tuple[int, int, int]:
self.check_cancel()
folder_out_path.mkdir(parents=True, exist_ok=True)
folder_name = self._safe_get(folder_obj, "Name") or "(unnamed folder)"
self.app.log(f"Outlook folder: {folder_name}")
self.app.log(f"Destination folder: {folder_out_path}")
self._log_folder_count_diagnostics(folder_obj)
items = self._safe_get(folder_obj, "Items")
if items is None:
self.app.log("Warning: unable to read folder items, continuing with subfolders.")
count = 0
else:
try:
items.Sort("[ReceivedTime]", False)
except Exception:
self.app.log("Warning: unable to sort by ReceivedTime, continuing without sorting.")
try:
count = items.Count
except Exception as exc:
self.app.log(f"Warning: unable to count folder items: {exc}")
count = 0
local_mail_count = 0
local_attachment_count = 0
local_skipped_count = 0
self.app.log(f"Detected items: {count}")
for idx in range(1, count + 1):
self.check_cancel()
try:
item = items.Item(idx)
except Exception as exc:
self.app.log(f"Error accessing item {idx}: {exc}")
continue
message_class = self._safe_get(item, "Class")
if message_class != MAIL_CLASS:
self.non_mail_item_count += 1
continue
if self.options.selected_keys is not None:
key = self._build_manifest_key_for_item(item)
if key not in self.options.selected_keys:
continue
try:
self.global_seq += 1
mail_count, att_count, skipped_count = self._export_mail(item, folder_out_path)
local_mail_count += mail_count
local_attachment_count += att_count
local_skipped_count += skipped_count
self.exported_mail_count += mail_count
self.exported_attachment_count += att_count
self.skipped_mail_count += skipped_count
except Exception as exc:
subject = self._safe_get(item, "Subject") or "(no subject)"
self.app.log(f"Error exporting email '{subject}': {exc}")
if idx % 10 == 0:
self.app.set_status(
"Export in progress... "
f"item {idx}/{count} | emails {self.exported_mail_count} | "
f"attachments {self.exported_attachment_count} | skipped {self.skipped_mail_count}"
)
self.app.pump_ui()
if self.options.selected_keys is not None:
return local_mail_count, local_attachment_count, local_skipped_count
subfolders = self._safe_get(folder_obj, "Folders")
subfolder_count = 0
try:
subfolder_count = subfolders.Count if subfolders is not None else 0
except Exception as exc:
self.app.log(f"Warning: unable to read subfolders: {exc}")
for i in range(1, subfolder_count + 1):
self.check_cancel()
try:
child = subfolders.Item(i)
except Exception as exc:
self.app.log(f"Error accessing subfolder {i}: {exc}")
continue
child_name = self._safe_get(child, "Name") or f"subfolder_{i}"
child_dir = folder_out_path / NameCodec.sanitize(str(child_name), max_len=80)
m, a, s = self.export_folder(child, child_dir)
local_mail_count += m
local_attachment_count += a
local_skipped_count += s
return local_mail_count, local_attachment_count, local_skipped_count
def export_selected_items(
self,
namespace: object,
selected_entry_refs: list[tuple[str, str]],
folder_out_path: Path,
) -> Tuple[int, int, int]:
self.check_cancel()
folder_out_path.mkdir(parents=True, exist_ok=True)
local_mail_count = 0
local_attachment_count = 0
local_skipped_count = 0
total = len(selected_entry_refs)
self.app.log(f"Direct Outlook selective export: {total} selected emails")
for idx, (store_id, entry_id) in enumerate(selected_entry_refs, start=1):
self.check_cancel()
try:
if store_id:
item = namespace.GetItemFromID(entry_id, store_id)
else:
item = namespace.GetItemFromID(entry_id)
except Exception as exc:
self.app.log(f"Error retrieving selected email {idx}/{total}: {exc}")
continue
message_class = self._safe_get(item, "Class")
if message_class != MAIL_CLASS:
self.non_mail_item_count += 1
continue
try:
self.global_seq += 1
mail_count, att_count, skipped_count = self._export_mail(item, folder_out_path)
local_mail_count += mail_count
local_attachment_count += att_count
local_skipped_count += skipped_count
self.exported_mail_count += mail_count
self.exported_attachment_count += att_count
self.skipped_mail_count += skipped_count
except Exception as exc:
subject = self._safe_get(item, "Subject") or "(no subject)"
self.app.log(f"Error exporting selected email '{subject}': {exc}")
self.app.set_status(
"Selective export in progress... "
f"{idx}/{total} | emails {self.exported_mail_count} | "
f"attachments {self.exported_attachment_count} | skipped {self.skipped_mail_count}"
)
self.app.pump_ui()
return local_mail_count, local_attachment_count, local_skipped_count
def _log_folder_count_diagnostics(self, folder_obj: object) -> None:
diagnostics = OutlookService.folder_item_diagnostics(folder_obj)
self.app.log("Folder count diagnostics:")
self.app.log(f" folder_name: {diagnostics.get('folder_name')}")
self.app.log(f" folder_path: {diagnostics.get('folder_path')}")
self.app.log(f" raw Items.Count: {diagnostics.get('raw_count')}")
self.app.log(f" Restrict IPM.Note Count: {diagnostics.get('restricted_count')}")
self.app.log(f" items_error: {diagnostics.get('items_error')}")
self.app.log(f" restrict_error: {diagnostics.get('restrict_error')}")
def _export_mail(self, item: object, folder_out_path: Path) -> Tuple[int, int, int]:
received_dt = self._get_best_mail_datetime(item)
subject = self._safe_get(item, "Subject") or ""
manifest_record = self._build_mail_record(item, subject, received_dt, 0)
if self.manifest is not None and self.manifest.has_record(manifest_record.key):
if self.manifest.record_file_exists(manifest_record.key):
self.app.log(f"Email already present in the manifest, skipping: {subject or '(no subject)'}")
self.skipped_manifest_count += 1
return 0, 0, 1
self.app.log(
f"Obsolete manifest record, re-exporting the email: {subject or '(no subject)'}"
)
self.manifest.remove_record(manifest_record.key)
self._mark_manifest_dirty()
body = self._safe_get(item, "Body") or ""
html_body = self._safe_get(item, "HTMLBody") or ""
exportable_attachments = self._collect_exportable_attachments(item, html_body)
manifest_record.attachment_count = len(exportable_attachments)
if self.options.only_with_attachments and not exportable_attachments:
self.app.log(f"Email without exportable attachments, skipping: {subject or '(no subject)'}")
self.skipped_no_attachments_count += 1
return 0, 0, 1
base_name = NameCodec.email_base_name(received_dt, self.global_seq)
canonical_mail_dir = folder_out_path / NameCodec.mail_folder_name(base_name, subject, len(exportable_attachments))
canonical_txt_path = canonical_mail_dir / f"{base_name}.txt"
if canonical_txt_path.exists():
self.app.log(f"Email already present on the file system, skipping: {subject or '(no subject)'}")
self.skipped_manifest_count += 1
if self.manifest is not None:
try:
manifest_record.txt_relative_path = canonical_txt_path.relative_to(self.manifest.root_dir).as_posix()
except Exception:
manifest_record.txt_relative_path = ""
attachments_dir = canonical_mail_dir / "attachments"
if attachments_dir.exists():
try:
manifest_record.exported_attachment_count = len([p for p in attachments_dir.iterdir() if p.is_file()])
except Exception:
manifest_record.exported_attachment_count = 0
manifest_record.status = "exported"
self.manifest.update_record(manifest_record)
self._mark_manifest_dirty()
self._flush_manifest_if_due()
return 0, 0, 1
mail_dir = self._unique_path(canonical_mail_dir)
mail_dir.mkdir(parents=True, exist_ok=True)
txt_path = mail_dir / f"{base_name}.txt"
header_lines = [
f"Subject: {subject}",
f"Received: {received_dt.strftime('%Y-%m-%d %H:%M:%S') if received_dt else ''}",
"",
"--- EMAIL TEXT ---",
body,
]
txt_path.write_text("\n".join(header_lines), encoding="utf-8", errors="replace")
self.app.log(f"Mail salvata: {txt_path.name}")
if self.manifest is not None:
try:
manifest_record.txt_relative_path = txt_path.relative_to(self.manifest.root_dir).as_posix()
except Exception:
manifest_record.txt_relative_path = ""
attachment_count = 0
if exportable_attachments:
attachments_dir = mail_dir / "attachments"
for att_index, attachment, original_filename in exportable_attachments:
self.check_cancel()
try:
export_name = NameCodec.attachment_name(base_name, original_filename, att_index)
attachments_dir.mkdir(parents=True, exist_ok=True)
export_path = self._unique_path(attachments_dir / export_name)
attachment.SaveAsFile(str(export_path))
attachment_count += 1
self.app.log(f" Attachment saved: {export_path.name}")
except Exception as exc:
self.app.log(f" Error saving attachment {att_index}: {exc}")
manifest_record.exported_attachment_count = attachment_count
manifest_record.status = "exported"
if self.manifest is not None:
self.manifest.update_record(manifest_record)
self._mark_manifest_dirty()
self._flush_manifest_if_due()
return 1, attachment_count, 0
def _collect_exportable_attachments(self, item: object, html_body: str) -> List[Tuple[int, object, str]]:
exportable = []
attachments = self._safe_get(item, "Attachments")
if attachments is None:
return exportable
try:
attachment_total = attachments.Count
except Exception as exc:
self.app.log(f" Error reading the number of attachments: {exc}")
return exportable
for att_index in range(1, attachment_total + 1):
self.check_cancel()
try:
attachment = attachments.Item(att_index)
original_filename = self._safe_get(attachment, "FileName") or f"attachment_{att_index}"
inline_score, inline_reasons = self._signature_image_score(
attachment, original_filename, html_body
)
if inline_score >= 3:
self.app.log(
f" Ignored inline/signature attachment: {original_filename} | "
f"score={inline_score} | reasons={', '.join(inline_reasons)}"
)
elif NameCodec.is_allowed_attachment(original_filename):
exportable.append((att_index, attachment, original_filename))
else:
self.app.log(f" Ignored attachment: {original_filename}")
except Exception as exc:
self.app.log(f" Error reading attachment {att_index}: {exc}")
return exportable
def _signature_image_score(self, attachment: object, filename: str, html_body: str) -> Tuple[int, List[str]]:
ext = NameCodec.sanitize_extension(os.path.splitext(filename or "")[1])
if ext not in IMAGE_ATTACHMENT_EXTENSIONS:
return 0, []
score = 0
reasons = []
html_lower = (html_body or "").lower()
filename_lower = os.path.basename(filename or "").lower()
hidden = self._get_attachment_mapi_property(attachment, MAPI_ATTACHMENT_HIDDEN)
if hidden is True:
score += 4
reasons.append("hidden")
rendering_position = self._get_attachment_mapi_property(attachment, MAPI_RENDERING_POSITION)
rendering_position_int = self._safe_int(rendering_position)
if rendering_position_int is not None:
if rendering_position_int not in (MAPI_RENDERING_POSITION_NOT_RENDERED, 0xFFFFFFFF):
score += 1
reasons.append(f"rendering-position={rendering_position_int}")
else:
reasons.append("rendering-position=-1")
content_id = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_ID)
if self._has_value(content_id):
cid = str(content_id).strip()
if self._html_references_cid(html_lower, cid):
score += 3
reasons.append(f"cid nel corpo HTML:{self._short_diag(cid)}")
else:
score += 1
reasons.append(f"cid presente:{self._short_diag(cid)}")
content_location = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_LOCATION)
if self._has_value(content_location):
location = str(content_location).strip()
if location.lower() in html_lower:
score += 2
reasons.append(f"content-location nel corpo HTML:{self._short_diag(location)}")
else:
score += 1
reasons.append(f"content-location presente:{self._short_diag(location)}")
disposition = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_DISPOSITION)
if self._has_value(disposition) and "inline" in str(disposition).lower():
score += 3
reasons.append(f"disposition inline:{self._short_diag(disposition)}")
if filename_lower and filename_lower in html_lower:
score += 2
reasons.append("nome file nel corpo HTML")
stem = os.path.splitext(filename_lower)[0]
if INLINE_ATTACHMENT_NAME_RE.match(stem or "") is not None:
score += 2
reasons.append("nome tipico inline/firma")
return score, reasons
@staticmethod
def _html_references_cid(html_lower: str, content_id: str) -> bool:
if not content_id:
return False
cid = content_id.strip().strip("<>").lower()
if not cid:
return False
return f"cid:{cid}" in html_lower or cid in html_lower
def _get_attachment_mapi_property(self, attachment: object, schema: str):
try:
accessor = attachment.PropertyAccessor
return accessor.GetProperty(schema)
except Exception:
return None
def _get_first_attachment_mapi_property(self, attachment: object, schemas):
for schema in schemas:
value = self._get_attachment_mapi_property(attachment, schema)
if self._has_value(value):
return value
return None
@staticmethod
def _safe_int(value: object) -> Optional[int]:
if value is None:
return None
try:
return int(value)
except Exception:
return None
@staticmethod
def _short_diag(value: object, max_len: int = 60) -> str:
try:
text = str(value).strip()
except Exception:
text = repr(value)
text = text.replace("\r", " ").replace("\n", " ")
if len(text) > max_len:
return text[:max_len] + "..."
return text
@staticmethod
def _has_value(value: object) -> bool:
if value is None:
return False
try:
return bool(str(value).strip())
except Exception:
return True
def _get_best_mail_datetime(self, item: object) -> Optional[datetime]:
for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"):
value = self._safe_get(item, attr)
converted = self._convert_outlook_datetime(value)
if converted is not None:
if attr != "ReceivedTime":
self.app.log(f" ReceivedTime not available, using {attr}.")
return converted
self._log_datetime_diagnostics(item)
return None
def _log_datetime_diagnostics(self, item: object) -> None:
if self.datetime_diagnostic_count >= self.datetime_diagnostic_limit:
if self.datetime_diagnostic_count == self.datetime_diagnostic_limit:
self.app.log(
" Date diagnostics: limit reached, further emails without a date will not be logged in detail."
)
self.datetime_diagnostic_count += 1
return
self.datetime_diagnostic_count += 1
subject = self._safe_get(item, "Subject") or "(no subject)"
self.app.log(f" Email date diagnostics #{self.global_seq}: {subject}")
for label, value in self._mail_identity_details(item):
self.app.log(f" {label}: {value}")
for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"):
ok, value_or_error = self._safe_get_with_error(item, attr)
if not ok:
self.app.log(f" {attr}: ACCESS ERROR: {value_or_error}")
continue
value = value_or_error
self.app.log(
f" {attr}: tipo={type(value).__name__}; repr={repr(value)}; str={self._safe_str(value)}"
)
def _mail_identity_details(self, item: object) -> List[Tuple[str, str]]:
parent = self._safe_get(item, "Parent")
folder_name = self._safe_get(parent, "Name") if parent is not None else None
store_id = self._safe_get(parent, "StoreID") if parent is not None else None
details = [
("EntryID", self._safe_str(self._safe_get(item, "EntryID"))),
("ConversationID", self._safe_str(self._safe_get(item, "ConversationID"))),
("InternetMessageID", self._safe_str(self._get_mail_mapi_property(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F"))),
("Outlook folder", self._safe_str(folder_name)),
("StoreID", self._safe_str(store_id)),
]
return details
def _build_mail_record(
self,
item: object,
subject: str,
received_dt: Optional[datetime],
attachment_count: int,
) -> MailRecord:
received_text = received_dt.strftime("%Y-%m-%d %H:%M:%S") if received_dt else ""
entry_id = self._safe_full_str(self._safe_get(item, "EntryID"))
parent = self._safe_get(item, "Parent")
store_id = self._safe_full_str(self._safe_get(parent, "StoreID") if parent is not None else None)
internet_message_id = self._safe_full_str(
self._get_mail_mapi_property(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F")
)
key = self._build_manifest_key_for_item(item)
return MailRecord(
key=key,
entry_id=entry_id,
store_id=store_id,
folder_key=self.options.folder_key,
received_time=received_text,
txt_relative_path="",
attachment_count=attachment_count,
exported_attachment_count=0,
export_mode=self.options.mode,
subject=self._safe_str(subject),
internet_message_id=internet_message_id,
)
def _build_manifest_key_for_item(
self,
item: object,
subject: Optional[str] = None,
received_text: Optional[str] = None,
) -> str:
entry_id = self._safe_full_str(self._safe_get(item, "EntryID"))
parent = self._safe_get(item, "Parent")
store_id = self._safe_full_str(self._safe_get(parent, "StoreID") if parent is not None else None)
return self._build_manifest_key_from_ids(store_id, entry_id)
@staticmethod
def _build_manifest_key_from_ids(store_id: object, entry_id: object) -> str:
return f"{Exporter._safe_full_str(store_id)}|{Exporter._safe_full_str(entry_id)}"
def _get_mail_mapi_property(self, item: object, schema: str):
try:
accessor = item.PropertyAccessor
return accessor.GetProperty(schema)
except Exception:
return None
@staticmethod
def _unique_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 2
while True:
candidate = parent / f"{stem}__{counter}{suffix}"
if not candidate.exists():
return candidate
counter += 1
@staticmethod
def _safe_get(obj: object, attr: str):
try:
return getattr(obj, attr)
except Exception:
return None
@staticmethod
def _safe_get_with_error(obj: object, attr: str) -> Tuple[bool, object]:
try:
return True, getattr(obj, attr)
except Exception as exc:
return False, exc
@staticmethod
def _safe_str(value: object, max_len: int = 300) -> str:
try:
text = str(value)
except Exception as exc:
text = f"<errore str: {exc}>"
if len(text) > max_len:
return text[:max_len] + "..."
return text
@staticmethod
def _safe_full_str(value: object) -> str:
try:
return str(value)
except Exception:
return ""
@staticmethod
def _convert_outlook_datetime(value) -> Optional[datetime]:
if value is None:
return None
if isinstance(value, datetime):
return value
try:
return datetime(
value.year,
value.month,
value.day,
value.hour,
value.minute,
value.second,
)
except Exception:
pass
for converter in (getattr(value, "Format", None), getattr(value, "strftime", None)):
if converter is None:
continue
try:
text = str(converter("%Y-%m-%d %H:%M:%S"))
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
except Exception:
continue
try:
return datetime.fromisoformat(str(value))
except Exception:
return None
class OutlookExporterApp(ctk.CTk):
def __init__(self) -> None:
super().__init__()
self.title(APP_TITLE)
self.geometry(APP_GEOMETRY)
self.minsize(1080, 680)
self.withdraw()
ctk.set_appearance_mode("system")
ctk.set_default_color_theme("blue")
self.splash = None
self.show_startup_splash()
self.outlook_service = OutlookService()
self.exporter = Exporter(self)
self.export_thread = None
self.main_thread_id = threading.get_ident()
self.ui_queue = queue.Queue()
self.connected = False
self.tree_folder_map = {}
self.tree_path_map = {}
self.empty_folder_alert_shown = set()
self.selected_folder_obj = None
self.filter_thread = None
self.shutting_down = False
self.ui_queue_after_id = None
self.loaded_email_headers: List[EmailHeader] = []
self.filtered_email_headers: List[EmailHeader] = []
self.filter_tree_key_map = {}
self.filter_tree = None
self.filter_loaded_folder_text = ""
self.filter_live_after_id = None
self.filter_live_delay_ms = 300
self.filter_status_var = ctk.StringVar(value="No emails loaded")
self.filter_loaded_folder_var = ctk.StringVar(value="No folder loaded")
self.filter_subject_var = ctk.StringVar(value="")
self.filter_sender_var = ctk.StringVar(value="")
self.filter_date_range_var = ctk.StringVar(value="")
self.filter_export_new_var = ctk.IntVar(value=0)
self.filter_export_with_attachments_var = ctk.IntVar(value=0)
self.filter_export_selected_var = ctk.IntVar(value=0)
self.log_lines: List[str] = []
self.pending_file_log_lines: List[str] = []
self.tooltips: List[ToolTip] = []
self.log_window = None
self.log_window_box = None
self.help_window = None
self.help_window_box = None
default_output_dir = Path.home() / "Desktop" / "outlook_export"
self.prefs = PreferenceStore(default_output_dir)
self.log_file_path = PreferenceStore.app_data_dir() / LOG_FILE_NAME
self._init_file_log()
self.selected_folder_display = ctk.StringVar(value="No folder selected")
self.output_dir_var = ctk.StringVar(value=self.prefs.get_output_dir())
self.status_var = ctk.StringVar(value="Ready")
self.busy_title_var = ctk.StringVar(value="")
self.busy_detail_var = ctk.StringVar(value="")
self.busy_overlay_visible = False
self.last_export_duration_seconds = 0.0
self.last_filter_load_duration_seconds = 0.0
self.create_zip_var = ctk.IntVar(value=0)
self.create_hierarchy_var = ctk.IntVar(value=1)
self._build_ui()
self._install_tooltips()
self.hide_startup_splash()
self.deiconify()
self.close_pyinstaller_splash()
self.log(f"Log file: {self.log_file_path}")
self.ui_queue_after_id = self.after(100, self.process_ui_queue)
self.protocol("WM_DELETE_WINDOW", self.on_close)
def show_startup_splash(self) -> None:
splash = ctk.CTkToplevel(self)
splash.title(APP_TITLE)
splash.overrideredirect(True)
splash.resizable(False, False)
splash.attributes("-topmost", True)
width = 420
height = 180
x = max(0, int((splash.winfo_screenwidth() - width) / 2))
y = max(0, int((splash.winfo_screenheight() - height) / 2))
splash.geometry(f"{width}x{height}+{x}+{y}")
splash.grid_columnconfigure(0, weight=1)
splash.grid_rowconfigure(0, weight=1)
content = ctk.CTkFrame(splash)
content.grid(row=0, column=0, sticky="nsew", padx=12, pady=12)
content.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(
content,
text="Outlook Email Exporter",
font=ctk.CTkFont(size=26, weight="bold"),
).grid(row=0, column=0, pady=(24, 8))
ctk.CTkLabel(content, text="Starting...").grid(row=1, column=0, pady=(0, 18))
progress = ctk.CTkProgressBar(content, mode="indeterminate")
progress.grid(row=2, column=0, sticky="ew", padx=34, pady=(0, 18))
progress.start()
self.splash = splash
self.update_idletasks()
self.update()
def hide_startup_splash(self) -> None:
if self.splash is not None:
try:
self.splash.destroy()
except Exception:
pass
self.splash = None
@staticmethod
def close_pyinstaller_splash() -> None:
try:
import pyi_splash
pyi_splash.close()
except Exception:
pass
def _build_ui(self) -> None:
self.grid_columnconfigure(0, weight=1)
self.grid_rowconfigure(1, weight=1)
top = ctk.CTkFrame(self)
top.grid(row=0, column=0, sticky="nsew", padx=8, pady=(8, 4))
top.grid_columnconfigure(3, weight=1)
top.grid_columnconfigure(4, weight=0)
self.btn_connect = ctk.CTkButton(top, text="Connect Outlook", command=self.connect_outlook)
self.btn_connect.grid(row=0, column=0, padx=6, pady=6)
self.btn_refresh = ctk.CTkButton(top, text="Reload folders", command=self.reload_tree, state="disabled")
self.btn_refresh.grid(row=0, column=1, padx=6, pady=6)
self.btn_expand = ctk.CTkButton(top, text="Expand tree", command=self.expand_all, state="disabled")
self.btn_expand.grid(row=0, column=2, padx=6, pady=6)
lbl_selected = ctk.CTkLabel(top, textvariable=self.selected_folder_display, anchor="w")
lbl_selected.grid(row=0, column=3, padx=6, pady=6, sticky="ew")
self.btn_help = ctk.CTkButton(top, text="?", width=34, command=self.open_help_window)
self.btn_help.grid(row=0, column=4, padx=(6, 10), pady=6, sticky="e")
main_pane = ttk.Panedwindow(self, orient="horizontal")
main_pane.grid(row=1, column=0, sticky="nsew", padx=8, pady=(4, 8))
left = ctk.CTkFrame(main_pane)
left.grid_rowconfigure(1, weight=1)
left.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(left, text="Outlook", font=ctk.CTkFont(size=18, weight="bold")).grid(
row=0, column=0, sticky="w", padx=8, pady=(8, 2)
)
tree_container = ctk.CTkFrame(left)
tree_container.grid(row=1, column=0, sticky="nsew", padx=8, pady=(2, 8))
tree_container.grid_rowconfigure(0, weight=1)
tree_container.grid_columnconfigure(0, weight=1)
style = ttk.Style()
try:
style.theme_use("default")
except Exception:
pass
style.configure("Treeview", rowheight=24)
self.tree_font_bold = ("Segoe UI", 10, "bold")
style.map(
"Treeview",
background=[("selected", "#93c5fd")],
foreground=[("selected", "#111827")],
)
self.tree = ttk.Treeview(tree_container, show="tree")
self.tree.grid(row=0, column=0, sticky="nsew")
self.tree.column("#0", width=280, minwidth=220, stretch=True)
self.tree.tag_configure("root", background="#d9d9d9", font=self.tree_font_bold)
self.tree.tag_configure("odd", background="#f3f4f6")
self.tree.tag_configure("even", background="#e5e7eb")
self.tree.tag_configure("branch_odd", background="#f3f4f6", font=self.tree_font_bold)
self.tree.tag_configure("branch_even", background="#e5e7eb", font=self.tree_font_bold)
self.tree.bind("<<TreeviewOpen>>", self.on_tree_open)
self.tree.bind("<<TreeviewSelect>>", self.on_tree_select)
yscroll = ttk.Scrollbar(tree_container, orient="vertical", command=self.tree.yview)
yscroll.grid(row=0, column=1, sticky="ns")
self.tree.configure(yscrollcommand=yscroll.set)
right = ctk.CTkFrame(main_pane)
right.grid_rowconfigure(1, weight=1)
right.grid_columnconfigure(0, weight=1)
main_pane.add(left, weight=7)
main_pane.add(right, weight=13)
right_top = ctk.CTkFrame(right)
right_top.grid(row=0, column=0, sticky="nsew", padx=0, pady=(0, 4))
right_top.grid_columnconfigure(1, weight=1)
ctk.CTkLabel(right_top, text="Immediate export", font=ctk.CTkFont(size=18, weight="bold")).grid(
row=0, column=0, columnspan=3, sticky="w", padx=8, pady=(8, 4)
)
ctk.CTkLabel(right_top, text="Output folder:").grid(row=1, column=0, padx=8, pady=4, sticky="w")
self.entry_output = ctk.CTkEntry(right_top, textvariable=self.output_dir_var)
self.entry_output.grid(row=1, column=1, padx=8, pady=4, sticky="ew")
self.btn_browse = ctk.CTkButton(right_top, text="Browse", width=110, command=self.choose_output_dir)
self.btn_browse.grid(row=1, column=2, padx=8, pady=4)
help_text = (
"The selected folder will be exported together with all its subfolders.\n"
"Each email is saved as a TXT file and attachments are saved in their original format."
)
ctk.CTkLabel(right_top, text=help_text, justify="left").grid(
row=2, column=0, columnspan=3, padx=8, pady=(2, 4), sticky="w"
)
options = ctk.CTkFrame(right_top, fg_color="transparent")
options.grid(row=3, column=0, columnspan=3, sticky="ew", padx=8, pady=(2, 4))
options.grid_columnconfigure(0, weight=1)
options.grid_columnconfigure(1, weight=1)
self.chk_zip = ctk.CTkCheckBox(
options,
text="Create zip",
variable=self.create_zip_var,
command=lambda: self.on_export_mode_change("zip"),
)
self.chk_zip.grid(row=0, column=0, sticky="w", padx=(0, 12))
self.chk_hierarchy = ctk.CTkCheckBox(
options,
text="Create folders on file system",
variable=self.create_hierarchy_var,
command=lambda: self.on_export_mode_change("hierarchy"),
)
self.chk_hierarchy.grid(row=0, column=1, sticky="w", padx=(12, 0))
self.btn_export = ctk.CTkButton(
right_top,
text="Export selected folder without preview",
command=lambda: self.start_export(EXPORT_MODE_ALL),
state="disabled",
height=32,
)
self.btn_export.grid(row=4, column=0, columnspan=2, padx=8, pady=(4, 6), sticky="ew")
right_bottom = ctk.CTkFrame(right)
right_bottom.grid(row=1, column=0, sticky="nsew", padx=0, pady=(4, 0))
right_bottom.grid_rowconfigure(4, weight=1)
right_bottom.grid_columnconfigure(1, weight=1)
right_bottom.grid_columnconfigure(3, weight=1)
filter_group = ctk.CTkFrame(right_bottom)
filter_group.grid(row=0, column=0, columnspan=4, sticky="nsew", padx=(8, 4), pady=(8, 4))
filter_group.grid_columnconfigure(1, weight=1)
filter_group.grid_columnconfigure(3, weight=1)
ctk.CTkLabel(filter_group, text="Preview and filters", font=ctk.CTkFont(size=18, weight="bold")).grid(
row=0, column=0, columnspan=4, sticky="w", padx=8, pady=(8, 2)
)
ctk.CTkLabel(filter_group, textvariable=self.filter_loaded_folder_var, anchor="w").grid(
row=1, column=0, columnspan=4, sticky="ew", padx=8, pady=(0, 4)
)
ctk.CTkLabel(filter_group, text="Sender contains").grid(row=2, column=0, sticky="w", padx=8, pady=(0, 4))
self.entry_filter_sender = ctk.CTkEntry(filter_group, textvariable=self.filter_sender_var, state="disabled")
self.entry_filter_sender.grid(row=2, column=1, sticky="ew", padx=(0, 8), pady=(0, 4))
self.entry_filter_sender.bind("<KeyRelease>", self._on_live_filter_input)
ctk.CTkLabel(filter_group, text="Subject contains").grid(row=2, column=2, sticky="w", padx=8, pady=(0, 4))
self.entry_filter_subject = ctk.CTkEntry(filter_group, textvariable=self.filter_subject_var, state="disabled")
self.entry_filter_subject.grid(row=2, column=3, sticky="ew", padx=(0, 8), pady=(0, 4))
self.entry_filter_subject.bind("<KeyRelease>", self._on_live_filter_input)
ctk.CTkLabel(filter_group, text="Period").grid(row=3, column=0, sticky="w", padx=8, pady=(0, 4))
self.combo_filter_date_range = ctk.CTkComboBox(
filter_group,
variable=self.filter_date_range_var,
values=["", "Last week", "Last month", "Last 6 months", "Last year"],
command=lambda _value: self.schedule_live_filter(),
state="disabled",
)
self.combo_filter_date_range.grid(row=3, column=1, sticky="w", padx=(0, 8), pady=(0, 4))
filter_options = ctk.CTkFrame(filter_group, fg_color="transparent")
filter_options.grid(row=4, column=0, columnspan=4, sticky="ew", padx=8, pady=(0, 8))
filter_options.grid_columnconfigure(3, weight=1)
self.chk_filter_export_new = ctk.CTkCheckBox(
filter_options,
text="New",
variable=self.filter_export_new_var,
command=self.schedule_live_filter,
state="disabled",
)
self.chk_filter_export_new.grid(row=0, column=0, sticky="w", padx=(0, 12))
self.chk_filter_export_with_attachments = ctk.CTkCheckBox(
filter_options,
text="With attachments",
variable=self.filter_export_with_attachments_var,
command=self.schedule_live_filter,
state="disabled",
)
self.chk_filter_export_with_attachments.grid(row=0, column=1, sticky="w", padx=(0, 12))
self.chk_filter_export_selected = ctk.CTkCheckBox(
filter_options,
text="Selected",
variable=self.filter_export_selected_var,
command=self.schedule_live_filter,
state="disabled",
)
self.chk_filter_export_selected.grid(row=0, column=2, sticky="w", padx=(0, 12))
actions_group = ctk.CTkFrame(right_bottom)
actions_group.grid(row=0, column=4, sticky="nsew", padx=(5, 10), pady=(10, 8))
actions_group.grid_columnconfigure(0, weight=1)
actions_group.grid_rowconfigure(4, weight=1)
ctk.CTkLabel(actions_group, text="Actions", font=ctk.CTkFont(size=15, weight="bold")).grid(
row=0, column=0, sticky="w", padx=8, pady=(8, 4)
)
self.btn_load_headers = ctk.CTkButton(
actions_group,
text="Load preview",
width=130,
command=self.load_folder_for_filtering,
state="disabled",
)
self.btn_load_headers.grid(row=1, column=0, sticky="ew", padx=8, pady=(0, 6))
self.btn_apply_filters = ctk.CTkButton(
actions_group,
text="Apply filters",
command=self.apply_email_filters,
width=120,
state="disabled",
)
self.btn_apply_filters.grid(row=2, column=0, sticky="ew", padx=8, pady=(0, 6))
self.btn_reset_filters = ctk.CTkButton(
actions_group,
text="Reset filters",
command=self.reset_email_filters,
width=110,
state="disabled",
)
self.btn_reset_filters.grid(row=3, column=0, sticky="ew", padx=8, pady=(0, 8))
table_frame = ctk.CTkFrame(right_bottom)
table_frame.grid(row=4, column=0, columnspan=5, sticky="nsew", padx=8, pady=(0, 8))
table_frame.grid_columnconfigure(0, weight=1)
table_frame.grid_rowconfigure(0, weight=1)
columns = ("date", "sender", "subject", "attachments", "types", "state")
self.filter_tree = ttk.Treeview(table_frame, columns=columns, show="headings", selectmode="extended")
self.filter_tree.grid(row=0, column=0, sticky="nsew")
self.filter_tree.tag_configure("odd", background="#f8fafc")
self.filter_tree.tag_configure("even", background="#e5e7eb")
self.filter_tree.heading("date", text="Date")
self.filter_tree.heading("sender", text="Sender")
self.filter_tree.heading("subject", text="Subject")
self.filter_tree.heading("attachments", text="Att.")
self.filter_tree.heading("types", text="Type")
self.filter_tree.heading("state", text="State")
self.filter_tree.column("date", width=150, anchor="w")
self.filter_tree.column("sender", width=220, anchor="w")
self.filter_tree.column("subject", width=430, anchor="w")
self.filter_tree.column("attachments", width=36, minwidth=32, anchor="center", stretch=False)
self.filter_tree.column("types", width=120, minwidth=90, anchor="center")
self.filter_tree.column("state", width=135, minwidth=120, anchor="center")
filter_yscroll = ttk.Scrollbar(table_frame, orient="vertical", command=self.filter_tree.yview)
filter_yscroll.grid(row=0, column=1, sticky="ns")
filter_xscroll = ttk.Scrollbar(table_frame, orient="horizontal", command=self.filter_tree.xview)
filter_xscroll.grid(row=1, column=0, sticky="ew")
self.filter_tree.configure(yscrollcommand=filter_yscroll.set, xscrollcommand=filter_xscroll.set)
self.filter_tree.bind("<<TreeviewSelect>>", self._on_filter_tree_select)
preview_actions = ctk.CTkFrame(right_bottom, fg_color="transparent")
preview_actions.grid(row=5, column=0, columnspan=5, sticky="ew", padx=8, pady=(0, 8))
preview_actions.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(preview_actions, textvariable=self.filter_status_var, anchor="w").grid(
row=0, column=0, sticky="ew", padx=(0, 10), pady=0
)
self.btn_export_preview = ctk.CTkButton(
preview_actions,
text="Export preview",
width=150,
command=self.export_preview_subset,
state="disabled",
)
self.btn_export_preview.grid(row=0, column=1, sticky="e", pady=0)
status = ctk.CTkFrame(self)
status.grid(row=2, column=0, sticky="ew", padx=8, pady=(0, 8))
status.grid_columnconfigure(0, weight=1)
status.grid_columnconfigure(1, weight=0)
status.grid_columnconfigure(2, weight=0)
status.grid_columnconfigure(3, weight=0)
ctk.CTkLabel(status, textvariable=self.status_var, anchor="w").grid(
row=0, column=0, sticky="ew", padx=8, pady=6
)
self.btn_status_log = ctk.CTkButton(status, text="Open log", width=90, command=self.toggle_log_window)
self.btn_status_log.grid(row=0, column=1, sticky="e", padx=(0, 8), pady=6)
self.btn_cancel = ctk.CTkButton(
status,
text="Cancel export",
command=self.cancel_export,
state="disabled",
width=150,
fg_color="#9b2c2c",
hover_color="#7f1d1d",
)
self.btn_cancel.grid(row=0, column=2, sticky="e", padx=(0, 8), pady=6)
self.btn_exit = ctk.CTkButton(status, text="Exit", width=100, command=self.on_close)
self.btn_exit.grid(row=0, column=3, sticky="e", padx=8, pady=6)
self.busy_overlay = ctk.CTkFrame(self, corner_radius=10, border_width=1, height=150)
self.busy_overlay.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(
self.busy_overlay,
textvariable=self.busy_title_var,
font=ctk.CTkFont(size=18, weight="bold"),
).grid(row=0, column=0, sticky="ew", padx=18, pady=(18, 8))
ctk.CTkLabel(
self.busy_overlay,
textvariable=self.busy_detail_var,
justify="center",
).grid(row=1, column=0, sticky="ew", padx=18, pady=(0, 10))
self.busy_progress = ctk.CTkProgressBar(self.busy_overlay, mode="indeterminate")
self.busy_progress.grid(row=2, column=0, sticky="ew", padx=18, pady=(0, 18))
self.busy_overlay.place_forget()
self._update_filter_controls_state()
def _install_tooltips(self) -> None:
tooltip_specs = [
(self.btn_connect, "Connect to the already running Outlook session and load the available accounts."),
(self.btn_refresh, "Reload the Outlook folder tree from the current session."),
(self.btn_expand, "Expand all folders that are already loaded in the tree."),
(self.btn_help, "Open the quick help window."),
(self.btn_browse, "Choose the destination folder for export files and the manifest."),
(self.btn_export, "Export the selected folder and all subfolders immediately, without using the preview."),
(self.btn_load_headers, "Load a lightweight preview of the emails in the selected folder."),
(self.btn_apply_filters, "Apply the current filters to the loaded preview."),
(self.btn_reset_filters, "Clear preview filters and show all loaded emails again."),
(self.btn_export_preview, "Export the subset of emails currently visible in the filtered preview."),
(self.btn_status_log, "Open or close the application log window."),
(self.btn_cancel, "Request cancellation of the long-running operation in progress."),
(self.btn_exit, "Close the application and try to shut it down cleanly."),
(self.entry_output, "Destination folder used for exported files and the export manifest."),
(self.entry_filter_sender, "Filter the preview by sender name or sender address."),
(self.entry_filter_subject, "Filter the preview by words contained in the email subject."),
(self.combo_filter_date_range, "Limit the preview to a recent time range."),
(self.chk_zip, "Create a ZIP archive as export output."),
(self.chk_hierarchy, "Create folders and files directly on the file system."),
(self.chk_filter_export_new, "Show only emails that are not yet exported."),
(self.chk_filter_export_with_attachments, "Show only emails that have Outlook attachments."),
(self.chk_filter_export_selected, "Limit the preview and export to the rows currently selected."),
]
self.tooltips = [ToolTip(widget, text) for widget, text in tooltip_specs]
def connect_outlook(self) -> None:
try:
self.set_status("Connecting to Outlook...")
self.log("Connecting to Outlook...")
self.outlook_service.connect()
self.connected = True
self.log("Connected to Outlook.")
self.reload_tree()
self.btn_refresh.configure(state="normal")
self.btn_expand.configure(state="normal")
self.set_status("Connected to Outlook")
except Exception as exc:
self.connected = False
self.log(f"Outlook connection error: {exc}")
self.set_status("Connection error")
messagebox.showerror(APP_TITLE, f"Unable to connect to Outlook.\n\n{exc}")
def reload_tree(self) -> None:
if not self.connected:
return
if self.filter_live_after_id is not None:
try:
self.after_cancel(self.filter_live_after_id)
except Exception:
pass
self.filter_live_after_id = None
self.tree.delete(*self.tree.get_children())
self.tree_folder_map.clear()
self.tree_path_map.clear()
self.empty_folder_alert_shown.clear()
self.selected_folder_obj = None
self.loaded_email_headers = []
self.filtered_email_headers = []
self.filter_tree_key_map = {}
self.filter_loaded_folder_text = ""
self.filter_loaded_folder_var.set("No folder loaded")
self.filter_status_var.set("No emails loaded")
self.selected_folder_display.set("No folder selected")
self.btn_export.configure(state="disabled")
self.refresh_filter_tree()
self._update_filter_controls_state()
for root_node in self.outlook_service.list_root_folders():
node_text = NameCodec.tree_label(root_node.display_name or root_node.path, root_node.mail_count)
node_id = self.tree.insert("", "end", text=node_text, open=False, tags=("root",))
self.tree_folder_map[node_id] = root_node.folder_obj
self.tree_path_map[node_id] = root_node.path
self._insert_dummy(node_id)
self.log("Folder tree reloaded.")
def _insert_dummy(self, node_id: str) -> None:
self.tree.insert(node_id, "end", text="__dummy__")
def on_tree_open(self, _event=None) -> None:
selected = self.tree.focus()
if not selected:
return
children = self.tree.get_children(selected)
if len(children) == 1 and self.tree.item(children[0], "text") == "__dummy__":
self.tree.delete(children[0])
folder_obj = self.tree_folder_map.get(selected)
current_path = self.tree_path_map.get(selected, self.tree.item(selected, "text"))
try:
child_nodes = self.outlook_service.list_children(folder_obj, current_path)
for idx, node in enumerate(child_nodes, start=1):
node_text = NameCodec.tree_label(node.display_name or node.path, node.mail_count)
has_children = False
try:
has_children = bool(node.folder_obj.Folders.Count > 0)
except Exception:
has_children = False
if has_children:
row_tag = "branch_even" if idx % 2 == 0 else "branch_odd"
else:
row_tag = "even" if idx % 2 == 0 else "odd"
child_id = self.tree.insert(selected, "end", text=node_text, open=False, tags=(row_tag,))
self.tree_folder_map[child_id] = node.folder_obj
self.tree_path_map[child_id] = node.path
try:
if has_children:
self._insert_dummy(child_id)
except Exception:
pass
except Exception as exc:
self.log(f"Error loading subfolders: {exc}")
def on_tree_select(self, _event=None) -> None:
selected = self.tree.selection()
if not selected:
return
node_id = selected[0]
folder_obj = self.tree_folder_map.get(node_id)
self.selected_folder_obj = folder_obj
folder_path = self.tree_path_map.get(node_id, self.tree.item(node_id, "text"))
if folder_obj is None:
self.selected_folder_display.set("No folder selected")
self.btn_export.configure(state="disabled")
return
self.selected_folder_display.set(f"Selected: {folder_path}")
self.btn_export.configure(state="normal")
self.btn_load_headers.configure(state="normal")
if folder_path != self.filter_loaded_folder_text:
self.loaded_email_headers = []
self.filtered_email_headers = []
self.filter_tree_key_map = {}
self.filter_loaded_folder_text = ""
self.filter_loaded_folder_var.set(f"Selected folder: {folder_path}")
self.filter_status_var.set("No emails loaded for the selected folder")
self.refresh_filter_tree()
self._update_filter_controls_state()
self.maybe_show_empty_folder_server_alert(node_id, folder_obj)
def maybe_show_empty_folder_server_alert(self, node_id: str, folder_obj: object) -> None:
if self.tree.parent(node_id) == "":
return
if node_id in self.empty_folder_alert_shown:
return
diagnostics = OutlookService.folder_item_diagnostics(folder_obj)
mail_count = OutlookService.count_mail_items(folder_obj)
if mail_count != 0:
return
self.empty_folder_alert_shown.add(node_id)
self.log_empty_folder_diagnostics(diagnostics)
self.show_empty_folder_server_alert()
def log_empty_folder_diagnostics(self, diagnostics: dict) -> None:
self.log("Empty folder diagnostics via Outlook COM:")
self.log(f" folder_name: {diagnostics.get('folder_name')}")
self.log(f" folder_path: {diagnostics.get('folder_path')}")
self.log(f" raw Items.Count: {diagnostics.get('raw_count')}")
self.log(f" Restrict IPM.Note Count: {diagnostics.get('restricted_count')}")
self.log(f" items_error: {diagnostics.get('items_error')}")
self.log(f" restrict_error: {diagnostics.get('restrict_error')}")
self.log(f" store_id: {diagnostics.get('store_id')}")
def show_empty_folder_server_alert(self) -> None:
dialog = ctk.CTkToplevel(self)
dialog.title("Empty Outlook folder")
dialog.geometry("720x430")
dialog.minsize(640, 380)
dialog.transient(self)
dialog.grab_set()
dialog.grid_columnconfigure(0, weight=0)
dialog.grid_columnconfigure(1, weight=1)
dialog.grid_rowconfigure(1, weight=1)
alert = ctk.CTkLabel(
dialog,
text="!",
width=72,
height=72,
corner_radius=36,
fg_color="#b45309",
text_color="white",
font=ctk.CTkFont(size=42, weight="bold"),
)
alert.grid(row=0, column=0, rowspan=2, sticky="n", padx=(24, 16), pady=(24, 12))
ctk.CTkLabel(
dialog,
text="The selected folder appears to be empty",
font=ctk.CTkFont(size=22, weight="bold"),
anchor="w",
).grid(row=0, column=1, sticky="ew", padx=(0, 24), pady=(26, 8))
message = (
"The selected folder looks empty, but there may still be emails on the Outlook server "
"that are not yet synchronized locally.\n\n"
"To synchronize, go to:\n\n"
"File -> Account settings -> Sync settings and Account -> All\n\n"
"Then wait for synchronization to complete and try the export again.\n\n"
"Note: if Outlook shows a link such as 'there are more items on the server', the program can "
"only see emails that are already synchronized locally."
)
ctk.CTkLabel(dialog, text=message, justify="left", anchor="nw", wraplength=560).grid(
row=1, column=1, sticky="nsew", padx=(0, 24), pady=(0, 16)
)
buttons = ctk.CTkFrame(dialog, fg_color="transparent")
buttons.grid(row=2, column=0, columnspan=2, sticky="ew", padx=24, pady=(0, 24))
buttons.grid_columnconfigure(0, weight=1)
buttons.grid_columnconfigure(1, weight=0)
buttons.grid_columnconfigure(2, weight=0)
def open_outlook() -> None:
try:
os.startfile("outlook")
except Exception as exc:
messagebox.showerror(APP_TITLE, f"Unable to open Outlook.\n\n{exc}")
ctk.CTkButton(buttons, text="Open Outlook", width=140, command=open_outlook).grid(
row=0, column=1, sticky="e", padx=(0, 12)
)
ctk.CTkButton(
buttons,
text="Close",
width=120,
command=dialog.destroy,
fg_color="#5f6368",
hover_color="#4b4f52",
).grid(row=0, column=2, sticky="e")
dialog.protocol("WM_DELETE_WINDOW", dialog.destroy)
dialog.after(100, dialog.focus_force)
def expand_all(self) -> None:
def _expand(node_id: str) -> None:
self.tree.item(node_id, open=True)
self.tree.focus(node_id)
self.on_tree_open()
for child in self.tree.get_children(node_id):
if self.tree.item(child, "text") != "__dummy__":
_expand(child)
for root in self.tree.get_children(""):
_expand(root)
self.log("Tree expansion completed.")
def choose_output_dir(self) -> None:
selected = filedialog.askdirectory(initialdir=self.output_dir_var.get() or str(Path.home()))
if selected:
self.output_dir_var.set(selected)
self.prefs.set_output_dir(selected)
def load_folder_for_filtering(self) -> None:
if self.selected_folder_obj is None:
messagebox.showwarning(APP_TITLE, "Select an Outlook folder first.")
return
if self.filter_thread is not None and self.filter_thread.is_alive():
messagebox.showwarning(APP_TITLE, "An email loading task is already in progress.")
return
folder_text = self.selected_folder_display.get().replace("Selected: ", "")
self.set_busy_overlay(
True,
"Please wait... loading many emails may take a few minutes",
f"Loading email headers for:\n{folder_text}",
)
self._set_controls_enabled(False)
self.filter_status_var.set("Loading in progress...")
output_dir = self.output_dir_var.get().strip()
self.filter_thread = threading.Thread(
target=self._run_filter_load_worker,
args=(self._marshal_folder_for_worker(self.selected_folder_obj), folder_text, output_dir),
daemon=True,
)
self.filter_thread.start()
@staticmethod
def _format_elapsed(seconds: float) -> str:
total_seconds = max(0, int(round(seconds)))
minutes, secs = divmod(total_seconds, 60)
return f"{minutes}m {secs:02d}s"
def on_export_mode_change(self, selected_mode: str) -> None:
if selected_mode == "zip":
self.create_zip_var.set(1)
self.create_hierarchy_var.set(0)
else:
self.create_zip_var.set(0)
self.create_hierarchy_var.set(1)
if not self.create_zip_var.get() and not self.create_hierarchy_var.get():
self.create_zip_var.set(1)
def _build_export_options(self, export_mode: str) -> ExportOptions:
if export_mode == EXPORT_MODE_WITH_ATTACHMENTS:
return ExportOptions(mode=export_mode, only_with_attachments=True)
if export_mode == EXPORT_MODE_FILTER:
return ExportOptions(mode=export_mode, only_with_attachments=False)
return ExportOptions(mode=EXPORT_MODE_ALL, only_with_attachments=False)
def _prepare_manifest_for_export(
self,
output_root: Path,
relative_path: str,
folder_text: str,
zip_mode: bool,
) -> bool:
manifest = ExportManifest(output_root)
folder_key = folder_text
scope = manifest.get_export_scope(folder_key)
expected_path = output_root / relative_path
record_count = manifest.folder_record_count(folder_key)
self.log(
"Pre-export manifest check: "
f"folder_key={folder_key} | scope={'yes' if scope is not None else 'no'} | "
f"record_count={record_count} | expected_path={expected_path}"
)
if scope is None:
if record_count <= 0:
return True
last_path = expected_path
target_label = relative_path or folder_text
else:
last_path_text = str(scope.get("lp") or "").strip()
last_path = Path(last_path_text) if last_path_text else expected_path
target_label = scope.get("rp") or relative_path or folder_text
self.log(f"Checking previous export path: {last_path}")
if last_path.exists():
return True
recreate = messagebox.askyesno(
APP_TITLE,
f"The previous export for the folder\n\n"
f"\"{target_label}\"\n\n"
"is no longer present on disk.\n\n"
"Do you want to recreate it?",
)
if not recreate:
return False
removed = manifest.clear_folder(folder_key)
manifest.save()
self.log(
f"Manifest cleaned to recreate the export for '{target_label}'. "
f"Removed records: {removed}"
)
return True
def _export_mode_label(self, export_mode: str) -> str:
labels = {
EXPORT_MODE_ALL: "Export all",
EXPORT_MODE_WITH_ATTACHMENTS: "Export emails with attachments only",
EXPORT_MODE_FILTER: "Export preview",
}
return labels.get(export_mode, export_mode)
def start_export(
self,
export_mode: str = EXPORT_MODE_ALL,
selected_keys: Optional[set[str]] = None,
selected_entry_refs: Optional[list[tuple[str, str]]] = None,
) -> None:
if self.export_thread is not None and self.export_thread.is_alive():
messagebox.showwarning(APP_TITLE, "An export is already in progress.")
return
if self.selected_folder_obj is None:
messagebox.showwarning(APP_TITLE, "Select an Outlook folder first.")
return
output_dir = self.output_dir_var.get().strip()
if not output_dir:
messagebox.showwarning(APP_TITLE, "Choose a valid output folder.")
return
output_root = Path(output_dir)
try:
output_root.mkdir(parents=True, exist_ok=True)
except Exception as exc:
messagebox.showerror(APP_TITLE, f"Unable to create the output folder.\n\n{exc}")
return
self.prefs.set_output_dir(str(output_root))
options = self._build_export_options(export_mode)
if selected_keys is not None:
if not selected_keys:
messagebox.showwarning(APP_TITLE, "Select at least one email from the filtered list.")
return
options.selected_keys = set(selected_keys)
if selected_entry_refs is not None:
if not selected_entry_refs:
messagebox.showwarning(APP_TITLE, "Select at least one email from the filtered list.")
return
options.selected_entry_refs = list(selected_entry_refs)
folder_text = self.selected_folder_display.get().replace("Selected: ", "")
options.folder_key = folder_text
relative_path = NameCodec.folder_name_from_outlook_path(folder_text)
zip_mode = bool(self.create_zip_var.get())
mode_label = self._export_mode_label(export_mode)
if not self._prepare_manifest_for_export(output_root, relative_path, folder_text, zip_mode):
return
if zip_mode:
confirm_message = (
f"Mode: {mode_label}\n\n"
"A ZIP archive will be created inside the output folder.\n"
"The temporary folder structure used to prepare the archive will be removed at the end.\n\n"
"Do you want to continue?"
)
else:
confirm_message = (
f"Mode: {mode_label}\n\n"
"A folder hierarchy will be created in the output folder.\n\n"
"Do you want to continue?"
)
if not messagebox.askyesno(APP_TITLE, confirm_message):
return
self._set_export_controls_running()
self.exporter = Exporter(self)
self.log("=" * 80)
self.log(f"Starting export: {folder_text}")
self.log(f"Action: {mode_label}")
self.log(f"Output mode: {'ZIP' if zip_mode else 'Folder hierarchy'}")
self.set_status("Export in progress...")
self.export_thread = threading.Thread(
target=self._run_export_worker,
args=(
self._marshal_folder_for_worker(self.selected_folder_obj),
output_root,
relative_path,
folder_text,
zip_mode,
options,
),
daemon=True,
)
self.export_thread.start()
def cancel_export(self) -> None:
self.exporter.cancel()
self.set_status("Cancellation requested...")
self.log("Cancellation request sent.")
def _set_controls_enabled(self, enabled: bool) -> None:
state = "normal" if enabled else "disabled"
export_state = state if enabled and self.selected_folder_obj else "disabled"
self.btn_export.configure(state=export_state)
self.btn_refresh.configure(state=state if self.connected else "disabled")
self.btn_connect.configure(state=state)
self.btn_expand.configure(state=state if self.connected else "disabled")
self.btn_browse.configure(state=state)
self.btn_exit.configure(state=state)
self.chk_zip.configure(state=state)
self.chk_hierarchy.configure(state=state)
self.btn_status_log.configure(state=state)
if enabled:
self._update_filter_controls_state()
else:
self.btn_load_headers.configure(state="disabled")
self.entry_filter_subject.configure(state="disabled")
self.entry_filter_sender.configure(state="disabled")
self.combo_filter_date_range.configure(state="disabled")
self.chk_filter_export_new.configure(state="disabled")
self.chk_filter_export_with_attachments.configure(state="disabled")
self.chk_filter_export_selected.configure(state="disabled")
self.btn_apply_filters.configure(state="disabled")
self.btn_reset_filters.configure(state="disabled")
self.btn_export_preview.configure(state="disabled")
def _set_export_controls_running(self) -> None:
self._set_controls_enabled(False)
self.btn_cancel.configure(state="normal")
self.set_busy_overlay(
True,
"Please wait... exporting many emails may take a few minutes",
"Export in progress...",
)
def _restore_export_controls(self) -> None:
self._set_controls_enabled(True)
self.btn_cancel.configure(state="disabled")
self.set_busy_overlay(False)
def _run_export_worker(
self,
folder_ref: object,
output_root: Path,
relative_path: str,
folder_text: str,
zip_mode: bool,
options: ExportOptions,
) -> None:
com_initialized = False
started_at = datetime.now()
try:
if pythoncom is not None:
pythoncom.CoInitialize()
com_initialized = True
manifest = ExportManifest(output_root)
self.exporter.configure(manifest, options)
self.log(f"Manifest export: {manifest.path}")
if zip_mode:
with tempfile.TemporaryDirectory(prefix="outlook_export_") as temp_dir:
temp_root = Path(temp_dir)
temp_export_target = temp_root / relative_path
self.log(f"Destinazione temporanea: {temp_export_target}")
if options.selected_entry_refs is not None:
namespace = OutlookService.get_namespace_for_worker()
mails, attachments, skipped = self.exporter.export_selected_items(
namespace,
options.selected_entry_refs,
temp_export_target,
)
else:
folder_obj = self._resolve_worker_folder(folder_ref)
mails, attachments, skipped = self.exporter.export_folder(folder_obj, temp_export_target)
self.exporter.check_cancel()
zip_path = self._create_zip_archive(temp_root, output_root, folder_text)
result_path = zip_path
else:
export_target = output_root / relative_path
self.log(f"Destinazione: {export_target}")
if options.selected_entry_refs is not None:
namespace = OutlookService.get_namespace_for_worker()
mails, attachments, skipped = self.exporter.export_selected_items(
namespace,
options.selected_entry_refs,
export_target,
)
else:
folder_obj = self._resolve_worker_folder(folder_ref)
mails, attachments, skipped = self.exporter.export_folder(folder_obj, export_target)
result_path = export_target
manifest.update_export_scope(options.folder_key, relative_path, result_path, zip_mode)
manifest.save()
elapsed_seconds = (datetime.now() - started_at).total_seconds()
self.ui_queue.put(("complete", mails, attachments, skipped, result_path, zip_mode, options.mode, elapsed_seconds))
except ExportCancelled:
self.ui_queue.put(("cancelled",))
except Exception as exc:
self.ui_queue.put(("error", str(exc), traceback.format_exc()))
finally:
if com_initialized:
try:
pythoncom.CoUninitialize()
except Exception:
pass
def _run_filter_load_worker(self, folder_ref: object, folder_text: str, output_dir: str) -> None:
com_initialized = False
started_at = datetime.now()
try:
if pythoncom is not None:
pythoncom.CoInitialize()
com_initialized = True
folder_obj = self._resolve_worker_folder(folder_ref)
headers = self._load_email_headers(folder_obj, output_dir)
elapsed_seconds = (datetime.now() - started_at).total_seconds()
self.ui_queue.put(("filter_loaded", folder_text, headers, elapsed_seconds))
except Exception as exc:
self.ui_queue.put(("filter_error", str(exc), traceback.format_exc()))
finally:
if com_initialized:
try:
pythoncom.CoUninitialize()
except Exception:
pass
def _load_email_headers(self, folder_obj: object, output_dir: str) -> List[EmailHeader]:
headers: List[EmailHeader] = []
output_root = Path(output_dir) if output_dir else None
manifest = ExportManifest(output_root) if output_root and output_root.exists() else None
items = getattr(folder_obj, "Items", None)
if items is None:
return headers
try:
items.Sort("[ReceivedTime]", True)
except Exception:
pass
try:
count = items.Count
except Exception:
count = 0
for idx in range(1, count + 1):
try:
item = items.Item(idx)
except Exception:
continue
try:
if getattr(item, "Class", None) != MAIL_CLASS:
continue
except Exception:
continue
subject_raw = getattr(item, "Subject", "") or ""
subject = Exporter._safe_str(subject_raw, max_len=180)
sender = Exporter._safe_str(
getattr(item, "SenderName", None) or getattr(item, "SenderEmailAddress", None) or "",
max_len=120,
)
entry_id = Exporter._safe_full_str(getattr(item, "EntryID", None))
parent = getattr(item, "Parent", None)
store_id = Exporter._safe_full_str(getattr(parent, "StoreID", None) if parent is not None else None)
received_dt = Exporter._convert_outlook_datetime(getattr(item, "ReceivedTime", None))
if received_dt is None:
received_dt = Exporter._convert_outlook_datetime(getattr(item, "SentOn", None))
received_text = received_dt.strftime("%Y-%m-%d %H:%M:%S") if received_dt else ""
attachment_count, attachment_types = self._collect_preview_attachment_info_minimal(item)
key = self._build_manifest_key_for_item(item, Exporter._safe_str(subject_raw), received_text)
exported = bool(manifest and manifest.has_record(key) and manifest.record_file_exists(key))
headers.append(
EmailHeader(
key=key,
entry_id=entry_id,
store_id=store_id,
received_time=received_text,
sender=sender,
subject=subject,
attachment_count=attachment_count,
attachment_types=attachment_types,
exported=exported,
)
)
if idx % 100 == 0:
self.ui_queue.put(
(
"status",
f"Loading emails... {idx}/{count} | found {len(headers)}",
)
)
return headers
def _refresh_loaded_header_export_flags(self) -> None:
if not self.loaded_email_headers:
return
output_dir = self.output_dir_var.get().strip()
if not output_dir:
return
output_root = Path(output_dir)
if not output_root.exists():
return
manifest = ExportManifest(output_root)
changed = False
for header in self.loaded_email_headers:
exported = bool(manifest.has_record(header.key) and manifest.record_file_exists(header.key))
if header.exported != exported:
header.exported = exported
changed = True
if changed:
self.apply_email_filters()
def _collect_preview_attachment_info(self, item: object, html_body: str) -> Tuple[int, str]:
attachments = getattr(item, "Attachments", None)
if attachments is None:
return 0, ""
try:
attachment_total = attachments.Count
except Exception:
return 0, ""
exported_exts: List[str] = []
for att_index in range(1, attachment_total + 1):
try:
attachment = attachments.Item(att_index)
except Exception:
continue
original_filename = Exporter._safe_str(getattr(attachment, "FileName", None) or f"attachment_{att_index}")
inline_score, _ = self.exporter._signature_image_score(attachment, original_filename, html_body)
if inline_score >= 3:
continue
if not NameCodec.is_allowed_attachment(original_filename):
continue
ext = NameCodec.sanitize_extension(os.path.splitext(original_filename or "")[1]).lstrip(".")
if ext:
exported_exts.append(ext.lower())
if not exported_exts:
return 0, ""
counts: dict[str, int] = {}
ordered_exts: List[str] = []
for ext in exported_exts:
if ext not in counts:
counts[ext] = 0
ordered_exts.append(ext)
counts[ext] += 1
summary = ", ".join(f"{ext}({counts[ext]})" for ext in ordered_exts)
return len(exported_exts), summary
@staticmethod
def _collect_preview_attachment_info_minimal(item: object) -> Tuple[int, str]:
attachments = getattr(item, "Attachments", None)
if attachments is None:
return 0, ""
try:
attachment_total = int(attachments.Count or 0)
except Exception:
return 0, ""
if attachment_total <= 0:
return 0, ""
return attachment_total, ""
def _build_manifest_key_for_item(self, item: object, subject: str, received_text: str) -> str:
entry_id = Exporter._safe_full_str(getattr(item, "EntryID", None))
parent = getattr(item, "Parent", None)
store_id = Exporter._safe_full_str(getattr(parent, "StoreID", None) if parent is not None else None)
return Exporter._build_manifest_key_from_ids(store_id, entry_id)
@staticmethod
def _get_mail_mapi_property(item: object) -> object:
try:
accessor = item.PropertyAccessor
return accessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x1035001F")
except Exception:
return None
@staticmethod
def _marshal_folder_for_worker(folder_obj: object) -> object:
if pythoncom is None:
return folder_obj
try:
ole_obj = getattr(folder_obj, "_oleobj_", None)
if ole_obj is None:
return folder_obj
return pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, ole_obj)
except Exception:
return folder_obj
@staticmethod
def _resolve_worker_folder(folder_ref: object) -> object:
if pythoncom is None or win32com is None:
return folder_ref
try:
dispatch = pythoncom.CoGetInterfaceAndReleaseStream(folder_ref, pythoncom.IID_IDispatch)
return win32com.client.Dispatch(dispatch)
except Exception:
return folder_ref
def _create_zip_archive(self, source_root: Path, output_root: Path, folder_text: str) -> Path:
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
base_name = NameCodec.archive_base_name_from_outlook_path(folder_text)
zip_path = Exporter._unique_path(output_root / f"{base_name}_{stamp}.zip")
self.log(f"Creating ZIP archive: {zip_path}")
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for path in source_root.rglob("*"):
self.exporter.check_cancel()
if path.is_file():
archive.write(path, path.relative_to(source_root).as_posix())
return zip_path
def process_ui_queue(self) -> None:
if self.shutting_down:
self.ui_queue_after_id = None
return
try:
while True:
event = self.ui_queue.get_nowait()
event_type = event[0]
if event_type == "log":
self._write_log(event[1])
elif event_type == "status":
self.status_var.set(event[1])
if self.busy_overlay_visible:
self.busy_detail_var.set(event[1])
elif event_type == "complete":
_, mails, attachments, skipped, result_path, zip_mode, export_mode, elapsed_seconds = event
self.last_export_duration_seconds = elapsed_seconds
elapsed_text = self._format_elapsed(elapsed_seconds)
self.set_status(
f"Export completed | emails: {mails} | attachments: {attachments} | skipped: {skipped} | time: {elapsed_text}"
)
self.log(
f"Export completed ({self._export_mode_label(export_mode)}). "
f"Emails: {mails}, Attachments: {attachments}, Skipped: {skipped}, Time: {elapsed_text}"
)
self.log(
"Export diagnostics summary: "
f"skipped_from_manifest={self.exporter.skipped_manifest_count}, "
f"skipped_without_attachments={self.exporter.skipped_no_attachments_count}, "
f"non_mail_items={self.exporter.non_mail_item_count}"
)
self._refresh_loaded_header_export_flags()
self._restore_export_controls()
self._flush_file_log()
self.show_export_complete_dialog(mails, attachments, skipped, result_path, zip_mode, export_mode, elapsed_text)
elif event_type == "cancelled":
self.set_status("Export cancelled")
self.log("Export cancelled by the user.")
self._restore_export_controls()
self._flush_file_log()
messagebox.showinfo(APP_TITLE, "Export cancelled.")
elif event_type == "filter_loaded":
_, folder_text, headers, elapsed_seconds = event
self.filter_thread = None
self.last_filter_load_duration_seconds = elapsed_seconds
elapsed_text = self._format_elapsed(elapsed_seconds)
self.loaded_email_headers = headers
self.filtered_email_headers = list(headers)
self.filter_loaded_folder_text = folder_text
self.filter_loaded_folder_var.set(f"Loaded folder: {folder_text}")
self.filter_status_var.set(
f"Loaded emails: {len(headers)} | time: {elapsed_text} | folder: {folder_text}"
)
self.set_status(f"Loading completed | emails: {len(headers)} | time: {elapsed_text}")
self.log(f"Preview loaded: {len(headers)} emails | folder: {folder_text} | time: {elapsed_text}")
self.set_busy_overlay(False)
self._set_controls_enabled(True)
self.apply_email_filters()
self._flush_file_log()
elif event_type == "filter_error":
_, message, details = event
self.filter_thread = None
self.set_status("Error while loading emails")
self.log(f"Email loading error: {message}")
self.log(details)
self.set_busy_overlay(False)
self._set_controls_enabled(True)
self._flush_file_log()
messagebox.showerror(APP_TITLE, f"Error while loading emails.\n\n{message}")
elif event_type == "error":
_, message, details = event
self.set_status("Error during export")
self.log(f"Export error: {message}")
self.log(details)
self._restore_export_controls()
self._flush_file_log()
messagebox.showerror(APP_TITLE, f"Error during export.\n\n{message}")
except queue.Empty:
pass
finally:
if not self.shutting_down:
self.ui_queue_after_id = self.after(100, self.process_ui_queue)
else:
self.ui_queue_after_id = None
def show_export_complete_dialog(
self,
mails: int,
attachments: int,
skipped: int,
export_target: Path,
zip_mode: bool,
export_mode: str,
elapsed_text: str,
) -> None:
dialog = ctk.CTkToplevel(self)
dialog.title("Export completed")
dialog.geometry("520x260")
dialog.resizable(False, False)
dialog.transient(self)
dialog.grab_set()
dialog.grid_columnconfigure(0, weight=1)
dialog.grid_rowconfigure(1, weight=1)
ctk.CTkLabel(
dialog,
text="Export completed",
font=ctk.CTkFont(size=20, weight="bold"),
).grid(row=0, column=0, sticky="w", padx=20, pady=(20, 8))
summary = (
f"Action: {self._export_mode_label(export_mode)}\n"
f"Exported emails: {mails}\n"
f"Exported attachments: {attachments}\n\n"
f"Skipped emails: {skipped}\n\n"
f"Elapsed time: {elapsed_text}\n\n"
f"Skipped from manifest: {self.exporter.skipped_manifest_count}\n"
f"Skipped without attachments: {self.exporter.skipped_no_attachments_count}\n"
f"Non-mail items detected: {self.exporter.non_mail_item_count}\n\n"
f"{'ZIP archive' if zip_mode else 'Folder'}:\n{export_target}"
)
summary_box = ctk.CTkTextbox(dialog, wrap="word")
summary_box.grid(row=1, column=0, sticky="nsew", padx=20, pady=(0, 16))
summary_box.insert("1.0", summary)
summary_box.configure(state="disabled")
buttons = ctk.CTkFrame(dialog, fg_color="transparent")
buttons.grid(row=2, column=0, sticky="ew", padx=20, pady=(0, 20))
buttons.grid_columnconfigure(0, weight=1)
buttons.grid_columnconfigure(1, weight=1)
buttons.grid_columnconfigure(2, weight=1)
def copy_summary() -> None:
try:
self.clipboard_clear()
self.clipboard_append(summary)
except Exception:
pass
def open_export_folder() -> None:
try:
target = export_target.parent if zip_mode else export_target
os.startfile(str(target))
except Exception as exc:
messagebox.showerror(APP_TITLE, f"Unable to open the export destination.\n\n{exc}")
finally:
dialog.destroy()
ctk.CTkButton(
buttons,
text="Copy report",
command=copy_summary,
height=38,
).grid(row=0, column=0, sticky="ew", padx=(0, 8))
ctk.CTkButton(
buttons,
text="Open zip folder" if zip_mode else "Open export folder",
command=open_export_folder,
height=38,
).grid(row=0, column=1, sticky="ew", padx=8)
ctk.CTkButton(
buttons,
text="Close",
command=dialog.destroy,
height=38,
fg_color="#5f6368",
hover_color="#4b4f52",
).grid(row=0, column=2, sticky="ew", padx=(8, 0))
dialog.protocol("WM_DELETE_WINDOW", dialog.destroy)
dialog.after(100, dialog.focus_force)
def set_busy_overlay(self, visible: bool, title: str = "", detail: str = "") -> None:
self.busy_overlay_visible = visible
if visible:
self.busy_title_var.set(title or "Please wait...")
self.busy_detail_var.set(detail or "")
self.busy_overlay.place(relx=0.5, rely=0.52, anchor="center", relwidth=0.42)
self.busy_overlay.lift()
try:
self.busy_progress.start()
except Exception:
pass
else:
try:
self.busy_progress.stop()
except Exception:
pass
self.busy_overlay.place_forget()
def apply_email_filters(self, autosize: bool = True) -> None:
if not self.loaded_email_headers:
self.filtered_email_headers = []
self.refresh_filter_tree(autosize=autosize)
return
subject_filter = self.filter_subject_var.get().strip().lower()
sender_filter = self.filter_sender_var.get().strip().lower()
date_range = self.filter_date_range_var.get().strip()
only_new = bool(self.filter_export_new_var.get())
only_with_attachments = bool(self.filter_export_with_attachments_var.get())
only_selected = bool(self.filter_export_selected_var.get())
selected_keys: set[str] = set()
if only_selected and self.filter_tree is not None:
selected_keys = {
self.filter_tree_key_map[item_id]
for item_id in self.filter_tree.selection()
if item_id in self.filter_tree_key_map
}
filtered: List[EmailHeader] = []
for header in self.loaded_email_headers:
if subject_filter and subject_filter not in header.subject.lower():
continue
if sender_filter and sender_filter not in header.sender.lower():
continue
if date_range and not self._header_matches_date_range(header, date_range):
continue
if only_new and header.exported:
continue
if only_with_attachments and header.attachment_count <= 0:
continue
if only_selected and header.key not in selected_keys:
continue
filtered.append(header)
self.filtered_email_headers = filtered
self.refresh_filter_tree(autosize=autosize)
def reset_email_filters(self) -> None:
if self.filter_live_after_id is not None:
try:
self.after_cancel(self.filter_live_after_id)
except Exception:
pass
self.filter_live_after_id = None
self.filter_subject_var.set("")
self.filter_sender_var.set("")
self.filter_date_range_var.set("")
self.filter_export_new_var.set(0)
self.filter_export_with_attachments_var.set(0)
self.filter_export_selected_var.set(0)
self.apply_email_filters()
def _on_live_filter_input(self, _event=None) -> None:
self.schedule_live_filter()
def _on_filter_tree_select(self, _event=None) -> None:
self._update_filter_controls_state()
if self.filter_export_selected_var.get():
self.schedule_live_filter()
def schedule_live_filter(self) -> None:
if not self.loaded_email_headers:
return
if self.filter_live_after_id is not None:
try:
self.after_cancel(self.filter_live_after_id)
except Exception:
pass
self.filter_live_after_id = None
self.filter_live_after_id = self.after(self.filter_live_delay_ms, self._apply_live_filters)
def _apply_live_filters(self) -> None:
self.filter_live_after_id = None
self.apply_email_filters(autosize=False)
@staticmethod
def _header_matches_date_range(header: EmailHeader, date_range: str) -> bool:
received_text = (header.received_time or "").strip()
if not received_text:
return False
try:
received_dt = datetime.strptime(received_text, "%Y-%m-%d %H:%M:%S")
except Exception:
return False
now = datetime.now()
range_days = {
"Last week": 7,
"Last month": 30,
"Last 6 months": 183,
"Last year": 365,
}
days = range_days.get(date_range)
if days is None:
return True
return received_dt >= now - timedelta(days=days)
def _get_export_headers_from_preview(self) -> List[EmailHeader]:
return list(self.filtered_email_headers)
def refresh_filter_tree(self, autosize: bool = True) -> None:
if self.filter_tree is None:
return
self.filter_tree.delete(*self.filter_tree.get_children())
self.filter_tree_key_map = {}
for idx, header in enumerate(self.filtered_email_headers, start=1):
state = "Exported" if header.exported else "New"
item_id = f"mail_{idx}"
row_tag = "even" if idx % 2 == 0 else "odd"
self.filter_tree.insert(
"",
"end",
iid=item_id,
tags=(row_tag,),
values=(
header.received_time,
header.sender,
header.subject,
header.attachment_count,
header.attachment_types,
state,
),
)
self.filter_tree_key_map[item_id] = header.key
if autosize:
self._autosize_preview_columns()
exported = sum(1 for item in self.filtered_email_headers if item.exported)
self.filter_status_var.set(
f"Visible emails: {len(self.filtered_email_headers)} | already exported: {exported} | new: {len(self.filtered_email_headers) - exported}"
)
self._update_filter_controls_state()
def _autosize_preview_columns(self) -> None:
if self.filter_tree is None:
return
try:
body_font = tkfont.nametofont("TkDefaultFont")
except Exception:
body_font = None
try:
heading_font = tkfont.nametofont("TkHeadingFont")
except Exception:
heading_font = body_font
column_limits = {
"date": (110, 190),
"sender": (140, 360),
"subject": (180, 900),
"attachments": (32, 60),
"types": (90, 260),
"state": (120, 180),
}
sample_size = 400
for column_id in ("date", "sender", "subject", "attachments", "types", "state"):
header_text = self.filter_tree.heading(column_id, "text") or ""
if heading_font is not None:
max_width = heading_font.measure(str(header_text))
else:
max_width = max(40, len(str(header_text)) * 8)
children = self.filter_tree.get_children("")
for item_id in children[:sample_size]:
values = self.filter_tree.item(item_id, "values")
column_index = self.filter_tree["columns"].index(column_id)
if column_index >= len(values):
continue
value_text = str(values[column_index])
width = body_font.measure(value_text) if body_font is not None else max(20, len(value_text) * 8)
if width > max_width:
max_width = width
min_width, max_allowed = column_limits[column_id]
final_width = max(min_width, min(max_allowed, max_width + 24))
self.filter_tree.column(column_id, width=final_width, minwidth=min_width)
def export_preview_subset(self) -> None:
headers_to_export = self._get_export_headers_from_preview()
if not headers_to_export:
messagebox.showwarning(APP_TITLE, "There are no emails in the current preview to export.")
return
selected_keys = {header.key for header in headers_to_export}
selected_refs = []
for header in headers_to_export:
if not header.entry_id:
continue
selected_refs.append((header.store_id, header.entry_id))
self.start_export(EXPORT_MODE_FILTER, selected_keys=selected_keys, selected_entry_refs=selected_refs)
def open_help_window(self) -> None:
if self.help_window is not None:
try:
if self.help_window.winfo_exists():
self.help_window.deiconify()
self.help_window.lift()
self.help_window.attributes("-topmost", True)
self.help_window.after(200, lambda: self.help_window.attributes("-topmost", False))
self.help_window.update_idletasks()
self.help_window.focus_force()
return
except Exception:
pass
dialog = ctk.CTkToplevel(self)
dialog.withdraw()
dialog.title(HELP_WINDOW_TITLE)
dialog.geometry(self._suggest_log_window_geometry())
dialog.minsize(560, 440)
self.help_window = dialog
dialog.grid_columnconfigure(0, weight=1)
dialog.grid_rowconfigure(1, weight=1)
ctk.CTkLabel(
dialog,
text="Quick help",
font=ctk.CTkFont(size=20, weight="bold"),
anchor="w",
).grid(row=0, column=0, sticky="ew", padx=14, pady=(14, 8))
self.help_window_box = ctk.CTkTextbox(dialog, wrap="word")
self.help_window_box.grid(row=1, column=0, sticky="nsew", padx=14, pady=(0, 14))
self.help_window_box.insert("1.0", HELP_TEXT)
self.help_window_box.configure(state="disabled")
self.help_window_box.see("1.0")
dialog.protocol("WM_DELETE_WINDOW", self.close_help_window)
dialog.deiconify()
dialog.lift()
dialog.attributes("-topmost", True)
dialog.after(200, lambda: dialog.attributes("-topmost", False))
dialog.after(100, dialog.focus_force)
def close_help_window(self) -> None:
if self.help_window is None:
return
try:
self.help_window.destroy()
except Exception:
pass
self.help_window = None
self.help_window_box = None
def log(self, message: str) -> None:
if threading.get_ident() != self.main_thread_id:
self.ui_queue.put(("log", message))
return
self._write_log(message)
def _init_file_log(self) -> None:
try:
self.log_file_path.parent.mkdir(parents=True, exist_ok=True)
header = (
f"MailExporter log started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Log file: {self.log_file_path}\n"
"=" * 80
+ "\n"
)
self.log_file_path.write_text(header, encoding="utf-8", errors="replace")
except Exception:
pass
def _write_log(self, message: str) -> None:
timestamp = datetime.now().strftime("%H:%M:%S")
line = f"[{timestamp}] {message}\n"
self._append_file_log(line)
self.log_lines.append(line)
if len(self.log_lines) > MAX_LOG_LINES:
self.log_lines = self.log_lines[-MAX_LOG_LINES:]
if self.log_window_box is not None:
try:
self.log_window_box.configure(state="normal")
self.log_window_box.delete("1.0", "end")
self.log_window_box.insert("1.0", "".join(self.log_lines))
self.log_window_box.see("end")
self.log_window_box.configure(state="disabled")
except Exception:
self.log_window_box = None
self.pump_ui()
def open_log_window(self) -> None:
if self.log_window is not None:
try:
if self.log_window.winfo_exists():
self.log_window.deiconify()
self.log_window.lift()
self.log_window.attributes("-topmost", True)
self.log_window.after(200, lambda: self.log_window.attributes("-topmost", False))
self.log_window.update_idletasks()
self.log_window.focus_force()
if hasattr(self, "btn_status_log"):
self.btn_status_log.configure(text="Close log")
return
except Exception:
pass
dialog = ctk.CTkToplevel(self)
dialog.withdraw()
dialog.title("Export log")
dialog.geometry(self._suggest_log_window_geometry())
dialog.minsize(520, 400)
self.log_window = dialog
if hasattr(self, "btn_status_log"):
self.btn_status_log.configure(text="Close log")
dialog.grid_columnconfigure(0, weight=1)
dialog.grid_rowconfigure(0, weight=1)
self.log_window_box = ctk.CTkTextbox(dialog, wrap="word")
self.log_window_box.grid(row=0, column=0, sticky="nsew", padx=12, pady=12)
self.log_window_box.insert("1.0", "".join(self.log_lines))
self.log_window_box.configure(state="disabled")
self.log_window_box.see("end")
dialog.protocol("WM_DELETE_WINDOW", self.close_log_window)
dialog.deiconify()
dialog.lift()
dialog.attributes("-topmost", True)
dialog.after(200, lambda: dialog.attributes("-topmost", False))
dialog.after(50, dialog.deiconify)
dialog.after(100, dialog.lift)
dialog.after(150, dialog.focus_force)
def toggle_log_window(self) -> None:
if self.log_window is not None:
try:
if self.log_window.winfo_exists():
self.close_log_window()
return
except Exception:
pass
self.open_log_window()
def close_log_window(self) -> None:
if self.log_window is None:
return
try:
self.log_window.destroy()
except Exception:
pass
self.log_window = None
self.log_window_box = None
if hasattr(self, "btn_status_log"):
try:
self.btn_status_log.configure(text="Open log")
except Exception:
pass
def _suggest_log_window_geometry(self) -> str:
try:
self.update_idletasks()
width = 620
height = 760
screen_width = max(800, int(self.winfo_screenwidth() or 0))
screen_height = max(600, int(self.winfo_screenheight() or 0))
main_x = int(self.winfo_x())
main_y = int(self.winfo_y())
main_width = max(800, int(self.winfo_width() or 0))
preferred_x = main_x + main_width + 20
max_x = max(0, screen_width - width - 20)
x = min(preferred_x, max_x)
y = max(0, min(main_y, screen_height - height - 60))
return f"{width}x{height}+{x}+{y}"
except Exception:
return "620x760"
def _update_filter_controls_state(self) -> None:
has_selection = self.selected_folder_obj is not None
has_headers = bool(self.loaded_email_headers)
if hasattr(self, "btn_load_headers"):
self.btn_load_headers.configure(
state="normal" if has_selection and self.connected and self.filter_thread is None else "disabled"
)
field_state = "normal" if has_headers else "disabled"
button_state = "normal" if has_headers else "disabled"
export_base_state = "normal" if has_selection and self.connected and self.filter_thread is None else "disabled"
self.entry_filter_subject.configure(state=field_state)
self.entry_filter_sender.configure(state=field_state)
self.combo_filter_date_range.configure(state="readonly" if has_headers else "disabled")
self.chk_filter_export_new.configure(state=field_state)
self.chk_filter_export_with_attachments.configure(state=field_state)
self.chk_filter_export_selected.configure(state=field_state)
self.btn_apply_filters.configure(state=button_state)
self.btn_reset_filters.configure(state=button_state)
export_headers = self._get_export_headers_from_preview() if has_headers else []
export_ready = bool(export_headers) and export_base_state == "normal"
self.btn_export_preview.configure(state="normal" if export_ready else "disabled")
def _append_file_log(self, line: str) -> None:
self.pending_file_log_lines.append(line)
def _flush_file_log(self) -> None:
if not self.pending_file_log_lines:
return
try:
self.log_file_path.parent.mkdir(parents=True, exist_ok=True)
with self.log_file_path.open("a", encoding="utf-8", errors="replace") as fh:
fh.writelines(self.pending_file_log_lines)
self.pending_file_log_lines.clear()
except Exception:
pass
def set_status(self, message: str) -> None:
if self.shutting_down:
return
if threading.get_ident() != self.main_thread_id:
self.ui_queue.put(("status", message))
return
self.status_var.set(message)
if self.busy_overlay_visible:
self.busy_detail_var.set(message)
self.pump_ui()
def pump_ui(self) -> None:
if self.shutting_down:
return
if threading.get_ident() != self.main_thread_id:
return
try:
self.update_idletasks()
self.update()
except Exception:
pass
def on_close(self) -> None:
if self.shutting_down:
return
self.shutting_down = True
try:
if self.filter_live_after_id is not None:
try:
self.after_cancel(self.filter_live_after_id)
except Exception:
pass
self.filter_live_after_id = None
if self.ui_queue_after_id is not None:
try:
self.after_cancel(self.ui_queue_after_id)
except Exception:
pass
self.ui_queue_after_id = None
try:
self.exporter.cancel()
except Exception:
pass
try:
self.busy_progress.stop()
except Exception:
pass
try:
self.set_busy_overlay(False)
except Exception:
pass
self._flush_file_log()
self.close_help_window()
self.close_log_window()
self.outlook_service.disconnect()
finally:
try:
self.destroy()
except Exception:
pass
if __name__ == "__main__":
if sys.platform != "win32":
raise SystemExit("This program runs on Windows only.")
app = OutlookExporterApp()
try:
app.mainloop()
except KeyboardInterrupt:
try:
app.on_close()
except Exception:
pass