584 lines
19 KiB
Python
584 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
from copy import deepcopy
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from typing import Dict, Iterable, List, Optional, Tuple
|
|
|
|
from build_babelnet_enrichment import (
|
|
BABELNET_CACHE_PATH,
|
|
BABELNET_ENV_KEY,
|
|
BABELNET_OUTPUT_PATH,
|
|
POS_TO_BABELNET,
|
|
enrich_entry,
|
|
load_json,
|
|
write_json,
|
|
)
|
|
from build_enriched_lexicon import (
|
|
ENRICHED_LEXICON_OUTPUT_PATH,
|
|
build_enriched_lexicon,
|
|
write_json as write_enriched_json,
|
|
)
|
|
from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH
|
|
|
|
|
|
DIFFICULTY_ALIASES: Dict[str, int] = {
|
|
"easy": 1,
|
|
"medium": 2,
|
|
"hard": 4,
|
|
"expert": 5,
|
|
}
|
|
DEFAULT_TOPIC = "general"
|
|
ABSTRACTISH_SUFFIXES = ("zione", "zioni", "mento", "menti", "ita", "ezza", "anza", "enza", "ismo")
|
|
FILL_ALLOWED_POS = {"NOUN", "VERB", "ADJ", "ADV", "PREP", "CONJ"}
|
|
GENERAL_FILL_MIN_QUALITY = 6
|
|
GENERAL_FILL_MAX_LENGTH = 10
|
|
SOFT_RELATED_FILL_LIMIT = 120
|
|
CONCRETE_TOPICS = {
|
|
"animals",
|
|
"plants",
|
|
"nature",
|
|
"ecology",
|
|
"geography",
|
|
"weather",
|
|
"sea",
|
|
"mountain",
|
|
"health",
|
|
"science",
|
|
"sport",
|
|
"history",
|
|
"school",
|
|
"cinema",
|
|
"literature",
|
|
"food",
|
|
"city",
|
|
"transport",
|
|
"work",
|
|
"home",
|
|
}
|
|
|
|
TOPIC_SEED_REQUIRED_SUBSTRINGS: Dict[str, Tuple[str, ...]] = {
|
|
"transport": (
|
|
"auto", "mot", "tren", "nav", "barc", "port", "pist", "vol", "aer",
|
|
"bici", "cicl", "rimorch", "reattor", "vettur", "ambul", "imbarc",
|
|
"trattor", "carr", "vap", "rota", "ruot",
|
|
),
|
|
"animals": (
|
|
"can", "gatt", "lup", "ors", "pesc", "aquil", "anatr", "cavall",
|
|
"serpent", "tig", "leon", "volp", "cerv", "capr", "pecor",
|
|
),
|
|
"nature": (
|
|
"mar", "lag", "fium", "vent", "bosch", "mont", "collin", "isol",
|
|
"rocc", "terra", "acqu", "fiore", "fogli", "radic", "affluent",
|
|
"litoral", "piogg", "nev", "onda", "clim",
|
|
),
|
|
"cinema": (
|
|
"film", "cin", "teatr", "attor", "scen", "reg", "doppi", "dialog",
|
|
"comic", "div", "docu", "pellic", "spettacol",
|
|
),
|
|
}
|
|
|
|
TOPIC_SEED_BLOCKED_SUBSTRINGS: Dict[str, Tuple[str, ...]] = {
|
|
"transport": (
|
|
"intervist", "intratten", "speriment", "stermin", "investig",
|
|
"intervent", "centometr", "sintetizz", "erot", "adoraz", "esalt",
|
|
"eccit", "traduz", "fluttu", "sollecit",
|
|
),
|
|
"animals": (
|
|
"assicur", "finanz", "coediz", "camerier", "servitor", "indic",
|
|
"estens", "diffus", "difensor", "spessor", "maggior",
|
|
),
|
|
"cinema": (
|
|
"manifest", "riediz", "dissimul", "diffus", "difensor", "estens",
|
|
"malumor", "eversor",
|
|
),
|
|
}
|
|
|
|
ENRICHABLE_STATUSES = {"not_requested", "api_error"}
|
|
|
|
BABELNET_TOPIC_SAFE_PREFIXES: Dict[str, Tuple[str, ...]] = {
|
|
"transport": (
|
|
"ambul",
|
|
"aer",
|
|
"autobus",
|
|
"autocar",
|
|
"automob",
|
|
"autostrad",
|
|
"autoveic",
|
|
"autovett",
|
|
"bicicl",
|
|
"ciclo",
|
|
"imbarc",
|
|
"locom",
|
|
"motoc",
|
|
"motr",
|
|
"navig",
|
|
"rimorch",
|
|
"trattor",
|
|
"tren",
|
|
"veicol",
|
|
"vettur",
|
|
),
|
|
}
|
|
|
|
|
|
def parse_difficulty(value: str) -> int:
|
|
text = str(value).strip().lower()
|
|
if text in DIFFICULTY_ALIASES:
|
|
return DIFFICULTY_ALIASES[text]
|
|
try:
|
|
level = int(text)
|
|
except ValueError as exc:
|
|
raise SystemExit(
|
|
"Valore non valido per --difficulty. Usa easy, medium, hard, expert oppure un intero tra 1 e 5."
|
|
) from exc
|
|
if not 1 <= level <= 5:
|
|
raise SystemExit("Il valore numerico di --difficulty deve essere compreso tra 1 e 5.")
|
|
return level
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Arricchisce incrementalmente il lessico: seleziona parole mancanti, "
|
|
"chiama BabelNet entro un limite e rigenera lexicon_it_enriched.json."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"--api-key",
|
|
default=os.environ.get(BABELNET_ENV_KEY),
|
|
help=f"Chiave API BabelNet. In alternativa imposta la variabile ambiente {BABELNET_ENV_KEY}.",
|
|
)
|
|
parser.add_argument(
|
|
"--topic",
|
|
default=DEFAULT_TOPIC,
|
|
help="Topic per cui scegliere le prossime parole da arricchire.",
|
|
)
|
|
parser.add_argument(
|
|
"--difficulty",
|
|
default="medium",
|
|
help="Difficolta massima: easy, medium, hard, expert oppure 1-5.",
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=50,
|
|
help="Numero massimo di parole da arricchire in questa esecuzione.",
|
|
)
|
|
parser.add_argument(
|
|
"--sleep",
|
|
type=float,
|
|
default=0.2,
|
|
help="Pausa tra richieste API.",
|
|
)
|
|
parser.add_argument(
|
|
"--semantic",
|
|
type=Path,
|
|
default=SEMANTIC_LEXICON_OUTPUT_PATH,
|
|
help="Lessico semantico completo di partenza.",
|
|
)
|
|
parser.add_argument(
|
|
"--babelnet",
|
|
type=Path,
|
|
default=BABELNET_OUTPUT_PATH,
|
|
help="Archivio degli arricchimenti BabelNet parziali.",
|
|
)
|
|
parser.add_argument(
|
|
"--enriched",
|
|
type=Path,
|
|
default=ENRICHED_LEXICON_OUTPUT_PATH,
|
|
help="Lessico arricchito da aggiornare.",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Mostra le parole candidate senza chiamare BabelNet e senza scrivere file.",
|
|
)
|
|
parser.add_argument(
|
|
"--retry-no-match",
|
|
action="store_true",
|
|
help="Riprova anche parole gia marcate come no_match.",
|
|
)
|
|
parser.add_argument(
|
|
"--words",
|
|
nargs="*",
|
|
default=None,
|
|
help="Parole specifiche da arricchire, utile per generare definizioni sul cruciverba finale.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def entry_key(entry: Dict[str, object]) -> Tuple[str, str]:
|
|
form = str(entry.get("normalized_form") or entry.get("form") or "").strip().lower()
|
|
pos = str(entry.get("pos") or "").strip().upper()
|
|
return form, pos
|
|
|
|
|
|
def dedupe(items: Iterable[Dict[str, object]]) -> List[Dict[str, object]]:
|
|
seen = set()
|
|
result = []
|
|
for item in items:
|
|
key = entry_key(item)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def entry_topics(entry: Dict[str, object]) -> Tuple[set[str], set[str]]:
|
|
topics = {str(item).lower() for item in entry.get("topics", []) if item}
|
|
semantic = entry.get("semantic", {})
|
|
semantic_topics = set()
|
|
if isinstance(semantic, dict):
|
|
semantic_topics = {str(item).lower() for item in semantic.get("semantic_topics", []) if item}
|
|
return topics, semantic_topics
|
|
|
|
|
|
def current_babelnet_status(entry: Dict[str, object]) -> str:
|
|
babelnet = entry.get("babelnet", {})
|
|
if isinstance(babelnet, dict):
|
|
return str(babelnet.get("status", "not_requested"))
|
|
return "not_requested"
|
|
|
|
|
|
def matches_topic_roots(word: str, topic: str) -> bool:
|
|
roots = TOPIC_SEED_REQUIRED_SUBSTRINGS.get(topic, ())
|
|
return not roots or any(root in word for root in roots)
|
|
|
|
|
|
def matches_safe_babelnet_roots(word: str, topic: str) -> bool:
|
|
prefixes = BABELNET_TOPIC_SAFE_PREFIXES.get(topic)
|
|
if prefixes is None:
|
|
return False
|
|
return any(word.startswith(prefix) for prefix in prefixes)
|
|
|
|
|
|
def is_blocked_for_topic(word: str, topic: str) -> bool:
|
|
return any(part in word for part in TOPIC_SEED_BLOCKED_SUBSTRINGS.get(topic, ()))
|
|
|
|
|
|
def topic_score(entry: Dict[str, object], topic: str) -> int:
|
|
if topic == DEFAULT_TOPIC:
|
|
return 20
|
|
|
|
word = str(entry.get("form", "")).lower()
|
|
topics, semantic_topics = entry_topics(entry)
|
|
score = 0
|
|
if topic in topics:
|
|
score += 100
|
|
if topic in semantic_topics:
|
|
score += 45
|
|
if matches_topic_roots(word, topic):
|
|
score += 35
|
|
if DEFAULT_TOPIC in topics:
|
|
score += 5
|
|
if is_blocked_for_topic(word, topic):
|
|
score -= 100
|
|
if topic in CONCRETE_TOPICS and word.endswith(ABSTRACTISH_SUFFIXES):
|
|
score -= 30
|
|
return score
|
|
|
|
|
|
def candidate_score(entry: Dict[str, object], topic: str) -> Tuple[int, int, int, int, int, str]:
|
|
word = str(entry.get("form", ""))
|
|
pos = str(entry.get("pos", ""))
|
|
pos_bonus = {
|
|
"NOUN": 12,
|
|
"VERB": 8,
|
|
"ADJ": 6,
|
|
"ADV": 4,
|
|
}.get(pos, 0)
|
|
semantic = entry.get("semantic", {})
|
|
semantic_bonus = 3 if isinstance(semantic, dict) and semantic.get("matched") else 0
|
|
length_bonus = 4 if 4 <= len(word) <= 10 else 1 if len(word) <= 14 else -3
|
|
return (
|
|
topic_score(entry, topic),
|
|
int(entry.get("quality_score", 0)),
|
|
pos_bonus,
|
|
semantic_bonus,
|
|
length_bonus,
|
|
word,
|
|
)
|
|
|
|
|
|
def eligible_for_babelnet(entry: Dict[str, object], topic: str, difficulty_level: int, retry_no_match: bool) -> bool:
|
|
word = str(entry.get("form", "")).lower()
|
|
pos = str(entry.get("pos", ""))
|
|
topics, semantic_topics = entry_topics(entry)
|
|
status = current_babelnet_status(entry)
|
|
allowed_statuses = set(ENRICHABLE_STATUSES)
|
|
if retry_no_match:
|
|
allowed_statuses.add("no_match")
|
|
|
|
if status not in allowed_statuses:
|
|
return False
|
|
if not word.isalpha() or len(word) < 3 or len(word) > 16:
|
|
return False
|
|
if pos not in POS_TO_BABELNET or pos not in FILL_ALLOWED_POS:
|
|
return False
|
|
if int(entry.get("difficulty_word", 5)) > difficulty_level:
|
|
return False
|
|
if not entry.get("allowed_in_crossword", False):
|
|
return False
|
|
if topic != DEFAULT_TOPIC:
|
|
if topic in CONCRETE_TOPICS and word.endswith(ABSTRACTISH_SUFFIXES):
|
|
return False
|
|
conservative_match = topic in topics
|
|
safe_root_match = matches_safe_babelnet_roots(word, topic)
|
|
semantic_only_match = topic in semantic_topics and topic not in CONCRETE_TOPICS
|
|
if not (conservative_match or safe_root_match or semantic_only_match):
|
|
return False
|
|
return True
|
|
|
|
|
|
def select_candidates(payload: Dict[str, object], topic: str, difficulty_level: int, limit: int, retry_no_match: bool) -> List[Dict[str, object]]:
|
|
entries = [
|
|
entry
|
|
for entry in payload.get("entries", []) or []
|
|
if isinstance(entry, dict) and eligible_for_babelnet(entry, topic, difficulty_level, retry_no_match)
|
|
]
|
|
|
|
if topic != DEFAULT_TOPIC:
|
|
strong = [entry for entry in entries if topic in entry_topics(entry)[0]]
|
|
soft = [
|
|
entry
|
|
for entry in entries
|
|
if entry not in strong
|
|
and int(entry.get("quality_score", 0)) >= GENERAL_FILL_MIN_QUALITY
|
|
and len(str(entry.get("form", ""))) <= GENERAL_FILL_MAX_LENGTH
|
|
]
|
|
support = [
|
|
entry
|
|
for entry in entries
|
|
if entry not in strong
|
|
and entry not in soft
|
|
and int(entry.get("quality_score", 0)) >= GENERAL_FILL_MIN_QUALITY
|
|
and not str(entry.get("form", "")).endswith(ABSTRACTISH_SUFFIXES)
|
|
]
|
|
entries = strong + sorted(soft, key=lambda item: candidate_score(item, topic), reverse=True)[:SOFT_RELATED_FILL_LIMIT]
|
|
entries += sorted(support, key=lambda item: candidate_score(item, topic), reverse=True)
|
|
|
|
entries = dedupe(entries)
|
|
entries.sort(key=lambda item: candidate_score(item, topic), reverse=True)
|
|
return entries[:limit]
|
|
|
|
|
|
def select_word_candidates(
|
|
payload: Dict[str, object],
|
|
words: Iterable[str],
|
|
limit: int,
|
|
retry_no_match: bool,
|
|
) -> List[Dict[str, object]]:
|
|
requested = []
|
|
seen_words = set()
|
|
for word in words:
|
|
normalized = str(word).strip().lower()
|
|
if normalized and normalized not in seen_words:
|
|
requested.append(normalized)
|
|
seen_words.add(normalized)
|
|
|
|
by_word = {
|
|
str(entry.get("form", "")).lower(): entry
|
|
for entry in payload.get("entries", []) or []
|
|
if isinstance(entry, dict)
|
|
}
|
|
selected = []
|
|
allowed_statuses = set(ENRICHABLE_STATUSES)
|
|
if retry_no_match:
|
|
allowed_statuses.add("no_match")
|
|
|
|
for word in requested:
|
|
entry = by_word.get(word)
|
|
if not entry:
|
|
continue
|
|
status = current_babelnet_status(entry)
|
|
if status not in allowed_statuses:
|
|
continue
|
|
if str(entry.get("pos", "")) not in POS_TO_BABELNET:
|
|
continue
|
|
if not str(entry.get("form", "")).isalpha():
|
|
continue
|
|
selected.append(entry)
|
|
if len(selected) >= limit:
|
|
break
|
|
|
|
return selected
|
|
|
|
|
|
def load_source_payload(enriched_path: Path, semantic_path: Path) -> Dict[str, object]:
|
|
if enriched_path.exists():
|
|
payload = load_json(enriched_path, {})
|
|
if isinstance(payload, dict) and "entries" in payload:
|
|
return payload
|
|
payload = load_json(semantic_path, {})
|
|
if isinstance(payload, dict) and "entries" in payload:
|
|
return payload
|
|
raise ValueError(f"Nessun lessico valido trovato: {enriched_path} / {semantic_path}")
|
|
|
|
|
|
def merge_babelnet_entries(existing_payload: Dict[str, object], new_entries: List[Dict[str, object]], topic: str, difficulty: str) -> Dict[str, object]:
|
|
existing_entries = [
|
|
entry for entry in existing_payload.get("entries", []) or [] if isinstance(entry, dict)
|
|
]
|
|
index = {entry_key(entry): deepcopy(entry) for entry in existing_entries}
|
|
generated_at = datetime.now().astimezone().isoformat(timespec="seconds")
|
|
|
|
for entry in new_entries:
|
|
updated = deepcopy(entry)
|
|
updated["babelnet_generated_at"] = generated_at
|
|
index[entry_key(updated)] = updated
|
|
|
|
entries = sorted(index.values(), key=lambda item: (str(item.get("form", "")), str(item.get("pos", ""))))
|
|
meta = dict(existing_payload.get("meta", {})) if isinstance(existing_payload.get("meta", {}), dict) else {}
|
|
meta.update(
|
|
{
|
|
"language": meta.get("language", "it"),
|
|
"version": max(1, int(meta.get("version", 1))),
|
|
"source": "BabelNet API",
|
|
"updated_at": generated_at,
|
|
"last_topic": topic,
|
|
"last_difficulty": difficulty,
|
|
"entry_count": len(entries),
|
|
}
|
|
)
|
|
return {"meta": meta, "entries": entries}
|
|
|
|
|
|
def rebuild_enriched(semantic_path: Path, babelnet_path: Path, enriched_path: Path, topic: str) -> Dict[str, object]:
|
|
namespace = SimpleNamespace(
|
|
semantic=semantic_path,
|
|
babelnet=babelnet_path,
|
|
output=enriched_path,
|
|
topic=topic,
|
|
)
|
|
payload = build_enriched_lexicon(namespace)
|
|
write_enriched_json(enriched_path, payload)
|
|
return payload
|
|
|
|
|
|
def run_incremental_enrichment(args: argparse.Namespace) -> Dict[str, object]:
|
|
normalized_topic = args.topic.strip().lower()
|
|
difficulty_level = parse_difficulty(str(args.difficulty))
|
|
source_payload = load_source_payload(args.enriched, args.semantic)
|
|
target_words = getattr(args, "words", None)
|
|
if target_words:
|
|
candidates = select_word_candidates(
|
|
source_payload,
|
|
target_words,
|
|
max(0, args.limit),
|
|
args.retry_no_match,
|
|
)
|
|
else:
|
|
candidates = select_candidates(
|
|
source_payload,
|
|
normalized_topic,
|
|
difficulty_level,
|
|
max(0, args.limit),
|
|
args.retry_no_match,
|
|
)
|
|
|
|
if args.dry_run:
|
|
return {
|
|
"mode": "dry-run",
|
|
"topic": normalized_topic,
|
|
"difficulty": args.difficulty,
|
|
"selected_count": len(candidates),
|
|
"selected_words": [entry.get("form") for entry in candidates],
|
|
}
|
|
|
|
if not args.api_key:
|
|
raise SystemExit(
|
|
f"Chiave BabelNet mancante. Imposta {BABELNET_ENV_KEY} oppure usa --api-key <chiave>."
|
|
)
|
|
|
|
cache = load_json(BABELNET_CACHE_PATH, {})
|
|
if not isinstance(cache, dict):
|
|
cache = {}
|
|
babelnet_payload = load_json(args.babelnet, {"entries": []})
|
|
if not isinstance(babelnet_payload, dict):
|
|
babelnet_payload = {"entries": []}
|
|
|
|
enriched_candidates = []
|
|
word_logs = []
|
|
for index, entry in enumerate(candidates, start=1):
|
|
updated = deepcopy(entry)
|
|
updated.pop("babelnet", None)
|
|
stats = {"api_calls": 0, "cache_hits": 0, "responses": 0}
|
|
updated["babelnet"] = enrich_entry(updated, args.api_key, cache, args.sleep, stats)
|
|
enriched_candidates.append(updated)
|
|
write_json(BABELNET_CACHE_PATH, cache)
|
|
word_logs.append(
|
|
{
|
|
"word": updated["form"],
|
|
"api_calls": stats["api_calls"],
|
|
"cache_hits": stats["cache_hits"],
|
|
"responses": stats["responses"],
|
|
"matched": bool(updated["babelnet"].get("matched")),
|
|
"synsets": len(updated["babelnet"].get("synsets", []) or []),
|
|
"reason": updated["babelnet"].get("reason"),
|
|
}
|
|
)
|
|
print(
|
|
f"[{index}/{len(candidates)}] {updated['form']}: "
|
|
f"api_calls={stats['api_calls']} cache_hits={stats['cache_hits']} "
|
|
f"risposta={stats['responses'] > 0} match={updated['babelnet'].get('matched')}"
|
|
)
|
|
|
|
merged_babelnet = merge_babelnet_entries(
|
|
babelnet_payload,
|
|
enriched_candidates,
|
|
normalized_topic,
|
|
str(args.difficulty),
|
|
)
|
|
write_json(args.babelnet, merged_babelnet)
|
|
enriched_payload = rebuild_enriched(args.semantic, args.babelnet, args.enriched, normalized_topic)
|
|
|
|
return {
|
|
"mode": "enriched",
|
|
"topic": normalized_topic,
|
|
"difficulty": args.difficulty,
|
|
"selected_count": len(candidates),
|
|
"matched_count": sum(1 for entry in enriched_candidates if entry.get("babelnet", {}).get("matched")),
|
|
"api_call_count": sum(item["api_calls"] for item in word_logs),
|
|
"cache_hit_count": sum(item["cache_hits"] for item in word_logs),
|
|
"word_logs": word_logs,
|
|
"babelnet_entry_count": merged_babelnet["meta"]["entry_count"],
|
|
"enriched_status_counts": enriched_payload["meta"]["babelnet_status_counts"],
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
result = run_incremental_enrichment(args)
|
|
if result["mode"] == "dry-run":
|
|
print("Dry-run BabelNet incrementale")
|
|
print(f"Topic: {result['topic']}")
|
|
print(f"Difficolta: {result['difficulty']}")
|
|
print(f"Parole selezionate: {result['selected_count']}")
|
|
for index, word in enumerate(result["selected_words"], start=1):
|
|
print(f"{index:2d}. {word}")
|
|
return
|
|
|
|
print("Arricchimento BabelNet completato")
|
|
print(f"Topic: {result['topic']}")
|
|
print(f"Parole interrogate: {result['selected_count']}")
|
|
print(f"Chiamate API BabelNet reali: {result['api_call_count']}")
|
|
print(f"Risposte da cache: {result['cache_hit_count']}")
|
|
print(f"Match BabelNet: {result['matched_count']}")
|
|
for item in result["word_logs"]:
|
|
print(
|
|
f"- {item['word']}: api_calls={item['api_calls']}, "
|
|
f"cache_hits={item['cache_hits']}, risposta={item['responses'] > 0}, "
|
|
f"match={item['matched']}, synsets={item['synsets']}"
|
|
)
|
|
print(f"Voci BabelNet archiviate: {result['babelnet_entry_count']}")
|
|
print(f"Stati lessico arricchito: {result['enriched_status_counts']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|