1481 lines
55 KiB
Python
1481 lines
55 KiB
Python
import json
|
|
import os
|
|
import queue
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import traceback
|
|
import zipfile
|
|
from pathlib import Path
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import List, Optional, Tuple
|
|
|
|
import customtkinter as ctk
|
|
from tkinter import filedialog, messagebox
|
|
from tkinter import ttk
|
|
|
|
try:
|
|
import pythoncom
|
|
import win32timezone # required by pywin32 when Outlook returns timezone-aware COM dates
|
|
import win32com.client
|
|
except ImportError:
|
|
pythoncom = None
|
|
win32com = None
|
|
|
|
|
|
APP_TITLE = "Outlook Exporter"
|
|
APP_GEOMETRY = "1220x760"
|
|
MAIL_CLASS = 43 # OlObjectClass.olMail
|
|
MAX_LOG_LINES = 3000
|
|
PREFS_APP_DIR = "OutlookExporter"
|
|
PREFS_FILE_NAME = "prefs.json"
|
|
LOG_FILE_NAME = "mail_exporter.log"
|
|
OUTLOOK_NOT_RUNNING_MESSAGE = (
|
|
"Outlook non risulta aperto.\n\n"
|
|
"Apri Microsoft Outlook con il profilo email da esportare, attendi che abbia completato il caricamento, "
|
|
"poi riducilo a icona e torna qui per premere di nuovo 'Connetti a Outlook'."
|
|
)
|
|
ALLOWED_ATTACHMENT_EXTENSIONS = {
|
|
".pdf",
|
|
".doc",
|
|
".docx",
|
|
".docm",
|
|
".dot",
|
|
".dotx",
|
|
".dotm",
|
|
".xls",
|
|
".xlsx",
|
|
".xlsm",
|
|
".xlsb",
|
|
".xlt",
|
|
".xltx",
|
|
".xltm",
|
|
".csv",
|
|
".ppt",
|
|
".pptx",
|
|
".pptm",
|
|
".pps",
|
|
".ppsx",
|
|
".ppsm",
|
|
".pot",
|
|
".potx",
|
|
".potm",
|
|
".rtf",
|
|
".fodp",
|
|
".fods",
|
|
".fodt",
|
|
".odb",
|
|
".odf",
|
|
".odg",
|
|
".odt",
|
|
".ods",
|
|
".odp",
|
|
".otg",
|
|
".oth",
|
|
".otp",
|
|
".ots",
|
|
".ott",
|
|
".sxc",
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".bmp",
|
|
".tif",
|
|
".tiff",
|
|
".webp",
|
|
}
|
|
IMAGE_ATTACHMENT_EXTENSIONS = {
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".bmp",
|
|
".tif",
|
|
".tiff",
|
|
".webp",
|
|
}
|
|
MAPI_ATTACHMENT_HIDDEN = "http://schemas.microsoft.com/mapi/proptag/0x7FFE000B"
|
|
MAPI_ATTACH_CONTENT_ID = "http://schemas.microsoft.com/mapi/proptag/0x3712001F"
|
|
MAPI_ATTACH_CONTENT_LOCATION = "http://schemas.microsoft.com/mapi/proptag/0x3713001F"
|
|
MAPI_ATTACH_CONTENT_DISPOSITION = "http://schemas.microsoft.com/mapi/proptag/0x3716001F"
|
|
INLINE_ATTACHMENT_NAME_RE = re.compile(
|
|
r"^(image\d{3,}|att\d{5}|oledata|logo|spacer|facebook|linkedin|twitter|instagram|youtube)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class FolderNode:
|
|
store_name: str
|
|
path: str
|
|
folder_obj: object
|
|
display_name: Optional[str] = None
|
|
mail_count: Optional[int] = None
|
|
|
|
|
|
class ExportCancelled(Exception):
|
|
pass
|
|
|
|
|
|
class PreferenceStore:
|
|
def __init__(self, default_output_dir: Path) -> None:
|
|
self.default_output_dir = default_output_dir
|
|
self.path = self._prefs_path()
|
|
self.data = self._load_or_create()
|
|
|
|
def get_output_dir(self) -> str:
|
|
value = str(self.data.get("output_dir") or "").strip()
|
|
return value or str(self.default_output_dir)
|
|
|
|
def set_output_dir(self, output_dir: str) -> None:
|
|
output_dir = str(output_dir or "").strip()
|
|
if not output_dir:
|
|
return
|
|
self.data["output_dir"] = output_dir
|
|
self._save()
|
|
|
|
@classmethod
|
|
def _prefs_path(cls) -> Path:
|
|
appdata = os.environ.get("APPDATA")
|
|
base_dir = Path(appdata) if appdata else Path.home() / ".config"
|
|
return base_dir / PREFS_APP_DIR / PREFS_FILE_NAME
|
|
|
|
@classmethod
|
|
def app_data_dir(cls) -> Path:
|
|
return cls._prefs_path().parent
|
|
|
|
def _load_or_create(self) -> dict:
|
|
if self.path.exists():
|
|
try:
|
|
with self.path.open("r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
return data if isinstance(data, dict) else {}
|
|
except Exception:
|
|
return {"output_dir": str(self.default_output_dir)}
|
|
|
|
data = {"output_dir": str(self.default_output_dir)}
|
|
self.data = data
|
|
self._save()
|
|
return data
|
|
|
|
def _save(self) -> None:
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
with self.path.open("w", encoding="utf-8") as fh:
|
|
json.dump(self.data, fh, ensure_ascii=False, indent=2)
|
|
|
|
|
|
class OutlookService:
|
|
def __init__(self) -> None:
|
|
self.outlook = None
|
|
self.namespace = None
|
|
self.root_folders: List[FolderNode] = []
|
|
|
|
def connect(self) -> None:
|
|
if win32com is None or pythoncom is None:
|
|
raise RuntimeError(
|
|
"pywin32 non è installato. Esegui: pip install pywin32"
|
|
)
|
|
|
|
pythoncom.CoInitialize()
|
|
try:
|
|
try:
|
|
self.outlook = win32com.client.GetActiveObject("Outlook.Application")
|
|
except Exception as exc:
|
|
raise RuntimeError(OUTLOOK_NOT_RUNNING_MESSAGE) from exc
|
|
|
|
self.namespace = self.outlook.GetNamespace("MAPI")
|
|
self.root_folders = []
|
|
|
|
for i in range(1, self.namespace.Folders.Count + 1):
|
|
store = self.namespace.Folders.Item(i)
|
|
store_name = str(store.Name)
|
|
node = FolderNode(
|
|
store_name=store_name,
|
|
path=NameCodec.store_display_name(store_name),
|
|
folder_obj=store,
|
|
display_name=NameCodec.store_display_name(store_name),
|
|
)
|
|
self.root_folders.append(node)
|
|
except Exception:
|
|
pythoncom.CoUninitialize()
|
|
raise
|
|
|
|
def disconnect(self) -> None:
|
|
try:
|
|
self.root_folders = []
|
|
self.namespace = None
|
|
self.outlook = None
|
|
finally:
|
|
if pythoncom is not None:
|
|
try:
|
|
pythoncom.CoUninitialize()
|
|
except Exception:
|
|
pass
|
|
|
|
def list_root_folders(self) -> List[FolderNode]:
|
|
return list(self.root_folders)
|
|
|
|
def list_children(self, folder_obj: object, current_path: str) -> List[FolderNode]:
|
|
children = []
|
|
folders = folder_obj.Folders
|
|
for i in range(1, folders.Count + 1):
|
|
child = folders.Item(i)
|
|
child_name = str(child.Name)
|
|
child_path = f"{current_path} / {child_name}"
|
|
children.append(
|
|
FolderNode(
|
|
store_name=current_path.split(" / ")[0],
|
|
path=child_path,
|
|
folder_obj=child,
|
|
display_name=child_name,
|
|
mail_count=self.count_mail_items(child),
|
|
)
|
|
)
|
|
return children
|
|
|
|
@staticmethod
|
|
def count_mail_items(folder_obj: object) -> Optional[int]:
|
|
try:
|
|
items = folder_obj.Items
|
|
except Exception:
|
|
return None
|
|
|
|
try:
|
|
return items.Restrict("[MessageClass] = 'IPM.Note'").Count
|
|
except Exception:
|
|
try:
|
|
return items.Count
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class NameCodec:
|
|
INVALID_CHARS_RE = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
|
|
SPACE_RE = re.compile(r'\s+')
|
|
|
|
@classmethod
|
|
def compact_text(cls, value: str, max_len: int = 32) -> str:
|
|
value = cls.SPACE_RE.sub(" ", value or "").strip()
|
|
if len(value) <= max_len:
|
|
return value
|
|
return value[: max_len - 3].rstrip() + "..."
|
|
|
|
@classmethod
|
|
def store_display_name(cls, store_name: str) -> str:
|
|
store_name = store_name or "Account"
|
|
if "@" in store_name:
|
|
local_part = store_name.split("@", 1)[0]
|
|
return cls.compact_text(local_part, max_len=34)
|
|
return cls.compact_text(store_name, max_len=34)
|
|
|
|
@classmethod
|
|
def tree_label(cls, display_name: str, mail_count: Optional[int]) -> str:
|
|
display_name = cls.compact_text(display_name or "Cartella", max_len=46)
|
|
if mail_count is None:
|
|
return display_name
|
|
return f"{display_name} ({mail_count})"
|
|
|
|
@classmethod
|
|
def sanitize(cls, value: str, max_len: int = 80) -> str:
|
|
value = value or ""
|
|
value = cls.INVALID_CHARS_RE.sub("_", value)
|
|
value = cls.SPACE_RE.sub(" ", value).strip()
|
|
value = value.replace(".", "_")
|
|
if not value:
|
|
value = "vuoto"
|
|
if len(value) > max_len:
|
|
value = value[:max_len].rstrip(" _")
|
|
return value
|
|
|
|
@classmethod
|
|
def folder_name_from_outlook_path(cls, folder_path: str) -> str:
|
|
# Rimuove lo store iniziale quando il path arriva dall'albero "Store / Cartella".
|
|
parts = [p.strip() for p in folder_path.split(" / ")]
|
|
parts = [p for p in parts if p]
|
|
export_parts = parts[1:] if len(parts) > 1 else parts
|
|
safe_parts = [cls.sanitize(p, max_len=50) for p in export_parts]
|
|
return os.path.join(*safe_parts) if safe_parts else "cartella"
|
|
|
|
@classmethod
|
|
def archive_base_name_from_outlook_path(cls, folder_path: str) -> str:
|
|
parts = [p.strip() for p in folder_path.split(" / ")]
|
|
parts = [p for p in parts if p]
|
|
folder_name = parts[-1] if parts else "export"
|
|
return cls.sanitize(folder_name, max_len=60)
|
|
|
|
@classmethod
|
|
def email_base_name(cls, received_time: Optional[datetime], seq: int) -> str:
|
|
if received_time is None:
|
|
stamp = "data_sconosciuta"
|
|
else:
|
|
stamp = received_time.strftime("%Y-%m-%d_%H-%M-%S")
|
|
return f"mail_{stamp}_{seq:05d}"
|
|
|
|
@classmethod
|
|
def mail_folder_name(cls, base_mail_name: str, subject: str, attachment_count: int = 0) -> str:
|
|
safe_subject = cls.sanitize(subject or "senza oggetto", max_len=50)
|
|
attachment_marker = f"CON_ALLEGATI_{attachment_count:02d}" if attachment_count else "SOLO_MAIL"
|
|
return f"{base_mail_name}__{attachment_marker}__{safe_subject}"
|
|
|
|
@classmethod
|
|
def attachment_name(cls, base_mail_name: str, original_filename: str, att_index: int) -> str:
|
|
original_filename = original_filename or f"attachment_{att_index}"
|
|
original_filename = os.path.basename(original_filename)
|
|
stem, ext = os.path.splitext(original_filename)
|
|
stem = cls.sanitize(stem, max_len=80)
|
|
ext = cls.sanitize_extension(ext)
|
|
return f"allegato__{base_mail_name}__{att_index:02d}__{stem}{ext}"
|
|
|
|
@classmethod
|
|
def sanitize_extension(cls, ext: str, max_len: int = 20) -> str:
|
|
ext = (ext or "").strip().lower()
|
|
if not ext:
|
|
return ""
|
|
if not ext.startswith("."):
|
|
ext = f".{ext}"
|
|
ext = cls.INVALID_CHARS_RE.sub("_", ext)
|
|
ext = ext.replace(" ", "_")
|
|
if len(ext) > max_len:
|
|
ext = ext[:max_len]
|
|
return ext
|
|
|
|
@classmethod
|
|
def is_allowed_attachment(cls, filename: str) -> bool:
|
|
ext = cls.sanitize_extension(os.path.splitext(filename or "")[1])
|
|
return ext in ALLOWED_ATTACHMENT_EXTENSIONS
|
|
|
|
|
|
class Exporter:
|
|
def __init__(self, app: "OutlookExporterApp") -> None:
|
|
self.app = app
|
|
self.cancel_requested = False
|
|
self.global_seq = 0
|
|
self.exported_mail_count = 0
|
|
self.exported_attachment_count = 0
|
|
self.datetime_diagnostic_count = 0
|
|
self.datetime_diagnostic_limit = 20
|
|
|
|
def cancel(self) -> None:
|
|
self.cancel_requested = True
|
|
|
|
def check_cancel(self) -> None:
|
|
if self.cancel_requested:
|
|
raise ExportCancelled("Esportazione annullata dall'utente.")
|
|
|
|
def export_folder(self, folder_obj: object, folder_out_path: Path) -> Tuple[int, int]:
|
|
self.check_cancel()
|
|
folder_out_path.mkdir(parents=True, exist_ok=True)
|
|
folder_name = self._safe_get(folder_obj, "Name") or "(cartella senza nome)"
|
|
self.app.log(f"Cartella Outlook: {folder_name}")
|
|
self.app.log(f"Cartella destinazione: {folder_out_path}")
|
|
|
|
items = self._safe_get(folder_obj, "Items")
|
|
if items is None:
|
|
self.app.log("Attenzione: impossibile leggere gli elementi della cartella, continuo con le sottocartelle.")
|
|
count = 0
|
|
else:
|
|
try:
|
|
items.Sort("[ReceivedTime]", False)
|
|
except Exception:
|
|
self.app.log("Attenzione: impossibile ordinare per ReceivedTime, continuo senza ordinamento.")
|
|
|
|
try:
|
|
count = items.Count
|
|
except Exception as exc:
|
|
self.app.log(f"Attenzione: impossibile contare gli elementi della cartella: {exc}")
|
|
count = 0
|
|
|
|
local_mail_count = 0
|
|
local_attachment_count = 0
|
|
|
|
self.app.log(f"Elementi rilevati: {count}")
|
|
|
|
for idx in range(1, count + 1):
|
|
self.check_cancel()
|
|
try:
|
|
item = items.Item(idx)
|
|
except Exception as exc:
|
|
self.app.log(f"Errore accesso elemento {idx}: {exc}")
|
|
continue
|
|
|
|
message_class = self._safe_get(item, "Class")
|
|
if message_class != MAIL_CLASS:
|
|
continue
|
|
|
|
try:
|
|
self.global_seq += 1
|
|
mail_count, att_count = self._export_mail(item, folder_out_path)
|
|
local_mail_count += mail_count
|
|
local_attachment_count += att_count
|
|
self.exported_mail_count += mail_count
|
|
self.exported_attachment_count += att_count
|
|
except Exception as exc:
|
|
subject = self._safe_get(item, "Subject") or "(senza oggetto)"
|
|
self.app.log(f"Errore esportando mail '{subject}': {exc}")
|
|
|
|
if idx % 10 == 0:
|
|
self.app.set_status(
|
|
f"Esportazione in corso... elemento {idx}/{count} | mail {self.exported_mail_count} | allegati {self.exported_attachment_count}"
|
|
)
|
|
self.app.pump_ui()
|
|
|
|
subfolders = self._safe_get(folder_obj, "Folders")
|
|
subfolder_count = 0
|
|
try:
|
|
subfolder_count = subfolders.Count if subfolders is not None else 0
|
|
except Exception as exc:
|
|
self.app.log(f"Attenzione: impossibile leggere le sottocartelle: {exc}")
|
|
|
|
for i in range(1, subfolder_count + 1):
|
|
self.check_cancel()
|
|
try:
|
|
child = subfolders.Item(i)
|
|
except Exception as exc:
|
|
self.app.log(f"Errore accesso sottocartella {i}: {exc}")
|
|
continue
|
|
child_name = self._safe_get(child, "Name") or f"sottocartella_{i}"
|
|
child_dir = folder_out_path / NameCodec.sanitize(str(child_name), max_len=80)
|
|
m, a = self.export_folder(child, child_dir)
|
|
local_mail_count += m
|
|
local_attachment_count += a
|
|
|
|
return local_mail_count, local_attachment_count
|
|
|
|
def _export_mail(self, item: object, folder_out_path: Path) -> Tuple[int, int]:
|
|
received_dt = self._get_best_mail_datetime(item)
|
|
subject = self._safe_get(item, "Subject") or ""
|
|
body = self._safe_get(item, "Body") or ""
|
|
html_body = self._safe_get(item, "HTMLBody") or ""
|
|
exportable_attachments = self._collect_exportable_attachments(item, html_body)
|
|
|
|
base_name = NameCodec.email_base_name(received_dt, self.global_seq)
|
|
mail_dir = self._unique_path(
|
|
folder_out_path / NameCodec.mail_folder_name(base_name, subject, len(exportable_attachments))
|
|
)
|
|
mail_dir.mkdir(parents=True, exist_ok=True)
|
|
txt_path = mail_dir / f"{base_name}.txt"
|
|
|
|
header_lines = [
|
|
f"Oggetto: {subject}",
|
|
f"Ricevuta: {received_dt.strftime('%Y-%m-%d %H:%M:%S') if received_dt else ''}",
|
|
"",
|
|
"--- TESTO EMAIL ---",
|
|
body,
|
|
]
|
|
txt_path.write_text("\n".join(header_lines), encoding="utf-8", errors="replace")
|
|
self.app.log(f"Mail salvata: {txt_path.name}")
|
|
|
|
attachment_count = 0
|
|
if exportable_attachments:
|
|
attachments_dir = mail_dir / "allegati"
|
|
for att_index, attachment, original_filename in exportable_attachments:
|
|
self.check_cancel()
|
|
try:
|
|
export_name = NameCodec.attachment_name(base_name, original_filename, att_index)
|
|
attachments_dir.mkdir(parents=True, exist_ok=True)
|
|
export_path = self._unique_path(attachments_dir / export_name)
|
|
attachment.SaveAsFile(str(export_path))
|
|
attachment_count += 1
|
|
self.app.log(f" Allegato salvato: {export_path.name}")
|
|
except Exception as exc:
|
|
self.app.log(f" Errore salvando allegato {att_index}: {exc}")
|
|
|
|
return 1, attachment_count
|
|
|
|
def _collect_exportable_attachments(self, item: object, html_body: str) -> List[Tuple[int, object, str]]:
|
|
exportable = []
|
|
attachments = self._safe_get(item, "Attachments")
|
|
if attachments is None:
|
|
return exportable
|
|
|
|
try:
|
|
attachment_total = attachments.Count
|
|
except Exception as exc:
|
|
self.app.log(f" Errore leggendo il numero di allegati: {exc}")
|
|
return exportable
|
|
|
|
for att_index in range(1, attachment_total + 1):
|
|
self.check_cancel()
|
|
try:
|
|
attachment = attachments.Item(att_index)
|
|
original_filename = self._safe_get(attachment, "FileName") or f"attachment_{att_index}"
|
|
inline_score, inline_reasons = self._inline_signature_score(
|
|
attachment, original_filename, html_body
|
|
)
|
|
if inline_score >= 3:
|
|
self.app.log(
|
|
f" Allegato inline/firma ignorato: {original_filename} | "
|
|
f"score={inline_score} | motivi={', '.join(inline_reasons)}"
|
|
)
|
|
elif NameCodec.is_allowed_attachment(original_filename):
|
|
exportable.append((att_index, attachment, original_filename))
|
|
else:
|
|
self.app.log(f" Allegato ignorato: {original_filename}")
|
|
except Exception as exc:
|
|
self.app.log(f" Errore leggendo allegato {att_index}: {exc}")
|
|
|
|
return exportable
|
|
|
|
def _inline_signature_score(self, attachment: object, filename: str, html_body: str) -> Tuple[int, List[str]]:
|
|
ext = NameCodec.sanitize_extension(os.path.splitext(filename or "")[1])
|
|
if ext not in IMAGE_ATTACHMENT_EXTENSIONS:
|
|
return 0, []
|
|
|
|
score = 0
|
|
reasons = []
|
|
html_lower = (html_body or "").lower()
|
|
filename_lower = os.path.basename(filename or "").lower()
|
|
|
|
hidden = self._get_attachment_mapi_property(attachment, MAPI_ATTACHMENT_HIDDEN)
|
|
if hidden is True:
|
|
score += 4
|
|
reasons.append("hidden")
|
|
|
|
content_id = self._get_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_ID)
|
|
if self._has_value(content_id):
|
|
cid = str(content_id).strip()
|
|
if self._html_references_cid(html_lower, cid):
|
|
score += 3
|
|
reasons.append("cid nel corpo HTML")
|
|
else:
|
|
score += 1
|
|
reasons.append("cid presente")
|
|
|
|
content_location = self._get_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_LOCATION)
|
|
if self._has_value(content_location):
|
|
location = str(content_location).strip()
|
|
if location.lower() in html_lower:
|
|
score += 2
|
|
reasons.append("content-location nel corpo HTML")
|
|
else:
|
|
score += 1
|
|
reasons.append("content-location presente")
|
|
|
|
disposition = self._get_attachment_mapi_property(attachment, MAPI_ATTACH_CONTENT_DISPOSITION)
|
|
if self._has_value(disposition) and "inline" in str(disposition).lower():
|
|
score += 3
|
|
reasons.append("disposition inline")
|
|
|
|
if filename_lower and filename_lower in html_lower:
|
|
score += 2
|
|
reasons.append("nome file nel corpo HTML")
|
|
|
|
stem = os.path.splitext(filename_lower)[0]
|
|
if INLINE_ATTACHMENT_NAME_RE.match(stem or "") is not None:
|
|
score += 2
|
|
reasons.append("nome tipico inline/firma")
|
|
|
|
return score, reasons
|
|
|
|
@staticmethod
|
|
def _html_references_cid(html_lower: str, content_id: str) -> bool:
|
|
if not content_id:
|
|
return False
|
|
cid = content_id.strip().strip("<>").lower()
|
|
if not cid:
|
|
return False
|
|
return f"cid:{cid}" in html_lower or cid in html_lower
|
|
|
|
def _get_attachment_mapi_property(self, attachment: object, schema: str):
|
|
try:
|
|
accessor = attachment.PropertyAccessor
|
|
return accessor.GetProperty(schema)
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _has_value(value: object) -> bool:
|
|
if value is None:
|
|
return False
|
|
try:
|
|
return bool(str(value).strip())
|
|
except Exception:
|
|
return True
|
|
|
|
def _get_best_mail_datetime(self, item: object) -> Optional[datetime]:
|
|
for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"):
|
|
value = self._safe_get(item, attr)
|
|
converted = self._convert_outlook_datetime(value)
|
|
if converted is not None:
|
|
if attr != "ReceivedTime":
|
|
self.app.log(f" Data ReceivedTime non disponibile, uso {attr}.")
|
|
return converted
|
|
self._log_datetime_diagnostics(item)
|
|
return None
|
|
|
|
def _log_datetime_diagnostics(self, item: object) -> None:
|
|
if self.datetime_diagnostic_count >= self.datetime_diagnostic_limit:
|
|
if self.datetime_diagnostic_count == self.datetime_diagnostic_limit:
|
|
self.app.log(
|
|
" Diagnostica date: limite raggiunto, ulteriori mail senza data non verranno dettagliate."
|
|
)
|
|
self.datetime_diagnostic_count += 1
|
|
return
|
|
|
|
self.datetime_diagnostic_count += 1
|
|
subject = self._safe_get(item, "Subject") or "(senza oggetto)"
|
|
self.app.log(f" Diagnostica date mail #{self.global_seq}: {subject}")
|
|
for label, value in self._mail_identity_details(item):
|
|
self.app.log(f" {label}: {value}")
|
|
for attr in ("ReceivedTime", "SentOn", "CreationTime", "LastModificationTime"):
|
|
ok, value_or_error = self._safe_get_with_error(item, attr)
|
|
if not ok:
|
|
self.app.log(f" {attr}: ERRORE accesso: {value_or_error}")
|
|
continue
|
|
|
|
value = value_or_error
|
|
self.app.log(
|
|
f" {attr}: tipo={type(value).__name__}; repr={repr(value)}; str={self._safe_str(value)}"
|
|
)
|
|
|
|
def _mail_identity_details(self, item: object) -> List[Tuple[str, str]]:
|
|
parent = self._safe_get(item, "Parent")
|
|
folder_name = self._safe_get(parent, "Name") if parent is not None else None
|
|
store_id = self._safe_get(parent, "StoreID") if parent is not None else None
|
|
|
|
details = [
|
|
("EntryID", self._safe_str(self._safe_get(item, "EntryID"))),
|
|
("ConversationID", self._safe_str(self._safe_get(item, "ConversationID"))),
|
|
("InternetMessageID", self._safe_str(self._get_mail_mapi_property(item, "http://schemas.microsoft.com/mapi/proptag/0x1035001F"))),
|
|
("Cartella Outlook", self._safe_str(folder_name)),
|
|
("StoreID", self._safe_str(store_id)),
|
|
]
|
|
return details
|
|
|
|
def _get_mail_mapi_property(self, item: object, schema: str):
|
|
try:
|
|
accessor = item.PropertyAccessor
|
|
return accessor.GetProperty(schema)
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _unique_path(path: Path) -> Path:
|
|
if not path.exists():
|
|
return path
|
|
|
|
stem = path.stem
|
|
suffix = path.suffix
|
|
parent = path.parent
|
|
counter = 2
|
|
while True:
|
|
candidate = parent / f"{stem}__{counter}{suffix}"
|
|
if not candidate.exists():
|
|
return candidate
|
|
counter += 1
|
|
|
|
@staticmethod
|
|
def _safe_get(obj: object, attr: str):
|
|
try:
|
|
return getattr(obj, attr)
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _safe_get_with_error(obj: object, attr: str) -> Tuple[bool, object]:
|
|
try:
|
|
return True, getattr(obj, attr)
|
|
except Exception as exc:
|
|
return False, exc
|
|
|
|
@staticmethod
|
|
def _safe_str(value: object, max_len: int = 300) -> str:
|
|
try:
|
|
text = str(value)
|
|
except Exception as exc:
|
|
text = f"<errore str: {exc}>"
|
|
if len(text) > max_len:
|
|
return text[:max_len] + "..."
|
|
return text
|
|
|
|
@staticmethod
|
|
def _convert_outlook_datetime(value) -> Optional[datetime]:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value
|
|
try:
|
|
return datetime(
|
|
value.year,
|
|
value.month,
|
|
value.day,
|
|
value.hour,
|
|
value.minute,
|
|
value.second,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
for converter in (getattr(value, "Format", None), getattr(value, "strftime", None)):
|
|
if converter is None:
|
|
continue
|
|
try:
|
|
text = str(converter("%Y-%m-%d %H:%M:%S"))
|
|
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
|
|
except Exception:
|
|
continue
|
|
|
|
try:
|
|
return datetime.fromisoformat(str(value))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
class OutlookExporterApp(ctk.CTk):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.title(APP_TITLE)
|
|
self.geometry(APP_GEOMETRY)
|
|
self.minsize(1080, 680)
|
|
self.withdraw()
|
|
|
|
ctk.set_appearance_mode("system")
|
|
ctk.set_default_color_theme("blue")
|
|
self.splash = None
|
|
|
|
self.show_startup_splash()
|
|
self.outlook_service = OutlookService()
|
|
self.exporter = Exporter(self)
|
|
self.export_thread = None
|
|
self.main_thread_id = threading.get_ident()
|
|
self.ui_queue = queue.Queue()
|
|
self.connected = False
|
|
self.tree_folder_map = {}
|
|
self.tree_path_map = {}
|
|
self.empty_folder_alert_shown = set()
|
|
self.selected_folder_obj = None
|
|
default_output_dir = Path.home() / "Desktop" / "outlook_export"
|
|
self.prefs = PreferenceStore(default_output_dir)
|
|
self.log_file_path = PreferenceStore.app_data_dir() / LOG_FILE_NAME
|
|
self._init_file_log()
|
|
self.selected_folder_display = ctk.StringVar(value="Nessuna cartella selezionata")
|
|
self.output_dir_var = ctk.StringVar(value=self.prefs.get_output_dir())
|
|
self.status_var = ctk.StringVar(value="Pronto")
|
|
self.create_zip_var = ctk.IntVar(value=1)
|
|
self.create_hierarchy_var = ctk.IntVar(value=0)
|
|
|
|
self._build_ui()
|
|
self.hide_startup_splash()
|
|
self.deiconify()
|
|
self.close_pyinstaller_splash()
|
|
self.after(100, self.process_ui_queue)
|
|
self.protocol("WM_DELETE_WINDOW", self.on_close)
|
|
|
|
def show_startup_splash(self) -> None:
|
|
splash = ctk.CTkToplevel(self)
|
|
splash.title(APP_TITLE)
|
|
splash.overrideredirect(True)
|
|
splash.resizable(False, False)
|
|
splash.attributes("-topmost", True)
|
|
|
|
width = 420
|
|
height = 180
|
|
x = max(0, int((splash.winfo_screenwidth() - width) / 2))
|
|
y = max(0, int((splash.winfo_screenheight() - height) / 2))
|
|
splash.geometry(f"{width}x{height}+{x}+{y}")
|
|
splash.grid_columnconfigure(0, weight=1)
|
|
splash.grid_rowconfigure(0, weight=1)
|
|
|
|
content = ctk.CTkFrame(splash)
|
|
content.grid(row=0, column=0, sticky="nsew", padx=12, pady=12)
|
|
content.grid_columnconfigure(0, weight=1)
|
|
|
|
ctk.CTkLabel(
|
|
content,
|
|
text="MailExporter",
|
|
font=ctk.CTkFont(size=26, weight="bold"),
|
|
).grid(row=0, column=0, pady=(24, 8))
|
|
ctk.CTkLabel(content, text="Avvio in corso...").grid(row=1, column=0, pady=(0, 18))
|
|
|
|
progress = ctk.CTkProgressBar(content, mode="indeterminate")
|
|
progress.grid(row=2, column=0, sticky="ew", padx=34, pady=(0, 18))
|
|
progress.start()
|
|
|
|
self.splash = splash
|
|
self.update_idletasks()
|
|
self.update()
|
|
|
|
def hide_startup_splash(self) -> None:
|
|
if self.splash is not None:
|
|
try:
|
|
self.splash.destroy()
|
|
except Exception:
|
|
pass
|
|
self.splash = None
|
|
|
|
@staticmethod
|
|
def close_pyinstaller_splash() -> None:
|
|
try:
|
|
import pyi_splash
|
|
|
|
pyi_splash.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _build_ui(self) -> None:
|
|
self.grid_columnconfigure(0, weight=1)
|
|
self.grid_rowconfigure(1, weight=1)
|
|
|
|
top = ctk.CTkFrame(self)
|
|
top.grid(row=0, column=0, sticky="nsew", padx=12, pady=(12, 6))
|
|
top.grid_columnconfigure(3, weight=1)
|
|
|
|
self.btn_connect = ctk.CTkButton(top, text="Connetti a Outlook gia aperto", command=self.connect_outlook)
|
|
self.btn_connect.grid(row=0, column=0, padx=8, pady=8)
|
|
|
|
self.btn_refresh = ctk.CTkButton(top, text="Ricarica cartelle", command=self.reload_tree, state="disabled")
|
|
self.btn_refresh.grid(row=0, column=1, padx=8, pady=8)
|
|
|
|
self.btn_expand = ctk.CTkButton(top, text="Espandi tutto", command=self.expand_all, state="disabled")
|
|
self.btn_expand.grid(row=0, column=2, padx=8, pady=8)
|
|
|
|
lbl_selected = ctk.CTkLabel(top, textvariable=self.selected_folder_display, anchor="w")
|
|
lbl_selected.grid(row=0, column=3, padx=8, pady=8, sticky="ew")
|
|
|
|
main_pane = ttk.Panedwindow(self, orient="horizontal")
|
|
main_pane.grid(row=1, column=0, sticky="nsew", padx=12, pady=(6, 12))
|
|
|
|
left = ctk.CTkFrame(main_pane)
|
|
left.grid_rowconfigure(1, weight=1)
|
|
left.grid_columnconfigure(0, weight=1)
|
|
|
|
ctk.CTkLabel(left, text="Albero cartelle Outlook", font=ctk.CTkFont(size=18, weight="bold")).grid(
|
|
row=0, column=0, sticky="w", padx=10, pady=(10, 4)
|
|
)
|
|
|
|
tree_container = ctk.CTkFrame(left)
|
|
tree_container.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4, 10))
|
|
tree_container.grid_rowconfigure(0, weight=1)
|
|
tree_container.grid_columnconfigure(0, weight=1)
|
|
|
|
style = ttk.Style()
|
|
try:
|
|
style.theme_use("default")
|
|
except Exception:
|
|
pass
|
|
style.configure("Treeview", rowheight=24)
|
|
|
|
self.tree = ttk.Treeview(tree_container, show="tree")
|
|
self.tree.grid(row=0, column=0, sticky="nsew")
|
|
self.tree.bind("<<TreeviewOpen>>", self.on_tree_open)
|
|
self.tree.bind("<<TreeviewSelect>>", self.on_tree_select)
|
|
|
|
yscroll = ttk.Scrollbar(tree_container, orient="vertical", command=self.tree.yview)
|
|
yscroll.grid(row=0, column=1, sticky="ns")
|
|
self.tree.configure(yscrollcommand=yscroll.set)
|
|
|
|
right = ctk.CTkFrame(main_pane)
|
|
right.grid_rowconfigure(1, weight=1)
|
|
right.grid_columnconfigure(0, weight=1)
|
|
|
|
main_pane.add(left, weight=3)
|
|
main_pane.add(right, weight=2)
|
|
|
|
right_top = ctk.CTkFrame(right)
|
|
right_top.grid(row=0, column=0, sticky="nsew", padx=0, pady=(0, 6))
|
|
right_top.grid_columnconfigure(1, weight=1)
|
|
|
|
ctk.CTkLabel(right_top, text="Esportazione", font=ctk.CTkFont(size=18, weight="bold")).grid(
|
|
row=0, column=0, columnspan=3, sticky="w", padx=10, pady=(10, 8)
|
|
)
|
|
|
|
ctk.CTkLabel(right_top, text="Cartella di output:").grid(row=1, column=0, padx=10, pady=6, sticky="w")
|
|
self.entry_output = ctk.CTkEntry(right_top, textvariable=self.output_dir_var)
|
|
self.entry_output.grid(row=1, column=1, padx=10, pady=6, sticky="ew")
|
|
self.btn_browse = ctk.CTkButton(right_top, text="Sfoglia", width=110, command=self.choose_output_dir)
|
|
self.btn_browse.grid(row=1, column=2, padx=10, pady=6)
|
|
|
|
help_text = (
|
|
"La cartella selezionata verrà esportata insieme a tutte le sue sottocartelle.\n"
|
|
"Ogni email verrà salvata in TXT e gli allegati verranno salvati nel formato originale."
|
|
)
|
|
ctk.CTkLabel(right_top, text=help_text, justify="left").grid(
|
|
row=2, column=0, columnspan=3, padx=10, pady=(4, 8), sticky="w"
|
|
)
|
|
|
|
options = ctk.CTkFrame(right_top, fg_color="transparent")
|
|
options.grid(row=3, column=0, columnspan=3, sticky="ew", padx=10, pady=(4, 8))
|
|
options.grid_columnconfigure(0, weight=1)
|
|
options.grid_columnconfigure(1, weight=1)
|
|
|
|
self.chk_zip = ctk.CTkCheckBox(
|
|
options,
|
|
text="Crea zip",
|
|
variable=self.create_zip_var,
|
|
command=lambda: self.on_export_mode_change("zip"),
|
|
)
|
|
self.chk_zip.grid(row=0, column=0, sticky="w", padx=(0, 12))
|
|
|
|
self.chk_hierarchy = ctk.CTkCheckBox(
|
|
options,
|
|
text="Crea gerarchia",
|
|
variable=self.create_hierarchy_var,
|
|
command=lambda: self.on_export_mode_change("hierarchy"),
|
|
)
|
|
self.chk_hierarchy.grid(row=0, column=1, sticky="w", padx=(12, 0))
|
|
|
|
self.btn_export = ctk.CTkButton(
|
|
right_top,
|
|
text="Esporta cartella selezionata",
|
|
command=self.start_export,
|
|
state="disabled",
|
|
height=36,
|
|
)
|
|
self.btn_export.grid(row=4, column=0, columnspan=2, padx=10, pady=(8, 10), sticky="ew")
|
|
|
|
self.btn_cancel = ctk.CTkButton(
|
|
right_top,
|
|
text="Annulla esportazione",
|
|
command=self.cancel_export,
|
|
state="disabled",
|
|
fg_color="#9b2c2c",
|
|
hover_color="#7f1d1d",
|
|
)
|
|
self.btn_cancel.grid(row=4, column=2, padx=10, pady=(8, 10), sticky="ew")
|
|
|
|
right_bottom = ctk.CTkFrame(right)
|
|
right_bottom.grid(row=1, column=0, sticky="nsew", padx=0, pady=(6, 0))
|
|
right_bottom.grid_rowconfigure(1, weight=1)
|
|
right_bottom.grid_columnconfigure(0, weight=1)
|
|
|
|
ctk.CTkLabel(right_bottom, text="Log", font=ctk.CTkFont(size=18, weight="bold")).grid(
|
|
row=0, column=0, sticky="w", padx=10, pady=(10, 4)
|
|
)
|
|
|
|
self.log_box = ctk.CTkTextbox(right_bottom)
|
|
self.log_box.grid(row=1, column=0, sticky="nsew", padx=10, pady=(4, 10))
|
|
self.log_box.configure(state="disabled")
|
|
|
|
status = ctk.CTkFrame(self)
|
|
status.grid(row=2, column=0, sticky="ew", padx=12, pady=(0, 12))
|
|
status.grid_columnconfigure(0, weight=1)
|
|
status.grid_columnconfigure(1, weight=0)
|
|
ctk.CTkLabel(status, textvariable=self.status_var, anchor="w").grid(
|
|
row=0, column=0, sticky="ew", padx=10, pady=8
|
|
)
|
|
self.btn_exit = ctk.CTkButton(status, text="Esci", width=100, command=self.on_close)
|
|
self.btn_exit.grid(row=0, column=1, sticky="e", padx=10, pady=8)
|
|
|
|
def connect_outlook(self) -> None:
|
|
try:
|
|
self.set_status("Connessione a Outlook in corso...")
|
|
self.log("Connessione a Outlook...")
|
|
self.outlook_service.connect()
|
|
self.connected = True
|
|
self.log("Connesso a Outlook.")
|
|
self.reload_tree()
|
|
self.btn_refresh.configure(state="normal")
|
|
self.btn_expand.configure(state="normal")
|
|
self.set_status("Connesso a Outlook")
|
|
except Exception as exc:
|
|
self.connected = False
|
|
self.log(f"Errore connessione Outlook: {exc}")
|
|
self.set_status("Errore di connessione")
|
|
messagebox.showerror(APP_TITLE, f"Impossibile collegarsi a Outlook.\n\n{exc}")
|
|
|
|
def reload_tree(self) -> None:
|
|
if not self.connected:
|
|
return
|
|
self.tree.delete(*self.tree.get_children())
|
|
self.tree_folder_map.clear()
|
|
self.tree_path_map.clear()
|
|
self.empty_folder_alert_shown.clear()
|
|
self.selected_folder_obj = None
|
|
self.selected_folder_display.set("Nessuna cartella selezionata")
|
|
self.btn_export.configure(state="disabled")
|
|
|
|
for root_node in self.outlook_service.list_root_folders():
|
|
node_text = NameCodec.tree_label(root_node.display_name or root_node.path, root_node.mail_count)
|
|
node_id = self.tree.insert("", "end", text=node_text, open=False)
|
|
self.tree_folder_map[node_id] = root_node.folder_obj
|
|
self.tree_path_map[node_id] = root_node.path
|
|
self._insert_dummy(node_id)
|
|
|
|
self.log("Albero cartelle ricaricato.")
|
|
|
|
def _insert_dummy(self, node_id: str) -> None:
|
|
self.tree.insert(node_id, "end", text="__dummy__")
|
|
|
|
def on_tree_open(self, _event=None) -> None:
|
|
selected = self.tree.focus()
|
|
if not selected:
|
|
return
|
|
children = self.tree.get_children(selected)
|
|
if len(children) == 1 and self.tree.item(children[0], "text") == "__dummy__":
|
|
self.tree.delete(children[0])
|
|
folder_obj = self.tree_folder_map.get(selected)
|
|
current_path = self.tree_path_map.get(selected, self.tree.item(selected, "text"))
|
|
try:
|
|
child_nodes = self.outlook_service.list_children(folder_obj, current_path)
|
|
for node in child_nodes:
|
|
node_text = NameCodec.tree_label(node.display_name or node.path, node.mail_count)
|
|
child_id = self.tree.insert(selected, "end", text=node_text, open=False)
|
|
self.tree_folder_map[child_id] = node.folder_obj
|
|
self.tree_path_map[child_id] = node.path
|
|
try:
|
|
if node.folder_obj.Folders.Count > 0:
|
|
self._insert_dummy(child_id)
|
|
except Exception:
|
|
pass
|
|
except Exception as exc:
|
|
self.log(f"Errore caricando sottocartelle: {exc}")
|
|
|
|
def on_tree_select(self, _event=None) -> None:
|
|
selected = self.tree.selection()
|
|
if not selected:
|
|
return
|
|
node_id = selected[0]
|
|
folder_obj = self.tree_folder_map.get(node_id)
|
|
self.selected_folder_obj = folder_obj
|
|
folder_path = self.tree_path_map.get(node_id, self.tree.item(node_id, "text"))
|
|
if folder_obj is None:
|
|
self.selected_folder_display.set("Nessuna cartella selezionata")
|
|
self.btn_export.configure(state="disabled")
|
|
return
|
|
|
|
self.selected_folder_display.set(f"Selezionata: {folder_path}")
|
|
self.btn_export.configure(state="normal")
|
|
self.maybe_show_empty_folder_server_alert(node_id, folder_obj)
|
|
|
|
def maybe_show_empty_folder_server_alert(self, node_id: str, folder_obj: object) -> None:
|
|
if node_id in self.empty_folder_alert_shown:
|
|
return
|
|
|
|
mail_count = OutlookService.count_mail_items(folder_obj)
|
|
if mail_count != 0:
|
|
return
|
|
|
|
self.empty_folder_alert_shown.add(node_id)
|
|
self.show_empty_folder_server_alert()
|
|
|
|
def show_empty_folder_server_alert(self) -> None:
|
|
dialog = ctk.CTkToplevel(self)
|
|
dialog.title("Cartella Outlook vuota")
|
|
dialog.geometry("720x430")
|
|
dialog.minsize(640, 380)
|
|
dialog.transient(self)
|
|
dialog.grab_set()
|
|
|
|
dialog.grid_columnconfigure(0, weight=0)
|
|
dialog.grid_columnconfigure(1, weight=1)
|
|
dialog.grid_rowconfigure(1, weight=1)
|
|
|
|
alert = ctk.CTkLabel(
|
|
dialog,
|
|
text="!",
|
|
width=72,
|
|
height=72,
|
|
corner_radius=36,
|
|
fg_color="#b45309",
|
|
text_color="white",
|
|
font=ctk.CTkFont(size=42, weight="bold"),
|
|
)
|
|
alert.grid(row=0, column=0, rowspan=2, sticky="n", padx=(24, 16), pady=(24, 12))
|
|
|
|
ctk.CTkLabel(
|
|
dialog,
|
|
text="La cartella selezionata risulta vuota",
|
|
font=ctk.CTkFont(size=22, weight="bold"),
|
|
anchor="w",
|
|
).grid(row=0, column=1, sticky="ew", padx=(0, 24), pady=(26, 8))
|
|
|
|
message = (
|
|
"La cartella selezionata risulta vuota, ma potrebbero esserci email sul server di Outlook "
|
|
"che non hai sincronizzato.\n\n"
|
|
"Per sincronizzare vai su:\n\n"
|
|
"File -> Impostazioni account -> Impostazioni di sincronizzazione e Account -> Tutto\n\n"
|
|
"Poi attendi la sincronizzazione completa e riprova l'esportazione.\n\n"
|
|
"Nota: se Outlook mostra un link tipo 'ci sono altri elementi sul server', il programma puo "
|
|
"vedere solo le email gia sincronizzate localmente."
|
|
)
|
|
ctk.CTkLabel(dialog, text=message, justify="left", anchor="nw", wraplength=560).grid(
|
|
row=1, column=1, sticky="nsew", padx=(0, 24), pady=(0, 16)
|
|
)
|
|
|
|
buttons = ctk.CTkFrame(dialog, fg_color="transparent")
|
|
buttons.grid(row=2, column=0, columnspan=2, sticky="ew", padx=24, pady=(0, 24))
|
|
buttons.grid_columnconfigure(0, weight=1)
|
|
buttons.grid_columnconfigure(1, weight=0)
|
|
buttons.grid_columnconfigure(2, weight=0)
|
|
|
|
def open_outlook() -> None:
|
|
try:
|
|
os.startfile("outlook")
|
|
except Exception as exc:
|
|
messagebox.showerror(APP_TITLE, f"Impossibile aprire Outlook.\n\n{exc}")
|
|
|
|
ctk.CTkButton(buttons, text="Apri Outlook", width=140, command=open_outlook).grid(
|
|
row=0, column=1, sticky="e", padx=(0, 12)
|
|
)
|
|
ctk.CTkButton(
|
|
buttons,
|
|
text="Chiudi",
|
|
width=120,
|
|
command=dialog.destroy,
|
|
fg_color="#5f6368",
|
|
hover_color="#4b4f52",
|
|
).grid(row=0, column=2, sticky="e")
|
|
|
|
dialog.protocol("WM_DELETE_WINDOW", dialog.destroy)
|
|
dialog.after(100, dialog.focus_force)
|
|
|
|
def expand_all(self) -> None:
|
|
def _expand(node_id: str) -> None:
|
|
self.tree.item(node_id, open=True)
|
|
self.tree.focus(node_id)
|
|
self.on_tree_open()
|
|
for child in self.tree.get_children(node_id):
|
|
if self.tree.item(child, "text") != "__dummy__":
|
|
_expand(child)
|
|
|
|
for root in self.tree.get_children(""):
|
|
_expand(root)
|
|
self.log("Espansione albero completata.")
|
|
|
|
def choose_output_dir(self) -> None:
|
|
selected = filedialog.askdirectory(initialdir=self.output_dir_var.get() or str(Path.home()))
|
|
if selected:
|
|
self.output_dir_var.set(selected)
|
|
self.prefs.set_output_dir(selected)
|
|
|
|
def on_export_mode_change(self, selected_mode: str) -> None:
|
|
if selected_mode == "zip":
|
|
self.create_zip_var.set(1)
|
|
self.create_hierarchy_var.set(0)
|
|
else:
|
|
self.create_zip_var.set(0)
|
|
self.create_hierarchy_var.set(1)
|
|
|
|
if not self.create_zip_var.get() and not self.create_hierarchy_var.get():
|
|
self.create_zip_var.set(1)
|
|
|
|
def start_export(self) -> None:
|
|
if self.export_thread is not None and self.export_thread.is_alive():
|
|
messagebox.showwarning(APP_TITLE, "Una esportazione e gia in corso.")
|
|
return
|
|
|
|
if self.selected_folder_obj is None:
|
|
messagebox.showwarning(APP_TITLE, "Seleziona prima una cartella Outlook.")
|
|
return
|
|
|
|
output_dir = self.output_dir_var.get().strip()
|
|
if not output_dir:
|
|
messagebox.showwarning(APP_TITLE, "Scegli una cartella di output valida.")
|
|
return
|
|
output_root = Path(output_dir)
|
|
|
|
try:
|
|
output_root.mkdir(parents=True, exist_ok=True)
|
|
except Exception as exc:
|
|
messagebox.showerror(APP_TITLE, f"Impossibile creare la cartella di output.\n\n{exc}")
|
|
return
|
|
self.prefs.set_output_dir(str(output_root))
|
|
|
|
folder_text = self.selected_folder_display.get().replace("Selezionata: ", "")
|
|
relative_path = NameCodec.folder_name_from_outlook_path(folder_text)
|
|
zip_mode = bool(self.create_zip_var.get())
|
|
if zip_mode:
|
|
confirm_message = (
|
|
"Verra creato un archivio ZIP nella cartella di output.\n"
|
|
"La gerarchia temporanea usata per preparare l'archivio verra rimossa al termine.\n\n"
|
|
"Vuoi continuare?"
|
|
)
|
|
else:
|
|
confirm_message = (
|
|
"Verra creata una gerarchia di cartelle nella cartella di output.\n\n"
|
|
"Vuoi continuare?"
|
|
)
|
|
if not messagebox.askyesno(APP_TITLE, confirm_message):
|
|
return
|
|
|
|
self._set_export_controls_running()
|
|
self.exporter = Exporter(self)
|
|
|
|
self.log("=" * 80)
|
|
self.log(f"Avvio esportazione: {folder_text}")
|
|
self.log(f"Modalita: {'ZIP' if zip_mode else 'Gerarchia'}")
|
|
self.set_status("Esportazione in corso...")
|
|
|
|
self.export_thread = threading.Thread(
|
|
target=self._run_export_worker,
|
|
args=(
|
|
self._marshal_folder_for_worker(self.selected_folder_obj),
|
|
output_root,
|
|
relative_path,
|
|
folder_text,
|
|
zip_mode,
|
|
),
|
|
daemon=True,
|
|
)
|
|
self.export_thread.start()
|
|
|
|
def cancel_export(self) -> None:
|
|
self.exporter.cancel()
|
|
self.set_status("Richiesta annullamento in corso...")
|
|
self.log("Richiesta di annullamento inviata.")
|
|
|
|
def _set_export_controls_running(self) -> None:
|
|
self.btn_export.configure(state="disabled")
|
|
self.btn_cancel.configure(state="normal")
|
|
self.btn_refresh.configure(state="disabled")
|
|
self.btn_connect.configure(state="disabled")
|
|
self.btn_expand.configure(state="disabled")
|
|
self.chk_zip.configure(state="disabled")
|
|
self.chk_hierarchy.configure(state="disabled")
|
|
|
|
def _restore_export_controls(self) -> None:
|
|
self.btn_export.configure(state="normal" if self.selected_folder_obj else "disabled")
|
|
self.btn_cancel.configure(state="disabled")
|
|
self.btn_refresh.configure(state="normal" if self.connected else "disabled")
|
|
self.btn_connect.configure(state="normal")
|
|
self.btn_expand.configure(state="normal" if self.connected else "disabled")
|
|
self.chk_zip.configure(state="normal")
|
|
self.chk_hierarchy.configure(state="normal")
|
|
|
|
def _run_export_worker(
|
|
self,
|
|
folder_ref: object,
|
|
output_root: Path,
|
|
relative_path: str,
|
|
folder_text: str,
|
|
zip_mode: bool,
|
|
) -> None:
|
|
com_initialized = False
|
|
try:
|
|
if pythoncom is not None:
|
|
pythoncom.CoInitialize()
|
|
com_initialized = True
|
|
folder_obj = self._resolve_worker_folder(folder_ref)
|
|
|
|
if zip_mode:
|
|
with tempfile.TemporaryDirectory(prefix="outlook_export_") as temp_dir:
|
|
temp_root = Path(temp_dir)
|
|
temp_export_target = temp_root / relative_path
|
|
self.log(f"Destinazione temporanea: {temp_export_target}")
|
|
mails, attachments = self.exporter.export_folder(folder_obj, temp_export_target)
|
|
self.exporter.check_cancel()
|
|
zip_path = self._create_zip_archive(temp_root, output_root, folder_text)
|
|
result_path = zip_path
|
|
else:
|
|
export_target = output_root / relative_path
|
|
self.log(f"Destinazione: {export_target}")
|
|
mails, attachments = self.exporter.export_folder(folder_obj, export_target)
|
|
result_path = export_target
|
|
|
|
self.ui_queue.put(("complete", mails, attachments, result_path, zip_mode))
|
|
except ExportCancelled:
|
|
self.ui_queue.put(("cancelled",))
|
|
except Exception as exc:
|
|
self.ui_queue.put(("error", str(exc), traceback.format_exc()))
|
|
finally:
|
|
if com_initialized:
|
|
try:
|
|
pythoncom.CoUninitialize()
|
|
except Exception:
|
|
pass
|
|
|
|
@staticmethod
|
|
def _marshal_folder_for_worker(folder_obj: object) -> object:
|
|
if pythoncom is None:
|
|
return folder_obj
|
|
try:
|
|
ole_obj = getattr(folder_obj, "_oleobj_", None)
|
|
if ole_obj is None:
|
|
return folder_obj
|
|
return pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, ole_obj)
|
|
except Exception:
|
|
return folder_obj
|
|
|
|
@staticmethod
|
|
def _resolve_worker_folder(folder_ref: object) -> object:
|
|
if pythoncom is None or win32com is None:
|
|
return folder_ref
|
|
try:
|
|
dispatch = pythoncom.CoGetInterfaceAndReleaseStream(folder_ref, pythoncom.IID_IDispatch)
|
|
return win32com.client.Dispatch(dispatch)
|
|
except Exception:
|
|
return folder_ref
|
|
|
|
def _create_zip_archive(self, source_root: Path, output_root: Path, folder_text: str) -> Path:
|
|
stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
base_name = NameCodec.archive_base_name_from_outlook_path(folder_text)
|
|
zip_path = Exporter._unique_path(output_root / f"{base_name}_{stamp}.zip")
|
|
self.log(f"Creazione archivio ZIP: {zip_path}")
|
|
|
|
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
|
|
for path in source_root.rglob("*"):
|
|
self.exporter.check_cancel()
|
|
if path.is_file():
|
|
archive.write(path, path.relative_to(source_root).as_posix())
|
|
|
|
return zip_path
|
|
|
|
def process_ui_queue(self) -> None:
|
|
try:
|
|
while True:
|
|
event = self.ui_queue.get_nowait()
|
|
event_type = event[0]
|
|
if event_type == "log":
|
|
self._write_log(event[1])
|
|
elif event_type == "status":
|
|
self.status_var.set(event[1])
|
|
elif event_type == "complete":
|
|
_, mails, attachments, result_path, zip_mode = event
|
|
self.set_status(f"Esportazione completata | mail: {mails} | allegati: {attachments}")
|
|
self.log(f"Esportazione completata. Mail: {mails}, Allegati: {attachments}")
|
|
self._restore_export_controls()
|
|
self.show_export_complete_dialog(mails, attachments, result_path, zip_mode)
|
|
elif event_type == "cancelled":
|
|
self.set_status("Esportazione annullata")
|
|
self.log("Esportazione annullata dall'utente.")
|
|
self._restore_export_controls()
|
|
messagebox.showinfo(APP_TITLE, "Esportazione annullata.")
|
|
elif event_type == "error":
|
|
_, message, details = event
|
|
self.set_status("Errore durante l'esportazione")
|
|
self.log(f"Errore esportazione: {message}")
|
|
self.log(details)
|
|
self._restore_export_controls()
|
|
messagebox.showerror(APP_TITLE, f"Errore durante l'esportazione.\n\n{message}")
|
|
except queue.Empty:
|
|
pass
|
|
finally:
|
|
self.after(100, self.process_ui_queue)
|
|
|
|
def show_export_complete_dialog(self, mails: int, attachments: int, export_target: Path, zip_mode: bool) -> None:
|
|
dialog = ctk.CTkToplevel(self)
|
|
dialog.title("Esportazione completata")
|
|
dialog.geometry("520x260")
|
|
dialog.resizable(False, False)
|
|
dialog.transient(self)
|
|
dialog.grab_set()
|
|
|
|
dialog.grid_columnconfigure(0, weight=1)
|
|
dialog.grid_rowconfigure(1, weight=1)
|
|
|
|
ctk.CTkLabel(
|
|
dialog,
|
|
text="Esportazione completata",
|
|
font=ctk.CTkFont(size=20, weight="bold"),
|
|
).grid(row=0, column=0, sticky="w", padx=20, pady=(20, 8))
|
|
|
|
summary = (
|
|
f"Mail esportate: {mails}\n"
|
|
f"Allegati esportati: {attachments}\n\n"
|
|
f"{'Archivio ZIP' if zip_mode else 'Cartella'}:\n{export_target}"
|
|
)
|
|
ctk.CTkLabel(dialog, text=summary, justify="left", anchor="w").grid(
|
|
row=1, column=0, sticky="nsew", padx=20, pady=(0, 16)
|
|
)
|
|
buttons = ctk.CTkFrame(dialog, fg_color="transparent")
|
|
buttons.grid(row=2, column=0, sticky="ew", padx=20, pady=(0, 20))
|
|
buttons.grid_columnconfigure(0, weight=1)
|
|
buttons.grid_columnconfigure(1, weight=1)
|
|
|
|
def open_export_folder() -> None:
|
|
try:
|
|
target = export_target.parent if zip_mode else export_target
|
|
os.startfile(str(target))
|
|
except Exception as exc:
|
|
messagebox.showerror(APP_TITLE, f"Impossibile aprire la destinazione di export.\n\n{exc}")
|
|
finally:
|
|
dialog.destroy()
|
|
|
|
ctk.CTkButton(
|
|
buttons,
|
|
text="Vai alla cartella di zip" if zip_mode else "Vai alla cartella di export",
|
|
command=open_export_folder,
|
|
height=38,
|
|
).grid(row=0, column=0, sticky="ew", padx=(0, 8))
|
|
|
|
ctk.CTkButton(
|
|
buttons,
|
|
text="Chiudi",
|
|
command=dialog.destroy,
|
|
height=38,
|
|
fg_color="#5f6368",
|
|
hover_color="#4b4f52",
|
|
).grid(row=0, column=1, sticky="ew", padx=(8, 0))
|
|
|
|
dialog.protocol("WM_DELETE_WINDOW", dialog.destroy)
|
|
dialog.after(100, dialog.focus_force)
|
|
|
|
def log(self, message: str) -> None:
|
|
if threading.get_ident() != self.main_thread_id:
|
|
self.ui_queue.put(("log", message))
|
|
return
|
|
|
|
self._write_log(message)
|
|
|
|
def _init_file_log(self) -> None:
|
|
try:
|
|
self.log_file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
header = (
|
|
f"MailExporter log avviato: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
|
f"File log: {self.log_file_path}\n"
|
|
"=" * 80
|
|
+ "\n"
|
|
)
|
|
self.log_file_path.write_text(header, encoding="utf-8", errors="replace")
|
|
except Exception:
|
|
pass
|
|
|
|
def _write_log(self, message: str) -> None:
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
line = f"[{timestamp}] {message}\n"
|
|
self._append_file_log(line)
|
|
self.log_box.configure(state="normal")
|
|
self.log_box.insert("end", line)
|
|
current_text = self.log_box.get("1.0", "end")
|
|
lines = current_text.splitlines()
|
|
if len(lines) > MAX_LOG_LINES:
|
|
trimmed = "\n".join(lines[-MAX_LOG_LINES:]) + "\n"
|
|
self.log_box.delete("1.0", "end")
|
|
self.log_box.insert("1.0", trimmed)
|
|
self.log_box.see("end")
|
|
self.log_box.configure(state="disabled")
|
|
self.pump_ui()
|
|
|
|
def _append_file_log(self, line: str) -> None:
|
|
try:
|
|
self.log_file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with self.log_file_path.open("a", encoding="utf-8", errors="replace") as fh:
|
|
fh.write(line)
|
|
except Exception:
|
|
pass
|
|
|
|
def set_status(self, message: str) -> None:
|
|
if threading.get_ident() != self.main_thread_id:
|
|
self.ui_queue.put(("status", message))
|
|
return
|
|
|
|
self.status_var.set(message)
|
|
self.pump_ui()
|
|
|
|
def pump_ui(self) -> None:
|
|
if threading.get_ident() != self.main_thread_id:
|
|
return
|
|
|
|
try:
|
|
self.update_idletasks()
|
|
self.update()
|
|
except Exception:
|
|
pass
|
|
|
|
def on_close(self) -> None:
|
|
try:
|
|
self.outlook_service.disconnect()
|
|
finally:
|
|
self.destroy()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if sys.platform != "win32":
|
|
raise SystemExit("Questo programma funziona solo su Windows.")
|
|
|
|
app = OutlookExporterApp()
|
|
app.mainloop()
|