from __future__ import annotations import argparse import os from copy import deepcopy from datetime import datetime from pathlib import Path from types import SimpleNamespace from typing import Dict, Iterable, List, Optional, Tuple from build_babelnet_enrichment import ( BABELNET_CACHE_PATH, BABELNET_ENV_KEY, BABELNET_OUTPUT_PATH, POS_TO_BABELNET, enrich_entry, load_json, write_json, ) from build_enriched_lexicon import ( ENRICHED_LEXICON_OUTPUT_PATH, build_enriched_lexicon, write_json as write_enriched_json, ) from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH DIFFICULTY_ALIASES: Dict[str, int] = { "easy": 1, "medium": 2, "hard": 4, "expert": 5, } DEFAULT_TOPIC = "general" ABSTRACTISH_SUFFIXES = ("zione", "zioni", "mento", "menti", "ita", "ezza", "anza", "enza", "ismo") FILL_ALLOWED_POS = {"NOUN", "VERB", "ADJ", "ADV", "PREP", "CONJ"} GENERAL_FILL_MIN_QUALITY = 6 GENERAL_FILL_MAX_LENGTH = 10 SOFT_RELATED_FILL_LIMIT = 120 CONCRETE_TOPICS = { "animals", "plants", "nature", "ecology", "geography", "weather", "sea", "mountain", "health", "science", "sport", "history", "school", "cinema", "literature", "food", "city", "transport", "work", "home", } TOPIC_SEED_REQUIRED_SUBSTRINGS: Dict[str, Tuple[str, ...]] = { "transport": ( "auto", "mot", "tren", "nav", "barc", "port", "pist", "vol", "aer", "bici", "cicl", "rimorch", "reattor", "vettur", "ambul", "imbarc", "trattor", "carr", "vap", "rota", "ruot", ), "animals": ( "can", "gatt", "lup", "ors", "pesc", "aquil", "anatr", "cavall", "serpent", "tig", "leon", "volp", "cerv", "capr", "pecor", ), "nature": ( "mar", "lag", "fium", "vent", "bosch", "mont", "collin", "isol", "rocc", "terra", "acqu", "fiore", "fogli", "radic", "affluent", "litoral", "piogg", "nev", "onda", "clim", ), "cinema": ( "film", "cin", "teatr", "attor", "scen", "reg", "doppi", "dialog", "comic", "div", "docu", "pellic", "spettacol", ), } TOPIC_SEED_BLOCKED_SUBSTRINGS: Dict[str, Tuple[str, ...]] = { "transport": ( "intervist", "intratten", "speriment", "stermin", "investig", "intervent", "centometr", "sintetizz", "erot", "adoraz", "esalt", "eccit", "traduz", "fluttu", "sollecit", ), "animals": ( "assicur", "finanz", "coediz", "camerier", "servitor", "indic", "estens", "diffus", "difensor", "spessor", "maggior", ), "cinema": ( "manifest", "riediz", "dissimul", "diffus", "difensor", "estens", "malumor", "eversor", ), } ENRICHABLE_STATUSES = {"not_requested", "api_error"} BABELNET_TOPIC_SAFE_PREFIXES: Dict[str, Tuple[str, ...]] = { "transport": ( "ambul", "aer", "autobus", "autocar", "automob", "autostrad", "autoveic", "autovett", "bicicl", "ciclo", "imbarc", "locom", "motoc", "motr", "navig", "rimorch", "trattor", "tren", "veicol", "vettur", ), } def parse_difficulty(value: str) -> int: text = str(value).strip().lower() if text in DIFFICULTY_ALIASES: return DIFFICULTY_ALIASES[text] try: level = int(text) except ValueError as exc: raise SystemExit( "Valore non valido per --difficulty. Usa easy, medium, hard, expert oppure un intero tra 1 e 5." ) from exc if not 1 <= level <= 5: raise SystemExit("Il valore numerico di --difficulty deve essere compreso tra 1 e 5.") return level def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Arricchisce incrementalmente il lessico: seleziona parole mancanti, " "chiama BabelNet entro un limite e rigenera lexicon_it_enriched.json." ) ) parser.add_argument( "--api-key", default=os.environ.get(BABELNET_ENV_KEY), help=f"Chiave API BabelNet. In alternativa imposta la variabile ambiente {BABELNET_ENV_KEY}.", ) parser.add_argument( "--topic", default=DEFAULT_TOPIC, help="Topic per cui scegliere le prossime parole da arricchire.", ) parser.add_argument( "--difficulty", default="medium", help="Difficolta massima: easy, medium, hard, expert oppure 1-5.", ) parser.add_argument( "--limit", type=int, default=50, help="Numero massimo di parole da arricchire in questa esecuzione.", ) parser.add_argument( "--sleep", type=float, default=0.2, help="Pausa tra richieste API.", ) parser.add_argument( "--semantic", type=Path, default=SEMANTIC_LEXICON_OUTPUT_PATH, help="Lessico semantico completo di partenza.", ) parser.add_argument( "--babelnet", type=Path, default=BABELNET_OUTPUT_PATH, help="Archivio degli arricchimenti BabelNet parziali.", ) parser.add_argument( "--enriched", type=Path, default=ENRICHED_LEXICON_OUTPUT_PATH, help="Lessico arricchito da aggiornare.", ) parser.add_argument( "--dry-run", action="store_true", help="Mostra le parole candidate senza chiamare BabelNet e senza scrivere file.", ) parser.add_argument( "--retry-no-match", action="store_true", help="Riprova anche parole gia marcate come no_match.", ) parser.add_argument( "--words", nargs="*", default=None, help="Parole specifiche da arricchire, utile per generare definizioni sul cruciverba finale.", ) return parser.parse_args() def entry_key(entry: Dict[str, object]) -> Tuple[str, str]: form = str(entry.get("normalized_form") or entry.get("form") or "").strip().lower() pos = str(entry.get("pos") or "").strip().upper() return form, pos def dedupe(items: Iterable[Dict[str, object]]) -> List[Dict[str, object]]: seen = set() result = [] for item in items: key = entry_key(item) if key in seen: continue seen.add(key) result.append(item) return result def entry_topics(entry: Dict[str, object]) -> Tuple[set[str], set[str]]: topics = {str(item).lower() for item in entry.get("topics", []) if item} semantic = entry.get("semantic", {}) semantic_topics = set() if isinstance(semantic, dict): semantic_topics = {str(item).lower() for item in semantic.get("semantic_topics", []) if item} return topics, semantic_topics def current_babelnet_status(entry: Dict[str, object]) -> str: babelnet = entry.get("babelnet", {}) if isinstance(babelnet, dict): return str(babelnet.get("status", "not_requested")) return "not_requested" def matches_topic_roots(word: str, topic: str) -> bool: roots = TOPIC_SEED_REQUIRED_SUBSTRINGS.get(topic, ()) return not roots or any(root in word for root in roots) def matches_safe_babelnet_roots(word: str, topic: str) -> bool: prefixes = BABELNET_TOPIC_SAFE_PREFIXES.get(topic) if prefixes is None: return False return any(word.startswith(prefix) for prefix in prefixes) def is_blocked_for_topic(word: str, topic: str) -> bool: return any(part in word for part in TOPIC_SEED_BLOCKED_SUBSTRINGS.get(topic, ())) def topic_score(entry: Dict[str, object], topic: str) -> int: if topic == DEFAULT_TOPIC: return 20 word = str(entry.get("form", "")).lower() topics, semantic_topics = entry_topics(entry) score = 0 if topic in topics: score += 100 if topic in semantic_topics: score += 45 if matches_topic_roots(word, topic): score += 35 if DEFAULT_TOPIC in topics: score += 5 if is_blocked_for_topic(word, topic): score -= 100 if topic in CONCRETE_TOPICS and word.endswith(ABSTRACTISH_SUFFIXES): score -= 30 return score def candidate_score(entry: Dict[str, object], topic: str) -> Tuple[int, int, int, int, int, str]: word = str(entry.get("form", "")) pos = str(entry.get("pos", "")) pos_bonus = { "NOUN": 12, "VERB": 8, "ADJ": 6, "ADV": 4, }.get(pos, 0) semantic = entry.get("semantic", {}) semantic_bonus = 3 if isinstance(semantic, dict) and semantic.get("matched") else 0 length_bonus = 4 if 4 <= len(word) <= 10 else 1 if len(word) <= 14 else -3 return ( topic_score(entry, topic), int(entry.get("quality_score", 0)), pos_bonus, semantic_bonus, length_bonus, word, ) def eligible_for_babelnet(entry: Dict[str, object], topic: str, difficulty_level: int, retry_no_match: bool) -> bool: word = str(entry.get("form", "")).lower() pos = str(entry.get("pos", "")) topics, semantic_topics = entry_topics(entry) status = current_babelnet_status(entry) allowed_statuses = set(ENRICHABLE_STATUSES) if retry_no_match: allowed_statuses.add("no_match") if status not in allowed_statuses: return False if not word.isalpha() or len(word) < 3 or len(word) > 16: return False if pos not in POS_TO_BABELNET or pos not in FILL_ALLOWED_POS: return False if int(entry.get("difficulty_word", 5)) > difficulty_level: return False if not entry.get("allowed_in_crossword", False): return False if topic != DEFAULT_TOPIC: if topic in CONCRETE_TOPICS and word.endswith(ABSTRACTISH_SUFFIXES): return False conservative_match = topic in topics safe_root_match = matches_safe_babelnet_roots(word, topic) semantic_only_match = topic in semantic_topics and topic not in CONCRETE_TOPICS if not (conservative_match or safe_root_match or semantic_only_match): return False return True def select_candidates(payload: Dict[str, object], topic: str, difficulty_level: int, limit: int, retry_no_match: bool) -> List[Dict[str, object]]: entries = [ entry for entry in payload.get("entries", []) or [] if isinstance(entry, dict) and eligible_for_babelnet(entry, topic, difficulty_level, retry_no_match) ] if topic != DEFAULT_TOPIC: strong = [entry for entry in entries if topic in entry_topics(entry)[0]] soft = [ entry for entry in entries if entry not in strong and int(entry.get("quality_score", 0)) >= GENERAL_FILL_MIN_QUALITY and len(str(entry.get("form", ""))) <= GENERAL_FILL_MAX_LENGTH ] support = [ entry for entry in entries if entry not in strong and entry not in soft and int(entry.get("quality_score", 0)) >= GENERAL_FILL_MIN_QUALITY and not str(entry.get("form", "")).endswith(ABSTRACTISH_SUFFIXES) ] entries = strong + sorted(soft, key=lambda item: candidate_score(item, topic), reverse=True)[:SOFT_RELATED_FILL_LIMIT] entries += sorted(support, key=lambda item: candidate_score(item, topic), reverse=True) entries = dedupe(entries) entries.sort(key=lambda item: candidate_score(item, topic), reverse=True) return entries[:limit] def select_word_candidates( payload: Dict[str, object], words: Iterable[str], limit: int, retry_no_match: bool, ) -> List[Dict[str, object]]: requested = [] seen_words = set() for word in words: normalized = str(word).strip().lower() if normalized and normalized not in seen_words: requested.append(normalized) seen_words.add(normalized) by_word = { str(entry.get("form", "")).lower(): entry for entry in payload.get("entries", []) or [] if isinstance(entry, dict) } selected = [] allowed_statuses = set(ENRICHABLE_STATUSES) if retry_no_match: allowed_statuses.add("no_match") for word in requested: entry = by_word.get(word) if not entry: continue status = current_babelnet_status(entry) if status not in allowed_statuses: continue if str(entry.get("pos", "")) not in POS_TO_BABELNET: continue if not str(entry.get("form", "")).isalpha(): continue selected.append(entry) if len(selected) >= limit: break return selected def load_source_payload(enriched_path: Path, semantic_path: Path) -> Dict[str, object]: if enriched_path.exists(): payload = load_json(enriched_path, {}) if isinstance(payload, dict) and "entries" in payload: return payload payload = load_json(semantic_path, {}) if isinstance(payload, dict) and "entries" in payload: return payload raise ValueError(f"Nessun lessico valido trovato: {enriched_path} / {semantic_path}") def merge_babelnet_entries(existing_payload: Dict[str, object], new_entries: List[Dict[str, object]], topic: str, difficulty: str) -> Dict[str, object]: existing_entries = [ entry for entry in existing_payload.get("entries", []) or [] if isinstance(entry, dict) ] index = {entry_key(entry): deepcopy(entry) for entry in existing_entries} generated_at = datetime.now().astimezone().isoformat(timespec="seconds") for entry in new_entries: updated = deepcopy(entry) updated["babelnet_generated_at"] = generated_at index[entry_key(updated)] = updated entries = sorted(index.values(), key=lambda item: (str(item.get("form", "")), str(item.get("pos", "")))) meta = dict(existing_payload.get("meta", {})) if isinstance(existing_payload.get("meta", {}), dict) else {} meta.update( { "language": meta.get("language", "it"), "version": max(1, int(meta.get("version", 1))), "source": "BabelNet API", "updated_at": generated_at, "last_topic": topic, "last_difficulty": difficulty, "entry_count": len(entries), } ) return {"meta": meta, "entries": entries} def rebuild_enriched(semantic_path: Path, babelnet_path: Path, enriched_path: Path, topic: str) -> Dict[str, object]: namespace = SimpleNamespace( semantic=semantic_path, babelnet=babelnet_path, output=enriched_path, topic=topic, ) payload = build_enriched_lexicon(namespace) write_enriched_json(enriched_path, payload) return payload def run_incremental_enrichment(args: argparse.Namespace) -> Dict[str, object]: normalized_topic = args.topic.strip().lower() difficulty_level = parse_difficulty(str(args.difficulty)) source_payload = load_source_payload(args.enriched, args.semantic) target_words = getattr(args, "words", None) if target_words: candidates = select_word_candidates( source_payload, target_words, max(0, args.limit), args.retry_no_match, ) else: candidates = select_candidates( source_payload, normalized_topic, difficulty_level, max(0, args.limit), args.retry_no_match, ) if args.dry_run: return { "mode": "dry-run", "topic": normalized_topic, "difficulty": args.difficulty, "selected_count": len(candidates), "selected_words": [entry.get("form") for entry in candidates], } if not args.api_key: raise SystemExit( f"Chiave BabelNet mancante. Imposta {BABELNET_ENV_KEY} oppure usa --api-key ." ) cache = load_json(BABELNET_CACHE_PATH, {}) if not isinstance(cache, dict): cache = {} babelnet_payload = load_json(args.babelnet, {"entries": []}) if not isinstance(babelnet_payload, dict): babelnet_payload = {"entries": []} enriched_candidates = [] word_logs = [] for index, entry in enumerate(candidates, start=1): updated = deepcopy(entry) updated.pop("babelnet", None) stats = {"api_calls": 0, "cache_hits": 0, "responses": 0} updated["babelnet"] = enrich_entry(updated, args.api_key, cache, args.sleep, stats) enriched_candidates.append(updated) write_json(BABELNET_CACHE_PATH, cache) word_logs.append( { "word": updated["form"], "api_calls": stats["api_calls"], "cache_hits": stats["cache_hits"], "responses": stats["responses"], "matched": bool(updated["babelnet"].get("matched")), "synsets": len(updated["babelnet"].get("synsets", []) or []), "reason": updated["babelnet"].get("reason"), } ) print( f"[{index}/{len(candidates)}] {updated['form']}: " f"api_calls={stats['api_calls']} cache_hits={stats['cache_hits']} " f"risposta={stats['responses'] > 0} match={updated['babelnet'].get('matched')}" ) merged_babelnet = merge_babelnet_entries( babelnet_payload, enriched_candidates, normalized_topic, str(args.difficulty), ) write_json(args.babelnet, merged_babelnet) enriched_payload = rebuild_enriched(args.semantic, args.babelnet, args.enriched, normalized_topic) return { "mode": "enriched", "topic": normalized_topic, "difficulty": args.difficulty, "selected_count": len(candidates), "matched_count": sum(1 for entry in enriched_candidates if entry.get("babelnet", {}).get("matched")), "api_call_count": sum(item["api_calls"] for item in word_logs), "cache_hit_count": sum(item["cache_hits"] for item in word_logs), "word_logs": word_logs, "babelnet_entry_count": merged_babelnet["meta"]["entry_count"], "enriched_status_counts": enriched_payload["meta"]["babelnet_status_counts"], } def main() -> None: args = parse_args() result = run_incremental_enrichment(args) if result["mode"] == "dry-run": print("Dry-run BabelNet incrementale") print(f"Topic: {result['topic']}") print(f"Difficolta: {result['difficulty']}") print(f"Parole selezionate: {result['selected_count']}") for index, word in enumerate(result["selected_words"], start=1): print(f"{index:2d}. {word}") return print("Arricchimento BabelNet completato") print(f"Topic: {result['topic']}") print(f"Parole interrogate: {result['selected_count']}") print(f"Chiamate API BabelNet reali: {result['api_call_count']}") print(f"Risposte da cache: {result['cache_hit_count']}") print(f"Match BabelNet: {result['matched_count']}") for item in result["word_logs"]: print( f"- {item['word']}: api_calls={item['api_calls']}, " f"cache_hits={item['cache_hits']}, risposta={item['responses'] > 0}, " f"match={item['matched']}, synsets={item['synsets']}" ) print(f"Voci BabelNet archiviate: {result['babelnet_entry_count']}") print(f"Stati lessico arricchito: {result['enriched_status_counts']}") if __name__ == "__main__": main()