Files
mail_exp/mail_exporter.py

1134 lines
42 KiB
Python

import json
import os
import queue
import re
import sys
import tempfile
import threading
import traceback
import zipfile
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple
import customtkinter as ctk
from tkinter import filedialog, messagebox
from tkinter import ttk
try:
import pythoncom
import win32com.client
except ImportError:
pythoncom = None
win32com = None
APP_TITLE = "Outlook Exporter"
APP_GEOMETRY = "1220x760"
MAIL_CLASS = 43 # OlObjectClass.olMail
MAX_LOG_LINES = 3000
PREFS_APP_DIR = "OutlookExporter"
PREFS_FILE_NAME = "prefs.json"
OUTLOOK_NOT_RUNNING_MESSAGE = (
"Outlook non risulta aperto.\n\n"
"Apri Microsoft Outlook con il profilo email da esportare, attendi che abbia completato il caricamento, "
"poi riducilo a icona e torna qui per premere di nuovo 'Connetti a Outlook'."
)
ALLOWED_ATTACHMENT_EXTENSIONS = {
".pdf",
".doc",
".docx",
".docm",
".dot",
".dotx",
".dotm",
".xls",
".xlsx",
".xlsm",
".xlsb",
".xlt",
".xltx",
".xltm",
".csv",
".ppt",
".pptx",
".pptm",
".pps",
".ppsx",
".ppsm",
".pot",
".potx",
".potm",
".rtf",
".odt",
".ods",
".odp",
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".tif",
".tiff",
".webp",
}
@dataclass
class FolderNode:
store_name: str
path: str
folder_obj: object
display_name: Optional[str] = None
mail_count: Optional[int] = None
class ExportCancelled(Exception):
pass
class PreferenceStore:
def __init__(self, default_output_dir: Path) -> None:
self.default_output_dir = default_output_dir
self.path = self._prefs_path()
self.data = self._load_or_create()
def get_output_dir(self) -> str:
value = str(self.data.get("output_dir") or "").strip()
return value or str(self.default_output_dir)
def set_output_dir(self, output_dir: str) -> None:
output_dir = str(output_dir or "").strip()
if not output_dir:
return
self.data["output_dir"] = output_dir
self._save()
@classmethod
def _prefs_path(cls) -> Path:
appdata = os.environ.get("APPDATA")
base_dir = Path(appdata) if appdata else Path.home() / ".config"
return base_dir / PREFS_APP_DIR / PREFS_FILE_NAME
def _load_or_create(self) -> dict:
if self.path.exists():
try:
with self.path.open("r", encoding="utf-8") as fh:
data = json.load(fh)
return data if isinstance(data, dict) else {}
except Exception:
return {"output_dir": str(self.default_output_dir)}
data = {"output_dir": str(self.default_output_dir)}
self.data = data
self._save()
return data
def _save(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
with self.path.open("w", encoding="utf-8") as fh:
json.dump(self.data, fh, ensure_ascii=False, indent=2)
class OutlookService:
def __init__(self) -> None:
self.outlook = None
self.namespace = None
self.root_folders: List[FolderNode] = []
def connect(self) -> None:
if win32com is None or pythoncom is None:
raise RuntimeError(
"pywin32 non è installato. Esegui: pip install pywin32"
)
pythoncom.CoInitialize()
try:
try:
self.outlook = win32com.client.GetActiveObject("Outlook.Application")
except Exception as exc:
raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc
self.namespace = self.outlook.GetNamespace("MAPI")
self.root_folders = []
for i in range(1, self.namespace.Folders.Count + 1):
store = self.namespace.Folders.Item(i)
store_name = str(store.Name)
node = FolderNode(
store_name=store_name,
path=NameCodec.store_display_name(store_name),
folder_obj=store,
display_name=NameCodec.store_display_name(store_name),
)
self.root_folders.append(node)
except Exception:
pythoncom.CoUninitialize()
raise
def disconnect(self) -> None:
try:
self.root_folders = []
self.namespace = None
self.outlook = None
finally:
if pythoncom is not None:
try:
pythoncom.CoUninitialize()
except Exception:
pass
def list_root_folders(self) -> List[FolderNode]:
return list(self.root_folders)
def list_children(self, folder_obj: object, current_path: str) -> List[FolderNode]:
children = []
folders = folder_obj.Folders
for i in range(1, folders.Count + 1):
child = folders.Item(i)
child_name = str(child.Name)
child_path = f"{current_path} / {child_name}"
children.append(
FolderNode(
store_name=current_path.split(" / ")[0],
path=child_path,
folder_obj=child,
display_name=child_name,
mail_count=self.count_mail_items(child),
)
)
return children
@staticmethod
def count_mail_items(folder_obj: object) -> Optional[int]:
try:
items = folder_obj.Items
except Exception:
return None
try:
return items.Restrict("[MessageClass] = 'IPM.Note'").Count
except Exception:
try:
return items.Count
except Exception:
return None
class NameCodec:
INVALID_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
SPACE_RE = re.compile(r'\s+')
@classmethod
def compact_text(cls, value: str, max_len: int = 32) -> str:
value = cls.SPACE_RE.sub(" ", value or "").strip()
if len(value) <= max_len:
return value
return value[: max_len - 3].rstrip() + "..."
@classmethod
def store_display_name(cls, store_name: str) -> str:
store_name = store_name or "Account"
if "@" in store_name:
local_part = store_name.split("@", 1)[0]
return cls.compact_text(local_part, max_len=34)
return cls.compact_text(store_name, max_len=34)
@classmethod
def tree_label(cls, display_name: str, mail_count: Optional[int]) -> str:
display_name = cls.compact_text(display_name or "Cartella", max_len=46)
if mail_count is None:
return display_name
return f"{display_name} ({mail_count})"
@classmethod
def sanitize(cls, value: str, max_len: int = 80) -> str:
value = value or ""
value = cls.INVALID_CHARS_RE.sub("_", value)
value = cls.SPACE_RE.sub(" ", value).strip()
value = value.replace(".", "_")
if not value:
value = "vuoto"
if len(value) > max_len:
value = value[:max_len].rstrip(" _")
return value
@classmethod
def folder_name_from_outlook_path(cls, folder_path: str) -> str:
# Rimuove lo store iniziale quando il path arriva dall'albero "Store / Cartella".
parts = [p.strip() for p in folder_path.split(" / ")]
parts = [p for p in parts if p]
export_parts = parts[1:] if len(parts) > 1 else parts
safe_parts = [cls.sanitize(p, max_len=50) for p in export_parts]
return os.path.join(*safe_parts) if safe_parts else "cartella"
@classmethod
def archive_base_name_from_outlook_path(cls, folder_path: str) -> str:
parts = [p.strip() for p in folder_path.split(" / ")]
parts = [p for p in parts if p]
folder_name = parts[-1] if parts else "export"
return cls.sanitize(folder_name, max_len=60)
@classmethod
def email_base_name(cls, received_time: Optional[datetime], seq: int) -> str:
if received_time is None:
stamp = "data_sconosciuta"
else:
stamp = received_time.strftime("%Y-%m-%d_%H-%M-%S")
return f"mail_{stamp}_{seq:05d}"
@classmethod
def mail_folder_name(cls, base_mail_name: str, subject: str) -> str:
safe_subject = cls.sanitize(subject or "senza oggetto", max_len=50)
return f"{base_mail_name}__{safe_subject}"
@classmethod
def attachment_name(cls, base_mail_name: str, original_filename: str, att_index: int) -> str:
original_filename = original_filename or f"attachment_{att_index}"
original_filename = os.path.basename(original_filename)
stem, ext = os.path.splitext(original_filename)
stem = cls.sanitize(stem, max_len=80)
ext = cls.sanitize_extension(ext)
return f"allegato__{base_mail_name}__{att_index:02d}__{stem}{ext}"
@classmethod
def sanitize_extension(cls, ext: str, max_len: int = 20) -> str:
ext = (ext or "").strip().lower()
if not ext:
return ""
if not ext.startswith("."):
ext = f".{ext}"
ext = cls.INVALID_CHARS_RE.sub("_", ext)
ext = ext.replace(" ", "_")
if len(ext) > max_len:
ext = ext[:max_len]
return ext
@classmethod
def is_allowed_attachment(cls, filename: str) -> bool:
ext = cls.sanitize_extension(os.path.splitext(filename or "")[1])
return ext in ALLOWED_ATTACHMENT_EXTENSIONS
class Exporter:
def __init__(self, app: "OutlookExporterApp") -> None:
self.app = app
self.cancel_requested = False
self.global_seq = 0
self.exported_mail_count = 0
self.exported_attachment_count = 0
def cancel(self) -> None:
self.cancel_requested = True
def check_cancel(self) -> None:
if self.cancel_requested:
raise ExportCancelled("Esportazione annullata dall'utente.")
def export_folder(self, folder_obj: object, folder_out_path: Path) -> Tuple[int, int]:
self.check_cancel()
folder_out_path.mkdir(parents=True, exist_ok=True)
folder_name = self._safe_get(folder_obj, "Name") or "(cartella senza nome)"
self.app.log(f"Cartella Outlook: {folder_name}")
self.app.log(f"Cartella destinazione: {folder_out_path}")
items = self._safe_get(folder_obj, "Items")
if items is None:
self.app.log("Attenzione: impossibile leggere gli elementi della cartella, continuo con le sottocartelle.")
count = 0
else:
try:
items.Sort("[ReceivedTime]", False)
except Exception:
self.app.log("Attenzione: impossibile ordinare per ReceivedTime, continuo senza ordinamento.")
try:
count = items.Count
except Exception as exc:
self.app.log(f"Attenzione: impossibile contare gli elementi della cartella: {exc}")
count = 0
local_mail_count = 0
local_attachment_count = 0
self.app.log(f"Elementi rilevati: {count}")
for idx in range(1, count + 1):
self.check_cancel()
try:
item = items.Item(idx)
except Exception as exc:
self.app.log(f"Errore accesso elemento {idx}: {exc}")
continue
message_class = self._safe_get(item, "Class")
if message_class != MAIL_CLASS:
continue
try:
self.global_seq += 1
mail_count, att_count = self._export_mail(item, folder_out_path)
local_mail_count += mail_count
local_attachment_count += att_count
self.exported_mail_count += mail_count
self.exported_attachment_count += att_count
except Exception as exc:
subject = self._safe_get(item, "Subject") or "(senza oggetto)"
self.app.log(f"Errore esportando mail '{subject}': {exc}")
if idx % 10 == 0:
self.app.set_status(
f"Esportazione in corso... elemento {idx}/{count} | mail {self.exported_mail_count} | allegati {self.exported_attachment_count}"
)
self.app.pump_ui()
subfolders = self._safe_get(folder_obj, "Folders")
subfolder_count = 0
try:
subfolder_count = subfolders.Count if subfolders is not None else 0
except Exception as exc:
self.app.log(f"Attenzione: impossibile leggere le sottocartelle: {exc}")
for i in range(1, subfolder_count + 1):
self.check_cancel()
try:
child = subfolders.Item(i)
except Exception as exc:
self.app.log(f"Errore accesso sottocartella {i}: {exc}")
continue
child_name = self._safe_get(child, "Name") or f"sottocartella_{i}"
child_dir = folder_out_path / NameCodec.sanitize(str(child_name), max_len=80)
m, a = self.export_folder(child, child_dir)
local_mail_count += m
local_attachment_count += a
return local_mail_count, local_attachment_count
def _export_mail(self, item: object, folder_out_path: Path) -> Tuple[int, int]:
received_dt = self._convert_received_time(self._safe_get(item, "ReceivedTime"))
subject = self._safe_get(item, "Subject") or ""
body = self._safe_get(item, "Body") or ""
base_name = NameCodec.email_base_name(received_dt, self.global_seq)
mail_dir = self._unique_path(folder_out_path / NameCodec.mail_folder_name(base_name, subject))
mail_dir.mkdir(parents=True, exist_ok=True)
txt_path = mail_dir / f"{base_name}.txt"
header_lines = [
f"Oggetto: {subject}",
f"Ricevuta: {received_dt.strftime('%Y-%m-%d %H:%M:%S') if received_dt else ''}",
"",
"--- TESTO EMAIL ---",
body,
]
txt_path.write_text("\n".join(header_lines), encoding="utf-8", errors="replace")
self.app.log(f"Mail salvata: {txt_path.name}")
attachment_count = 0
attachments = self._safe_get(item, "Attachments")
if attachments is not None:
try:
attachment_total = attachments.Count
except Exception as exc:
self.app.log(f" Errore leggendo il numero di allegati: {exc}")
attachment_total = 0
attachments_dir = mail_dir / "allegati"
for att_index in range(1, attachment_total + 1):
self.check_cancel()
try:
attachment = attachments.Item(att_index)
original_filename = self._safe_get(attachment, "FileName") or f"attachment_{att_index}"
if not NameCodec.is_allowed_attachment(original_filename):
self.app.log(f" Allegato ignorato: {original_filename}")
continue
export_name = NameCodec.attachment_name(base_name, original_filename, att_index)
attachments_dir.mkdir(parents=True, exist_ok=True)
export_path = self._unique_path(attachments_dir / export_name)
attachment.SaveAsFile(str(export_path))
attachment_count += 1
self.app.log(f" Allegato salvato: {export_path.name}")
except Exception as exc:
self.app.log(f" Errore salvando allegato {att_index}: {exc}")
return 1, attachment_count
@staticmethod
def _unique_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 2
while True:
candidate = parent / f"{stem}__{counter}{suffix}"
if not candidate.exists():
return candidate
counter += 1
@staticmethod
def _safe_get(obj: object, attr: str):
try:
return getattr(obj, attr)
except Exception:
return None
@staticmethod
def _convert_received_time(value) -> Optional[datetime]:
if value is None:
return None
if isinstance(value, datetime):
return value
try:
return datetime(
value.year,
value.month,
value.day,
value.hour,
value.minute,
value.second,
)
except Exception:
return None
class OutlookExporterApp(ctk.CTk):
def __init__(self) -> None:
super().__init__()
self.title(APP_TITLE)
self.geometry(APP_GEOMETRY)
self.minsize(1080, 680)
self.withdraw()
ctk.set_appearance_mode("system")
ctk.set_default_color_theme("blue")
self.splash = None
self.show_startup_splash()
self.outlook_service = OutlookService()
self.exporter = Exporter(self)
self.export_thread = None
self.main_thread_id = threading.get_ident()
self.ui_queue = queue.Queue()
self.connected = False
self.tree_folder_map = {}
self.tree_path_map = {}
self.selected_folder_obj = None
default_output_dir = Path.home() / "Desktop" / "outlook_export"
self.prefs = PreferenceStore(default_output_dir)
self.selected_folder_display = ctk.StringVar(value="Nessuna cartella selezionata")
self.output_dir_var = ctk.StringVar(value=self.prefs.get_output_dir())
self.status_var = ctk.StringVar(value="Pronto")
self.create_zip_var = ctk.IntVar(value=1)
self.create_hierarchy_var = ctk.IntVar(value=0)
self._build_ui()
self.hide_startup_splash()
self.deiconify()
self.after(100, self.process_ui_queue)
self.protocol("WM_DELETE_WINDOW", self.on_close)
def show_startup_splash(self) -> None:
splash = ctk.CTkToplevel(self)
splash.title(APP_TITLE)
splash.overrideredirect(True)
splash.resizable(False, False)
splash.attributes("-topmost", True)
width = 420
height = 180
x = max(0, int((splash.winfo_screenwidth() - width) / 2))
y = max(0, int((splash.winfo_screenheight() - height) / 2))
splash.geometry(f"{width}x{height}+{x}+{y}")
splash.grid_columnconfigure(0, weight=1)
splash.grid_rowconfigure(0, weight=1)
content = ctk.CTkFrame(splash)
content.grid(row=0, column=0, sticky="nsew", padx=12, pady=12)
content.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(
content,
text="MailExporter",
font=ctk.CTkFont(size=26, weight="bold"),
).grid(row=0, column=0, pady=(24, 8))
ctk.CTkLabel(content, text="Avvio in corso...").grid(row=1, column=0, pady=(0, 18))
progress = ctk.CTkProgressBar(content, mode="indeterminate")
progress.grid(row=2, column=0, sticky="ew", padx=34, pady=(0, 18))
progress.start()
self.splash = splash
self.update_idletasks()
self.update()
def hide_startup_splash(self) -> None:
if self.splash is not None:
try:
self.splash.destroy()
except Exception:
pass
self.splash = None
def _build_ui(self) -> None:
self.grid_columnconfigure(0, weight=1)
self.grid_rowconfigure(1, weight=1)
top = ctk.CTkFrame(self)
top.grid(row=0, column=0, sticky="nsew", padx=12, pady=(12, 6))
top.grid_columnconfigure(3, weight=1)
self.btn_connect = ctk.CTkButton(top, text="Connetti a Outlook gia aperto", command=self.connect_outlook)
self.btn_connect.grid(row=0, column=0, padx=8, pady=8)
self.btn_refresh = ctk.CTkButton(top, text="Ricarica cartelle", command=self.reload_tree, state="disabled")
self.btn_refresh.grid(row=0, column=1, padx=8, pady=8)
self.btn_expand = ctk.CTkButton(top, text="Espandi tutto", command=self.expand_all, state="disabled")
self.btn_expand.grid(row=0, column=2, padx=8, pady=8)
lbl_selected = ctk.CTkLabel(top, textvariable=self.selected_folder_display, anchor="w")
lbl_selected.grid(row=0, column=3, padx=8, pady=8, sticky="ew")
main_pane = ttk.Panedwindow(self, orient="horizontal")
main_pane.grid(row=1, column=0, sticky="nsew", padx=12, pady=(6, 12))
left = ctk.CTkFrame(main_pane)
left.grid_rowconfigure(1, weight=1)
left.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(left, text="Albero cartelle Outlook", font=ctk.CTkFont(size=18, weight="bold")).grid(
row=0, column=0, sticky="w", padx=10, pady=(10, 4)
)
tree_container = ctk.CTkFrame(left)
tree_container.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4, 10))
tree_container.grid_rowconfigure(0, weight=1)
tree_container.grid_columnconfigure(0, weight=1)
style = ttk.Style()
try:
style.theme_use("default")
except Exception:
pass
style.configure("Treeview", rowheight=24)
self.tree = ttk.Treeview(tree_container, show="tree")
self.tree.grid(row=0, column=0, sticky="nsew")
self.tree.bind("<<TreeviewOpen>>", self.on_tree_open)
self.tree.bind("<<TreeviewSelect>>", self.on_tree_select)
yscroll = ttk.Scrollbar(tree_container, orient="vertical", command=self.tree.yview)
yscroll.grid(row=0, column=1, sticky="ns")
self.tree.configure(yscrollcommand=yscroll.set)
right = ctk.CTkFrame(main_pane)
right.grid_rowconfigure(1, weight=1)
right.grid_columnconfigure(0, weight=1)
main_pane.add(left, weight=3)
main_pane.add(right, weight=2)
right_top = ctk.CTkFrame(right)
right_top.grid(row=0, column=0, sticky="nsew", padx=0, pady=(0, 6))
right_top.grid_columnconfigure(1, weight=1)
ctk.CTkLabel(right_top, text="Esportazione", font=ctk.CTkFont(size=18, weight="bold")).grid(
row=0, column=0, columnspan=3, sticky="w", padx=10, pady=(10, 8)
)
ctk.CTkLabel(right_top, text="Cartella di output:").grid(row=1, column=0, padx=10, pady=6, sticky="w")
self.entry_output = ctk.CTkEntry(right_top, textvariable=self.output_dir_var)
self.entry_output.grid(row=1, column=1, padx=10, pady=6, sticky="ew")
self.btn_browse = ctk.CTkButton(right_top, text="Sfoglia", width=110, command=self.choose_output_dir)
self.btn_browse.grid(row=1, column=2, padx=10, pady=6)
help_text = (
"La cartella selezionata verrà esportata insieme a tutte le sue sottocartelle.\n"
"Ogni email verrà salvata in TXT e gli allegati verranno salvati nel formato originale."
)
ctk.CTkLabel(right_top, text=help_text, justify="left").grid(
row=2, column=0, columnspan=3, padx=10, pady=(4, 8), sticky="w"
)
options = ctk.CTkFrame(right_top, fg_color="transparent")
options.grid(row=3, column=0, columnspan=3, sticky="ew", padx=10, pady=(4, 8))
options.grid_columnconfigure(0, weight=1)
options.grid_columnconfigure(1, weight=1)
self.chk_zip = ctk.CTkCheckBox(
options,
text="Crea zip",
variable=self.create_zip_var,
command=lambda: self.on_export_mode_change("zip"),
)
self.chk_zip.grid(row=0, column=0, sticky="w", padx=(0, 12))
self.chk_hierarchy = ctk.CTkCheckBox(
options,
text="Crea gerarchia",
variable=self.create_hierarchy_var,
command=lambda: self.on_export_mode_change("hierarchy"),
)
self.chk_hierarchy.grid(row=0, column=1, sticky="w", padx=(12, 0))
self.btn_export = ctk.CTkButton(
right_top,
text="Esporta cartella selezionata",
command=self.start_export,
state="disabled",
height=36,
)
self.btn_export.grid(row=4, column=0, columnspan=2, padx=10, pady=(8, 10), sticky="ew")
self.btn_cancel = ctk.CTkButton(
right_top,
text="Annulla esportazione",
command=self.cancel_export,
state="disabled",
fg_color="#9b2c2c",
hover_color="#7f1d1d",
)
self.btn_cancel.grid(row=4, column=2, padx=10, pady=(8, 10), sticky="ew")
right_bottom = ctk.CTkFrame(right)
right_bottom.grid(row=1, column=0, sticky="nsew", padx=0, pady=(6, 0))
right_bottom.grid_rowconfigure(1, weight=1)
right_bottom.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(right_bottom, text="Log", font=ctk.CTkFont(size=18, weight="bold")).grid(
row=0, column=0, sticky="w", padx=10, pady=(10, 4)
)
self.log_box = ctk.CTkTextbox(right_bottom)
self.log_box.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4, 10))
self.log_box.configure(state="disabled")
status = ctk.CTkFrame(self)
status.grid(row=2, column=0, sticky="ew", padx=12, pady=(0, 12))
status.grid_columnconfigure(0, weight=1)
status.grid_columnconfigure(1, weight=0)
ctk.CTkLabel(status, textvariable=self.status_var, anchor="w").grid(
row=0, column=0, sticky="ew", padx=10, pady=8
)
self.btn_exit = ctk.CTkButton(status, text="Esci", width=100, command=self.on_close)
self.btn_exit.grid(row=0, column=1, sticky="e", padx=10, pady=8)
def connect_outlook(self) -> None:
try:
self.set_status("Connessione a Outlook in corso...")
self.log("Connessione a Outlook...")
self.outlook_service.connect()
self.connected = True
self.log("Connesso a Outlook.")
self.reload_tree()
self.btn_refresh.configure(state="normal")
self.btn_expand.configure(state="normal")
self.set_status("Connesso a Outlook")
except Exception as exc:
self.connected = False
self.log(f"Errore connessione Outlook: {exc}")
self.set_status("Errore di connessione")
messagebox.showerror(APP_TITLE, f"Impossibile collegarsi a Outlook.\n\n{exc}")
def reload_tree(self) -> None:
if not self.connected:
return
self.tree.delete(*self.tree.get_children())
self.tree_folder_map.clear()
self.tree_path_map.clear()
self.selected_folder_obj = None
self.selected_folder_display.set("Nessuna cartella selezionata")
self.btn_export.configure(state="disabled")
for root_node in self.outlook_service.list_root_folders():
node_text = NameCodec.tree_label(root_node.display_name or root_node.path, root_node.mail_count)
node_id = self.tree.insert("", "end", text=node_text, open=False)
self.tree_folder_map[node_id] = root_node.folder_obj
self.tree_path_map[node_id] = root_node.path
self._insert_dummy(node_id)
self.log("Albero cartelle ricaricato.")
def _insert_dummy(self, node_id: str) -> None:
self.tree.insert(node_id, "end", text="__dummy__")
def on_tree_open(self, _event=None) -> None:
selected = self.tree.focus()
if not selected:
return
children = self.tree.get_children(selected)
if len(children) == 1 and self.tree.item(children[0], "text") == "__dummy__":
self.tree.delete(children[0])
folder_obj = self.tree_folder_map.get(selected)
current_path = self.tree_path_map.get(selected, self.tree.item(selected, "text"))
try:
child_nodes = self.outlook_service.list_children(folder_obj, current_path)
for node in child_nodes:
node_text = NameCodec.tree_label(node.display_name or node.path, node.mail_count)
child_id = self.tree.insert(selected, "end", text=node_text, open=False)
self.tree_folder_map[child_id] = node.folder_obj
self.tree_path_map[child_id] = node.path
try:
if node.folder_obj.Folders.Count > 0:
self._insert_dummy(child_id)
except Exception:
pass
except Exception as exc:
self.log(f"Errore caricando sottocartelle: {exc}")
def on_tree_select(self, _event=None) -> None:
selected = self.tree.selection()
if not selected:
return
node_id = selected[0]
folder_obj = self.tree_folder_map.get(node_id)
self.selected_folder_obj = folder_obj
folder_path = self.tree_path_map.get(node_id, self.tree.item(node_id, "text"))
if folder_obj is None:
self.selected_folder_display.set("Nessuna cartella selezionata")
self.btn_export.configure(state="disabled")
return
self.selected_folder_display.set(f"Selezionata: {folder_path}")
self.btn_export.configure(state="normal")
def expand_all(self) -> None:
def _expand(node_id: str) -> None:
self.tree.item(node_id, open=True)
self.tree.focus(node_id)
self.on_tree_open()
for child in self.tree.get_children(node_id):
if self.tree.item(child, "text") != "__dummy__":
_expand(child)
for root in self.tree.get_children(""):
_expand(root)
self.log("Espansione albero completata.")
def choose_output_dir(self) -> None:
selected = filedialog.askdirectory(initialdir=self.output_dir_var.get() or str(Path.home()))
if selected:
self.output_dir_var.set(selected)
self.prefs.set_output_dir(selected)
def on_export_mode_change(self, selected_mode: str) -> None:
if selected_mode == "zip":
self.create_zip_var.set(1)
self.create_hierarchy_var.set(0)
else:
self.create_zip_var.set(0)
self.create_hierarchy_var.set(1)
if not self.create_zip_var.get() and not self.create_hierarchy_var.get():
self.create_zip_var.set(1)
def start_export(self) -> None:
if self.export_thread is not None and self.export_thread.is_alive():
messagebox.showwarning(APP_TITLE, "Una esportazione e gia in corso.")
return
if self.selected_folder_obj is None:
messagebox.showwarning(APP_TITLE, "Seleziona prima una cartella Outlook.")
return
output_dir = self.output_dir_var.get().strip()
if not output_dir:
messagebox.showwarning(APP_TITLE, "Scegli una cartella di output valida.")
return
output_root = Path(output_dir)
try:
output_root.mkdir(parents=True, exist_ok=True)
except Exception as exc:
messagebox.showerror(APP_TITLE, f"Impossibile creare la cartella di output.\n\n{exc}")
return
self.prefs.set_output_dir(str(output_root))
folder_text = self.selected_folder_display.get().replace("Selezionata: ", "")
relative_path = NameCodec.folder_name_from_outlook_path(folder_text)
zip_mode = bool(self.create_zip_var.get())
if zip_mode:
confirm_message = (
"Verra creato un archivio ZIP nella cartella di output.\n"
"La gerarchia temporanea usata per preparare l'archivio verra rimossa al termine.\n\n"
"Vuoi continuare?"
)
else:
confirm_message = (
"Verra creata una gerarchia di cartelle nella cartella di output.\n\n"
"Vuoi continuare?"
)
if not messagebox.askyesno(APP_TITLE, confirm_message):
return
self._set_export_controls_running()
self.exporter = Exporter(self)
self.log("=" * 80)
self.log(f"Avvio esportazione: {folder_text}")
self.log(f"Modalita: {'ZIP' if zip_mode else 'Gerarchia'}")
self.set_status("Esportazione in corso...")
self.export_thread = threading.Thread(
target=self._run_export_worker,
args=(
self._marshal_folder_for_worker(self.selected_folder_obj),
output_root,
relative_path,
folder_text,
zip_mode,
),
daemon=True,
)
self.export_thread.start()
def cancel_export(self) -> None:
self.exporter.cancel()
self.set_status("Richiesta annullamento in corso...")
self.log("Richiesta di annullamento inviata.")
def _set_export_controls_running(self) -> None:
self.btn_export.configure(state="disabled")
self.btn_cancel.configure(state="normal")
self.btn_refresh.configure(state="disabled")
self.btn_connect.configure(state="disabled")
self.btn_expand.configure(state="disabled")
self.chk_zip.configure(state="disabled")
self.chk_hierarchy.configure(state="disabled")
def _restore_export_controls(self) -> None:
self.btn_export.configure(state="normal" if self.selected_folder_obj else "disabled")
self.btn_cancel.configure(state="disabled")
self.btn_refresh.configure(state="normal" if self.connected else "disabled")
self.btn_connect.configure(state="normal")
self.btn_expand.configure(state="normal" if self.connected else "disabled")
self.chk_zip.configure(state="normal")
self.chk_hierarchy.configure(state="normal")
def _run_export_worker(
self,
folder_ref: object,
output_root: Path,
relative_path: str,
folder_text: str,
zip_mode: bool,
) -> None:
com_initialized = False
try:
if pythoncom is not None:
pythoncom.CoInitialize()
com_initialized = True
folder_obj = self._resolve_worker_folder(folder_ref)
if zip_mode:
with tempfile.TemporaryDirectory(prefix="outlook_export_") as temp_dir:
temp_root = Path(temp_dir)
temp_export_target = temp_root / relative_path
self.log(f"Destinazione temporanea: {temp_export_target}")
mails, attachments = self.exporter.export_folder(folder_obj, temp_export_target)
self.exporter.check_cancel()
zip_path = self._create_zip_archive(temp_root, output_root, folder_text)
result_path = zip_path
else:
export_target = output_root / relative_path
self.log(f"Destinazione: {export_target}")
mails, attachments = self.exporter.export_folder(folder_obj, export_target)
result_path = export_target
self.ui_queue.put(("complete", mails, attachments, result_path, zip_mode))
except ExportCancelled:
self.ui_queue.put(("cancelled",))
except Exception as exc:
self.ui_queue.put(("error", str(exc), traceback.format_exc()))
finally:
if com_initialized:
try:
pythoncom.CoUninitialize()
except Exception:
pass
@staticmethod
def _marshal_folder_for_worker(folder_obj: object) -> object:
if pythoncom is None:
return folder_obj
try:
ole_obj = getattr(folder_obj, "_oleobj_", None)
if ole_obj is None:
return folder_obj
return pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, ole_obj)
except Exception:
return folder_obj
@staticmethod
def _resolve_worker_folder(folder_ref: object) -> object:
if pythoncom is None or win32com is None:
return folder_ref
try:
dispatch = pythoncom.CoGetInterfaceAndReleaseStream(folder_ref, pythoncom.IID_IDispatch)
return win32com.client.Dispatch(dispatch)
except Exception:
return folder_ref
def _create_zip_archive(self, source_root: Path, output_root: Path, folder_text: str) -> Path:
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
base_name = NameCodec.archive_base_name_from_outlook_path(folder_text)
zip_path = Exporter._unique_path(output_root / f"{base_name}_{stamp}.zip")
self.log(f"Creazione archivio ZIP: {zip_path}")
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for path in source_root.rglob("*"):
self.exporter.check_cancel()
if path.is_file():
archive.write(path, path.relative_to(source_root).as_posix())
return zip_path
def process_ui_queue(self) -> None:
try:
while True:
event = self.ui_queue.get_nowait()
event_type = event[0]
if event_type == "log":
self._write_log(event[1])
elif event_type == "status":
self.status_var.set(event[1])
elif event_type == "complete":
_, mails, attachments, result_path, zip_mode = event
self.set_status(f"Esportazione completata | mail: {mails} | allegati: {attachments}")
self.log(f"Esportazione completata. Mail: {mails}, Allegati: {attachments}")
self._restore_export_controls()
self.show_export_complete_dialog(mails, attachments, result_path, zip_mode)
elif event_type == "cancelled":
self.set_status("Esportazione annullata")
self.log("Esportazione annullata dall'utente.")
self._restore_export_controls()
messagebox.showinfo(APP_TITLE, "Esportazione annullata.")
elif event_type == "error":
_, message, details = event
self.set_status("Errore durante l'esportazione")
self.log(f"Errore esportazione: {message}")
self.log(details)
self._restore_export_controls()
messagebox.showerror(APP_TITLE, f"Errore durante l'esportazione.\n\n{message}")
except queue.Empty:
pass
finally:
self.after(100, self.process_ui_queue)
def show_export_complete_dialog(self, mails: int, attachments: int, export_target: Path, zip_mode: bool) -> None:
dialog = ctk.CTkToplevel(self)
dialog.title("Esportazione completata")
dialog.geometry("520x260")
dialog.resizable(False, False)
dialog.transient(self)
dialog.grab_set()
dialog.grid_columnconfigure(0, weight=1)
dialog.grid_rowconfigure(1, weight=1)
ctk.CTkLabel(
dialog,
text="Esportazione completata",
font=ctk.CTkFont(size=20, weight="bold"),
).grid(row=0, column=0, sticky="w", padx=20, pady=(20, 8))
summary = (
f"Mail esportate: {mails}\n"
f"Allegati esportati: {attachments}\n\n"
f"{'Archivio ZIP' if zip_mode else 'Cartella'}:\n{export_target}"
)
ctk.CTkLabel(dialog, text=summary, justify="left", anchor="w").grid(
row=1, column=0, sticky="nsew", padx=20, pady=(0, 16)
)
buttons = ctk.CTkFrame(dialog, fg_color="transparent")
buttons.grid(row=2, column=0, sticky="ew", padx=20, pady=(0, 20))
buttons.grid_columnconfigure(0, weight=1)
buttons.grid_columnconfigure(1, weight=1)
def open_export_folder() -> None:
try:
target = export_target.parent if zip_mode else export_target
os.startfile(str(target))
except Exception as exc:
messagebox.showerror(APP_TITLE, f"Impossibile aprire la destinazione di export.\n\n{exc}")
finally:
dialog.destroy()
ctk.CTkButton(
buttons,
text="Vai alla cartella di zip" if zip_mode else "Vai alla cartella di export",
command=open_export_folder,
height=38,
).grid(row=0, column=0, sticky="ew", padx=(0, 8))
ctk.CTkButton(
buttons,
text="Chiudi",
command=dialog.destroy,
height=38,
fg_color="#5f6368",
hover_color="#4b4f52",
).grid(row=0, column=1, sticky="ew", padx=(8, 0))
dialog.protocol("WM_DELETE_WINDOW", dialog.destroy)
dialog.after(100, dialog.focus_force)
def log(self, message: str) -> None:
if threading.get_ident() != self.main_thread_id:
self.ui_queue.put(("log", message))
return
self._write_log(message)
def _write_log(self, message: str) -> None:
timestamp = datetime.now().strftime("%H:%M:%S")
line = f"[{timestamp}] {message}\n"
self.log_box.configure(state="normal")
self.log_box.insert("end", line)
current_text = self.log_box.get("1.0", "end")
lines = current_text.splitlines()
if len(lines) > MAX_LOG_LINES:
trimmed = "\n".join(lines[-MAX_LOG_LINES:]) + "\n"
self.log_box.delete("1.0", "end")
self.log_box.insert("1.0", trimmed)
self.log_box.see("end")
self.log_box.configure(state="disabled")
self.pump_ui()
def set_status(self, message: str) -> None:
if threading.get_ident() != self.main_thread_id:
self.ui_queue.put(("status", message))
return
self.status_var.set(message)
self.pump_ui()
def pump_ui(self) -> None:
if threading.get_ident() != self.main_thread_id:
return
try:
self.update_idletasks()
self.update()
except Exception:
pass
def on_close(self) -> None:
try:
self.outlook_service.disconnect()
finally:
self.destroy()
if __name__ == "__main__":
if sys.platform != "win32":
raise SystemExit("Questo programma funziona solo su Windows.")
app = OutlookExporterApp()
app.mainloop()