Files
cruciverba_1/refine_lexicon_topics.py

474 lines
15 KiB
Python

from __future__ import annotations
import argparse
import json
import re
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
from build_enriched_lexicon import ENRICHED_LEXICON_OUTPUT_PATH
REFINED_LEXICON_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_refined.json")
TOPIC_KEYWORDS: Dict[str, Tuple[str, ...]] = {
"religion": (
"abbazia",
"abate",
"arcivescovo",
"cappella",
"cardinale",
"chiesa",
"clero",
"convento",
"diocesi",
"ecclesiast",
"fede",
"frate",
"mistica",
"monaco",
"monastero",
"parrocchia",
"prete",
"religion",
"sacerdot",
"santo",
"vescovo",
),
"ecclesiastical_hierarchy": (
"abate",
"arcivescovo",
"carica ecclesiastica",
"cardinale",
"clero",
"dignità ecclesiastica",
"ecclesiast",
"ordinazione",
"parroco",
"patriarca",
"pontefice",
"prete",
"priore",
"superiore del monastero",
"vescovo",
),
"honorific_title": (
"carica",
"epiteto",
"nobile",
"onore",
"onorific",
"titolo",
),
"mysticism": (
"asceta",
"contemplazione",
"estasi",
"mistica",
"mistico",
"monachesimo",
"spiritual",
),
"geography": (
"borgo",
"città",
"comune",
"frazione",
"geografia",
"isola",
"località",
"paese",
"provincia",
"regione",
"stato",
"toponimo",
"valle",
),
"transport": (
"aereo",
"aeroplano",
"auto",
"autobus",
"autocarro",
"barca",
"bicicletta",
"imbarcazione",
"locomotiva",
"motore",
"nave",
"pista",
"porto",
"stazione",
"traghetto",
"treno",
"trasport",
"veicolo",
"viaggio",
),
"nature": (
"acqua",
"albero",
"animale",
"bosco",
"fiore",
"fiume",
"foresta",
"mare",
"montagna",
"natura",
"pianta",
"terra",
),
"health": (
"ambulanza",
"anemia",
"cura",
"farmaco",
"malattia",
"medic",
"ospedale",
"paziente",
"salute",
"soccorso",
"terapia",
),
"war": (
"arma",
"artiglieria",
"assalto",
"battaglia",
"bombard",
"esercito",
"fortezza",
"guerra",
"militare",
"soldato",
"trincea",
),
}
TAG_STOPWORDS = {
"and",
"con",
"da",
"dei",
"del",
"della",
"delle",
"dello",
"di",
"e",
"il",
"in",
"la",
"le",
"lo",
"nel",
"nella",
"per",
"su",
"the",
"un",
"una",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Genera un lessico raffinato con campi aggiuntivi per topic, tag semantici e sensi."
)
parser.add_argument(
"--input",
type=Path,
default=ENRICHED_LEXICON_OUTPUT_PATH,
help="File lessicale di partenza, tipicamente lexicon_it_enriched.json.",
)
parser.add_argument(
"--output",
type=Path,
default=REFINED_LEXICON_OUTPUT_PATH,
help="Nuovo file lessicale raffinato da generare.",
)
parser.add_argument(
"--replace-general",
action="store_true",
help="Se attivo, sostituisce topic=['general'] con i topic suggeriti quando la confidenza e alta.",
)
parser.add_argument(
"--min-topic-score",
type=int,
default=40,
help="Punteggio minimo per promuovere un topic suggerito nei topics finali.",
)
return parser.parse_args()
def load_json(path: Path) -> Dict[str, object]:
return json.loads(path.read_text(encoding="utf-8"))
def write_json(path: Path, payload: Dict[str, object]) -> None:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def dedupe(items: Iterable[str]) -> List[str]:
result: List[str] = []
seen = set()
for item in items:
text = str(item).strip()
if not text:
continue
key = text.lower()
if key in seen:
continue
seen.add(key)
result.append(text)
return result
def slugify_tag(text: str) -> str:
value = re.sub(r"[^a-z0-9]+", "_", text.strip().lower(), flags=re.IGNORECASE)
value = value.strip("_")
return value
def cleanup_tag(tag: str) -> str:
normalized = slugify_tag(tag)
if not normalized or normalized in TAG_STOPWORDS or len(normalized) <= 1:
return ""
return normalized
def flatten_text(entry: Dict[str, object]) -> str:
chunks: List[str] = []
chunks.extend(str(topic) for topic in entry.get("topics", []) or [])
semantic = entry.get("semantic", {})
if isinstance(semantic, dict):
chunks.extend(str(topic) for topic in semantic.get("semantic_topics", []) or [])
chunks.extend(str(gloss) for gloss in semantic.get("glosses", []) or [])
for synset in semantic.get("synsets", []) or []:
if isinstance(synset, dict):
chunks.append(str(synset.get("definition", "")))
chunks.extend(str(item) for item in synset.get("lemmas", []) or [])
babelnet = entry.get("babelnet", {})
if isinstance(babelnet, dict):
chunks.extend(str(item) for item in babelnet.get("synset_refs", []) or [])
best_synset = babelnet.get("best_synset", {})
if isinstance(best_synset, dict):
chunks.extend(str(item) for item in best_synset.get("glosses", []) or [])
chunks.extend(str(item) for item in best_synset.get("categories", []) or [])
chunks.extend(str(item) for item in best_synset.get("domains", []) or [])
chunks.extend(str(item) for item in best_synset.get("senses", []) or [])
for synset in babelnet.get("synsets", []) or []:
if isinstance(synset, dict):
chunks.extend(str(item) for item in synset.get("glosses", []) or [])
chunks.extend(str(item) for item in synset.get("categories", []) or [])
chunks.extend(str(item) for item in synset.get("domains", []) or [])
chunks.extend(str(item) for item in synset.get("senses", []) or [])
return " ".join(chunks).lower()
def infer_topic_scores(entry: Dict[str, object]) -> Dict[str, int]:
text = flatten_text(entry)
scores: Dict[str, int] = {}
for topic, keywords in TOPIC_KEYWORDS.items():
score = 0
for keyword in keywords:
occurrences = text.count(keyword.lower())
if occurrences:
score += 12 * occurrences
if score:
scores[topic] = min(score, 100)
return scores
def collect_semantic_tags(entry: Dict[str, object]) -> List[str]:
tags: List[str] = []
tags.extend(str(topic) for topic in entry.get("topics", []) or [])
semantic = entry.get("semantic", {})
if isinstance(semantic, dict):
tags.extend(str(topic) for topic in semantic.get("semantic_topics", []) or [])
for relation_group in (semantic.get("raw_relation_terms", {}) or {}).values():
tags.extend(str(item) for item in relation_group or [])
babelnet = entry.get("babelnet", {})
if isinstance(babelnet, dict):
best_synset = babelnet.get("best_synset", {})
if isinstance(best_synset, dict):
tags.extend(str(item) for item in best_synset.get("categories", []) or [])
tags.extend(str(item) for item in best_synset.get("domains", []) or [])
for synset in babelnet.get("synsets", []) or []:
if isinstance(synset, dict):
tags.extend(str(item) for item in synset.get("categories", []) or [])
tags.extend(str(item) for item in synset.get("domains", []) or [])
cleaned = [cleanup_tag(tag) for tag in tags]
return [tag for tag in dedupe(cleaned) if tag]
def collect_senses(entry: Dict[str, object], topic_scores: Dict[str, int]) -> List[Dict[str, object]]:
senses: List[Dict[str, object]] = []
semantic = entry.get("semantic", {})
if isinstance(semantic, dict):
for synset in semantic.get("synsets", []) or []:
if not isinstance(synset, dict):
continue
definition = str(synset.get("definition", "")).strip()
if not definition:
continue
senses.append(
{
"source": "semantic",
"id": synset.get("id"),
"definition": definition,
"lemmas": dedupe(str(item) for item in synset.get("lemmas", []) or []),
"topics": dedupe(
list(semantic.get("semantic_topics", []) or [])
+ [topic for topic, score in topic_scores.items() if score >= 50]
),
"confidence": 0.7,
}
)
babelnet = entry.get("babelnet", {})
if isinstance(babelnet, dict):
best_synset = babelnet.get("best_synset", {})
if isinstance(best_synset, dict) and best_synset.get("id"):
glosses = [str(item).strip() for item in best_synset.get("glosses", []) or [] if str(item).strip()]
if glosses:
senses.append(
{
"source": "babelnet",
"id": best_synset.get("id"),
"definition": glosses[0],
"lemmas": dedupe(str(item) for item in best_synset.get("senses", []) or []),
"topics": dedupe(
[str(best_synset.get("topic", "")).strip()]
+ [topic for topic, score in topic_scores.items() if score >= 50]
),
"confidence": round(min(max(float(best_synset.get("topic_score", 0)) / 100.0, 0.4), 0.95), 2),
}
)
return senses
def collect_geo_tags(entry: Dict[str, object]) -> List[str]:
babelnet = entry.get("babelnet", {})
tags: List[str] = []
if isinstance(babelnet, dict):
for synset in babelnet.get("synsets", []) or []:
if not isinstance(synset, dict):
continue
for category in synset.get("categories", []) or []:
text = str(category).lower()
if any(keyword in text for keyword in ("comuni_", "province_", "regioni_", "città", "paesi", "località")):
tags.append("toponym_possible")
return dedupe(tags)
def collect_name_tags(entry: Dict[str, object]) -> List[str]:
tags: List[str] = []
form = str(entry.get("form", ""))
if form[:1].isupper():
tags.append("capitalized_form")
return dedupe(tags)
def should_review(entry: Dict[str, object], topic_scores: Dict[str, int], senses: List[Dict[str, object]]) -> bool:
existing_topics = [str(topic).lower() for topic in entry.get("topics", []) or []]
best_score = max(topic_scores.values(), default=0)
strong_topics = [topic for topic, score in topic_scores.items() if score >= 50]
babelnet_status = str((entry.get("babelnet", {}) or {}).get("status", ""))
if existing_topics == ["general"] and not strong_topics:
return True
if babelnet_status == "ambiguous" and best_score < 50:
return True
if len(senses) >= 3 and len(strong_topics) >= 2:
return True
return False
def promoted_topics(
existing_topics: List[str], topic_scores: Dict[str, int], replace_general: bool, min_topic_score: int
) -> List[str]:
inferred = [topic for topic, score in sorted(topic_scores.items(), key=lambda item: (-item[1], item[0])) if score >= min_topic_score]
existing_clean = dedupe(existing_topics)
if replace_general and existing_clean == ["general"] and inferred:
return inferred
return dedupe(existing_clean + inferred)
def refine_entry(entry: Dict[str, object], replace_general: bool, min_topic_score: int) -> Dict[str, object]:
refined = deepcopy(entry)
topic_scores = infer_topic_scores(refined)
semantic_tags = collect_semantic_tags(refined)
senses = collect_senses(refined, topic_scores)
geo_tags = collect_geo_tags(refined)
name_tags = collect_name_tags(refined)
current_topics = [str(topic) for topic in refined.get("topics", []) or []]
refined["topics"] = promoted_topics(current_topics, topic_scores, replace_general, min_topic_score)
refined["semantic_tags"] = semantic_tags
refined["senses"] = senses
refined["topic_confidence"] = topic_scores
refined["topic_suggestions"] = [topic for topic, score in sorted(topic_scores.items(), key=lambda item: (-item[1], item[0]))]
refined["geo_tags"] = geo_tags
refined["name_tags"] = name_tags
refined["needs_review"] = should_review(refined, topic_scores, senses)
return refined
def build_refined_lexicon(args: argparse.Namespace) -> Dict[str, object]:
payload = load_json(args.input)
if not isinstance(payload, dict) or "entries" not in payload:
raise ValueError(f"Lessico di input non valido: {args.input}")
refined_entries = [
refine_entry(entry, args.replace_general, args.min_topic_score)
for entry in payload.get("entries", []) or []
if isinstance(entry, dict)
]
review_count = sum(1 for entry in refined_entries if entry.get("needs_review"))
topicful_count = sum(1 for entry in refined_entries if len(entry.get("topic_suggestions", []) or []) > 0)
return {
"meta": {
"language": "it",
"version": 1,
"base_lexicon": args.input.name,
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"entry_count": len(refined_entries),
"replace_general": args.replace_general,
"min_topic_score": args.min_topic_score,
"review_count": review_count,
"topicful_count": topicful_count,
},
"entries": refined_entries,
}
def main() -> None:
args = parse_args()
payload = build_refined_lexicon(args)
write_json(args.output, payload)
print(f"Lessico raffinato generato: {args.output}")
print(f"Voci totali: {payload['meta']['entry_count']}")
print(f"Voci con suggerimenti di topic: {payload['meta']['topicful_count']}")
print(f"Voci marcate needs_review: {payload['meta']['review_count']}")
if __name__ == "__main__":
main()