Files
cruciverba_1/enrich_review_from_wiktionary.py

679 lines
24 KiB
Python

from __future__ import annotations
import argparse
import json
import re
import time
import urllib.parse
import urllib.request
import urllib.error
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
from refine_lexicon_topics import REFINED_LEXICON_OUTPUT_PATH
REVIEW_INPUT_PATH = Path(__file__).with_name("to_be_review.json")
WIKTIONARY_CACHE_PATH = Path(__file__).with_name(".wiktionary_cache.json")
WIKTIONARY_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_refined_plus_wiktionary.json")
WIKTIONARY_API_URL = "https://it.wiktionary.org/w/api.php"
DEFAULT_REVIEW_REASONS = {"no_viable_definition", "only_general_topics", "babelnet_ambiguous"}
POS_ALIASES = {
"sostantivo": "NOUN",
"nome": "NOUN",
"sost": "NOUN",
"aggettivo": "ADJ",
"agg": "ADJ",
"verbo": "VERB",
"verb": "VERB",
"verb form": "VERB_FORM",
"avverbio": "ADV",
"avv": "ADV",
"preposizione": "PREP",
"prep": "PREP",
"congiunzione": "CONJ",
"cong": "CONJ",
"pronome": "PRON",
"pron": "PRON",
"articolo": "ART",
"interiezione": "INTJ",
"inter": "INTJ",
"locuzione": "PHRASE",
"loc": "PHRASE",
}
TOPIC_KEYWORDS = {
"religion": ("religione", "cattolic", "sacro", "sacra", "devozion", "scapolare", "abbazia", "monastero"),
"clothing": ("abito", "vestito", "vestit", "abbigliamento", "indumento", "stoffa"),
"grammar": ("diminutivo", "voce verbale", "congiuntivo", "plurale", "singolare", "grammatica", "verbo"),
"geography": ("comune", "paese", "regione", "provincia", "citta", "localita", "frazione"),
"transport": ("veicolo", "motore", "treno", "aereo", "trasporto", "nave", "imbarcazione"),
"health": ("medicina", "ospedale", "malattia", "cura", "feriti", "ammalati", "sanitario"),
}
GRAMMAR_KEYWORDS = (
"diminutivo",
"accrescitivo",
"peggiorativo",
"alterato",
"voce verbale",
"congiuntivo",
"participio",
"plurale",
"singolare",
"maschile",
"femminile",
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Arricchisce le voci problematiche del lessico refined con definizioni e metadati "
"estratti da it.wiktionary.org."
)
)
parser.add_argument(
"--input",
type=Path,
default=REFINED_LEXICON_OUTPUT_PATH,
help="Lessico refined di partenza.",
)
parser.add_argument(
"--review",
type=Path,
default=REVIEW_INPUT_PATH,
help="File to_be_review.json da usare per selezionare le voci prioritarie.",
)
parser.add_argument(
"--output",
type=Path,
default=WIKTIONARY_OUTPUT_PATH,
help="Nuovo lessico con blocco wiktionary aggiunto.",
)
parser.add_argument(
"--cache",
type=Path,
default=WIKTIONARY_CACHE_PATH,
help="Cache locale delle risposte Wiktionary.",
)
parser.add_argument(
"--word-limit",
type=int,
default=0,
help="Limite massimo di parole da elaborare. 0 = tutte le candidate.",
)
parser.add_argument(
"--sleep",
type=float,
default=1.0,
help="Pausa tra le richieste HTTP a Wiktionary.",
)
parser.add_argument(
"--save-every",
type=int,
default=25,
help="Salva cache e output ogni N parole elaborate per non perdere progresso.",
)
parser.add_argument(
"--retry-429",
type=int,
default=3,
help="Numero massimo di tentativi aggiuntivi se Wiktionary risponde HTTP 429.",
)
parser.add_argument(
"--backoff-429",
type=float,
default=30.0,
help="Secondi di attesa iniziali dopo un HTTP 429; raddoppiano a ogni nuovo tentativo.",
)
parser.add_argument(
"--stop-on-429",
action="store_true",
help="Se attivo, al primo HTTP 429 salva lo stato e interrompe il batch senza altri tentativi.",
)
parser.add_argument(
"--words",
default="",
help="Lista separata da virgole di lemmi specifici da arricchire.",
)
parser.add_argument(
"--review-reasons",
default=",".join(sorted(DEFAULT_REVIEW_REASONS)),
help="Motivi del file review da trattare con priorita, separati da virgole.",
)
parser.add_argument(
"--api-url",
default=WIKTIONARY_API_URL,
help="Endpoint MediaWiki Action API di Wiktionary.",
)
parser.add_argument(
"--skip-existing",
action="store_true",
help="Salta le voci che nel lessico di input hanno già un blocco wiktionary con stato utile.",
)
return parser.parse_args()
def load_json(path: Path, default: object) -> object:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def write_json(path: Path, payload: object) -> None:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def parse_csv_set(value: str) -> set[str]:
return {item.strip().lower() for item in str(value or "").split(",") if item.strip()}
def entry_key(entry: Dict[str, object]) -> Tuple[str, str]:
form = str(entry.get("normalized_form") or entry.get("form") or "").strip().lower()
pos = str(entry.get("pos") or "").strip().upper()
return form, pos
def fetch_wikitext(title: str, api_url: str) -> Dict[str, object]:
params = {
"action": "query",
"prop": "revisions",
"titles": title,
"rvprop": "content",
"rvslots": "main",
"formatversion": "2",
"format": "json",
}
url = f"{api_url}?{urllib.parse.urlencode(params)}"
request = urllib.request.Request(
url,
headers={
"User-Agent": "cruciverba-alpha/0.1 (local lexical enrichment)",
"Accept": "application/json",
},
)
with urllib.request.urlopen(request, timeout=30) as response:
payload = json.loads(response.read().decode("utf-8"))
pages = ((payload.get("query") or {}).get("pages") or [])
if not pages:
return {"status": "missing"}
page = pages[0]
if page.get("missing"):
return {"status": "missing", "title": page.get("title", title)}
revisions = page.get("revisions") or []
content = ""
if revisions:
slots = revisions[0].get("slots") or {}
main_slot = slots.get("main") or {}
content = str(main_slot.get("content") or "")
return {
"status": "ok" if content else "empty",
"title": page.get("title", title),
"pageid": page.get("pageid"),
"wikitext": content,
}
def fetch_wikitext_with_retry(title: str, args: argparse.Namespace) -> Dict[str, object]:
attempts = 0
delay = max(1.0, float(args.backoff_429))
while True:
try:
return fetch_wikitext(title, args.api_url)
except urllib.error.HTTPError as exc:
if exc.code != 429:
raise
if args.stop_on_429:
raise
if attempts >= max(0, int(args.retry_429)):
raise
attempts += 1
print(f"[429] {title}: attendo {delay:.1f}s prima del tentativo {attempts}/{args.retry_429}")
time.sleep(delay)
delay *= 2
def normalize_heading(text: str) -> str:
raw = str(text or "").strip().lower().replace(" ", "")
if raw == "{{-it-}}":
return "{{-it-}}"
cleaned = strip_wikicode(text).strip().lower()
return cleaned
def extract_italian_section(wikitext: str) -> str:
section_pattern = re.compile(r"^==\s*(.*?)\s*==\s*$", re.MULTILINE)
matches = list(section_pattern.finditer(wikitext))
for index, match in enumerate(matches):
raw_heading = str(match.group(1) or "").strip().lower().replace(" ", "")
heading = normalize_heading(match.group(1))
if raw_heading == "{{-it-}}" or heading in {"italiano", "it"}:
start = match.end()
end = matches[index + 1].start() if index + 1 < len(matches) else len(wikitext)
return wikitext[start:end]
return ""
def strip_templates(text: str) -> str:
previous = None
current = text
while previous != current:
previous = current
current = re.sub(r"\{\{([^{}|]+)\|([^{}]+?)\}\}", r"\2", current)
current = re.sub(r"\{\{[^{}]+\}\}", "", current)
return current
def strip_wikicode(text: str) -> str:
value = str(text or "")
value = re.sub(r"<!--.*?-->", " ", value, flags=re.DOTALL)
value = re.sub(r"<ref[^>]*>.*?</ref>", " ", value, flags=re.DOTALL)
value = re.sub(r"<[^>]+>", " ", value)
value = strip_templates(value)
value = re.sub(r"\[\[([^|\]]+)\|([^\]]+)\]\]", r"\2", value)
value = re.sub(r"\[\[([^\]]+)\]\]", r"\1", value)
value = value.replace("'''", "").replace("''", "")
value = value.replace("&nbsp;", " ")
value = re.sub(r"\s+", " ", value)
return value.strip(" .;:-")
def infer_topics(definitions: Sequence[str], categories: Sequence[str]) -> List[str]:
text = " ".join(definitions + list(categories)).lower()
topics = []
for topic, keywords in TOPIC_KEYWORDS.items():
if any(keyword in text for keyword in keywords):
topics.append(topic)
return sorted(set(topics))
def infer_grammar_hints(definitions: Sequence[str], raw_section: str) -> List[str]:
text = f"{' '.join(definitions)} {raw_section}".lower()
hints = []
for keyword in GRAMMAR_KEYWORDS:
if keyword in text:
hints.append(keyword)
return sorted(set(hints))
def detect_pos_from_heading(heading: str) -> Optional[str]:
normalized = normalize_heading(heading)
if not normalized:
return None
for label, pos in sorted(POS_ALIASES.items(), key=lambda item: len(item[0]), reverse=True):
if label in normalized:
return pos
return None
def parse_template_marker(line: str) -> Tuple[Optional[str], Optional[str]]:
stripped = line.strip()
match = re.match(r"^\{\{-([^{}|]+?)-?(?:\|.*)?\}\}$", stripped, flags=re.IGNORECASE)
if not match:
return None, None
marker = match.group(1).strip().lower()
if marker == "it":
return "language", "it"
for label, pos in sorted(POS_ALIASES.items(), key=lambda item: len(item[0]), reverse=True):
if marker.startswith(label):
return "pos", pos
if marker.startswith("sinon"):
return "subsection", "sinonimi"
if marker.startswith(("etim", "trad", "sill", "pron", "var", "note")):
return "subsection", marker
return "subsection", marker
def parse_wiktionary_section(section_text: str) -> Dict[str, object]:
lines = section_text.splitlines()
entries: List[Dict[str, object]] = []
categories: List[str] = []
current: Optional[Dict[str, object]] = None
current_subsection = ""
heading_pattern = re.compile(r"^(={3,4})\s*(.*?)\s*\1\s*$")
for raw_line in lines:
line = raw_line.rstrip()
if not line:
continue
for category_match in re.findall(r"\[\[Categoria:([^\]]+)\]\]", line):
categories.append(strip_wikicode(category_match))
marker_kind, marker_value = parse_template_marker(line)
if marker_kind == "pos":
current = {
"pos": marker_value,
"heading": marker_value,
"definitions": [],
"examples": [],
"synonyms": [],
}
entries.append(current)
current_subsection = ""
continue
if marker_kind == "subsection":
current_subsection = str(marker_value or "")
continue
heading_match = heading_pattern.match(line)
if heading_match:
level = len(heading_match.group(1))
heading = heading_match.group(2)
if level == 3:
pos = detect_pos_from_heading(heading)
if pos:
current = {
"pos": pos,
"heading": strip_wikicode(heading),
"definitions": [],
"examples": [],
"synonyms": [],
}
entries.append(current)
current_subsection = ""
continue
current_subsection = normalize_heading(heading)
continue
if current is None:
continue
stripped = line.lstrip()
if stripped.startswith("#") and not stripped.startswith(("#:", "#*", "#;")):
definition = strip_wikicode(stripped.lstrip("#").strip())
if definition:
current["definitions"].append(definition)
continue
if stripped.startswith("#:") or stripped.startswith("#*"):
example = strip_wikicode(stripped[2:].strip())
if example:
current["examples"].append(example)
continue
if current_subsection.startswith("sinonim") and stripped.startswith("*"):
synonym = strip_wikicode(stripped.lstrip("*").strip())
if synonym:
current["synonyms"].append(synonym)
flat_definitions = [definition for entry in entries for definition in entry["definitions"]]
topic_hints = infer_topics(flat_definitions, categories)
grammar_hints = infer_grammar_hints(flat_definitions, section_text)
return {
"entries": entries,
"categories": sorted(set(filter(None, categories))),
"definitions": flat_definitions,
"topic_hints": topic_hints,
"grammar_hints": grammar_hints,
}
def wiktionary_payload_for_entry(entry: Dict[str, object], api_response: Dict[str, object]) -> Dict[str, object]:
status = str(api_response.get("status", "missing"))
if status != "ok":
return {
"status": status,
"matched": False,
"page_title": api_response.get("title") or entry.get("form"),
"source_url": f"https://it.wiktionary.org/wiki/{urllib.parse.quote(str(entry.get('form', '')))}",
"definitions": [],
"entries": [],
"topic_hints": [],
"grammar_hints": [],
"categories": [],
}
italian_section = extract_italian_section(str(api_response.get("wikitext") or ""))
if not italian_section:
return {
"status": "no_italian_section",
"matched": False,
"page_title": api_response.get("title") or entry.get("form"),
"source_url": f"https://it.wiktionary.org/wiki/{urllib.parse.quote(str(api_response.get('title') or entry.get('form', '')))}",
"definitions": [],
"entries": [],
"topic_hints": [],
"grammar_hints": [],
"categories": [],
}
parsed = parse_wiktionary_section(italian_section)
matched = bool(parsed["definitions"])
return {
"status": "enriched" if matched else "section_without_definitions",
"matched": matched,
"page_title": api_response.get("title") or entry.get("form"),
"pageid": api_response.get("pageid"),
"source_url": f"https://it.wiktionary.org/wiki/{urllib.parse.quote(str(api_response.get('title') or entry.get('form', '')))}",
"definitions": parsed["definitions"],
"entries": parsed["entries"],
"topic_hints": parsed["topic_hints"],
"grammar_hints": parsed["grammar_hints"],
"categories": parsed["categories"],
"raw_excerpt": italian_section[:4000],
}
def select_targets(
refined_payload: Dict[str, object],
review_payload: Dict[str, object],
review_reasons: set[str],
explicit_words: set[str],
word_limit: int,
skip_existing: bool,
) -> Tuple[List[Dict[str, object]], int]:
refined_entries = [entry for entry in refined_payload.get("entries", []) or [] if isinstance(entry, dict)]
refined_by_word = {str(entry.get("form", "")).lower(): entry for entry in refined_entries if entry.get("form")}
if explicit_words:
selected = []
skipped_existing_count = 0
for word in explicit_words:
entry = refined_by_word.get(word)
if entry is None:
continue
if skip_existing and wiktionary_already_useful(entry):
skipped_existing_count += 1
continue
selected.append(entry)
selected = selected[:word_limit] if word_limit > 0 else selected
return selected, skipped_existing_count
review_entries = [entry for entry in review_payload.get("entries", []) or [] if isinstance(entry, dict)]
selected_words: List[str] = []
seen = set()
skipped_existing_count = 0
for review_entry in review_entries:
word = str(review_entry.get("form", "")).strip().lower()
if not word or word in seen:
continue
reasons = {str(item).lower() for item in review_entry.get("review_reasons", []) or []}
refined = refined_by_word.get(word)
if refined is None:
continue
if skip_existing and wiktionary_already_useful(refined):
skipped_existing_count += 1
continue
babelnet_status = str((refined.get("babelnet") or {}).get("status", "")).lower()
if reasons.intersection(review_reasons) or babelnet_status == "no_match":
selected_words.append(word)
seen.add(word)
if word_limit > 0 and len(selected_words) >= word_limit:
break
return [refined_by_word[word] for word in selected_words if word in refined_by_word], skipped_existing_count
def wiktionary_already_useful(entry: Dict[str, object]) -> bool:
wiktionary = entry.get("wiktionary", {})
if not isinstance(wiktionary, dict):
return False
status = str(wiktionary.get("status", "")).lower()
if status == "enriched" and (wiktionary.get("definitions") or wiktionary.get("entries")):
return True
if status in {"missing", "no_italian_section", "section_without_definitions", "empty"}:
return True
return False
def enrich_from_wiktionary(args: argparse.Namespace) -> Dict[str, object]:
refined_payload = load_json(args.input, {"entries": []})
if not isinstance(refined_payload, dict) or "entries" not in refined_payload:
raise ValueError(f"Lessico refined non valido: {args.input}")
review_payload = load_json(args.review, {"entries": []})
if not isinstance(review_payload, dict):
review_payload = {"entries": []}
cache = load_json(args.cache, {})
if not isinstance(cache, dict):
cache = {}
targets, skipped_existing_count = select_targets(
refined_payload,
review_payload,
parse_csv_set(args.review_reasons),
parse_csv_set(args.words),
args.word_limit,
args.skip_existing,
)
enriched_entries = []
cache_hits = 0
network_calls = 0
network_attempts = 0
processed_count = 0
stopped_reason = None
stop_word = None
print(
f"Target selezionati: {len(targets)}"
+ (f" | già saltati per wiktionary esistente: {skipped_existing_count}" if args.skip_existing else "")
)
def persist_progress() -> None:
refined_index = {
entry_key(entry): entry
for entry in refined_payload.get("entries", []) or []
if isinstance(entry, dict)
}
for item in enriched_entries:
refined_index[entry_key(item)] = item
merged_entries = list(refined_index.values())
merged_entries.sort(key=lambda item: (str(item.get("normalized_form", "")), str(item.get("pos", ""))))
merged_payload = {
"meta": {
**(refined_payload.get("meta", {}) if isinstance(refined_payload.get("meta"), dict) else {}),
"wiktionary_source": args.api_url,
"wiktionary_generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"wiktionary_target_count": len(targets),
"wiktionary_processed_count": processed_count,
"wiktionary_skipped_existing_count": skipped_existing_count,
"wiktionary_cache_hits": cache_hits,
"wiktionary_network_calls": network_calls,
"wiktionary_network_attempts": network_attempts,
"wiktionary_stopped_reason": stopped_reason,
"wiktionary_stop_word": stop_word,
},
"entries": merged_entries,
}
write_json(args.cache, cache)
write_json(args.output, merged_payload)
for index, entry in enumerate(targets, start=1):
updated = deepcopy(entry)
word = str(entry.get("form", "")).strip()
cache_key = word.lower()
if cache_key in cache:
api_response = cache[cache_key]
cache_hits += 1
else:
try:
network_attempts += 1
api_response = fetch_wikitext_with_retry(word, args)
except urllib.error.HTTPError as exc:
if exc.code == 429:
stop_word = word
stopped_reason = f"http_429_after_{processed_count}_words"
print(f"[STOP] Wiktionary ha risposto 429 su '{word}'. Salvo il progresso e interrompo il batch.")
persist_progress()
return {
"target_count": len(targets),
"processed_count": processed_count,
"skipped_existing_count": skipped_existing_count,
"cache_hits": cache_hits,
"network_calls": network_calls,
"network_attempts": network_attempts,
"output": str(args.output),
"stopped_reason": stopped_reason,
"stop_word": stop_word,
}
raise
cache[cache_key] = api_response
network_calls += 1
if args.sleep > 0:
time.sleep(args.sleep)
updated["wiktionary"] = wiktionary_payload_for_entry(updated, api_response)
updated["wiktionary_generated_at"] = datetime.now().astimezone().isoformat(timespec="seconds")
enriched_entries.append(updated)
processed_count += 1
print(
f"[{index}/{len(targets)}] {word}: "
f"status={updated['wiktionary'].get('status')} "
f"def={len(updated['wiktionary'].get('definitions', []))} "
f"topics={len(updated['wiktionary'].get('topic_hints', []))}"
)
if args.save_every > 0 and processed_count % int(args.save_every) == 0:
persist_progress()
print(f"[save] progresso salvato dopo {processed_count} parole")
persist_progress()
return {
"target_count": len(targets),
"processed_count": processed_count,
"skipped_existing_count": skipped_existing_count,
"cache_hits": cache_hits,
"network_calls": network_calls,
"network_attempts": network_attempts,
"output": str(args.output),
"stopped_reason": stopped_reason,
"stop_word": stop_word,
}
def main() -> None:
args = parse_args()
result = enrich_from_wiktionary(args)
print(f"Lessico con Wiktionary generato: {result['output']}")
print(f"Voci trattate: {result.get('processed_count', result['target_count'])}/{result['target_count']}")
if "skipped_existing_count" in result:
print(f"Voci già saltate: {result['skipped_existing_count']}")
print(f"Cache hit: {result['cache_hits']}")
print(f"Chiamate rete: {result['network_calls']}")
if "network_attempts" in result:
print(f"Tentativi di rete: {result['network_attempts']}")
if result.get("stopped_reason"):
print(f"Batch interrotto: {result['stopped_reason']}")
if result.get("stop_word"):
print(f"Ultima parola bloccante: {result['stop_word']}")
if __name__ == "__main__":
main()