Files
cruciverba_1/build_enriched_lexicon.py

325 lines
11 KiB
Python

from __future__ import annotations
import argparse
import json
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
from build_babelnet_enrichment import BABELNET_OUTPUT_PATH
from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH
ENRICHED_LEXICON_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_enriched.json")
TOPIC_DOMAIN_RULES: Dict[str, Dict[str, Tuple[str, ...]]] = {
"transport": {
"strong": (
"TRANSPORT_AND_TRAVEL",
"NAVIGATION_AND_AVIATION",
),
"weak": (
"CRAFT_ENGINEERING_AND_TECHNOLOGY",
"FARMING_FISHING_AND_HUNTING",
),
"negative": (
"MEDIA_AND_PRESS",
"PHILOSOPHY_PSYCHOLOGY_AND_BEHAVIOR",
"RELIGION_MYSTICISM_AND_MYTHOLOGY",
"CHEMISTRY_AND_MINERALOGY",
),
},
"health": {
"strong": ("HEALTH_AND_MEDICINE",),
"weak": ("BIOLOGY",),
"negative": ("MEDIA_AND_PRESS",),
},
"cinema": {
"strong": ("MEDIA_AND_PRESS",),
"weak": ("ART_ARCHITECTURE_AND_ARCHAEOLOGY",),
"negative": ("HEALTH_AND_MEDICINE", "CHEMISTRY_AND_MINERALOGY"),
},
"nature": {
"strong": (
"BIOLOGY",
"ANIMALS",
"PLANTS",
"EARTH",
"METEOROLOGY",
),
"weak": ("GEOGRAPHY_AND_PLACES",),
"negative": ("MEDIA_AND_PRESS",),
},
"ecology": {
"strong": ("BIOLOGY", "EARTH", "METEOROLOGY"),
"weak": ("GEOGRAPHY_AND_PLACES",),
"negative": ("MEDIA_AND_PRESS",),
},
}
TOPIC_TEXT_KEYWORDS: Dict[str, Tuple[str, ...]] = {
"transport": (
"aereo",
"auto",
"autobus",
"barca",
"bicicletta",
"imbarcazione",
"motore",
"nave",
"pista",
"trasport",
"treno",
"veicolo",
"viaggio",
),
"health": ("cura", "malato", "medic", "ospedale", "paziente", "salute", "soccorso"),
"cinema": ("attore", "cinema", "film", "pellicola", "regia", "spettacolo"),
"nature": ("acqua", "animale", "bosco", "fiore", "mare", "montagna", "pianta", "terra"),
"ecology": ("ambiente", "ecologia", "inquinamento", "natura", "sostenibile"),
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Fonde lexicon_it_semantic.json con gli arricchimenti BabelNet gia disponibili."
)
parser.add_argument(
"--semantic",
type=Path,
default=SEMANTIC_LEXICON_OUTPUT_PATH,
help="Lessico semantico completo di partenza.",
)
parser.add_argument(
"--babelnet",
type=Path,
default=BABELNET_OUTPUT_PATH,
help="File con arricchimenti BabelNet parziali.",
)
parser.add_argument(
"--output",
type=Path,
default=ENRICHED_LEXICON_OUTPUT_PATH,
help="Lessico arricchito da generare.",
)
parser.add_argument(
"--topic",
default=None,
help="Topic opzionale da usare per scegliere il synset BabelNet migliore.",
)
return parser.parse_args()
def load_json(path: Path, default: object) -> object:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def write_json(path: Path, payload: object) -> None:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def entry_key(entry: Dict[str, object]) -> Tuple[str, str]:
form = str(entry.get("normalized_form") or entry.get("form") or "").strip().lower()
pos = str(entry.get("pos") or "").strip().upper()
return form, pos
def dedupe(items: Iterable[str]) -> List[str]:
result = []
seen = set()
for item in items:
text = str(item).strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def topic_candidates(entry: Dict[str, object], requested_topic: Optional[str]) -> List[str]:
topics = [str(topic).lower() for topic in entry.get("topics", []) if topic]
if requested_topic:
topics.insert(0, requested_topic.lower())
return [topic for topic in dedupe(topics) if topic != "general"]
def synset_text(synset: Dict[str, object]) -> str:
fields = []
fields.extend(str(item) for item in synset.get("glosses", []) or [])
fields.extend(str(item) for item in synset.get("categories", []) or [])
fields.extend(str(item) for item in synset.get("senses", []) or [])
return " ".join(fields).lower()
def score_synset_for_topic(synset: Dict[str, object], topic: str) -> int:
score = 0
domains = {str(domain).upper() for domain in synset.get("domains", []) or []}
rules = TOPIC_DOMAIN_RULES.get(topic, {})
score += 60 * len(domains.intersection(rules.get("strong", ())))
score += 25 * len(domains.intersection(rules.get("weak", ())))
score -= 35 * len(domains.intersection(rules.get("negative", ())))
text = synset_text(synset)
for keyword in TOPIC_TEXT_KEYWORDS.get(topic, ()):
if keyword in text:
score += 12
return score
def choose_best_synset(
babelnet: Dict[str, object], entry: Dict[str, object], requested_topic: Optional[str]
) -> Tuple[Optional[Dict[str, object]], Dict[str, int]]:
synsets = [item for item in babelnet.get("synsets", []) or [] if isinstance(item, dict)]
topics = topic_candidates(entry, requested_topic)
if not synsets:
return None, {}
if not topics:
best_synset = synsets[0]
return {
"id": best_synset.get("id"),
"topic": None,
"topic_score": 0,
"strong_topic": False,
"senses": best_synset.get("senses", []),
"glosses": best_synset.get("glosses", []),
"categories": best_synset.get("categories", []),
"domains": best_synset.get("domains", []),
}, {}
topic_scores: Dict[str, int] = {}
best_synset = None
best_topic = None
best_score = -10_000
for topic in topics:
topic_best = max(score_synset_for_topic(synset, topic) for synset in synsets)
topic_scores[topic] = topic_best
for synset in synsets:
score = score_synset_for_topic(synset, topic)
if score > best_score:
best_score = score
best_topic = topic
best_synset = synset
if not best_synset:
return None, topic_scores
return {
"id": best_synset.get("id"),
"topic": best_topic,
"topic_score": best_score,
"strong_topic": best_score >= 40,
"senses": best_synset.get("senses", []),
"glosses": best_synset.get("glosses", []),
"categories": best_synset.get("categories", []),
"domains": best_synset.get("domains", []),
}, topic_scores
def normalize_babelnet_status(
entry: Dict[str, object], babelnet_entry: Optional[Dict[str, object]], requested_topic: Optional[str]
) -> Dict[str, object]:
if not babelnet_entry:
return {"status": "not_requested"}
raw_babelnet = babelnet_entry.get("babelnet", {})
if not isinstance(raw_babelnet, dict):
return {"status": "api_error", "reason": "invalid_babelnet_payload"}
if not raw_babelnet.get("matched"):
return {
"status": "no_match",
"matched": False,
"reason": raw_babelnet.get("reason", "no_synsets"),
"synsets": [],
}
best_synset, topic_scores = choose_best_synset(raw_babelnet, entry, requested_topic)
status = "enriched"
if best_synset and int(best_synset.get("topic_score", 0)) <= 0:
status = "ambiguous"
selected_synset_id = best_synset.get("id") if best_synset else None
selected_topic = best_synset.get("topic") if best_synset else None
topic_score = int(best_synset.get("topic_score", 0)) if best_synset else 0
strong_topic = bool(best_synset.get("strong_topic", False)) if best_synset else False
return {
"status": status,
"matched": True,
"selected_synset_id": selected_synset_id,
"selected_topic": selected_topic,
"topic_score": topic_score,
"strong_topic": strong_topic,
"synset_refs": raw_babelnet.get("synset_refs", []),
"synsets": raw_babelnet.get("synsets", []),
"topic_scores": topic_scores,
"best_synset": best_synset,
"source_generated_at": babelnet_entry.get("babelnet_generated_at"),
}
def build_babelnet_index(payload: Dict[str, object]) -> Dict[Tuple[str, str], Dict[str, object]]:
index = {}
for entry in payload.get("entries", []) or []:
if not isinstance(entry, dict):
continue
index[entry_key(entry)] = entry
return index
def build_enriched_lexicon(args: argparse.Namespace) -> Dict[str, object]:
semantic_payload = load_json(args.semantic, {})
if not isinstance(semantic_payload, dict) or "entries" not in semantic_payload:
raise ValueError(f"Lessico semantico non valido: {args.semantic}")
babelnet_payload = load_json(args.babelnet, {"entries": []})
if not isinstance(babelnet_payload, dict):
babelnet_payload = {"entries": []}
babelnet_index = build_babelnet_index(babelnet_payload)
enriched_entries = []
status_counts: Dict[str, int] = {}
for entry in semantic_payload.get("entries", []) or []:
if not isinstance(entry, dict):
continue
enriched = deepcopy(entry)
babelnet_entry = babelnet_index.get(entry_key(enriched))
enriched["babelnet"] = normalize_babelnet_status(enriched, babelnet_entry, args.topic)
status = str(enriched["babelnet"].get("status", "unknown"))
status_counts[status] = status_counts.get(status, 0) + 1
enriched_entries.append(enriched)
return {
"meta": {
"language": semantic_payload.get("meta", {}).get("language", "it"),
"version": 1,
"base_lexicon": args.semantic.name,
"babelnet_source": args.babelnet.name if args.babelnet.exists() else None,
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"requested_topic": args.topic,
"entry_count": len(enriched_entries),
"babelnet_status_counts": status_counts,
},
"entries": enriched_entries,
}
def main() -> None:
args = parse_args()
payload = build_enriched_lexicon(args)
write_json(args.output, payload)
print(f"Lessico arricchito generato: {args.output}")
print(f"Voci totali: {payload['meta']['entry_count']}")
print(f"Stati BabelNet: {payload['meta']['babelnet_status_counts']}")
if __name__ == "__main__":
main()