3146 lines
125 KiB
Python
3146 lines
125 KiB
Python
import json
|
|
import os
|
|
import queue
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import traceback
|
|
import zipfile
|
|
from pathlib import Path
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional, Tuple
|
|
|
|
import customtkinter as ctk
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox
|
|
import tkinter.font as tkfont
|
|
from tkinter import ttk
|
|
|
|
try:
|
|
import pythoncom
|
|
import win32timezone # required by pywin32 when Outlook returns timezone-aware COM dates
|
|
import win32com.client
|
|
except ImportError:
|
|
pythoncom = None
|
|
win32com = None
|
|
|
|
|
|
APP_TITLE = "Outlook email exporter ver. 1.0"
|
|
APP_GEOMETRY = "1220x760+0+0"
|
|
MAIL_CLASS = 43 # OlObjectClass.olMail
|
|
MAX_LOG_LINES = 3000
|
|
PREFS_APP_DIR = "OutlookExporter"
|
|
PREFS_FILE_NAME = "prefs.json"
|
|
LOG_FILE_NAME = "mail_exporter.log"
|
|
EXPORT_MANIFEST_NAME = "export_manifest.json"
|
|
EXPORT_MODE_ALL = "all"
|
|
EXPORT_MODE_WITH_ATTACHMENTS = "with_attachments"
|
|
EXPORT_MODE_FILTER = "filter"
|
|
OUTLOOK_NOT_RUNNING_MESSAGE = (
|
|
"Outlook does not appear to be open.\n\n"
|
|
"Open Microsoft Outlook with the email profile you want to export, wait for it to finish loading, "
|
|
"then minimize it and come back here to press 'Connect Outlook' again."
|
|
)
|
|
HELP_WINDOW_TITLE = "Help"
|
|
HELP_TEXT = """Outlook Email Exporter 1.0
|
|
|
|
Overview
|
|
- Connect to a running Outlook session.
|
|
- Select an Outlook folder from the tree.
|
|
- Choose an output folder.
|
|
- Use immediate export to export the selected folder and all subfolders.
|
|
- Use Load preview to inspect a folder before exporting a subset.
|
|
|
|
Preview and filters
|
|
- Sender and Subject filters narrow the visible preview.
|
|
- Period filters the preview by recent time ranges.
|
|
- New shows only emails not yet exported.
|
|
- With attachments shows only emails with Outlook attachments.
|
|
- Selected limits export to the rows currently selected in the preview.
|
|
|
|
Export modes
|
|
- Create zip builds a ZIP archive.
|
|
- Create folders on file system writes the export as folders and files.
|
|
- Export preview exports only the emails currently visible in the filtered preview.
|
|
|
|
Notes
|
|
- Outlook must already be installed and configured on this PC.
|
|
- Large folders can take a few minutes to load or export.
|
|
- The program uses an incremental manifest to avoid exporting the same email twice.
|
|
"""
|
|
ALLOWED_ATTACHMENT_EXTENSIONS = {
|
|
".pdf",
|
|
".doc",
|
|
".docx",
|
|
".docm",
|
|
".dot",
|
|
".dotx",
|
|
".dotm",
|
|
".xls",
|
|
".xlsx",
|
|
".xlsm",
|
|
".xlsb",
|
|
".xlt",
|
|
".xltx",
|
|
".xltm",
|
|
".csv",
|
|
".htm",
|
|
".html",
|
|
".ppt",
|
|
".pptx",
|
|
".pptm",
|
|
".pps",
|
|
".ppsx",
|
|
".ppsm",
|
|
".pot",
|
|
".potx",
|
|
".potm",
|
|
".rtf",
|
|
".fodp",
|
|
".fods",
|
|
".fodt",
|
|
".odb",
|
|
".odf",
|
|
".odg",
|
|
".odt",
|
|
".ods",
|
|
".odp",
|
|
".otg",
|
|
".oth",
|
|
".otp",
|
|
".ots",
|
|
".ott",
|
|
".sxc",
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".bmp",
|
|
".tif",
|
|
".tiff",
|
|
".webp",
|
|
}
|
|
IMAGE_ATTACHMENT_EXTENSIONS = {
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".bmp",
|
|
".tif",
|
|
".tiff",
|
|
".webp",
|
|
}
|
|
MAPI_ATTACHMENT_HIDDEN = "http://schemas.microsoft.com/mapi/proptag/0x7FFE000B"
|
|
MAPI_ATTACH_CONTENT_ID = (
|
|
"http://schemas.microsoft.com/mapi/proptag/0x3712001F",
|
|
"http://schemas.microsoft.com/mapi/proptag/0x3712001E",
|
|
)
|
|
MAPI_ATTACH_CONTENT_LOCATION = (
|
|
"http://schemas.microsoft.com/mapi/proptag/0x3713001F",
|
|
"http://schemas.microsoft.com/mapi/proptag/0x3713001E",
|
|
)
|
|
MAPI_ATTACH_CONTENT_DISPOSITION = (
|
|
"http://schemas.microsoft.com/mapi/proptag/0x3716001F",
|
|
"http://schemas.microsoft.com/mapi/proptag/0x3716001E",
|
|
)
|
|
MAPI_RENDERING_POSITION = "http://schemas.microsoft.com/mapi/proptag/0x370B0003"
|
|
MAPI_RENDERING_POSITION_NOT_RENDERED = -1
|
|
INLINE_ATTACHMENT_NAME_RE = re.compile(
|
|
r"^(image\d{3,}|att\d{5}|oledata|logo|spacer|facebook|linkedin|twitter|instagram|youtube)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class FolderNode:
|
|
store_name: str
|
|
path: str
|
|
folder_obj: object
|
|
display_name: Optional[str] = None
|
|
mail_count: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class ExportOptions:
|
|
mode: str = EXPORT_MODE_ALL
|
|
only_with_attachments: bool = False
|
|
folder_key: str = ""
|
|
selected_keys: Optional[set[str]] = None
|
|
selected_entry_refs: Optional[list[tuple[str, str]]] = None
|
|
|
|
|
|
@dataclass
|
|
class MailRecord:
|
|
key: str
|
|
entry_id: str
|
|
store_id: str
|
|
folder_key: str
|
|
received_time: str
|
|
txt_relative_path: str
|
|
attachment_count: int
|
|
exported_attachment_count: int
|
|
export_mode: str
|
|
subject: str = ""
|
|
internet_message_id: str = ""
|
|
status: str = "exported"
|
|
|
|
|
|
@dataclass
|
|
class EmailHeader:
|
|
key: str
|
|
entry_id: str
|
|
store_id: str
|
|
received_time: str
|
|
sender: str
|
|
subject: str
|
|
attachment_count: int
|
|
attachment_types: str
|
|
exported: bool
|
|
|
|
|
|
class ExportCancelled(Exception):
|
|
pass
|
|
|
|
|
|
class ToolTip:
|
|
def __init__(self, widget: object, text: str, delay_ms: int = 450) -> None:
|
|
self.widget = widget
|
|
self.text = text
|
|
self.delay_ms = delay_ms
|
|
self._after_id = None
|
|
self._tip_window = None
|
|
try:
|
|
self.widget.bind("<Enter>", self._on_enter, add="+")
|
|
self.widget.bind("<Leave>", self._on_leave, add="+")
|
|
self.widget.bind("<ButtonPress>", self._on_leave, add="+")
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_enter(self, _event=None) -> None:
|
|
self._cancel_scheduled()
|
|
try:
|
|
self._after_id = self.widget.after(self.delay_ms, self._show)
|
|
except Exception:
|
|
self._after_id = None
|
|
|
|
def _on_leave(self, _event=None) -> None:
|
|
self._cancel_scheduled()
|
|
self._hide()
|
|
|
|
def _cancel_scheduled(self) -> None:
|
|
if self._after_id is not None:
|
|
try:
|
|
self.widget.after_cancel(self._after_id)
|
|
except Exception:
|
|
pass
|
|
self._after_id = None
|
|
|
|
def _show(self) -> None:
|
|
self._after_id = None
|
|
if self._tip_window is not None:
|
|
return
|
|
try:
|
|
if not self.widget.winfo_exists():
|
|
return
|
|
x = self.widget.winfo_rootx() + 18
|
|
y = self.widget.winfo_rooty() + self.widget.winfo_height() + 6
|
|
tip = tk.Toplevel(self.widget)
|
|
tip.wm_overrideredirect(True)
|
|
tip.wm_geometry(f"+{x}+{y}")
|
|
tip.attributes("-topmost", True)
|
|
label = tk.Label(
|
|
tip,
|
|
text=self.text,
|
|
justify="left",
|
|
background="#111827",
|
|
foreground="#f9fafb",
|
|
relief="solid",
|
|
borderwidth=1,
|
|
padx=8,
|
|
pady=4,
|
|
font=("Segoe UI", 9),
|
|
)
|
|
label.pack()
|
|
self._tip_window = tip
|
|
except Exception:
|
|
self._tip_window = None
|
|
|
|
def _hide(self) -> None:
|
|
if self._tip_window is not None:
|
|
try:
|
|
self._tip_window.destroy()
|
|
except Exception:
|
|
pass
|
|
self._tip_window = None
|
|
|
|
|
|
class ExportManifest:
|
|
def __init__(self, root_dir: Path) -> None:
|
|
self.root_dir = root_dir
|
|
self.path = root_dir / EXPORT_MANIFEST_NAME
|
|
self.data = self._load()
|
|
|
|
def _load(self) -> dict:
|
|
if not self.path.exists():
|
|
return {"version": 2, "records": {}, "exports": {}}
|
|
try:
|
|
with self.path.open("r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
if not isinstance(data, dict):
|
|
return {"version": 2, "records": {}, "exports": {}}
|
|
records = data.get("records")
|
|
if not isinstance(records, dict):
|
|
data["records"] = {}
|
|
exports = data.get("exports")
|
|
if not isinstance(exports, dict):
|
|
data["exports"] = {}
|
|
data["version"] = 2
|
|
return data
|
|
except Exception:
|
|
return {"version": 2, "records": {}, "exports": {}}
|
|
|
|
def has_record(self, key: str) -> bool:
|
|
return bool(self.data.get("records", {}).get(key))
|
|
|
|
def get_record(self, key: str) -> Optional[dict]:
|
|
return self.data.get("records", {}).get(key)
|
|
|
|
def record_file_exists(self, key: str) -> bool:
|
|
record = self.get_record(key)
|
|
if not isinstance(record, dict):
|
|
return False
|
|
rel_path = str(record.get("p") or "").strip()
|
|
if not rel_path:
|
|
return False
|
|
try:
|
|
return (self.root_dir / Path(rel_path)).exists()
|
|
except Exception:
|
|
return False
|
|
|
|
def remove_record(self, key: str) -> None:
|
|
self.data.get("records", {}).pop(key, None)
|
|
|
|
def update_record(self, record: MailRecord) -> None:
|
|
self.data.setdefault("records", {})[record.key] = {
|
|
"eid": record.entry_id,
|
|
"sid": record.store_id,
|
|
"f": record.folder_key,
|
|
"r": record.received_time,
|
|
"p": record.txt_relative_path,
|
|
"a": record.attachment_count,
|
|
"ea": record.exported_attachment_count,
|
|
"m": record.export_mode,
|
|
"subject": record.subject,
|
|
"imid": record.internet_message_id,
|
|
"s": record.status,
|
|
"u": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
}
|
|
|
|
def get_export_scope(self, folder_key: str) -> Optional[dict]:
|
|
return self.data.get("exports", {}).get(folder_key)
|
|
|
|
def update_export_scope(self, folder_key: str, relative_path: str, result_path: Path, zip_mode: bool) -> None:
|
|
self.data.setdefault("exports", {})[folder_key] = {
|
|
"rp": relative_path,
|
|
"lp": str(result_path),
|
|
"z": bool(zip_mode),
|
|
"u": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
}
|
|
|
|
def folder_record_count(self, folder_key: str) -> int:
|
|
count = 0
|
|
for record in self.data.get("records", {}).values():
|
|
if isinstance(record, dict) and record.get("f") == folder_key:
|
|
count += 1
|
|
return count
|
|
|
|
def total_record_count(self) -> int:
|
|
return len(self.data.get("records", {}))
|
|
|
|
def clear_folder(self, folder_key: str) -> int:
|
|
records = self.data.get("records", {})
|
|
to_delete = [key for key, value in records.items() if isinstance(value, dict) and value.get("f") == folder_key]
|
|
for key in to_delete:
|
|
records.pop(key, None)
|
|
self.data.get("exports", {}).pop(folder_key, None)
|
|
return len(to_delete)
|
|
|
|
def save(self) -> None:
|
|
self.root_dir.mkdir(parents=True, exist_ok=True)
|
|
with self.path.open("w", encoding="utf-8") as fh:
|
|
json.dump(self.data, fh, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
|
|
class PreferenceStore:
|
|
def __init__(self, default_output_dir: Path) -> None:
|
|
self.default_output_dir = default_output_dir
|
|
self.path = self._prefs_path()
|
|
self.data = self._load_or_create()
|
|
|
|
def get_output_dir(self) -> str:
|
|
value = str(self.data.get("output_dir") or "").strip()
|
|
return value or str(self.default_output_dir)
|
|
|
|
def set_output_dir(self, output_dir: str) -> None:
|
|
output_dir = str(output_dir or "").strip()
|
|
if not output_dir:
|
|
return
|
|
self.data["output_dir"] = output_dir
|
|
self._save()
|
|
|
|
@classmethod
|
|
def _prefs_path(cls) -> Path:
|
|
appdata = os.environ.get("APPDATA")
|
|
base_dir = Path(appdata) if appdata else Path.home() / ".config"
|
|
return base_dir / PREFS_APP_DIR / PREFS_FILE_NAME
|
|
|
|
@classmethod
|
|
def app_data_dir(cls) -> Path:
|
|
return cls._prefs_path().parent
|
|
|
|
def _load_or_create(self) -> dict:
|
|
if self.path.exists():
|
|
try:
|
|
with self.path.open("r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
return data if isinstance(data, dict) else {}
|
|
except Exception:
|
|
return {"output_dir": str(self.default_output_dir)}
|
|
|
|
data = {"output_dir": str(self.default_output_dir)}
|
|
self.data = data
|
|
self._save()
|
|
return data
|
|
|
|
def _save(self) -> None:
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
with self.path.open("w", encoding="utf-8") as fh:
|
|
json.dump(self.data, fh, ensure_ascii=False, indent=2)
|
|
|
|
|
|
class OutlookService:
|
|
def __init__(self) -> None:
|
|
self.outlook = None
|
|
self.namespace = None
|
|
self.root_folders: List[FolderNode] = []
|
|
|
|
def connect(self) -> None:
|
|
if win32com is None or pythoncom is None:
|
|
raise RuntimeError(
|
|
"pywin32 non è installato. Esegui: pip install pywin32"
|
|
)
|
|
|
|
pythoncom.CoInitialize()
|
|
try:
|
|
try:
|
|
self.outlook = win32com.client.GetActiveObject("Outlook.Application")
|
|
except Exception as exc:
|
|
raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc
|
|
|
|
self.namespace = self.outlook.GetNamespace("MAPI")
|
|
self.root_folders = []
|
|
|
|
for i in range(1, self.namespace.Folders.Count + 1):
|
|
store = self.namespace.Folders.Item(i)
|
|
store_name = str(store.Name)
|
|
node = FolderNode(
|
|
store_name=store_name,
|
|
path=NameCodec.store_display_name(store_name),
|
|
folder_obj=store,
|
|
display_name=NameCodec.store_display_name(store_name),
|
|
)
|
|
self.root_folders.append(node)
|
|
except Exception:
|
|
pythoncom.CoUninitialize()
|
|
raise
|
|
|
|
def disconnect(self) -> None:
|
|
try:
|
|
self.root_folders = []
|
|
self.namespace = None
|
|
self.outlook = None
|
|
finally:
|
|
if pythoncom is not None:
|
|
try:
|
|
pythoncom.CoUninitialize()
|
|
except Exception:
|
|
pass
|
|
|
|
def list_root_folders(self) -> List[FolderNode]:
|
|
return list(self.root_folders)
|
|
|
|
def list_children(self, folder_obj: object, current_path: str) -> List[FolderNode]:
|
|
children = []
|
|
folders = folder_obj.Folders
|
|
for i in range(1, folders.Count + 1):
|
|
child = folders.Item(i)
|
|
child_name = str(child.Name)
|
|
child_path = f"{current_path} / {child_name}"
|
|
children.append(
|
|
FolderNode(
|
|
store_name=current_path.split(" / ")[0],
|
|
path=child_path,
|
|
folder_obj=child,
|
|
display_name=child_name,
|
|
mail_count=self.count_mail_items(child),
|
|
)
|
|
)
|
|
return children
|
|
|
|
@staticmethod
|
|
def count_mail_items(folder_obj: object) -> Optional[int]:
|
|
diagnostics = OutlookService.folder_item_diagnostics(folder_obj)
|
|
restricted_count = diagnostics.get("restricted_count")
|
|
raw_count = diagnostics.get("raw_count")
|
|
if restricted_count is None:
|
|
return raw_count
|
|
if restricted_count == 0 and raw_count:
|
|
return raw_count
|
|
return restricted_count
|
|
|
|
@staticmethod
|
|
def folder_item_diagnostics(folder_obj: object) -> dict:
|
|
diagnostics = {
|
|
"folder_name": None,
|
|
"raw_count": None,
|
|
"restricted_count": None,
|
|
"items_error": None,
|
|
"restrict_error": None,
|
|
"folder_path": None,
|
|
"store_id": None,
|
|
}
|
|
|
|
try:
|
|
diagnostics["folder_name"] = str(folder_obj.Name)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
diagnostics["folder_path"] = str(folder_obj.FolderPath)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
items = folder_obj.Items
|
|
except Exception as exc:
|
|
diagnostics["items_error"] = str(exc)
|
|
return diagnostics
|
|
|
|
try:
|
|
diagnostics["raw_count"] = items.Count
|
|
except Exception as exc:
|
|
diagnostics["items_error"] = str(exc)
|
|
|
|
try:
|
|
diagnostics["restricted_count"] = items.Restrict("[MessageClass] = 'IPM.Note'").Count
|
|
except Exception as exc:
|
|
diagnostics["restrict_error"] = str(exc)
|
|
|
|
try:
|
|
diagnostics["store_id"] = str(folder_obj.StoreID)
|
|
except Exception:
|
|
pass
|
|
|
|
return diagnostics
|
|
|
|
@staticmethod
|
|
def get_namespace_for_worker() -> object:
|
|
if win32com is None:
|
|
raise RuntimeError("pywin32 non disponibile.")
|
|
try:
|
|
outlook = win32com.client.GetActiveObject("Outlook.Application")
|
|
except Exception as exc:
|
|
raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc
|
|
return outlook.GetNamespace("MAPI")
|
|
|
|
|
|
class NameCodec:
|
|
INVALID_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
|
|
SPACE_RE = re.compile(r'\s+')
|
|
|
|
@classmethod
|
|
def compact_text(cls, value: str, max_len: int = 32) -> str:
|
|
value = cls.SPACE_RE.sub(" ", value or "").strip()
|
|
if len(value) <= max_len:
|
|
return value
|
|
return value[: max_len - 3].rstrip() + "..."
|
|
|
|
@classmethod
|
|
def store_display_name(cls, store_name: str) -> str:
|
|
store_name = store_name or "Account"
|
|
if "@" in store_name:
|
|
local_part = store_name.split("@", 1)[0]
|
|
return cls.compact_text(local_part, max_len=34)
|
|
return cls.compact_text(store_name, max_len=34)
|
|
|
|
@classmethod
|
|
def tree_label(cls, display_name: str, mail_count: Optional[int]) -> str:
|
|
display_name = cls.compact_text(display_name or "Folder", max_len=46)
|
|
if mail_count is None:
|
|
return display_name
|
|
return f"{display_name} ({mail_count})"
|
|
|
|
@classmethod
|
|
def sanitize(cls, value: str, max_len: int = 80) -> str:
|
|
value = value or ""
|
|
value = cls.INVALID_CHARS_RE.sub("_", value)
|
|
value = cls.SPACE_RE.sub(" ", value).strip()
|
|
value = value.replace(".", "_")
|
|
if not value:
|
|
value = "empty"
|
|
if len(value) > max_len:
|
|
value = value[:max_len].rstrip(" _")
|
|
return value
|
|
|
|
@classmethod
|
|
def folder_name_from_outlook_path(cls, folder_path: str) -> str:
|
|
# Rimuove lo store iniziale quando il path arriva dall'albero "Store / Cartella".
|
|
parts = [p.strip() for p in folder_path.split(" / ")]
|
|
parts = [p for p in parts if p]
|
|
export_parts = parts[1:] if len(parts) > 1 else parts
|
|
safe_parts = [cls.sanitize(p, max_len=50) for p in export_parts]
|
|
return os.path.join(*safe_parts) if safe_parts else "folder"
|
|
|
|
@classmethod
|
|
def archive_base_name_from_outlook_path(cls, folder_path: str) -> str:
|
|
parts = [p.strip() for p in folder_path.split(" / ")]
|
|
parts = [p for p in parts if p]
|
|
folder_name = parts[-1] if parts else "export"
|
|
return cls.sanitize(folder_name, max_len=60)
|
|
|
|
@classmethod
|
|
def email_base_name(cls, received_time: Optional[datetime], seq: int) -> str:
|
|
if received_time is None:
|
|
stamp = "data_sconosciuta"
|
|
else:
|
|
stamp = received_time.strftime("%Y-%m-%d_%H-%M-%S")
|
|
return f"mail_{stamp}_{seq:05d}"
|
|
|
|
@classmethod
|
|
def mail_folder_name(cls, base_mail_name: str, subject: str, attachment_count: int = 0) -> str:
|
|
safe_subject = cls.sanitize(subject or "no_subject", max_len=50)
|
|
attachment_marker = f"CON_ALLEGATI_{attachment_count:02d}" if attachment_count else "SOLO_MAIL"
|
|
return f"{base_mail_name}__{attachment_marker}__{safe_subject}"
|
|
|
|
@classmethod
|
|
def attachment_name(cls, base_mail_name: str, original_filename: str, att_index: int) -> str:
|
|
original_filename = original_filename or f"attachment_{att_index}"
|
|
original_filename = os.path.basename(original_filename)
|
|
stem, ext = os.path.splitext(original_filename)
|
|
stem = cls.sanitize(stem, max_len=80)
|
|
ext = cls.sanitize_extension(ext)
|
|
return f"attachment__{base_mail_name}__{att_index:02d}__{stem}{ext}"
|
|
|
|
@classmethod
|
|
def sanitize_extension(cls, ext: str, max_len: int = 20) -> str:
|
|
ext = (ext or "").strip().lower()
|
|
if not ext:
|
|
return ""
|
|
if not ext.startswith("."):
|
|
ext = f".{ext}"
|
|
ext = cls.INVALID_CHARS_RE.sub("_", ext)
|
|
ext = ext.replace(" ", "_")
|
|
if len(ext) > max_len:
|
|
ext = ext[:max_len]
|
|
return ext
|
|
|
|
@classmethod
|
|
def is_allowed_attachment(cls, filename: str) -> bool:
|
|
ext = cls.sanitize_extension(os.path.splitext(filename or "")[1])
|
|
return ext in ALLOWED_ATTACHMENT_EXTENSIONS
|
|
|
|
|
|
class Exporter:
|
|
MANIFEST_FLUSH_INTERVAL_SECONDS = 20.0
|
|
|
|
def __init__(self, app: "OutlookExporterApp") -> None:
|
|
self.app = app
|
|
self.cancel_requested = False
|
|
self.global_seq = 0
|
|
self.exported_mail_count = 0
|
|
self.exported_attachment_count = 0
|
|
self.skipped_mail_count = 0
|
|
self.skipped_manifest_count = 0
|
|
self.skipped_no_attachments_count = 0
|
|
self.non_mail_item_count = 0
|
|
self.datetime_diagnostic_count = 0
|
|
self.datetime_diagnostic_limit = 20
|
|
self.manifest: Optional[ExportManifest] = None
|
|
self.manifest_dirty = False
|
|
self.last_manifest_flush_monotonic = time.monotonic()
|
|
self.options = ExportOptions()
|
|
|
|
def configure(self, manifest: Optional[ExportManifest], options: ExportOptions) -> None:
|
|
self.manifest = manifest
|
|
self.manifest_dirty = False
|
|
self.last_manifest_flush_monotonic = time.monotonic()
|
|
self.options = options
|
|
|
|
def cancel(self) -> None:
|
|
self.cancel_requested = True
|
|
|
|
def check_cancel(self) -> None:
|
|
if self.cancel_requested:
|
|
raise ExportCancelled("Export cancelled by the user.")
|
|
|
|
def _mark_manifest_dirty(self) -> None:
|
|
if self.manifest is None:
|
|
return
|
|
self.manifest_dirty = True
|
|
|
|
def _flush_manifest_if_due(self, force: bool = False) -> None:
|
|
if self.manifest is None or not self.manifest_dirty:
|
|
return
|
|
now = time.monotonic()
|
|
if not force and (now - self.last_manifest_flush_monotonic) < self.MANIFEST_FLUSH_INTERVAL_SECONDS:
|
|
return
|
|
self.manifest.save()
|
|
self.last_manifest_flush_monotonic = now
|
|
self.manifest_dirty = False
|
|
|
|
def export_folder(self, folder_obj: object, folder_out_path: Path) -> Tuple[int, int, int]:
|
|
self.check_cancel()
|
|
folder_out_path.mkdir(parents=True, exist_ok=True)
|
|
folder_name = self._safe_get(folder_obj, "Name") or "(unnamed folder)"
|
|
self.app.log(f"Outlook folder: {folder_name}")
|
|
self.app.log(f"Destination folder: {folder_out_path}")
|
|
self._log_folder_count_diagnostics(folder_obj)
|
|
|
|
items = self._safe_get(folder_obj, "Items")
|
|
if items is None:
|
|
self.app.log("Warning: unable to read folder items, continuing with subfolders.")
|
|
count = 0
|
|
else:
|
|
try:
|
|
items.Sort("[ReceivedTime]", False)
|
|
except Exception:
|
|
self.app.log("Warning: unable to sort by ReceivedTime, continuing without sorting.")
|
|
|
|
try:
|
|
count = items.Count
|
|
except Exception as exc:
|
|
self.app.log(f"Warning: unable to count folder items: {exc}")
|
|
count = 0
|
|
|
|
local_mail_count = 0
|
|
local_attachment_count = 0
|
|
local_skipped_count = 0
|
|
|
|
self.app.log(f"Detected items: {count}")
|
|
|
|
for idx in range(1, count + 1):
|
|
self.check_cancel()
|
|
try:
|
|
item = items.Item(idx)
|
|
except Exception as exc:
|
|
self.app.log(f"Error accessing item {idx}: {exc}")
|
|
continue
|
|
|
|
message_class = self._safe_get(item, "Class")
|
|
if message_class != MAIL_CLASS:
|
|
self.non_mail_item_count += 1
|
|
continue
|
|
|
|
if self.options.selected_keys is not None:
|
|
key = self._build_manifest_key_for_item(item)
|
|
if key not in self.options.selected_keys:
|
|
continue
|
|
|
|
try:
|
|
self.global_seq += 1
|
|
mail_count, att_count, skipped_count = self._export_mail(item, folder_out_path)
|
|
local_mail_count += mail_count
|
|
local_attachment_count += att_count
|
|
local_skipped_count += skipped_count
|
|
self.exported_mail_count += mail_count
|
|
self.exported_attachment_count += att_count
|
|
self.skipped_mail_count += skipped_count
|
|
except Exception as exc:
|
|
subject = self._safe_get(item, "Subject") or "(no subject)"
|
|
self.app.log(f"Error exporting email '{subject}': {exc}")
|
|
|
|
if idx % 10 == 0:
|
|
self.app.set_status(
|
|
"Export in progress... "
|
|
f"item {idx}/{count} | emails {self.exported_mail_count} | "
|
|
f"attachments {self.exported_attachment_count} | skipped {self.skipped_mail_count}"
|
|
)
|
|
self.app.pump_ui()
|
|
|
|
if self.options.selected_keys is not None:
|
|
return local_mail_count, local_attachment_count, local_skipped_count
|
|
|
|
subfolders = self._safe_get(folder_obj, "Folders")
|
|
subfolder_count = 0
|
|
try:
|
|
subfolder_count = subfolders.Count if subfolders is not None else 0
|
|
except Exception as exc:
|
|
self.app.log(f"Warning: unable to read subfolders: {exc}")
|
|
|
|
for i in range(1, subfolder_count + 1):
|
|
self.check_cancel()
|
|
try:
|
|
child = subfolders.Item(i)
|
|
except Exception as exc:
|
|
self.app.log(f"Error accessing subfolder {i}: {exc}")
|
|
continue
|
|
child_name = self._safe_get(child, "Name") or f"subfolder_{i}"
|
|
child_dir = folder_out_path / NameCodec.sanitize(str(child_name), max_len=80)
|
|
m, a, s = self.export_folder(child, child_dir)
|
|
local_mail_count += m
|
|
local_attachment_count += a
|
|
local_skipped_count += s
|
|
|
|
return local_mail_count, local_attachment_count, local_skipped_count
|
|
|
|
def export_selected_items(
|
|
self,
|
|
namespace: object,
|
|
selected_entry_refs: list[tuple[str, str]],
|
|
folder_out_path: Path,
|
|
) -> Tuple[int, int, int]:
|
|
self.check_cancel()
|
|
folder_out_path.mkdir(parents=True, exist_ok=True)
|
|
local_mail_count = 0
|
|
local_attachment_count = 0
|
|
local_skipped_count = 0
|
|
total = len(selected_entry_refs)
|
|
self.app.log(f"Direct Outlook selective export: {total} selected emails")
|
|
|
|
for idx, (store_id, entry_id) in enumerate(selected_entry_refs, start=1):
|
|
self.check_cancel()
|
|
try:
|
|
if store_id:
|
|
item = namespace.GetItemFromID(entry_id, store_id)
|
|
else:
|
|
item = namespace.GetItemFromID(entry_id)
|
|
except Exception as exc:
|
|
self.app.log(f"Error retrieving selected email {idx}/{total}: {exc}")
|
|
continue
|
|
|
|
message_class = self._safe_get(item, "Class")
|
|
if message_class != MAIL_CLASS:
|
|
self.non_mail_item_count += 1
|
|
continue
|
|
|
|
try:
|
|
self.global_seq += 1
|
|
mail_count, att_count, skipped_count = self._export_mail(item, folder_out_path)
|
|
local_mail_count += mail_count
|
|
local_attachment_count += att_count
|
|
local_skipped_count += skipped_count
|
|
self.exported_mail_count += mail_count
|
|
self.exported_attachment_count += att_count
|
|
self.skipped_mail_count += skipped_count
|
|
except Exception as exc:
|
|
subject = self._safe_get(item, "Subject") or "(no subject)"
|
|
self.app.log(f"Error exporting selected email '{subject}': {exc}")
|
|
|
|
self.app.set_status(
|
|
"Selective export in progress... "
|
|
f"{idx}/{total} | emails {self.exported_mail_count} | "
|
|
f"attachments {self.exported_attachment_count} | skipped {self.skipped_mail_count}"
|
|
)
|
|
self.app.pump_ui()
|
|
|
|
return local_mail_count, local_attachment_count, local_skipped_count
|
|
|
|
def _log_folder_count_diagnostics(self, folder_obj: object) -> None:
|
|
diagnostics = OutlookService.folder_item_diagnostics(folder_obj)
|
|
self.app.log("Folder count diagnostics:")
|
|
self.app.log(f" folder_name: {diagnostics.get('folder_name')}")
|
|
self.app.log(f" folder_path: {diagnostics.get('folder_path')}")
|
|
self.app.log(f" raw Items.Count: {diagnostics.get('raw_count')}")
|
|
self.app.log(f" Restrict IPM.Note Count: {diagnostics.get('restricted_count')}")
|
|
self.app.log(f" items_error: {diagnostics.get('items_error')}")
|
|
self.app.log(f" restrict_error: {diagnostics.get('restrict_error')}")
|
|
|
|
def _export_mail(self, item: object, folder_out_path: Path) -> Tuple[int, int, int]:
|
|
received_dt = self._get_best_mail_datetime(item)
|
|
subject = self._safe_get(item, "Subject") or ""
|
|
manifest_record = self._build_mail_record(item, subject, received_dt, 0)
|
|
|
|
if self.manifest is not None and self.manifest.has_record(manifest_record.key):
|
|
if self.manifest.record_file_exists(manifest_record.key):
|
|
self.app.log(f"Email already present in the manifest, skipping: {subject or '(no subject)'}")
|
|
self.skipped_manifest_count += 1
|
|
return 0, 0, 1
|
|
self.app.log(
|
|
f"Obsolete manifest record, re-exporting the email: {subject or '(no subject)'}"
|
|
)
|
|
self.manifest.remove_record(manifest_record.key)
|
|
self._mark_manifest_dirty()
|
|
|
|
body = self._safe_get(item, "Body") or ""
|
|
html_body = self._safe_get(item, "HTMLBody") or ""
|
|
exportable_attachments = self._collect_exportable_attachments(item, html_body)
|
|
manifest_record.attachment_count = len(exportable_attachments)
|
|
|
|
if self.options.only_with_attachments and not exportable_attachments:
|
|
self.app.log(f"Email without exportable attachments, skipping: {subject or '(no subject)'}")
|
|
self.skipped_no_attachments_count += 1
|
|
return 0, 0, 1
|
|
|
|
base_name = NameCodec.email_base_name(received_dt, self.global_seq)
|
|
canonical_mail_dir = folder_out_path / NameCodec.mail_folder_name(base_name, subject, len(exportable_attachments))
|
|
canonical_txt_path = canonical_mail_dir / f"{base_name}.txt"
|
|
if canonical_txt_path.exists():
|
|
self.app.log(f"Email already present on the file system, skipping: {subject or '(no subject)'}")
|
|
self.skipped_manifest_count += 1
|
|
if self.manifest is not None:
|
|
try:
|
|
manifest_record.txt_relative_path = canonical_txt_path.relative_to(self.manifest.root_dir).as_posix()
|
|
except Exception:
|
|
manifest_record.txt_relative_path = ""
|
|
attachments_dir = canonical_mail_dir / "attachments"
|
|
if attachments_dir.exists():
|
|
try:
|
|
manifest_record.exported_attachment_count = len([p for p in attachments_dir.iterdir() if p.is_file()])
|
|
except Exception:
|
|
manifest_record.exported_attachment_count = 0
|
|
manifest_record.status = "exported"
|
|
self.manifest.update_record(manifest_record)
|
|
self._mark_manifest_dirty()
|
|
self._flush_manifest_if_due()
|
|
return 0, 0, 1
|
|
|
|
mail_dir = self._unique_path(canonical_mail_dir)
|
|
mail_dir.mkdir(parents=True, exist_ok=True)
|
|
txt_path = mail_dir / f"{base_name}.txt"
|
|
|
|
header_lines = [
|
|
f"Subject: {subject}",
|
|
f"Received: {received_dt.strftime('%Y-%m-%d %H:%M:%S') if received_dt else ''}",
|
|
"",
|
|
"--- EMAIL TEXT ---",
|
|
body,
|
|
]
|
|
txt_path.write_text("\n".join(header_lines), encoding="utf-8", errors="replace")
|
|
self.app.log(f"Mail salvata: {txt_path.name}")
|
|
if self.manifest is not None:
|
|
try:
|
|
manifest_record.txt_relative_path = txt_path.relative_to(self.manifest.root_dir).as_posix()
|
|
except Exception:
|
|
manifest_record.txt_relative_path = ""
|
|
|
|
attachment_count = 0
|
|
if exportable_attachments:
|
|
attachments_dir = mail_dir / "attachments"
|
|
for att_index, attachment, original_filename in exportable_attachments:
|
|
self.check_cancel()
|
|
try:
|
|
export_name = NameCodec.attachment_name(base_name, original_filename, att_index)
|
|
attachments_dir.mkdir(parents=True, exist_ok=True)
|
|
export_path = self._unique_path(attachments_dir / export_name)
|
|
attachment.SaveAsFile(str(export_path))
|
|
attachment_count += 1
|
|
self.app.log(f" Attachment saved: {export_path.name}")
|
|
except Exception as exc:
|
|
self.app.log(f" Error saving attachment {att_index}: {exc}")
|
|
|
|
manifest_record.exported_attachment_count = attachment_count
|
|
manifest_record.status = "exported"
|
|
if self.manifest is not None:
|
|
self.manifest.update_record(manifest_record)
|
|
self._mark_manifest_dirty()
|
|
self._flush_manifest_if_due()
|
|
|
|
return 1, attachment_count, 0
|
|
|
|
def _collect_exportable_attachments(self, item: object, html_body: str) -> List[Tuple[int, object, str]]:
|
|
exportable = []
|
|
attachments = self._safe_get(item, "Attachments")
|
|
if attachments is None:
|
|
return exportable
|
|
|
|
try:
|
|
attachment_total = attachments.Count
|
|
except Exception as exc:
|
|
self.app.log(f" Error reading the number of attachments: {exc}")
|
|
return exportable
|
|
|
|
for att_index in range(1, attachment_total + 1):
|
|
self.check_cancel()
|
|
try:
|
|
attachment = attachments.Item(att_index)
|
|
original_filename = self._safe_get(attachment, "FileName") or f"attachment_{att_index}"
|
|
inline_score, inline_reasons = self._signature_image_score(
|
|
attachment, original_filename, html_body
|
|
)
|
|
if inline_score >= 3:
|
|
self.app.log(
|
|
f" Ignored inline/signature attachment: {original_filename} | "
|
|
f"score={inline_score} | reasons={', '.join(inline_reasons)}"
|
|
)
|
|
elif NameCodec.is_allowed_attachment(original_filename):
|
|
exportable.append((att_index, attachment, original_filename))
|
|
else:
|
|
self.app.log(f" Ignored attachment: {original_filename}")
|
|
except Exception as exc:
|
|
self.app.log(f" Error reading attachment {att_index}: {exc}")
|
|
|
|
return exportable
|
|
|
|
def _signature_image_score(self, attachment: object, filename: str, html_body: str) -> Tuple[int, List[str]]:
|
|
ext = NameCodec.sanitize_extension(os.path.splitext(filename or "")[1])
|
|
if ext not in IMAGE_ATTACHMENT_EXTENSIONS:
|
|
return 0, []
|
|
|
|
score = 0
|
|
reasons = []
|
|
html_lower = (html_body or "").lower()
|
|
filename_lower = os.path.basename(filename or "").lower()
|
|
|
|
hidden = self._get_attachment_mapi_property(attachment, MAPI_ATTACHMENT_HIDDEN)
|
|
if hidden is True:
|
|
score += 4
|
|
reasons.append("hidden")
|
|
|
|
rendering_position = self._get_attachment_mapi_property(attachment, MAPI_RENDERING_POSITION)
|
|
rendering_position_int = self._safe_int(rendering_position)
|
|
if rendering_position_int is not None:
|
|
if rendering_position_int not in (MAPI_RENDERING_POSITION_NOT_RENDERED, 0xFFFFFFFF):
|
|
score += 1
|
|
reasons.append(f"rendering-position={rendering_position_int}")
|
|
else:
|
|
reasons.append("rendering-position=-1")
|
|
|
|
content_id = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_ID)
|
|
if self._has_value(content_id):
|
|
cid = str(content_id).strip()
|
|
if self._html_references_cid(html_lower, cid):
|
|
score += 3
|
|
reasons.append(f"cid nel corpo HTML:{self._short_diag(cid)}")
|
|
else:
|
|
score += 1
|
|
reasons.append(f"cid presente:{self._short_diag(cid)}")
|
|
|
|
content_location = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_LOCATION)
|
|
if self._has_value(content_location):
|
|
location = str(content_location).strip()
|
|
if location.lower() in html_lower:
|
|
score += 2
|
|
reasons.append(f"content-location nel corpo HTML:{self._short_diag(location)}")
|
|
else:
|
|
score += 1
|
|
reasons.append(f"content-location presente:{self._short_diag(location)}")
|
|
|
|
disposition = self._get_first_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_DISPOSITION)
|
|
if self._has_value(disposition) and "inline" in str(disposition).lower():
|
|
score += 3
|
|
reasons.append(f"disposition inline:{self._short_diag(disposition)}")
|
|
|
|
if filename_lower and filename_lower in html_lower:
|
|
score += 2
|
|
reasons.append("nome file nel corpo HTML")
|
|
|
|
stem = os.path.splitext(filename_lower)[0]
|
|
if INLINE_ATTACHMENT_NAME_RE.match(stem or "") is not None:
|
|
score += 2
|
|
reasons.append("nome tipico inline/firma")
|
|
|
|
return score, reasons
|
|
|
|
@staticmethod
|
|
def _html_references_cid(html_lower: str, content_id: str) -> bool:
|
|
if not content_id:
|
|
return False
|
|
cid = content_id.strip().strip("<>").lower()
|
|
if not cid:
|
|
return False
|
|
return f"cid:{cid}" in html_lower or cid in html_lower
|
|
|
|
def _get_attachment_mapi_property(self, attachment: object, schema: str):
|
|
try:
|
|
accessor = attachment.PropertyAccessor
|
|
return accessor.GetProperty(schema)
|
|
except Exception:
|
|
return None
|
|
|
|
def _get_first_attachment_mapi_property(self, attachment: object, schemas):
|
|
for schema in schemas:
|
|
value = self._get_attachment_mapi_property(attachment, schema)
|
|
if self._has_value(value):
|
|
return value
|
|
return None
|
|
|
|
@staticmethod
|
|
def _safe_int(value: object) -> Optional[int]:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _short_diag(value: object, max_len: int = 60) -> str:
|
|
try:
|
|
text = str(value).strip()
|
|
except Exception:
|
|
text = repr(value)
|
|
text = text.replace("\r", " ").replace("\n", " ")
|
|
if len(text) > max_len:
|
|
return text[:max_len] + "..."
|
|
return text
|
|
|
|
@staticmethod
|
|
def _has_value(value: object) -> bool:
|
|
if value is None:
|
|
return False
|
|
try:
|
|
return bool(str(value).strip())
|
|
except Exception:
|
|
return True
|
|
|
|
def _get_best_mail_datetime(self, item: object) -> Optional[datetime]:
|
|
for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"):
|
|
value = self._safe_get(item, attr)
|
|
converted = self._convert_outlook_datetime(value)
|
|
if converted is not None:
|
|
if attr != "ReceivedTime":
|
|
self.app.log(f" ReceivedTime not available, using {attr}.")
|
|
return converted
|
|
self._log_datetime_diagnostics(item)
|
|
return None
|
|
|
|
def _log_datetime_diagnostics(self, item: object) -> None:
|
|
if self.datetime_diagnostic_count >= self.datetime_diagnostic_limit:
|
|
if self.datetime_diagnostic_count == self.datetime_diagnostic_limit:
|
|
self.app.log(
|
|
" Date diagnostics: limit reached, further emails without a date will not be logged in detail."
|
|
)
|
|
self.datetime_diagnostic_count += 1
|
|
return
|
|
|
|
self.datetime_diagnostic_count += 1
|
|
subject = self._safe_get(item, "Subject") or "(no subject)"
|
|
self.app.log(f" Email date diagnostics #{self.global_seq}: {subject}")
|
|
for label, value in self._mail_identity_details(item):
|
|
self.app.log(f" {label}: {value}")
|
|
for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"):
|
|
ok, value_or_error = self._safe_get_with_error(item, attr)
|
|
if not ok:
|
|
self.app.log(f" {attr}: ACCESS ERROR: {value_or_error}")
|
|
continue
|
|
|
|
value = value_or_error
|
|
self.app.log(
|
|
f" {attr}: tipo={type(value).__name__}; repr={repr(value)}; str={self._safe_str(value)}"
|
|
)
|
|
|
|
def _mail_identity_details(self, item: object) -> List[Tuple[str, str]]:
|
|
parent = self._safe_get(item, "Parent")
|
|
folder_name = self._safe_get(parent, "Name") if parent is not None else None
|
|
store_id = self._safe_get(parent, "StoreID") if parent is not None else None
|
|
|
|
details = [
|
|
("EntryID", self._safe_str(self._safe_get(item, "EntryID"))),
|
|
("ConversationID", self._safe_str(self._safe_get(item, "ConversationID"))),
|
|
("InternetMessageID", self._safe_str(self._get_mail_mapi_property(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F"))),
|
|
("Outlook folder", self._safe_str(folder_name)),
|
|
("StoreID", self._safe_str(store_id)),
|
|
]
|
|
return details
|
|
|
|
def _build_mail_record(
|
|
self,
|
|
item: object,
|
|
subject: str,
|
|
received_dt: Optional[datetime],
|
|
attachment_count: int,
|
|
) -> MailRecord:
|
|
received_text = received_dt.strftime("%Y-%m-%d %H:%M:%S") if received_dt else ""
|
|
entry_id = self._safe_full_str(self._safe_get(item, "EntryID"))
|
|
parent = self._safe_get(item, "Parent")
|
|
store_id = self._safe_full_str(self._safe_get(parent, "StoreID") if parent is not None else None)
|
|
internet_message_id = self._safe_full_str(
|
|
self._get_mail_mapi_property(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F")
|
|
)
|
|
key = self._build_manifest_key_for_item(item)
|
|
return MailRecord(
|
|
key=key,
|
|
entry_id=entry_id,
|
|
store_id=store_id,
|
|
folder_key=self.options.folder_key,
|
|
received_time=received_text,
|
|
txt_relative_path="",
|
|
attachment_count=attachment_count,
|
|
exported_attachment_count=0,
|
|
export_mode=self.options.mode,
|
|
subject=self._safe_str(subject),
|
|
internet_message_id=internet_message_id,
|
|
)
|
|
|
|
def _build_manifest_key_for_item(
|
|
self,
|
|
item: object,
|
|
subject: Optional[str] = None,
|
|
received_text: Optional[str] = None,
|
|
) -> str:
|
|
entry_id = self._safe_full_str(self._safe_get(item, "EntryID"))
|
|
parent = self._safe_get(item, "Parent")
|
|
store_id = self._safe_full_str(self._safe_get(parent, "StoreID") if parent is not None else None)
|
|
return self._build_manifest_key_from_ids(store_id, entry_id)
|
|
|
|
@staticmethod
|
|
def _build_manifest_key_from_ids(store_id: object, entry_id: object) -> str:
|
|
return f"{Exporter._safe_full_str(store_id)}|{Exporter._safe_full_str(entry_id)}"
|
|
|
|
def _get_mail_mapi_property(self, item: object, schema: str):
|
|
try:
|
|
accessor = item.PropertyAccessor
|
|
return accessor.GetProperty(schema)
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _unique_path(path: Path) -> Path:
|
|
if not path.exists():
|
|
return path
|
|
|
|
stem = path.stem
|
|
suffix = path.suffix
|
|
parent = path.parent
|
|
counter = 2
|
|
while True:
|
|
candidate = parent / f"{stem}__{counter}{suffix}"
|
|
if not candidate.exists():
|
|
return candidate
|
|
counter += 1
|
|
|
|
@staticmethod
|
|
def _safe_get(obj: object, attr: str):
|
|
try:
|
|
return getattr(obj, attr)
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _safe_get_with_error(obj: object, attr: str) -> Tuple[bool, object]:
|
|
try:
|
|
return True, getattr(obj, attr)
|
|
except Exception as exc:
|
|
return False, exc
|
|
|
|
@staticmethod
|
|
def _safe_str(value: object, max_len: int = 300) -> str:
|
|
try:
|
|
text = str(value)
|
|
except Exception as exc:
|
|
text = f"<errore str: {exc}>"
|
|
if len(text) > max_len:
|
|
return text[:max_len] + "..."
|
|
return text
|
|
|
|
@staticmethod
|
|
def _safe_full_str(value: object) -> str:
|
|
try:
|
|
return str(value)
|
|
except Exception:
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _convert_outlook_datetime(value) -> Optional[datetime]:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value
|
|
try:
|
|
return datetime(
|
|
value.year,
|
|
value.month,
|
|
value.day,
|
|
value.hour,
|
|
value.minute,
|
|
value.second,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
for converter in (getattr(value, "Format", None), getattr(value, "strftime", None)):
|
|
if converter is None:
|
|
continue
|
|
try:
|
|
text = str(converter("%Y-%m-%d %H:%M:%S"))
|
|
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
|
|
except Exception:
|
|
continue
|
|
|
|
try:
|
|
return datetime.fromisoformat(str(value))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class OutlookExporterApp(ctk.CTk):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.title(APP_TITLE)
|
|
self.geometry(APP_GEOMETRY)
|
|
self.minsize(1080, 680)
|
|
self.withdraw()
|
|
|
|
ctk.set_appearance_mode("system")
|
|
ctk.set_default_color_theme("blue")
|
|
self.splash = None
|
|
|
|
self.show_startup_splash()
|
|
self.outlook_service = OutlookService()
|
|
self.exporter = Exporter(self)
|
|
self.export_thread = None
|
|
self.main_thread_id = threading.get_ident()
|
|
self.ui_queue = queue.Queue()
|
|
self.connected = False
|
|
self.tree_folder_map = {}
|
|
self.tree_path_map = {}
|
|
self.empty_folder_alert_shown = set()
|
|
self.selected_folder_obj = None
|
|
self.filter_thread = None
|
|
self.shutting_down = False
|
|
self.ui_queue_after_id = None
|
|
self.loaded_email_headers: List[EmailHeader] = []
|
|
self.filtered_email_headers: List[EmailHeader] = []
|
|
self.filter_tree_key_map = {}
|
|
self.filter_tree = None
|
|
self.filter_loaded_folder_text = ""
|
|
self.filter_live_after_id = None
|
|
self.filter_live_delay_ms = 300
|
|
self.filter_status_var = ctk.StringVar(value="No emails loaded")
|
|
self.filter_loaded_folder_var = ctk.StringVar(value="No folder loaded")
|
|
self.filter_subject_var = ctk.StringVar(value="")
|
|
self.filter_sender_var = ctk.StringVar(value="")
|
|
self.filter_date_range_var = ctk.StringVar(value="")
|
|
self.filter_export_new_var = ctk.IntVar(value=0)
|
|
self.filter_export_with_attachments_var = ctk.IntVar(value=0)
|
|
self.filter_export_selected_var = ctk.IntVar(value=0)
|
|
self.log_lines: List[str] = []
|
|
self.pending_file_log_lines: List[str] = []
|
|
self.tooltips: List[ToolTip] = []
|
|
self.log_window = None
|
|
self.log_window_box = None
|
|
self.help_window = None
|
|
self.help_window_box = None
|
|
default_output_dir = Path.home() / "Desktop" / "outlook_export"
|
|
self.prefs = PreferenceStore(default_output_dir)
|
|
self.log_file_path = PreferenceStore.app_data_dir() / LOG_FILE_NAME
|
|
self._init_file_log()
|
|
self.selected_folder_display = ctk.StringVar(value="No folder selected")
|
|
self.output_dir_var = ctk.StringVar(value=self.prefs.get_output_dir())
|
|
self.status_var = ctk.StringVar(value="Ready")
|
|
self.busy_title_var = ctk.StringVar(value="")
|
|
self.busy_detail_var = ctk.StringVar(value="")
|
|
self.busy_overlay_visible = False
|
|
self.last_export_duration_seconds = 0.0
|
|
self.last_filter_load_duration_seconds = 0.0
|
|
self.create_zip_var = ctk.IntVar(value=0)
|
|
self.create_hierarchy_var = ctk.IntVar(value=1)
|
|
|
|
self._build_ui()
|
|
self._install_tooltips()
|
|
self.hide_startup_splash()
|
|
self.deiconify()
|
|
self.close_pyinstaller_splash()
|
|
self.log(f"Log file: {self.log_file_path}")
|
|
self.ui_queue_after_id = self.after(100, self.process_ui_queue)
|
|
self.protocol("WM_DELETE_WINDOW", self.on_close)
|
|
|
|
def show_startup_splash(self) -> None:
|
|
splash = ctk.CTkToplevel(self)
|
|
splash.title(APP_TITLE)
|
|
splash.overrideredirect(True)
|
|
splash.resizable(False, False)
|
|
splash.attributes("-topmost", True)
|
|
|
|
width = 420
|
|
height = 180
|
|
x = max(0, int((splash.winfo_screenwidth() - width) / 2))
|
|
y = max(0, int((splash.winfo_screenheight() - height) / 2))
|
|
splash.geometry(f"{width}x{height}+{x}+{y}")
|
|
splash.grid_columnconfigure(0, weight=1)
|
|
splash.grid_rowconfigure(0, weight=1)
|
|
|
|
content = ctk.CTkFrame(splash)
|
|
content.grid(row=0, column=0, sticky="nsew", padx=12, pady=12)
|
|
content.grid_columnconfigure(0, weight=1)
|
|
|
|
ctk.CTkLabel(
|
|
content,
|
|
text="Outlook Email Exporter",
|
|
font=ctk.CTkFont(size=26, weight="bold"),
|
|
).grid(row=0, column=0, pady=(24, 8))
|
|
ctk.CTkLabel(content, text="Starting...").grid(row=1, column=0, pady=(0, 18))
|
|
|
|
progress = ctk.CTkProgressBar(content, mode="indeterminate")
|
|
progress.grid(row=2, column=0, sticky="ew", padx=34, pady=(0, 18))
|
|
progress.start()
|
|
|
|
self.splash = splash
|
|
self.update_idletasks()
|
|
self.update()
|
|
|
|
def hide_startup_splash(self) -> None:
|
|
if self.splash is not None:
|
|
try:
|
|
self.splash.destroy()
|
|
except Exception:
|
|
pass
|
|
self.splash = None
|
|
|
|
@staticmethod
|
|
def close_pyinstaller_splash() -> None:
|
|
try:
|
|
import pyi_splash
|
|
|
|
pyi_splash.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _build_ui(self) -> None:
|
|
self.grid_columnconfigure(0, weight=1)
|
|
self.grid_rowconfigure(1, weight=1)
|
|
|
|
top = ctk.CTkFrame(self)
|
|
top.grid(row=0, column=0, sticky="nsew", padx=8, pady=(8, 4))
|
|
top.grid_columnconfigure(3, weight=1)
|
|
top.grid_columnconfigure(4, weight=0)
|
|
|
|
self.btn_connect = ctk.CTkButton(top, text="Connect Outlook", command=self.connect_outlook)
|
|
self.btn_connect.grid(row=0, column=0, padx=6, pady=6)
|
|
|
|
self.btn_refresh = ctk.CTkButton(top, text="Reload folders", command=self.reload_tree, state="disabled")
|
|
self.btn_refresh.grid(row=0, column=1, padx=6, pady=6)
|
|
|
|
self.btn_expand = ctk.CTkButton(top, text="Expand tree", command=self.expand_all, state="disabled")
|
|
self.btn_expand.grid(row=0, column=2, padx=6, pady=6)
|
|
|
|
lbl_selected = ctk.CTkLabel(top, textvariable=self.selected_folder_display, anchor="w")
|
|
lbl_selected.grid(row=0, column=3, padx=6, pady=6, sticky="ew")
|
|
self.btn_help = ctk.CTkButton(top, text="?", width=34, command=self.open_help_window)
|
|
self.btn_help.grid(row=0, column=4, padx=(6, 10), pady=6, sticky="e")
|
|
|
|
main_pane = ttk.Panedwindow(self, orient="horizontal")
|
|
main_pane.grid(row=1, column=0, sticky="nsew", padx=8, pady=(4, 8))
|
|
|
|
left = ctk.CTkFrame(main_pane)
|
|
left.grid_rowconfigure(1, weight=1)
|
|
left.grid_columnconfigure(0, weight=1)
|
|
|
|
ctk.CTkLabel(left, text="Outlook", font=ctk.CTkFont(size=18, weight="bold")).grid(
|
|
row=0, column=0, sticky="w", padx=8, pady=(8, 2)
|
|
)
|
|
|
|
tree_container = ctk.CTkFrame(left)
|
|
tree_container.grid(row=1, column=0, sticky="nsew", padx=8, pady=(2, 8))
|
|
tree_container.grid_rowconfigure(0, weight=1)
|
|
tree_container.grid_columnconfigure(0, weight=1)
|
|
|
|
style = ttk.Style()
|
|
try:
|
|
style.theme_use("default")
|
|
except Exception:
|
|
pass
|
|
style.configure("Treeview", rowheight=24)
|
|
self.tree_font_bold = ("Segoe UI", 10, "bold")
|
|
style.map(
|
|
"Treeview",
|
|
background=[("selected", "#93c5fd")],
|
|
foreground=[("selected", "#111827")],
|
|
)
|
|
|
|
self.tree = ttk.Treeview(tree_container, show="tree")
|
|
self.tree.grid(row=0, column=0, sticky="nsew")
|
|
self.tree.column("#0", width=280, minwidth=220, stretch=True)
|
|
self.tree.tag_configure("root", background="#d9d9d9", font=self.tree_font_bold)
|
|
self.tree.tag_configure("odd", background="#f3f4f6")
|
|
self.tree.tag_configure("even", background="#e5e7eb")
|
|
self.tree.tag_configure("branch_odd", background="#f3f4f6", font=self.tree_font_bold)
|
|
self.tree.tag_configure("branch_even", background="#e5e7eb", font=self.tree_font_bold)
|
|
self.tree.bind("<<TreeviewOpen>>", self.on_tree_open)
|
|
self.tree.bind("<<TreeviewSelect>>", self.on_tree_select)
|
|
|
|
yscroll = ttk.Scrollbar(tree_container, orient="vertical", command=self.tree.yview)
|
|
yscroll.grid(row=0, column=1, sticky="ns")
|
|
self.tree.configure(yscrollcommand=yscroll.set)
|
|
|
|
right = ctk.CTkFrame(main_pane)
|
|
right.grid_rowconfigure(1, weight=1)
|
|
right.grid_columnconfigure(0, weight=1)
|
|
|
|
main_pane.add(left, weight=7)
|
|
main_pane.add(right, weight=13)
|
|
|
|
right_top = ctk.CTkFrame(right)
|
|
right_top.grid(row=0, column=0, sticky="nsew", padx=0, pady=(0, 4))
|
|
right_top.grid_columnconfigure(1, weight=1)
|
|
|
|
ctk.CTkLabel(right_top, text="Immediate export", font=ctk.CTkFont(size=18, weight="bold")).grid(
|
|
row=0, column=0, columnspan=3, sticky="w", padx=8, pady=(8, 4)
|
|
)
|
|
|
|
ctk.CTkLabel(right_top, text="Output folder:").grid(row=1, column=0, padx=8, pady=4, sticky="w")
|
|
self.entry_output = ctk.CTkEntry(right_top, textvariable=self.output_dir_var)
|
|
self.entry_output.grid(row=1, column=1, padx=8, pady=4, sticky="ew")
|
|
self.btn_browse = ctk.CTkButton(right_top, text="Browse", width=110, command=self.choose_output_dir)
|
|
self.btn_browse.grid(row=1, column=2, padx=8, pady=4)
|
|
|
|
help_text = (
|
|
"The selected folder will be exported together with all its subfolders.\n"
|
|
"Each email is saved as a TXT file and attachments are saved in their original format."
|
|
)
|
|
ctk.CTkLabel(right_top, text=help_text, justify="left").grid(
|
|
row=2, column=0, columnspan=3, padx=8, pady=(2, 4), sticky="w"
|
|
)
|
|
|
|
options = ctk.CTkFrame(right_top, fg_color="transparent")
|
|
options.grid(row=3, column=0, columnspan=3, sticky="ew", padx=8, pady=(2, 4))
|
|
options.grid_columnconfigure(0, weight=1)
|
|
options.grid_columnconfigure(1, weight=1)
|
|
|
|
self.chk_zip = ctk.CTkCheckBox(
|
|
options,
|
|
text="Create zip",
|
|
variable=self.create_zip_var,
|
|
command=lambda: self.on_export_mode_change("zip"),
|
|
)
|
|
self.chk_zip.grid(row=0, column=0, sticky="w", padx=(0, 12))
|
|
|
|
self.chk_hierarchy = ctk.CTkCheckBox(
|
|
options,
|
|
text="Create folders on file system",
|
|
variable=self.create_hierarchy_var,
|
|
command=lambda: self.on_export_mode_change("hierarchy"),
|
|
)
|
|
self.chk_hierarchy.grid(row=0, column=1, sticky="w", padx=(12, 0))
|
|
|
|
self.btn_export = ctk.CTkButton(
|
|
right_top,
|
|
text="Export selected folder without preview",
|
|
command=lambda: self.start_export(EXPORT_MODE_ALL),
|
|
state="disabled",
|
|
height=32,
|
|
)
|
|
self.btn_export.grid(row=4, column=0, columnspan=2, padx=8, pady=(4, 6), sticky="ew")
|
|
|
|
right_bottom = ctk.CTkFrame(right)
|
|
right_bottom.grid(row=1, column=0, sticky="nsew", padx=0, pady=(4, 0))
|
|
right_bottom.grid_rowconfigure(4, weight=1)
|
|
right_bottom.grid_columnconfigure(1, weight=1)
|
|
right_bottom.grid_columnconfigure(3, weight=1)
|
|
|
|
filter_group = ctk.CTkFrame(right_bottom)
|
|
filter_group.grid(row=0, column=0, columnspan=4, sticky="nsew", padx=(8, 4), pady=(8, 4))
|
|
filter_group.grid_columnconfigure(1, weight=1)
|
|
filter_group.grid_columnconfigure(3, weight=1)
|
|
ctk.CTkLabel(filter_group, text="Preview and filters", font=ctk.CTkFont(size=18, weight="bold")).grid(
|
|
row=0, column=0, columnspan=4, sticky="w", padx=8, pady=(8, 2)
|
|
)
|
|
ctk.CTkLabel(filter_group, textvariable=self.filter_loaded_folder_var, anchor="w").grid(
|
|
row=1, column=0, columnspan=4, sticky="ew", padx=8, pady=(0, 4)
|
|
)
|
|
ctk.CTkLabel(filter_group, text="Sender contains").grid(row=2, column=0, sticky="w", padx=8, pady=(0, 4))
|
|
self.entry_filter_sender = ctk.CTkEntry(filter_group, textvariable=self.filter_sender_var, state="disabled")
|
|
self.entry_filter_sender.grid(row=2, column=1, sticky="ew", padx=(0, 8), pady=(0, 4))
|
|
self.entry_filter_sender.bind("<KeyRelease>", self._on_live_filter_input)
|
|
ctk.CTkLabel(filter_group, text="Subject contains").grid(row=2, column=2, sticky="w", padx=8, pady=(0, 4))
|
|
self.entry_filter_subject = ctk.CTkEntry(filter_group, textvariable=self.filter_subject_var, state="disabled")
|
|
self.entry_filter_subject.grid(row=2, column=3, sticky="ew", padx=(0, 8), pady=(0, 4))
|
|
self.entry_filter_subject.bind("<KeyRelease>", self._on_live_filter_input)
|
|
ctk.CTkLabel(filter_group, text="Period").grid(row=3, column=0, sticky="w", padx=8, pady=(0, 4))
|
|
self.combo_filter_date_range = ctk.CTkComboBox(
|
|
filter_group,
|
|
variable=self.filter_date_range_var,
|
|
values=["", "Last week", "Last month", "Last 6 months", "Last year"],
|
|
command=lambda _value: self.schedule_live_filter(),
|
|
state="disabled",
|
|
)
|
|
self.combo_filter_date_range.grid(row=3, column=1, sticky="w", padx=(0, 8), pady=(0, 4))
|
|
filter_options = ctk.CTkFrame(filter_group, fg_color="transparent")
|
|
filter_options.grid(row=4, column=0, columnspan=4, sticky="ew", padx=8, pady=(0, 8))
|
|
filter_options.grid_columnconfigure(3, weight=1)
|
|
self.chk_filter_export_new = ctk.CTkCheckBox(
|
|
filter_options,
|
|
text="New",
|
|
variable=self.filter_export_new_var,
|
|
command=self.schedule_live_filter,
|
|
state="disabled",
|
|
)
|
|
self.chk_filter_export_new.grid(row=0, column=0, sticky="w", padx=(0, 12))
|
|
self.chk_filter_export_with_attachments = ctk.CTkCheckBox(
|
|
filter_options,
|
|
text="With attachments",
|
|
variable=self.filter_export_with_attachments_var,
|
|
command=self.schedule_live_filter,
|
|
state="disabled",
|
|
)
|
|
self.chk_filter_export_with_attachments.grid(row=0, column=1, sticky="w", padx=(0, 12))
|
|
self.chk_filter_export_selected = ctk.CTkCheckBox(
|
|
filter_options,
|
|
text="Selected",
|
|
variable=self.filter_export_selected_var,
|
|
command=self.schedule_live_filter,
|
|
state="disabled",
|
|
)
|
|
self.chk_filter_export_selected.grid(row=0, column=2, sticky="w", padx=(0, 12))
|
|
actions_group = ctk.CTkFrame(right_bottom)
|
|
actions_group.grid(row=0, column=4, sticky="nsew", padx=(5, 10), pady=(10, 8))
|
|
actions_group.grid_columnconfigure(0, weight=1)
|
|
actions_group.grid_rowconfigure(4, weight=1)
|
|
ctk.CTkLabel(actions_group, text="Actions", font=ctk.CTkFont(size=15, weight="bold")).grid(
|
|
row=0, column=0, sticky="w", padx=8, pady=(8, 4)
|
|
)
|
|
self.btn_load_headers = ctk.CTkButton(
|
|
actions_group,
|
|
text="Load preview",
|
|
width=130,
|
|
command=self.load_folder_for_filtering,
|
|
state="disabled",
|
|
)
|
|
self.btn_load_headers.grid(row=1, column=0, sticky="ew", padx=8, pady=(0, 6))
|
|
self.btn_apply_filters = ctk.CTkButton(
|
|
actions_group,
|
|
text="Apply filters",
|
|
command=self.apply_email_filters,
|
|
width=120,
|
|
state="disabled",
|
|
)
|
|
self.btn_apply_filters.grid(row=2, column=0, sticky="ew", padx=8, pady=(0, 6))
|
|
self.btn_reset_filters = ctk.CTkButton(
|
|
actions_group,
|
|
text="Reset filters",
|
|
command=self.reset_email_filters,
|
|
width=110,
|
|
state="disabled",
|
|
)
|
|
self.btn_reset_filters.grid(row=3, column=0, sticky="ew", padx=8, pady=(0, 8))
|
|
|
|
table_frame = ctk.CTkFrame(right_bottom)
|
|
table_frame.grid(row=4, column=0, columnspan=5, sticky="nsew", padx=8, pady=(0, 8))
|
|
table_frame.grid_columnconfigure(0, weight=1)
|
|
table_frame.grid_rowconfigure(0, weight=1)
|
|
|
|
columns = ("date", "sender", "subject", "attachments", "types", "state")
|
|
self.filter_tree = ttk.Treeview(table_frame, columns=columns, show="headings", selectmode="extended")
|
|
self.filter_tree.grid(row=0, column=0, sticky="nsew")
|
|
self.filter_tree.tag_configure("odd", background="#f8fafc")
|
|
self.filter_tree.tag_configure("even", background="#e5e7eb")
|
|
self.filter_tree.heading("date", text="Date")
|
|
self.filter_tree.heading("sender", text="Sender")
|
|
self.filter_tree.heading("subject", text="Subject")
|
|
self.filter_tree.heading("attachments", text="Att.")
|
|
self.filter_tree.heading("types", text="Type")
|
|
self.filter_tree.heading("state", text="State")
|
|
self.filter_tree.column("date", width=150, anchor="w")
|
|
self.filter_tree.column("sender", width=220, anchor="w")
|
|
self.filter_tree.column("subject", width=430, anchor="w")
|
|
self.filter_tree.column("attachments", width=36, minwidth=32, anchor="center", stretch=False)
|
|
self.filter_tree.column("types", width=120, minwidth=90, anchor="center")
|
|
self.filter_tree.column("state", width=135, minwidth=120, anchor="center")
|
|
|
|
filter_yscroll = ttk.Scrollbar(table_frame, orient="vertical", command=self.filter_tree.yview)
|
|
filter_yscroll.grid(row=0, column=1, sticky="ns")
|
|
filter_xscroll = ttk.Scrollbar(table_frame, orient="horizontal", command=self.filter_tree.xview)
|
|
filter_xscroll.grid(row=1, column=0, sticky="ew")
|
|
self.filter_tree.configure(yscrollcommand=filter_yscroll.set, xscrollcommand=filter_xscroll.set)
|
|
self.filter_tree.bind("<<TreeviewSelect>>", self._on_filter_tree_select)
|
|
|
|
preview_actions = ctk.CTkFrame(right_bottom, fg_color="transparent")
|
|
preview_actions.grid(row=5, column=0, columnspan=5, sticky="ew", padx=8, pady=(0, 8))
|
|
preview_actions.grid_columnconfigure(0, weight=1)
|
|
ctk.CTkLabel(preview_actions, textvariable=self.filter_status_var, anchor="w").grid(
|
|
row=0, column=0, sticky="ew", padx=(0, 10), pady=0
|
|
)
|
|
self.btn_export_preview = ctk.CTkButton(
|
|
preview_actions,
|
|
text="Export preview",
|
|
width=150,
|
|
command=self.export_preview_subset,
|
|
state="disabled",
|
|
)
|
|
self.btn_export_preview.grid(row=0, column=1, sticky="e", pady=0)
|
|
|
|
status = ctk.CTkFrame(self)
|
|
status.grid(row=2, column=0, sticky="ew", padx=8, pady=(0, 8))
|
|
status.grid_columnconfigure(0, weight=1)
|
|
status.grid_columnconfigure(1, weight=0)
|
|
status.grid_columnconfigure(2, weight=0)
|
|
status.grid_columnconfigure(3, weight=0)
|
|
ctk.CTkLabel(status, textvariable=self.status_var, anchor="w").grid(
|
|
row=0, column=0, sticky="ew", padx=8, pady=6
|
|
)
|
|
self.btn_status_log = ctk.CTkButton(status, text="Open log", width=90, command=self.toggle_log_window)
|
|
self.btn_status_log.grid(row=0, column=1, sticky="e", padx=(0, 8), pady=6)
|
|
self.btn_cancel = ctk.CTkButton(
|
|
status,
|
|
text="Cancel export",
|
|
command=self.cancel_export,
|
|
state="disabled",
|
|
width=150,
|
|
fg_color="#9b2c2c",
|
|
hover_color="#7f1d1d",
|
|
)
|
|
self.btn_cancel.grid(row=0, column=2, sticky="e", padx=(0, 8), pady=6)
|
|
self.btn_exit = ctk.CTkButton(status, text="Exit", width=100, command=self.on_close)
|
|
self.btn_exit.grid(row=0, column=3, sticky="e", padx=8, pady=6)
|
|
|
|
self.busy_overlay = ctk.CTkFrame(self, corner_radius=10, border_width=1, height=150)
|
|
self.busy_overlay.grid_columnconfigure(0, weight=1)
|
|
ctk.CTkLabel(
|
|
self.busy_overlay,
|
|
textvariable=self.busy_title_var,
|
|
font=ctk.CTkFont(size=18, weight="bold"),
|
|
).grid(row=0, column=0, sticky="ew", padx=18, pady=(18, 8))
|
|
ctk.CTkLabel(
|
|
self.busy_overlay,
|
|
textvariable=self.busy_detail_var,
|
|
justify="center",
|
|
).grid(row=1, column=0, sticky="ew", padx=18, pady=(0, 10))
|
|
self.busy_progress = ctk.CTkProgressBar(self.busy_overlay, mode="indeterminate")
|
|
self.busy_progress.grid(row=2, column=0, sticky="ew", padx=18, pady=(0, 18))
|
|
self.busy_overlay.place_forget()
|
|
self._update_filter_controls_state()
|
|
|
|
def _install_tooltips(self) -> None:
|
|
tooltip_specs = [
|
|
(self.btn_connect, "Connect to the already running Outlook session and load the available accounts."),
|
|
(self.btn_refresh, "Reload the Outlook folder tree from the current session."),
|
|
(self.btn_expand, "Expand all folders that are already loaded in the tree."),
|
|
(self.btn_help, "Open the quick help window."),
|
|
(self.btn_browse, "Choose the destination folder for export files and the manifest."),
|
|
(self.btn_export, "Export the selected folder and all subfolders immediately, without using the preview."),
|
|
(self.btn_load_headers, "Load a lightweight preview of the emails in the selected folder."),
|
|
(self.btn_apply_filters, "Apply the current filters to the loaded preview."),
|
|
(self.btn_reset_filters, "Clear preview filters and show all loaded emails again."),
|
|
(self.btn_export_preview, "Export the subset of emails currently visible in the filtered preview."),
|
|
(self.btn_status_log, "Open or close the application log window."),
|
|
(self.btn_cancel, "Request cancellation of the long-running operation in progress."),
|
|
(self.btn_exit, "Close the application and try to shut it down cleanly."),
|
|
(self.entry_output, "Destination folder used for exported files and the export manifest."),
|
|
(self.entry_filter_sender, "Filter the preview by sender name or sender address."),
|
|
(self.entry_filter_subject, "Filter the preview by words contained in the email subject."),
|
|
(self.combo_filter_date_range, "Limit the preview to a recent time range."),
|
|
(self.chk_zip, "Create a ZIP archive as export output."),
|
|
(self.chk_hierarchy, "Create folders and files directly on the file system."),
|
|
(self.chk_filter_export_new, "Show only emails that are not yet exported."),
|
|
(self.chk_filter_export_with_attachments, "Show only emails that have Outlook attachments."),
|
|
(self.chk_filter_export_selected, "Limit the preview and export to the rows currently selected."),
|
|
]
|
|
self.tooltips = [ToolTip(widget, text) for widget, text in tooltip_specs]
|
|
|
|
def connect_outlook(self) -> None:
|
|
try:
|
|
self.set_status("Connecting to Outlook...")
|
|
self.log("Connecting to Outlook...")
|
|
self.outlook_service.connect()
|
|
self.connected = True
|
|
self.log("Connected to Outlook.")
|
|
self.reload_tree()
|
|
self.btn_refresh.configure(state="normal")
|
|
self.btn_expand.configure(state="normal")
|
|
self.set_status("Connected to Outlook")
|
|
except Exception as exc:
|
|
self.connected = False
|
|
self.log(f"Outlook connection error: {exc}")
|
|
self.set_status("Connection error")
|
|
messagebox.showerror(APP_TITLE, f"Unable to connect to Outlook.\n\n{exc}")
|
|
|
|
def reload_tree(self) -> None:
|
|
if not self.connected:
|
|
return
|
|
if self.filter_live_after_id is not None:
|
|
try:
|
|
self.after_cancel(self.filter_live_after_id)
|
|
except Exception:
|
|
pass
|
|
self.filter_live_after_id = None
|
|
self.tree.delete(*self.tree.get_children())
|
|
self.tree_folder_map.clear()
|
|
self.tree_path_map.clear()
|
|
self.empty_folder_alert_shown.clear()
|
|
self.selected_folder_obj = None
|
|
self.loaded_email_headers = []
|
|
self.filtered_email_headers = []
|
|
self.filter_tree_key_map = {}
|
|
self.filter_loaded_folder_text = ""
|
|
self.filter_loaded_folder_var.set("No folder loaded")
|
|
self.filter_status_var.set("No emails loaded")
|
|
self.selected_folder_display.set("No folder selected")
|
|
self.btn_export.configure(state="disabled")
|
|
self.refresh_filter_tree()
|
|
self._update_filter_controls_state()
|
|
|
|
for root_node in self.outlook_service.list_root_folders():
|
|
node_text = NameCodec.tree_label(root_node.display_name or root_node.path, root_node.mail_count)
|
|
node_id = self.tree.insert("", "end", text=node_text, open=False, tags=("root",))
|
|
self.tree_folder_map[node_id] = root_node.folder_obj
|
|
self.tree_path_map[node_id] = root_node.path
|
|
self._insert_dummy(node_id)
|
|
|
|
self.log("Folder tree reloaded.")
|
|
|
|
def _insert_dummy(self, node_id: str) -> None:
|
|
self.tree.insert(node_id, "end", text="__dummy__")
|
|
|
|
def on_tree_open(self, _event=None) -> None:
|
|
selected = self.tree.focus()
|
|
if not selected:
|
|
return
|
|
children = self.tree.get_children(selected)
|
|
if len(children) == 1 and self.tree.item(children[0], "text") == "__dummy__":
|
|
self.tree.delete(children[0])
|
|
folder_obj = self.tree_folder_map.get(selected)
|
|
current_path = self.tree_path_map.get(selected, self.tree.item(selected, "text"))
|
|
try:
|
|
child_nodes = self.outlook_service.list_children(folder_obj, current_path)
|
|
for idx, node in enumerate(child_nodes, start=1):
|
|
node_text = NameCodec.tree_label(node.display_name or node.path, node.mail_count)
|
|
has_children = False
|
|
try:
|
|
has_children = bool(node.folder_obj.Folders.Count > 0)
|
|
except Exception:
|
|
has_children = False
|
|
if has_children:
|
|
row_tag = "branch_even" if idx % 2 == 0 else "branch_odd"
|
|
else:
|
|
row_tag = "even" if idx % 2 == 0 else "odd"
|
|
child_id = self.tree.insert(selected, "end", text=node_text, open=False, tags=(row_tag,))
|
|
self.tree_folder_map[child_id] = node.folder_obj
|
|
self.tree_path_map[child_id] = node.path
|
|
try:
|
|
if has_children:
|
|
self._insert_dummy(child_id)
|
|
except Exception:
|
|
pass
|
|
except Exception as exc:
|
|
self.log(f"Error loading subfolders: {exc}")
|
|
|
|
def on_tree_select(self, _event=None) -> None:
|
|
selected = self.tree.selection()
|
|
if not selected:
|
|
return
|
|
node_id = selected[0]
|
|
folder_obj = self.tree_folder_map.get(node_id)
|
|
self.selected_folder_obj = folder_obj
|
|
folder_path = self.tree_path_map.get(node_id, self.tree.item(node_id, "text"))
|
|
if folder_obj is None:
|
|
self.selected_folder_display.set("No folder selected")
|
|
self.btn_export.configure(state="disabled")
|
|
return
|
|
|
|
self.selected_folder_display.set(f"Selected: {folder_path}")
|
|
self.btn_export.configure(state="normal")
|
|
self.btn_load_headers.configure(state="normal")
|
|
if folder_path != self.filter_loaded_folder_text:
|
|
self.loaded_email_headers = []
|
|
self.filtered_email_headers = []
|
|
self.filter_tree_key_map = {}
|
|
self.filter_loaded_folder_text = ""
|
|
self.filter_loaded_folder_var.set(f"Selected folder: {folder_path}")
|
|
self.filter_status_var.set("No emails loaded for the selected folder")
|
|
self.refresh_filter_tree()
|
|
self._update_filter_controls_state()
|
|
self.maybe_show_empty_folder_server_alert(node_id, folder_obj)
|
|
|
|
def maybe_show_empty_folder_server_alert(self, node_id: str, folder_obj: object) -> None:
|
|
if self.tree.parent(node_id) == "":
|
|
return
|
|
|
|
if node_id in self.empty_folder_alert_shown:
|
|
return
|
|
|
|
diagnostics = OutlookService.folder_item_diagnostics(folder_obj)
|
|
mail_count = OutlookService.count_mail_items(folder_obj)
|
|
if mail_count != 0:
|
|
return
|
|
|
|
self.empty_folder_alert_shown.add(node_id)
|
|
self.log_empty_folder_diagnostics(diagnostics)
|
|
self.show_empty_folder_server_alert()
|
|
|
|
def log_empty_folder_diagnostics(self, diagnostics: dict) -> None:
|
|
self.log("Empty folder diagnostics via Outlook COM:")
|
|
self.log(f" folder_name: {diagnostics.get('folder_name')}")
|
|
self.log(f" folder_path: {diagnostics.get('folder_path')}")
|
|
self.log(f" raw Items.Count: {diagnostics.get('raw_count')}")
|
|
self.log(f" Restrict IPM.Note Count: {diagnostics.get('restricted_count')}")
|
|
self.log(f" items_error: {diagnostics.get('items_error')}")
|
|
self.log(f" restrict_error: {diagnostics.get('restrict_error')}")
|
|
self.log(f" store_id: {diagnostics.get('store_id')}")
|
|
|
|
def show_empty_folder_server_alert(self) -> None:
|
|
dialog = ctk.CTkToplevel(self)
|
|
dialog.title("Empty Outlook folder")
|
|
dialog.geometry("720x430")
|
|
dialog.minsize(640, 380)
|
|
dialog.transient(self)
|
|
dialog.grab_set()
|
|
|
|
dialog.grid_columnconfigure(0, weight=0)
|
|
dialog.grid_columnconfigure(1, weight=1)
|
|
dialog.grid_rowconfigure(1, weight=1)
|
|
|
|
alert = ctk.CTkLabel(
|
|
dialog,
|
|
text="!",
|
|
width=72,
|
|
height=72,
|
|
corner_radius=36,
|
|
fg_color="#b45309",
|
|
text_color="white",
|
|
font=ctk.CTkFont(size=42, weight="bold"),
|
|
)
|
|
alert.grid(row=0, column=0, rowspan=2, sticky="n", padx=(24, 16), pady=(24, 12))
|
|
|
|
ctk.CTkLabel(
|
|
dialog,
|
|
text="The selected folder appears to be empty",
|
|
font=ctk.CTkFont(size=22, weight="bold"),
|
|
anchor="w",
|
|
).grid(row=0, column=1, sticky="ew", padx=(0, 24), pady=(26, 8))
|
|
|
|
message = (
|
|
"The selected folder looks empty, but there may still be emails on the Outlook server "
|
|
"that are not yet synchronized locally.\n\n"
|
|
"To synchronize, go to:\n\n"
|
|
"File -> Account settings -> Sync settings and Account -> All\n\n"
|
|
"Then wait for synchronization to complete and try the export again.\n\n"
|
|
"Note: if Outlook shows a link such as 'there are more items on the server', the program can "
|
|
"only see emails that are already synchronized locally."
|
|
)
|
|
ctk.CTkLabel(dialog, text=message, justify="left", anchor="nw", wraplength=560).grid(
|
|
row=1, column=1, sticky="nsew", padx=(0, 24), pady=(0, 16)
|
|
)
|
|
|
|
buttons = ctk.CTkFrame(dialog, fg_color="transparent")
|
|
buttons.grid(row=2, column=0, columnspan=2, sticky="ew", padx=24, pady=(0, 24))
|
|
buttons.grid_columnconfigure(0, weight=1)
|
|
buttons.grid_columnconfigure(1, weight=0)
|
|
buttons.grid_columnconfigure(2, weight=0)
|
|
|
|
def open_outlook() -> None:
|
|
try:
|
|
os.startfile("outlook")
|
|
except Exception as exc:
|
|
messagebox.showerror(APP_TITLE, f"Unable to open Outlook.\n\n{exc}")
|
|
|
|
ctk.CTkButton(buttons, text="Open Outlook", width=140, command=open_outlook).grid(
|
|
row=0, column=1, sticky="e", padx=(0, 12)
|
|
)
|
|
ctk.CTkButton(
|
|
buttons,
|
|
text="Close",
|
|
width=120,
|
|
command=dialog.destroy,
|
|
fg_color="#5f6368",
|
|
hover_color="#4b4f52",
|
|
).grid(row=0, column=2, sticky="e")
|
|
|
|
dialog.protocol("WM_DELETE_WINDOW", dialog.destroy)
|
|
dialog.after(100, dialog.focus_force)
|
|
|
|
def expand_all(self) -> None:
|
|
def _expand(node_id: str) -> None:
|
|
self.tree.item(node_id, open=True)
|
|
self.tree.focus(node_id)
|
|
self.on_tree_open()
|
|
for child in self.tree.get_children(node_id):
|
|
if self.tree.item(child, "text") != "__dummy__":
|
|
_expand(child)
|
|
|
|
for root in self.tree.get_children(""):
|
|
_expand(root)
|
|
self.log("Tree expansion completed.")
|
|
|
|
def choose_output_dir(self) -> None:
|
|
selected = filedialog.askdirectory(initialdir=self.output_dir_var.get() or str(Path.home()))
|
|
if selected:
|
|
self.output_dir_var.set(selected)
|
|
self.prefs.set_output_dir(selected)
|
|
|
|
def load_folder_for_filtering(self) -> None:
|
|
if self.selected_folder_obj is None:
|
|
messagebox.showwarning(APP_TITLE, "Select an Outlook folder first.")
|
|
return
|
|
if self.filter_thread is not None and self.filter_thread.is_alive():
|
|
messagebox.showwarning(APP_TITLE, "An email loading task is already in progress.")
|
|
return
|
|
|
|
folder_text = self.selected_folder_display.get().replace("Selected: ", "")
|
|
self.set_busy_overlay(
|
|
True,
|
|
"Please wait... loading many emails may take a few minutes",
|
|
f"Loading email headers for:\n{folder_text}",
|
|
)
|
|
self._set_controls_enabled(False)
|
|
self.filter_status_var.set("Loading in progress...")
|
|
output_dir = self.output_dir_var.get().strip()
|
|
self.filter_thread = threading.Thread(
|
|
target=self._run_filter_load_worker,
|
|
args=(self._marshal_folder_for_worker(self.selected_folder_obj), folder_text, output_dir),
|
|
daemon=True,
|
|
)
|
|
self.filter_thread.start()
|
|
|
|
@staticmethod
|
|
def _format_elapsed(seconds: float) -> str:
|
|
total_seconds = max(0, int(round(seconds)))
|
|
minutes, secs = divmod(total_seconds, 60)
|
|
return f"{minutes}m {secs:02d}s"
|
|
|
|
def on_export_mode_change(self, selected_mode: str) -> None:
|
|
if selected_mode == "zip":
|
|
self.create_zip_var.set(1)
|
|
self.create_hierarchy_var.set(0)
|
|
else:
|
|
self.create_zip_var.set(0)
|
|
self.create_hierarchy_var.set(1)
|
|
|
|
if not self.create_zip_var.get() and not self.create_hierarchy_var.get():
|
|
self.create_zip_var.set(1)
|
|
|
|
def _build_export_options(self, export_mode: str) -> ExportOptions:
|
|
if export_mode == EXPORT_MODE_WITH_ATTACHMENTS:
|
|
return ExportOptions(mode=export_mode, only_with_attachments=True)
|
|
if export_mode == EXPORT_MODE_FILTER:
|
|
return ExportOptions(mode=export_mode, only_with_attachments=False)
|
|
return ExportOptions(mode=EXPORT_MODE_ALL, only_with_attachments=False)
|
|
|
|
def _prepare_manifest_for_export(
|
|
self,
|
|
output_root: Path,
|
|
relative_path: str,
|
|
folder_text: str,
|
|
zip_mode: bool,
|
|
) -> bool:
|
|
manifest = ExportManifest(output_root)
|
|
folder_key = folder_text
|
|
scope = manifest.get_export_scope(folder_key)
|
|
expected_path = output_root / relative_path
|
|
record_count = manifest.folder_record_count(folder_key)
|
|
|
|
self.log(
|
|
"Pre-export manifest check: "
|
|
f"folder_key={folder_key} | scope={'yes' if scope is not None else 'no'} | "
|
|
f"record_count={record_count} | expected_path={expected_path}"
|
|
)
|
|
|
|
if scope is None:
|
|
if record_count <= 0:
|
|
return True
|
|
last_path = expected_path
|
|
target_label = relative_path or folder_text
|
|
else:
|
|
last_path_text = str(scope.get("lp") or "").strip()
|
|
last_path = Path(last_path_text) if last_path_text else expected_path
|
|
target_label = scope.get("rp") or relative_path or folder_text
|
|
|
|
self.log(f"Checking previous export path: {last_path}")
|
|
if last_path.exists():
|
|
return True
|
|
|
|
recreate = messagebox.askyesno(
|
|
APP_TITLE,
|
|
f"The previous export for the folder\n\n"
|
|
f"\"{target_label}\"\n\n"
|
|
"is no longer present on disk.\n\n"
|
|
"Do you want to recreate it?",
|
|
)
|
|
if not recreate:
|
|
return False
|
|
|
|
removed = manifest.clear_folder(folder_key)
|
|
manifest.save()
|
|
self.log(
|
|
f"Manifest cleaned to recreate the export for '{target_label}'. "
|
|
f"Removed records: {removed}"
|
|
)
|
|
return True
|
|
|
|
def _export_mode_label(self, export_mode: str) -> str:
|
|
labels = {
|
|
EXPORT_MODE_ALL: "Export all",
|
|
EXPORT_MODE_WITH_ATTACHMENTS: "Export emails with attachments only",
|
|
EXPORT_MODE_FILTER: "Export preview",
|
|
}
|
|
return labels.get(export_mode, export_mode)
|
|
|
|
def start_export(
|
|
self,
|
|
export_mode: str = EXPORT_MODE_ALL,
|
|
selected_keys: Optional[set[str]] = None,
|
|
selected_entry_refs: Optional[list[tuple[str, str]]] = None,
|
|
) -> None:
|
|
if self.export_thread is not None and self.export_thread.is_alive():
|
|
messagebox.showwarning(APP_TITLE, "An export is already in progress.")
|
|
return
|
|
|
|
if self.selected_folder_obj is None:
|
|
messagebox.showwarning(APP_TITLE, "Select an Outlook folder first.")
|
|
return
|
|
|
|
output_dir = self.output_dir_var.get().strip()
|
|
if not output_dir:
|
|
messagebox.showwarning(APP_TITLE, "Choose a valid output folder.")
|
|
return
|
|
output_root = Path(output_dir)
|
|
|
|
try:
|
|
output_root.mkdir(parents=True, exist_ok=True)
|
|
except Exception as exc:
|
|
messagebox.showerror(APP_TITLE, f"Unable to create the output folder.\n\n{exc}")
|
|
return
|
|
self.prefs.set_output_dir(str(output_root))
|
|
|
|
options = self._build_export_options(export_mode)
|
|
if selected_keys is not None:
|
|
if not selected_keys:
|
|
messagebox.showwarning(APP_TITLE, "Select at least one email from the filtered list.")
|
|
return
|
|
options.selected_keys = set(selected_keys)
|
|
if selected_entry_refs is not None:
|
|
if not selected_entry_refs:
|
|
messagebox.showwarning(APP_TITLE, "Select at least one email from the filtered list.")
|
|
return
|
|
options.selected_entry_refs = list(selected_entry_refs)
|
|
folder_text = self.selected_folder_display.get().replace("Selected: ", "")
|
|
options.folder_key = folder_text
|
|
relative_path = NameCodec.folder_name_from_outlook_path(folder_text)
|
|
zip_mode = bool(self.create_zip_var.get())
|
|
mode_label = self._export_mode_label(export_mode)
|
|
if not self._prepare_manifest_for_export(output_root, relative_path, folder_text, zip_mode):
|
|
return
|
|
if zip_mode:
|
|
confirm_message = (
|
|
f"Mode: {mode_label}\n\n"
|
|
"A ZIP archive will be created inside the output folder.\n"
|
|
"The temporary folder structure used to prepare the archive will be removed at the end.\n\n"
|
|
"Do you want to continue?"
|
|
)
|
|
else:
|
|
confirm_message = (
|
|
f"Mode: {mode_label}\n\n"
|
|
"A folder hierarchy will be created in the output folder.\n\n"
|
|
"Do you want to continue?"
|
|
)
|
|
if not messagebox.askyesno(APP_TITLE, confirm_message):
|
|
return
|
|
|
|
self._set_export_controls_running()
|
|
self.exporter = Exporter(self)
|
|
|
|
self.log("=" * 80)
|
|
self.log(f"Starting export: {folder_text}")
|
|
self.log(f"Action: {mode_label}")
|
|
self.log(f"Output mode: {'ZIP' if zip_mode else 'Folder hierarchy'}")
|
|
self.set_status("Export in progress...")
|
|
|
|
self.export_thread = threading.Thread(
|
|
target=self._run_export_worker,
|
|
args=(
|
|
self._marshal_folder_for_worker(self.selected_folder_obj),
|
|
output_root,
|
|
relative_path,
|
|
folder_text,
|
|
zip_mode,
|
|
options,
|
|
),
|
|
daemon=True,
|
|
)
|
|
self.export_thread.start()
|
|
|
|
def cancel_export(self) -> None:
|
|
self.exporter.cancel()
|
|
self.set_status("Cancellation requested...")
|
|
self.log("Cancellation request sent.")
|
|
|
|
def _set_controls_enabled(self, enabled: bool) -> None:
|
|
state = "normal" if enabled else "disabled"
|
|
export_state = state if enabled and self.selected_folder_obj else "disabled"
|
|
self.btn_export.configure(state=export_state)
|
|
self.btn_refresh.configure(state=state if self.connected else "disabled")
|
|
self.btn_connect.configure(state=state)
|
|
self.btn_expand.configure(state=state if self.connected else "disabled")
|
|
self.btn_browse.configure(state=state)
|
|
self.btn_exit.configure(state=state)
|
|
self.chk_zip.configure(state=state)
|
|
self.chk_hierarchy.configure(state=state)
|
|
self.btn_status_log.configure(state=state)
|
|
if enabled:
|
|
self._update_filter_controls_state()
|
|
else:
|
|
self.btn_load_headers.configure(state="disabled")
|
|
self.entry_filter_subject.configure(state="disabled")
|
|
self.entry_filter_sender.configure(state="disabled")
|
|
self.combo_filter_date_range.configure(state="disabled")
|
|
self.chk_filter_export_new.configure(state="disabled")
|
|
self.chk_filter_export_with_attachments.configure(state="disabled")
|
|
self.chk_filter_export_selected.configure(state="disabled")
|
|
self.btn_apply_filters.configure(state="disabled")
|
|
self.btn_reset_filters.configure(state="disabled")
|
|
self.btn_export_preview.configure(state="disabled")
|
|
|
|
def _set_export_controls_running(self) -> None:
|
|
self._set_controls_enabled(False)
|
|
self.btn_cancel.configure(state="normal")
|
|
self.set_busy_overlay(
|
|
True,
|
|
"Please wait... exporting many emails may take a few minutes",
|
|
"Export in progress...",
|
|
)
|
|
|
|
def _restore_export_controls(self) -> None:
|
|
self._set_controls_enabled(True)
|
|
self.btn_cancel.configure(state="disabled")
|
|
self.set_busy_overlay(False)
|
|
|
|
def _run_export_worker(
|
|
self,
|
|
folder_ref: object,
|
|
output_root: Path,
|
|
relative_path: str,
|
|
folder_text: str,
|
|
zip_mode: bool,
|
|
options: ExportOptions,
|
|
) -> None:
|
|
com_initialized = False
|
|
started_at = datetime.now()
|
|
try:
|
|
if pythoncom is not None:
|
|
pythoncom.CoInitialize()
|
|
com_initialized = True
|
|
manifest = ExportManifest(output_root)
|
|
self.exporter.configure(manifest, options)
|
|
self.log(f"Manifest export: {manifest.path}")
|
|
|
|
if zip_mode:
|
|
with tempfile.TemporaryDirectory(prefix="outlook_export_") as temp_dir:
|
|
temp_root = Path(temp_dir)
|
|
temp_export_target = temp_root / relative_path
|
|
self.log(f"Destinazione temporanea: {temp_export_target}")
|
|
if options.selected_entry_refs is not None:
|
|
namespace = OutlookService.get_namespace_for_worker()
|
|
mails, attachments, skipped = self.exporter.export_selected_items(
|
|
namespace,
|
|
options.selected_entry_refs,
|
|
temp_export_target,
|
|
)
|
|
else:
|
|
folder_obj = self._resolve_worker_folder(folder_ref)
|
|
mails, attachments, skipped = self.exporter.export_folder(folder_obj, temp_export_target)
|
|
self.exporter.check_cancel()
|
|
zip_path = self._create_zip_archive(temp_root, output_root, folder_text)
|
|
result_path = zip_path
|
|
else:
|
|
export_target = output_root / relative_path
|
|
self.log(f"Destinazione: {export_target}")
|
|
if options.selected_entry_refs is not None:
|
|
namespace = OutlookService.get_namespace_for_worker()
|
|
mails, attachments, skipped = self.exporter.export_selected_items(
|
|
namespace,
|
|
options.selected_entry_refs,
|
|
export_target,
|
|
)
|
|
else:
|
|
folder_obj = self._resolve_worker_folder(folder_ref)
|
|
mails, attachments, skipped = self.exporter.export_folder(folder_obj, export_target)
|
|
result_path = export_target
|
|
|
|
manifest.update_export_scope(options.folder_key, relative_path, result_path, zip_mode)
|
|
manifest.save()
|
|
|
|
elapsed_seconds = (datetime.now() - started_at).total_seconds()
|
|
self.ui_queue.put(("complete", mails, attachments, skipped, result_path, zip_mode, options.mode, elapsed_seconds))
|
|
except ExportCancelled:
|
|
self.ui_queue.put(("cancelled",))
|
|
except Exception as exc:
|
|
self.ui_queue.put(("error", str(exc), traceback.format_exc()))
|
|
finally:
|
|
if com_initialized:
|
|
try:
|
|
pythoncom.CoUninitialize()
|
|
except Exception:
|
|
pass
|
|
|
|
def _run_filter_load_worker(self, folder_ref: object, folder_text: str, output_dir: str) -> None:
|
|
com_initialized = False
|
|
started_at = datetime.now()
|
|
try:
|
|
if pythoncom is not None:
|
|
pythoncom.CoInitialize()
|
|
com_initialized = True
|
|
folder_obj = self._resolve_worker_folder(folder_ref)
|
|
headers = self._load_email_headers(folder_obj, output_dir)
|
|
elapsed_seconds = (datetime.now() - started_at).total_seconds()
|
|
self.ui_queue.put(("filter_loaded", folder_text, headers, elapsed_seconds))
|
|
except Exception as exc:
|
|
self.ui_queue.put(("filter_error", str(exc), traceback.format_exc()))
|
|
finally:
|
|
if com_initialized:
|
|
try:
|
|
pythoncom.CoUninitialize()
|
|
except Exception:
|
|
pass
|
|
|
|
def _load_email_headers(self, folder_obj: object, output_dir: str) -> List[EmailHeader]:
|
|
headers: List[EmailHeader] = []
|
|
output_root = Path(output_dir) if output_dir else None
|
|
manifest = ExportManifest(output_root) if output_root and output_root.exists() else None
|
|
items = getattr(folder_obj, "Items", None)
|
|
if items is None:
|
|
return headers
|
|
|
|
try:
|
|
items.Sort("[ReceivedTime]", True)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
count = items.Count
|
|
except Exception:
|
|
count = 0
|
|
|
|
for idx in range(1, count + 1):
|
|
try:
|
|
item = items.Item(idx)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
if getattr(item, "Class", None) != MAIL_CLASS:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
subject_raw = getattr(item, "Subject", "") or ""
|
|
subject = Exporter._safe_str(subject_raw, max_len=180)
|
|
sender = Exporter._safe_str(
|
|
getattr(item, "SenderName", None) or getattr(item, "SenderEmailAddress", None) or "",
|
|
max_len=120,
|
|
)
|
|
entry_id = Exporter._safe_full_str(getattr(item, "EntryID", None))
|
|
parent = getattr(item, "Parent", None)
|
|
store_id = Exporter._safe_full_str(getattr(parent, "StoreID", None) if parent is not None else None)
|
|
received_dt = Exporter._convert_outlook_datetime(getattr(item, "ReceivedTime", None))
|
|
if received_dt is None:
|
|
received_dt = Exporter._convert_outlook_datetime(getattr(item, "SentOn", None))
|
|
received_text = received_dt.strftime("%Y-%m-%d %H:%M:%S") if received_dt else ""
|
|
attachment_count, attachment_types = self._collect_preview_attachment_info_minimal(item)
|
|
|
|
key = self._build_manifest_key_for_item(item, Exporter._safe_str(subject_raw), received_text)
|
|
exported = bool(manifest and manifest.has_record(key) and manifest.record_file_exists(key))
|
|
headers.append(
|
|
EmailHeader(
|
|
key=key,
|
|
entry_id=entry_id,
|
|
store_id=store_id,
|
|
received_time=received_text,
|
|
sender=sender,
|
|
subject=subject,
|
|
attachment_count=attachment_count,
|
|
attachment_types=attachment_types,
|
|
exported=exported,
|
|
)
|
|
)
|
|
if idx % 100 == 0:
|
|
self.ui_queue.put(
|
|
(
|
|
"status",
|
|
f"Loading emails... {idx}/{count} | found {len(headers)}",
|
|
)
|
|
)
|
|
return headers
|
|
|
|
def _refresh_loaded_header_export_flags(self) -> None:
|
|
if not self.loaded_email_headers:
|
|
return
|
|
output_dir = self.output_dir_var.get().strip()
|
|
if not output_dir:
|
|
return
|
|
output_root = Path(output_dir)
|
|
if not output_root.exists():
|
|
return
|
|
manifest = ExportManifest(output_root)
|
|
changed = False
|
|
for header in self.loaded_email_headers:
|
|
exported = bool(manifest.has_record(header.key) and manifest.record_file_exists(header.key))
|
|
if header.exported != exported:
|
|
header.exported = exported
|
|
changed = True
|
|
if changed:
|
|
self.apply_email_filters()
|
|
|
|
def _collect_preview_attachment_info(self, item: object, html_body: str) -> Tuple[int, str]:
|
|
attachments = getattr(item, "Attachments", None)
|
|
if attachments is None:
|
|
return 0, ""
|
|
|
|
try:
|
|
attachment_total = attachments.Count
|
|
except Exception:
|
|
return 0, ""
|
|
|
|
exported_exts: List[str] = []
|
|
for att_index in range(1, attachment_total + 1):
|
|
try:
|
|
attachment = attachments.Item(att_index)
|
|
except Exception:
|
|
continue
|
|
original_filename = Exporter._safe_str(getattr(attachment, "FileName", None) or f"attachment_{att_index}")
|
|
inline_score, _ = self.exporter._signature_image_score(attachment, original_filename, html_body)
|
|
if inline_score >= 3:
|
|
continue
|
|
if not NameCodec.is_allowed_attachment(original_filename):
|
|
continue
|
|
ext = NameCodec.sanitize_extension(os.path.splitext(original_filename or "")[1]).lstrip(".")
|
|
if ext:
|
|
exported_exts.append(ext.lower())
|
|
if not exported_exts:
|
|
return 0, ""
|
|
counts: dict[str, int] = {}
|
|
ordered_exts: List[str] = []
|
|
for ext in exported_exts:
|
|
if ext not in counts:
|
|
counts[ext] = 0
|
|
ordered_exts.append(ext)
|
|
counts[ext] += 1
|
|
summary = ", ".join(f"{ext}({counts[ext]})" for ext in ordered_exts)
|
|
return len(exported_exts), summary
|
|
|
|
@staticmethod
|
|
def _collect_preview_attachment_info_minimal(item: object) -> Tuple[int, str]:
|
|
attachments = getattr(item, "Attachments", None)
|
|
if attachments is None:
|
|
return 0, ""
|
|
try:
|
|
attachment_total = int(attachments.Count or 0)
|
|
except Exception:
|
|
return 0, ""
|
|
if attachment_total <= 0:
|
|
return 0, ""
|
|
return attachment_total, ""
|
|
|
|
def _build_manifest_key_for_item(self, item: object, subject: str, received_text: str) -> str:
|
|
entry_id = Exporter._safe_full_str(getattr(item, "EntryID", None))
|
|
parent = getattr(item, "Parent", None)
|
|
store_id = Exporter._safe_full_str(getattr(parent, "StoreID", None) if parent is not None else None)
|
|
return Exporter._build_manifest_key_from_ids(store_id, entry_id)
|
|
|
|
@staticmethod
|
|
def _get_mail_mapi_property(item: object) -> object:
|
|
try:
|
|
accessor = item.PropertyAccessor
|
|
return accessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x1035001F")
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _marshal_folder_for_worker(folder_obj: object) -> object:
|
|
if pythoncom is None:
|
|
return folder_obj
|
|
try:
|
|
ole_obj = getattr(folder_obj, "_oleobj_", None)
|
|
if ole_obj is None:
|
|
return folder_obj
|
|
return pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, ole_obj)
|
|
except Exception:
|
|
return folder_obj
|
|
|
|
@staticmethod
|
|
def _resolve_worker_folder(folder_ref: object) -> object:
|
|
if pythoncom is None or win32com is None:
|
|
return folder_ref
|
|
try:
|
|
dispatch = pythoncom.CoGetInterfaceAndReleaseStream(folder_ref, pythoncom.IID_IDispatch)
|
|
return win32com.client.Dispatch(dispatch)
|
|
except Exception:
|
|
return folder_ref
|
|
|
|
def _create_zip_archive(self, source_root: Path, output_root: Path, folder_text: str) -> Path:
|
|
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
base_name = NameCodec.archive_base_name_from_outlook_path(folder_text)
|
|
zip_path = Exporter._unique_path(output_root / f"{base_name}_{stamp}.zip")
|
|
self.log(f"Creating ZIP archive: {zip_path}")
|
|
|
|
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
|
|
for path in source_root.rglob("*"):
|
|
self.exporter.check_cancel()
|
|
if path.is_file():
|
|
archive.write(path, path.relative_to(source_root).as_posix())
|
|
|
|
return zip_path
|
|
|
|
def process_ui_queue(self) -> None:
|
|
if self.shutting_down:
|
|
self.ui_queue_after_id = None
|
|
return
|
|
try:
|
|
while True:
|
|
event = self.ui_queue.get_nowait()
|
|
event_type = event[0]
|
|
if event_type == "log":
|
|
self._write_log(event[1])
|
|
elif event_type == "status":
|
|
self.status_var.set(event[1])
|
|
if self.busy_overlay_visible:
|
|
self.busy_detail_var.set(event[1])
|
|
elif event_type == "complete":
|
|
_, mails, attachments, skipped, result_path, zip_mode, export_mode, elapsed_seconds = event
|
|
self.last_export_duration_seconds = elapsed_seconds
|
|
elapsed_text = self._format_elapsed(elapsed_seconds)
|
|
self.set_status(
|
|
f"Export completed | emails: {mails} | attachments: {attachments} | skipped: {skipped} | time: {elapsed_text}"
|
|
)
|
|
self.log(
|
|
f"Export completed ({self._export_mode_label(export_mode)}). "
|
|
f"Emails: {mails}, Attachments: {attachments}, Skipped: {skipped}, Time: {elapsed_text}"
|
|
)
|
|
self.log(
|
|
"Export diagnostics summary: "
|
|
f"skipped_from_manifest={self.exporter.skipped_manifest_count}, "
|
|
f"skipped_without_attachments={self.exporter.skipped_no_attachments_count}, "
|
|
f"non_mail_items={self.exporter.non_mail_item_count}"
|
|
)
|
|
self._refresh_loaded_header_export_flags()
|
|
self._restore_export_controls()
|
|
self._flush_file_log()
|
|
self.show_export_complete_dialog(mails, attachments, skipped, result_path, zip_mode, export_mode, elapsed_text)
|
|
elif event_type == "cancelled":
|
|
self.set_status("Export cancelled")
|
|
self.log("Export cancelled by the user.")
|
|
self._restore_export_controls()
|
|
self._flush_file_log()
|
|
messagebox.showinfo(APP_TITLE, "Export cancelled.")
|
|
elif event_type == "filter_loaded":
|
|
_, folder_text, headers, elapsed_seconds = event
|
|
self.filter_thread = None
|
|
self.last_filter_load_duration_seconds = elapsed_seconds
|
|
elapsed_text = self._format_elapsed(elapsed_seconds)
|
|
self.loaded_email_headers = headers
|
|
self.filtered_email_headers = list(headers)
|
|
self.filter_loaded_folder_text = folder_text
|
|
self.filter_loaded_folder_var.set(f"Loaded folder: {folder_text}")
|
|
self.filter_status_var.set(
|
|
f"Loaded emails: {len(headers)} | time: {elapsed_text} | folder: {folder_text}"
|
|
)
|
|
self.set_status(f"Loading completed | emails: {len(headers)} | time: {elapsed_text}")
|
|
self.log(f"Preview loaded: {len(headers)} emails | folder: {folder_text} | time: {elapsed_text}")
|
|
self.set_busy_overlay(False)
|
|
self._set_controls_enabled(True)
|
|
self.apply_email_filters()
|
|
self._flush_file_log()
|
|
elif event_type == "filter_error":
|
|
_, message, details = event
|
|
self.filter_thread = None
|
|
self.set_status("Error while loading emails")
|
|
self.log(f"Email loading error: {message}")
|
|
self.log(details)
|
|
self.set_busy_overlay(False)
|
|
self._set_controls_enabled(True)
|
|
self._flush_file_log()
|
|
messagebox.showerror(APP_TITLE, f"Error while loading emails.\n\n{message}")
|
|
elif event_type == "error":
|
|
_, message, details = event
|
|
self.set_status("Error during export")
|
|
self.log(f"Export error: {message}")
|
|
self.log(details)
|
|
self._restore_export_controls()
|
|
self._flush_file_log()
|
|
messagebox.showerror(APP_TITLE, f"Error during export.\n\n{message}")
|
|
except queue.Empty:
|
|
pass
|
|
finally:
|
|
if not self.shutting_down:
|
|
self.ui_queue_after_id = self.after(100, self.process_ui_queue)
|
|
else:
|
|
self.ui_queue_after_id = None
|
|
|
|
def show_export_complete_dialog(
|
|
self,
|
|
mails: int,
|
|
attachments: int,
|
|
skipped: int,
|
|
export_target: Path,
|
|
zip_mode: bool,
|
|
export_mode: str,
|
|
elapsed_text: str,
|
|
) -> None:
|
|
dialog = ctk.CTkToplevel(self)
|
|
dialog.title("Export completed")
|
|
dialog.geometry("520x260")
|
|
dialog.resizable(False, False)
|
|
dialog.transient(self)
|
|
dialog.grab_set()
|
|
|
|
dialog.grid_columnconfigure(0, weight=1)
|
|
dialog.grid_rowconfigure(1, weight=1)
|
|
|
|
ctk.CTkLabel(
|
|
dialog,
|
|
text="Export completed",
|
|
font=ctk.CTkFont(size=20, weight="bold"),
|
|
).grid(row=0, column=0, sticky="w", padx=20, pady=(20, 8))
|
|
|
|
summary = (
|
|
f"Action: {self._export_mode_label(export_mode)}\n"
|
|
f"Exported emails: {mails}\n"
|
|
f"Exported attachments: {attachments}\n\n"
|
|
f"Skipped emails: {skipped}\n\n"
|
|
f"Elapsed time: {elapsed_text}\n\n"
|
|
f"Skipped from manifest: {self.exporter.skipped_manifest_count}\n"
|
|
f"Skipped without attachments: {self.exporter.skipped_no_attachments_count}\n"
|
|
f"Non-mail items detected: {self.exporter.non_mail_item_count}\n\n"
|
|
f"{'ZIP archive' if zip_mode else 'Folder'}:\n{export_target}"
|
|
)
|
|
summary_box = ctk.CTkTextbox(dialog, wrap="word")
|
|
summary_box.grid(row=1, column=0, sticky="nsew", padx=20, pady=(0, 16))
|
|
summary_box.insert("1.0", summary)
|
|
summary_box.configure(state="disabled")
|
|
buttons = ctk.CTkFrame(dialog, fg_color="transparent")
|
|
buttons.grid(row=2, column=0, sticky="ew", padx=20, pady=(0, 20))
|
|
buttons.grid_columnconfigure(0, weight=1)
|
|
buttons.grid_columnconfigure(1, weight=1)
|
|
buttons.grid_columnconfigure(2, weight=1)
|
|
|
|
def copy_summary() -> None:
|
|
try:
|
|
self.clipboard_clear()
|
|
self.clipboard_append(summary)
|
|
except Exception:
|
|
pass
|
|
|
|
def open_export_folder() -> None:
|
|
try:
|
|
target = export_target.parent if zip_mode else export_target
|
|
os.startfile(str(target))
|
|
except Exception as exc:
|
|
messagebox.showerror(APP_TITLE, f"Unable to open the export destination.\n\n{exc}")
|
|
finally:
|
|
dialog.destroy()
|
|
|
|
ctk.CTkButton(
|
|
buttons,
|
|
text="Copy report",
|
|
command=copy_summary,
|
|
height=38,
|
|
).grid(row=0, column=0, sticky="ew", padx=(0, 8))
|
|
|
|
ctk.CTkButton(
|
|
buttons,
|
|
text="Open zip folder" if zip_mode else "Open export folder",
|
|
command=open_export_folder,
|
|
height=38,
|
|
).grid(row=0, column=1, sticky="ew", padx=8)
|
|
|
|
ctk.CTkButton(
|
|
buttons,
|
|
text="Close",
|
|
command=dialog.destroy,
|
|
height=38,
|
|
fg_color="#5f6368",
|
|
hover_color="#4b4f52",
|
|
).grid(row=0, column=2, sticky="ew", padx=(8, 0))
|
|
|
|
dialog.protocol("WM_DELETE_WINDOW", dialog.destroy)
|
|
dialog.after(100, dialog.focus_force)
|
|
|
|
def set_busy_overlay(self, visible: bool, title: str = "", detail: str = "") -> None:
|
|
self.busy_overlay_visible = visible
|
|
if visible:
|
|
self.busy_title_var.set(title or "Please wait...")
|
|
self.busy_detail_var.set(detail or "")
|
|
self.busy_overlay.place(relx=0.5, rely=0.52, anchor="center", relwidth=0.42)
|
|
self.busy_overlay.lift()
|
|
try:
|
|
self.busy_progress.start()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
self.busy_progress.stop()
|
|
except Exception:
|
|
pass
|
|
self.busy_overlay.place_forget()
|
|
|
|
def apply_email_filters(self, autosize: bool = True) -> None:
|
|
if not self.loaded_email_headers:
|
|
self.filtered_email_headers = []
|
|
self.refresh_filter_tree(autosize=autosize)
|
|
return
|
|
subject_filter = self.filter_subject_var.get().strip().lower()
|
|
sender_filter = self.filter_sender_var.get().strip().lower()
|
|
date_range = self.filter_date_range_var.get().strip()
|
|
only_new = bool(self.filter_export_new_var.get())
|
|
only_with_attachments = bool(self.filter_export_with_attachments_var.get())
|
|
only_selected = bool(self.filter_export_selected_var.get())
|
|
selected_keys: set[str] = set()
|
|
if only_selected and self.filter_tree is not None:
|
|
selected_keys = {
|
|
self.filter_tree_key_map[item_id]
|
|
for item_id in self.filter_tree.selection()
|
|
if item_id in self.filter_tree_key_map
|
|
}
|
|
|
|
filtered: List[EmailHeader] = []
|
|
for header in self.loaded_email_headers:
|
|
if subject_filter and subject_filter not in header.subject.lower():
|
|
continue
|
|
if sender_filter and sender_filter not in header.sender.lower():
|
|
continue
|
|
if date_range and not self._header_matches_date_range(header, date_range):
|
|
continue
|
|
if only_new and header.exported:
|
|
continue
|
|
if only_with_attachments and header.attachment_count <= 0:
|
|
continue
|
|
if only_selected and header.key not in selected_keys:
|
|
continue
|
|
filtered.append(header)
|
|
|
|
self.filtered_email_headers = filtered
|
|
self.refresh_filter_tree(autosize=autosize)
|
|
|
|
def reset_email_filters(self) -> None:
|
|
if self.filter_live_after_id is not None:
|
|
try:
|
|
self.after_cancel(self.filter_live_after_id)
|
|
except Exception:
|
|
pass
|
|
self.filter_live_after_id = None
|
|
self.filter_subject_var.set("")
|
|
self.filter_sender_var.set("")
|
|
self.filter_date_range_var.set("")
|
|
self.filter_export_new_var.set(0)
|
|
self.filter_export_with_attachments_var.set(0)
|
|
self.filter_export_selected_var.set(0)
|
|
self.apply_email_filters()
|
|
|
|
def _on_live_filter_input(self, _event=None) -> None:
|
|
self.schedule_live_filter()
|
|
|
|
def _on_filter_tree_select(self, _event=None) -> None:
|
|
self._update_filter_controls_state()
|
|
if self.filter_export_selected_var.get():
|
|
self.schedule_live_filter()
|
|
|
|
def schedule_live_filter(self) -> None:
|
|
if not self.loaded_email_headers:
|
|
return
|
|
if self.filter_live_after_id is not None:
|
|
try:
|
|
self.after_cancel(self.filter_live_after_id)
|
|
except Exception:
|
|
pass
|
|
self.filter_live_after_id = None
|
|
self.filter_live_after_id = self.after(self.filter_live_delay_ms, self._apply_live_filters)
|
|
|
|
def _apply_live_filters(self) -> None:
|
|
self.filter_live_after_id = None
|
|
self.apply_email_filters(autosize=False)
|
|
|
|
@staticmethod
|
|
def _header_matches_date_range(header: EmailHeader, date_range: str) -> bool:
|
|
received_text = (header.received_time or "").strip()
|
|
if not received_text:
|
|
return False
|
|
try:
|
|
received_dt = datetime.strptime(received_text, "%Y-%m-%d %H:%M:%S")
|
|
except Exception:
|
|
return False
|
|
now = datetime.now()
|
|
range_days = {
|
|
"Last week": 7,
|
|
"Last month": 30,
|
|
"Last 6 months": 183,
|
|
"Last year": 365,
|
|
}
|
|
days = range_days.get(date_range)
|
|
if days is None:
|
|
return True
|
|
return received_dt >= now - timedelta(days=days)
|
|
|
|
def _get_export_headers_from_preview(self) -> List[EmailHeader]:
|
|
return list(self.filtered_email_headers)
|
|
|
|
def refresh_filter_tree(self, autosize: bool = True) -> None:
|
|
if self.filter_tree is None:
|
|
return
|
|
self.filter_tree.delete(*self.filter_tree.get_children())
|
|
self.filter_tree_key_map = {}
|
|
for idx, header in enumerate(self.filtered_email_headers, start=1):
|
|
state = "Exported" if header.exported else "New"
|
|
item_id = f"mail_{idx}"
|
|
row_tag = "even" if idx % 2 == 0 else "odd"
|
|
self.filter_tree.insert(
|
|
"",
|
|
"end",
|
|
iid=item_id,
|
|
tags=(row_tag,),
|
|
values=(
|
|
header.received_time,
|
|
header.sender,
|
|
header.subject,
|
|
header.attachment_count,
|
|
header.attachment_types,
|
|
state,
|
|
),
|
|
)
|
|
self.filter_tree_key_map[item_id] = header.key
|
|
if autosize:
|
|
self._autosize_preview_columns()
|
|
exported = sum(1 for item in self.filtered_email_headers if item.exported)
|
|
self.filter_status_var.set(
|
|
f"Visible emails: {len(self.filtered_email_headers)} | already exported: {exported} | new: {len(self.filtered_email_headers) - exported}"
|
|
)
|
|
self._update_filter_controls_state()
|
|
|
|
def _autosize_preview_columns(self) -> None:
|
|
if self.filter_tree is None:
|
|
return
|
|
try:
|
|
body_font = tkfont.nametofont("TkDefaultFont")
|
|
except Exception:
|
|
body_font = None
|
|
try:
|
|
heading_font = tkfont.nametofont("TkHeadingFont")
|
|
except Exception:
|
|
heading_font = body_font
|
|
|
|
column_limits = {
|
|
"date": (110, 190),
|
|
"sender": (140, 360),
|
|
"subject": (180, 900),
|
|
"attachments": (32, 60),
|
|
"types": (90, 260),
|
|
"state": (120, 180),
|
|
}
|
|
sample_size = 400
|
|
|
|
for column_id in ("date", "sender", "subject", "attachments", "types", "state"):
|
|
header_text = self.filter_tree.heading(column_id, "text") or ""
|
|
if heading_font is not None:
|
|
max_width = heading_font.measure(str(header_text))
|
|
else:
|
|
max_width = max(40, len(str(header_text)) * 8)
|
|
|
|
children = self.filter_tree.get_children("")
|
|
for item_id in children[:sample_size]:
|
|
values = self.filter_tree.item(item_id, "values")
|
|
column_index = self.filter_tree["columns"].index(column_id)
|
|
if column_index >= len(values):
|
|
continue
|
|
value_text = str(values[column_index])
|
|
width = body_font.measure(value_text) if body_font is not None else max(20, len(value_text) * 8)
|
|
if width > max_width:
|
|
max_width = width
|
|
|
|
min_width, max_allowed = column_limits[column_id]
|
|
final_width = max(min_width, min(max_allowed, max_width + 24))
|
|
self.filter_tree.column(column_id, width=final_width, minwidth=min_width)
|
|
|
|
def export_preview_subset(self) -> None:
|
|
headers_to_export = self._get_export_headers_from_preview()
|
|
if not headers_to_export:
|
|
messagebox.showwarning(APP_TITLE, "There are no emails in the current preview to export.")
|
|
return
|
|
selected_keys = {header.key for header in headers_to_export}
|
|
selected_refs = []
|
|
for header in headers_to_export:
|
|
if not header.entry_id:
|
|
continue
|
|
selected_refs.append((header.store_id, header.entry_id))
|
|
self.start_export(EXPORT_MODE_FILTER, selected_keys=selected_keys, selected_entry_refs=selected_refs)
|
|
|
|
def open_help_window(self) -> None:
|
|
if self.help_window is not None:
|
|
try:
|
|
if self.help_window.winfo_exists():
|
|
self.help_window.deiconify()
|
|
self.help_window.lift()
|
|
self.help_window.attributes("-topmost", True)
|
|
self.help_window.after(200, lambda: self.help_window.attributes("-topmost", False))
|
|
self.help_window.update_idletasks()
|
|
self.help_window.focus_force()
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
dialog = ctk.CTkToplevel(self)
|
|
dialog.withdraw()
|
|
dialog.title(HELP_WINDOW_TITLE)
|
|
dialog.geometry(self._suggest_log_window_geometry())
|
|
dialog.minsize(560, 440)
|
|
self.help_window = dialog
|
|
|
|
dialog.grid_columnconfigure(0, weight=1)
|
|
dialog.grid_rowconfigure(1, weight=1)
|
|
|
|
ctk.CTkLabel(
|
|
dialog,
|
|
text="Quick help",
|
|
font=ctk.CTkFont(size=20, weight="bold"),
|
|
anchor="w",
|
|
).grid(row=0, column=0, sticky="ew", padx=14, pady=(14, 8))
|
|
|
|
self.help_window_box = ctk.CTkTextbox(dialog, wrap="word")
|
|
self.help_window_box.grid(row=1, column=0, sticky="nsew", padx=14, pady=(0, 14))
|
|
self.help_window_box.insert("1.0", HELP_TEXT)
|
|
self.help_window_box.configure(state="disabled")
|
|
self.help_window_box.see("1.0")
|
|
|
|
dialog.protocol("WM_DELETE_WINDOW", self.close_help_window)
|
|
dialog.deiconify()
|
|
dialog.lift()
|
|
dialog.attributes("-topmost", True)
|
|
dialog.after(200, lambda: dialog.attributes("-topmost", False))
|
|
dialog.after(100, dialog.focus_force)
|
|
|
|
def close_help_window(self) -> None:
|
|
if self.help_window is None:
|
|
return
|
|
try:
|
|
self.help_window.destroy()
|
|
except Exception:
|
|
pass
|
|
self.help_window = None
|
|
self.help_window_box = None
|
|
|
|
def log(self, message: str) -> None:
|
|
if threading.get_ident() != self.main_thread_id:
|
|
self.ui_queue.put(("log", message))
|
|
return
|
|
|
|
self._write_log(message)
|
|
|
|
def _init_file_log(self) -> None:
|
|
try:
|
|
self.log_file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
header = (
|
|
f"MailExporter log started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
|
f"Log file: {self.log_file_path}\n"
|
|
"=" * 80
|
|
+ "\n"
|
|
)
|
|
self.log_file_path.write_text(header, encoding="utf-8", errors="replace")
|
|
except Exception:
|
|
pass
|
|
|
|
def _write_log(self, message: str) -> None:
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
line = f"[{timestamp}] {message}\n"
|
|
self._append_file_log(line)
|
|
self.log_lines.append(line)
|
|
if len(self.log_lines) > MAX_LOG_LINES:
|
|
self.log_lines = self.log_lines[-MAX_LOG_LINES:]
|
|
if self.log_window_box is not None:
|
|
try:
|
|
self.log_window_box.configure(state="normal")
|
|
self.log_window_box.delete("1.0", "end")
|
|
self.log_window_box.insert("1.0", "".join(self.log_lines))
|
|
self.log_window_box.see("end")
|
|
self.log_window_box.configure(state="disabled")
|
|
except Exception:
|
|
self.log_window_box = None
|
|
self.pump_ui()
|
|
|
|
def open_log_window(self) -> None:
|
|
if self.log_window is not None:
|
|
try:
|
|
if self.log_window.winfo_exists():
|
|
self.log_window.deiconify()
|
|
self.log_window.lift()
|
|
self.log_window.attributes("-topmost", True)
|
|
self.log_window.after(200, lambda: self.log_window.attributes("-topmost", False))
|
|
self.log_window.update_idletasks()
|
|
self.log_window.focus_force()
|
|
if hasattr(self, "btn_status_log"):
|
|
self.btn_status_log.configure(text="Close log")
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
dialog = ctk.CTkToplevel(self)
|
|
dialog.withdraw()
|
|
dialog.title("Export log")
|
|
dialog.geometry(self._suggest_log_window_geometry())
|
|
dialog.minsize(520, 400)
|
|
self.log_window = dialog
|
|
if hasattr(self, "btn_status_log"):
|
|
self.btn_status_log.configure(text="Close log")
|
|
|
|
dialog.grid_columnconfigure(0, weight=1)
|
|
dialog.grid_rowconfigure(0, weight=1)
|
|
|
|
self.log_window_box = ctk.CTkTextbox(dialog, wrap="word")
|
|
self.log_window_box.grid(row=0, column=0, sticky="nsew", padx=12, pady=12)
|
|
self.log_window_box.insert("1.0", "".join(self.log_lines))
|
|
self.log_window_box.configure(state="disabled")
|
|
self.log_window_box.see("end")
|
|
|
|
dialog.protocol("WM_DELETE_WINDOW", self.close_log_window)
|
|
dialog.deiconify()
|
|
dialog.lift()
|
|
dialog.attributes("-topmost", True)
|
|
dialog.after(200, lambda: dialog.attributes("-topmost", False))
|
|
dialog.after(50, dialog.deiconify)
|
|
dialog.after(100, dialog.lift)
|
|
dialog.after(150, dialog.focus_force)
|
|
|
|
def toggle_log_window(self) -> None:
|
|
if self.log_window is not None:
|
|
try:
|
|
if self.log_window.winfo_exists():
|
|
self.close_log_window()
|
|
return
|
|
except Exception:
|
|
pass
|
|
self.open_log_window()
|
|
|
|
def close_log_window(self) -> None:
|
|
if self.log_window is None:
|
|
return
|
|
try:
|
|
self.log_window.destroy()
|
|
except Exception:
|
|
pass
|
|
self.log_window = None
|
|
self.log_window_box = None
|
|
if hasattr(self, "btn_status_log"):
|
|
try:
|
|
self.btn_status_log.configure(text="Open log")
|
|
except Exception:
|
|
pass
|
|
|
|
def _suggest_log_window_geometry(self) -> str:
|
|
try:
|
|
self.update_idletasks()
|
|
width = 620
|
|
height = 760
|
|
screen_width = max(800, int(self.winfo_screenwidth() or 0))
|
|
screen_height = max(600, int(self.winfo_screenheight() or 0))
|
|
main_x = int(self.winfo_x())
|
|
main_y = int(self.winfo_y())
|
|
main_width = max(800, int(self.winfo_width() or 0))
|
|
|
|
preferred_x = main_x + main_width + 20
|
|
max_x = max(0, screen_width - width - 20)
|
|
x = min(preferred_x, max_x)
|
|
y = max(0, min(main_y, screen_height - height - 60))
|
|
return f"{width}x{height}+{x}+{y}"
|
|
except Exception:
|
|
return "620x760"
|
|
|
|
def _update_filter_controls_state(self) -> None:
|
|
has_selection = self.selected_folder_obj is not None
|
|
has_headers = bool(self.loaded_email_headers)
|
|
if hasattr(self, "btn_load_headers"):
|
|
self.btn_load_headers.configure(
|
|
state="normal" if has_selection and self.connected and self.filter_thread is None else "disabled"
|
|
)
|
|
field_state = "normal" if has_headers else "disabled"
|
|
button_state = "normal" if has_headers else "disabled"
|
|
export_base_state = "normal" if has_selection and self.connected and self.filter_thread is None else "disabled"
|
|
self.entry_filter_subject.configure(state=field_state)
|
|
self.entry_filter_sender.configure(state=field_state)
|
|
self.combo_filter_date_range.configure(state="readonly" if has_headers else "disabled")
|
|
self.chk_filter_export_new.configure(state=field_state)
|
|
self.chk_filter_export_with_attachments.configure(state=field_state)
|
|
self.chk_filter_export_selected.configure(state=field_state)
|
|
self.btn_apply_filters.configure(state=button_state)
|
|
self.btn_reset_filters.configure(state=button_state)
|
|
export_headers = self._get_export_headers_from_preview() if has_headers else []
|
|
export_ready = bool(export_headers) and export_base_state == "normal"
|
|
self.btn_export_preview.configure(state="normal" if export_ready else "disabled")
|
|
|
|
def _append_file_log(self, line: str) -> None:
|
|
self.pending_file_log_lines.append(line)
|
|
|
|
def _flush_file_log(self) -> None:
|
|
if not self.pending_file_log_lines:
|
|
return
|
|
try:
|
|
self.log_file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with self.log_file_path.open("a", encoding="utf-8", errors="replace") as fh:
|
|
fh.writelines(self.pending_file_log_lines)
|
|
self.pending_file_log_lines.clear()
|
|
except Exception:
|
|
pass
|
|
|
|
def set_status(self, message: str) -> None:
|
|
if self.shutting_down:
|
|
return
|
|
if threading.get_ident() != self.main_thread_id:
|
|
self.ui_queue.put(("status", message))
|
|
return
|
|
|
|
self.status_var.set(message)
|
|
if self.busy_overlay_visible:
|
|
self.busy_detail_var.set(message)
|
|
self.pump_ui()
|
|
|
|
def pump_ui(self) -> None:
|
|
if self.shutting_down:
|
|
return
|
|
if threading.get_ident() != self.main_thread_id:
|
|
return
|
|
|
|
try:
|
|
self.update_idletasks()
|
|
self.update()
|
|
except Exception:
|
|
pass
|
|
|
|
def on_close(self) -> None:
|
|
if self.shutting_down:
|
|
return
|
|
self.shutting_down = True
|
|
try:
|
|
if self.filter_live_after_id is not None:
|
|
try:
|
|
self.after_cancel(self.filter_live_after_id)
|
|
except Exception:
|
|
pass
|
|
self.filter_live_after_id = None
|
|
if self.ui_queue_after_id is not None:
|
|
try:
|
|
self.after_cancel(self.ui_queue_after_id)
|
|
except Exception:
|
|
pass
|
|
self.ui_queue_after_id = None
|
|
try:
|
|
self.exporter.cancel()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.busy_progress.stop()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.set_busy_overlay(False)
|
|
except Exception:
|
|
pass
|
|
self._flush_file_log()
|
|
self.close_help_window()
|
|
self.close_log_window()
|
|
self.outlook_service.disconnect()
|
|
finally:
|
|
try:
|
|
self.destroy()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if sys.platform != "win32":
|
|
raise SystemExit("This program runs on Windows only.")
|
|
|
|
app = OutlookExporterApp()
|
|
try:
|
|
app.mainloop()
|
|
except KeyboardInterrupt:
|
|
try:
|
|
app.on_close()
|
|
except Exception:
|
|
pass
|