474 lines
15 KiB
Python
474 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from copy import deepcopy
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Tuple
|
|
|
|
from build_enriched_lexicon import ENRICHED_LEXICON_OUTPUT_PATH
|
|
|
|
|
|
REFINED_LEXICON_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_refined.json")
|
|
|
|
TOPIC_KEYWORDS: Dict[str, Tuple[str, ...]] = {
|
|
"religion": (
|
|
"abbazia",
|
|
"abate",
|
|
"arcivescovo",
|
|
"cappella",
|
|
"cardinale",
|
|
"chiesa",
|
|
"clero",
|
|
"convento",
|
|
"diocesi",
|
|
"ecclesiast",
|
|
"fede",
|
|
"frate",
|
|
"mistica",
|
|
"monaco",
|
|
"monastero",
|
|
"parrocchia",
|
|
"prete",
|
|
"religion",
|
|
"sacerdot",
|
|
"santo",
|
|
"vescovo",
|
|
),
|
|
"ecclesiastical_hierarchy": (
|
|
"abate",
|
|
"arcivescovo",
|
|
"carica ecclesiastica",
|
|
"cardinale",
|
|
"clero",
|
|
"dignità ecclesiastica",
|
|
"ecclesiast",
|
|
"ordinazione",
|
|
"parroco",
|
|
"patriarca",
|
|
"pontefice",
|
|
"prete",
|
|
"priore",
|
|
"superiore del monastero",
|
|
"vescovo",
|
|
),
|
|
"honorific_title": (
|
|
"carica",
|
|
"epiteto",
|
|
"nobile",
|
|
"onore",
|
|
"onorific",
|
|
"titolo",
|
|
),
|
|
"mysticism": (
|
|
"asceta",
|
|
"contemplazione",
|
|
"estasi",
|
|
"mistica",
|
|
"mistico",
|
|
"monachesimo",
|
|
"spiritual",
|
|
),
|
|
"geography": (
|
|
"borgo",
|
|
"città",
|
|
"comune",
|
|
"frazione",
|
|
"geografia",
|
|
"isola",
|
|
"località",
|
|
"paese",
|
|
"provincia",
|
|
"regione",
|
|
"stato",
|
|
"toponimo",
|
|
"valle",
|
|
),
|
|
"transport": (
|
|
"aereo",
|
|
"aeroplano",
|
|
"auto",
|
|
"autobus",
|
|
"autocarro",
|
|
"barca",
|
|
"bicicletta",
|
|
"imbarcazione",
|
|
"locomotiva",
|
|
"motore",
|
|
"nave",
|
|
"pista",
|
|
"porto",
|
|
"stazione",
|
|
"traghetto",
|
|
"treno",
|
|
"trasport",
|
|
"veicolo",
|
|
"viaggio",
|
|
),
|
|
"nature": (
|
|
"acqua",
|
|
"albero",
|
|
"animale",
|
|
"bosco",
|
|
"fiore",
|
|
"fiume",
|
|
"foresta",
|
|
"mare",
|
|
"montagna",
|
|
"natura",
|
|
"pianta",
|
|
"terra",
|
|
),
|
|
"health": (
|
|
"ambulanza",
|
|
"anemia",
|
|
"cura",
|
|
"farmaco",
|
|
"malattia",
|
|
"medic",
|
|
"ospedale",
|
|
"paziente",
|
|
"salute",
|
|
"soccorso",
|
|
"terapia",
|
|
),
|
|
"war": (
|
|
"arma",
|
|
"artiglieria",
|
|
"assalto",
|
|
"battaglia",
|
|
"bombard",
|
|
"esercito",
|
|
"fortezza",
|
|
"guerra",
|
|
"militare",
|
|
"soldato",
|
|
"trincea",
|
|
),
|
|
}
|
|
|
|
TAG_STOPWORDS = {
|
|
"and",
|
|
"con",
|
|
"da",
|
|
"dei",
|
|
"del",
|
|
"della",
|
|
"delle",
|
|
"dello",
|
|
"di",
|
|
"e",
|
|
"il",
|
|
"in",
|
|
"la",
|
|
"le",
|
|
"lo",
|
|
"nel",
|
|
"nella",
|
|
"per",
|
|
"su",
|
|
"the",
|
|
"un",
|
|
"una",
|
|
}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Genera un lessico raffinato con campi aggiuntivi per topic, tag semantici e sensi."
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
type=Path,
|
|
default=ENRICHED_LEXICON_OUTPUT_PATH,
|
|
help="File lessicale di partenza, tipicamente lexicon_it_enriched.json.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=REFINED_LEXICON_OUTPUT_PATH,
|
|
help="Nuovo file lessicale raffinato da generare.",
|
|
)
|
|
parser.add_argument(
|
|
"--replace-general",
|
|
action="store_true",
|
|
help="Se attivo, sostituisce topic=['general'] con i topic suggeriti quando la confidenza e alta.",
|
|
)
|
|
parser.add_argument(
|
|
"--min-topic-score",
|
|
type=int,
|
|
default=40,
|
|
help="Punteggio minimo per promuovere un topic suggerito nei topics finali.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_json(path: Path) -> Dict[str, object]:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def write_json(path: Path, payload: Dict[str, object]) -> None:
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def dedupe(items: Iterable[str]) -> List[str]:
|
|
result: List[str] = []
|
|
seen = set()
|
|
for item in items:
|
|
text = str(item).strip()
|
|
if not text:
|
|
continue
|
|
key = text.lower()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(text)
|
|
return result
|
|
|
|
|
|
def slugify_tag(text: str) -> str:
|
|
value = re.sub(r"[^a-z0-9]+", "_", text.strip().lower(), flags=re.IGNORECASE)
|
|
value = value.strip("_")
|
|
return value
|
|
|
|
|
|
def cleanup_tag(tag: str) -> str:
|
|
normalized = slugify_tag(tag)
|
|
if not normalized or normalized in TAG_STOPWORDS or len(normalized) <= 1:
|
|
return ""
|
|
return normalized
|
|
|
|
|
|
def flatten_text(entry: Dict[str, object]) -> str:
|
|
chunks: List[str] = []
|
|
chunks.extend(str(topic) for topic in entry.get("topics", []) or [])
|
|
|
|
semantic = entry.get("semantic", {})
|
|
if isinstance(semantic, dict):
|
|
chunks.extend(str(topic) for topic in semantic.get("semantic_topics", []) or [])
|
|
chunks.extend(str(gloss) for gloss in semantic.get("glosses", []) or [])
|
|
for synset in semantic.get("synsets", []) or []:
|
|
if isinstance(synset, dict):
|
|
chunks.append(str(synset.get("definition", "")))
|
|
chunks.extend(str(item) for item in synset.get("lemmas", []) or [])
|
|
|
|
babelnet = entry.get("babelnet", {})
|
|
if isinstance(babelnet, dict):
|
|
chunks.extend(str(item) for item in babelnet.get("synset_refs", []) or [])
|
|
best_synset = babelnet.get("best_synset", {})
|
|
if isinstance(best_synset, dict):
|
|
chunks.extend(str(item) for item in best_synset.get("glosses", []) or [])
|
|
chunks.extend(str(item) for item in best_synset.get("categories", []) or [])
|
|
chunks.extend(str(item) for item in best_synset.get("domains", []) or [])
|
|
chunks.extend(str(item) for item in best_synset.get("senses", []) or [])
|
|
for synset in babelnet.get("synsets", []) or []:
|
|
if isinstance(synset, dict):
|
|
chunks.extend(str(item) for item in synset.get("glosses", []) or [])
|
|
chunks.extend(str(item) for item in synset.get("categories", []) or [])
|
|
chunks.extend(str(item) for item in synset.get("domains", []) or [])
|
|
chunks.extend(str(item) for item in synset.get("senses", []) or [])
|
|
|
|
return " ".join(chunks).lower()
|
|
|
|
|
|
def infer_topic_scores(entry: Dict[str, object]) -> Dict[str, int]:
|
|
text = flatten_text(entry)
|
|
scores: Dict[str, int] = {}
|
|
for topic, keywords in TOPIC_KEYWORDS.items():
|
|
score = 0
|
|
for keyword in keywords:
|
|
occurrences = text.count(keyword.lower())
|
|
if occurrences:
|
|
score += 12 * occurrences
|
|
if score:
|
|
scores[topic] = min(score, 100)
|
|
return scores
|
|
|
|
|
|
def collect_semantic_tags(entry: Dict[str, object]) -> List[str]:
|
|
tags: List[str] = []
|
|
tags.extend(str(topic) for topic in entry.get("topics", []) or [])
|
|
|
|
semantic = entry.get("semantic", {})
|
|
if isinstance(semantic, dict):
|
|
tags.extend(str(topic) for topic in semantic.get("semantic_topics", []) or [])
|
|
for relation_group in (semantic.get("raw_relation_terms", {}) or {}).values():
|
|
tags.extend(str(item) for item in relation_group or [])
|
|
|
|
babelnet = entry.get("babelnet", {})
|
|
if isinstance(babelnet, dict):
|
|
best_synset = babelnet.get("best_synset", {})
|
|
if isinstance(best_synset, dict):
|
|
tags.extend(str(item) for item in best_synset.get("categories", []) or [])
|
|
tags.extend(str(item) for item in best_synset.get("domains", []) or [])
|
|
for synset in babelnet.get("synsets", []) or []:
|
|
if isinstance(synset, dict):
|
|
tags.extend(str(item) for item in synset.get("categories", []) or [])
|
|
tags.extend(str(item) for item in synset.get("domains", []) or [])
|
|
|
|
cleaned = [cleanup_tag(tag) for tag in tags]
|
|
return [tag for tag in dedupe(cleaned) if tag]
|
|
|
|
|
|
def collect_senses(entry: Dict[str, object], topic_scores: Dict[str, int]) -> List[Dict[str, object]]:
|
|
senses: List[Dict[str, object]] = []
|
|
|
|
semantic = entry.get("semantic", {})
|
|
if isinstance(semantic, dict):
|
|
for synset in semantic.get("synsets", []) or []:
|
|
if not isinstance(synset, dict):
|
|
continue
|
|
definition = str(synset.get("definition", "")).strip()
|
|
if not definition:
|
|
continue
|
|
senses.append(
|
|
{
|
|
"source": "semantic",
|
|
"id": synset.get("id"),
|
|
"definition": definition,
|
|
"lemmas": dedupe(str(item) for item in synset.get("lemmas", []) or []),
|
|
"topics": dedupe(
|
|
list(semantic.get("semantic_topics", []) or [])
|
|
+ [topic for topic, score in topic_scores.items() if score >= 50]
|
|
),
|
|
"confidence": 0.7,
|
|
}
|
|
)
|
|
|
|
babelnet = entry.get("babelnet", {})
|
|
if isinstance(babelnet, dict):
|
|
best_synset = babelnet.get("best_synset", {})
|
|
if isinstance(best_synset, dict) and best_synset.get("id"):
|
|
glosses = [str(item).strip() for item in best_synset.get("glosses", []) or [] if str(item).strip()]
|
|
if glosses:
|
|
senses.append(
|
|
{
|
|
"source": "babelnet",
|
|
"id": best_synset.get("id"),
|
|
"definition": glosses[0],
|
|
"lemmas": dedupe(str(item) for item in best_synset.get("senses", []) or []),
|
|
"topics": dedupe(
|
|
[str(best_synset.get("topic", "")).strip()]
|
|
+ [topic for topic, score in topic_scores.items() if score >= 50]
|
|
),
|
|
"confidence": round(min(max(float(best_synset.get("topic_score", 0)) / 100.0, 0.4), 0.95), 2),
|
|
}
|
|
)
|
|
|
|
return senses
|
|
|
|
|
|
def collect_geo_tags(entry: Dict[str, object]) -> List[str]:
|
|
babelnet = entry.get("babelnet", {})
|
|
tags: List[str] = []
|
|
if isinstance(babelnet, dict):
|
|
for synset in babelnet.get("synsets", []) or []:
|
|
if not isinstance(synset, dict):
|
|
continue
|
|
for category in synset.get("categories", []) or []:
|
|
text = str(category).lower()
|
|
if any(keyword in text for keyword in ("comuni_", "province_", "regioni_", "città", "paesi", "località")):
|
|
tags.append("toponym_possible")
|
|
return dedupe(tags)
|
|
|
|
|
|
def collect_name_tags(entry: Dict[str, object]) -> List[str]:
|
|
tags: List[str] = []
|
|
form = str(entry.get("form", ""))
|
|
if form[:1].isupper():
|
|
tags.append("capitalized_form")
|
|
return dedupe(tags)
|
|
|
|
|
|
def should_review(entry: Dict[str, object], topic_scores: Dict[str, int], senses: List[Dict[str, object]]) -> bool:
|
|
existing_topics = [str(topic).lower() for topic in entry.get("topics", []) or []]
|
|
best_score = max(topic_scores.values(), default=0)
|
|
strong_topics = [topic for topic, score in topic_scores.items() if score >= 50]
|
|
babelnet_status = str((entry.get("babelnet", {}) or {}).get("status", ""))
|
|
|
|
if existing_topics == ["general"] and not strong_topics:
|
|
return True
|
|
if babelnet_status == "ambiguous" and best_score < 50:
|
|
return True
|
|
if len(senses) >= 3 and len(strong_topics) >= 2:
|
|
return True
|
|
return False
|
|
|
|
|
|
def promoted_topics(
|
|
existing_topics: List[str], topic_scores: Dict[str, int], replace_general: bool, min_topic_score: int
|
|
) -> List[str]:
|
|
inferred = [topic for topic, score in sorted(topic_scores.items(), key=lambda item: (-item[1], item[0])) if score >= min_topic_score]
|
|
existing_clean = dedupe(existing_topics)
|
|
|
|
if replace_general and existing_clean == ["general"] and inferred:
|
|
return inferred
|
|
|
|
return dedupe(existing_clean + inferred)
|
|
|
|
|
|
def refine_entry(entry: Dict[str, object], replace_general: bool, min_topic_score: int) -> Dict[str, object]:
|
|
refined = deepcopy(entry)
|
|
topic_scores = infer_topic_scores(refined)
|
|
semantic_tags = collect_semantic_tags(refined)
|
|
senses = collect_senses(refined, topic_scores)
|
|
geo_tags = collect_geo_tags(refined)
|
|
name_tags = collect_name_tags(refined)
|
|
current_topics = [str(topic) for topic in refined.get("topics", []) or []]
|
|
|
|
refined["topics"] = promoted_topics(current_topics, topic_scores, replace_general, min_topic_score)
|
|
refined["semantic_tags"] = semantic_tags
|
|
refined["senses"] = senses
|
|
refined["topic_confidence"] = topic_scores
|
|
refined["topic_suggestions"] = [topic for topic, score in sorted(topic_scores.items(), key=lambda item: (-item[1], item[0]))]
|
|
refined["geo_tags"] = geo_tags
|
|
refined["name_tags"] = name_tags
|
|
refined["needs_review"] = should_review(refined, topic_scores, senses)
|
|
return refined
|
|
|
|
|
|
def build_refined_lexicon(args: argparse.Namespace) -> Dict[str, object]:
|
|
payload = load_json(args.input)
|
|
if not isinstance(payload, dict) or "entries" not in payload:
|
|
raise ValueError(f"Lessico di input non valido: {args.input}")
|
|
|
|
refined_entries = [
|
|
refine_entry(entry, args.replace_general, args.min_topic_score)
|
|
for entry in payload.get("entries", []) or []
|
|
if isinstance(entry, dict)
|
|
]
|
|
|
|
review_count = sum(1 for entry in refined_entries if entry.get("needs_review"))
|
|
topicful_count = sum(1 for entry in refined_entries if len(entry.get("topic_suggestions", []) or []) > 0)
|
|
|
|
return {
|
|
"meta": {
|
|
"language": "it",
|
|
"version": 1,
|
|
"base_lexicon": args.input.name,
|
|
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
|
|
"entry_count": len(refined_entries),
|
|
"replace_general": args.replace_general,
|
|
"min_topic_score": args.min_topic_score,
|
|
"review_count": review_count,
|
|
"topicful_count": topicful_count,
|
|
},
|
|
"entries": refined_entries,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
payload = build_refined_lexicon(args)
|
|
write_json(args.output, payload)
|
|
print(f"Lessico raffinato generato: {args.output}")
|
|
print(f"Voci totali: {payload['meta']['entry_count']}")
|
|
print(f"Voci con suggerimenti di topic: {payload['meta']['topicful_count']}")
|
|
print(f"Voci marcate needs_review: {payload['meta']['review_count']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|