from __future__ import annotations import argparse import json import os import time import urllib.error import urllib.parse import urllib.request from datetime import datetime from pathlib import Path from typing import Dict, Iterable, List, Optional from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH BABELNET_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_babelnet.json") BABELNET_CACHE_PATH = Path(__file__).with_name(".babelnet_cache.json") BABELNET_LOCAL_KEY_PATH = Path(__file__).with_name(".babelnet_api_key.local") BABELNET_API_BASE = "https://babelnet.io/v9" BABELNET_ENV_KEY = "BABELNET_API_KEY" POS_TO_BABELNET = { "NOUN": "NOUN", "VERB": "VERB", "ADJ": "ADJECTIVE", "ADV": "ADVERB", } class BabelNetApiCallLimitReached(RuntimeError): pass class BabelNetKeyUnavailable(RuntimeError): pass DIFFICULTY_ALIASES: Dict[str, int] = { "easy": 1, "medium": 2, "hard": 4, "expert": 5, } def parse_difficulty(value: str) -> int: text = str(value).strip().lower() if text in DIFFICULTY_ALIASES: return DIFFICULTY_ALIASES[text] try: level = int(text) except ValueError as exc: raise SystemExit( "Valore non valido per --difficulty. Usa easy, medium, hard, expert oppure un intero tra 1 e 5." ) from exc if not 1 <= level <= 5: raise SystemExit("Il valore numerico di --difficulty deve essere compreso tra 1 e 5.") return level def _split_api_keys(text: str) -> List[str]: keys = [] seen = set() normalized = text.replace(";", "\n").replace(",", "\n") for line in normalized.splitlines(): key = line.strip() if not key or key.startswith("#") or key in seen: continue keys.append(key) seen.add(key) return keys def load_babelnet_api_keys() -> List[str]: env_key = os.environ.get(BABELNET_ENV_KEY) if env_key: return _split_api_keys(env_key) if BABELNET_LOCAL_KEY_PATH.exists(): return _split_api_keys(BABELNET_LOCAL_KEY_PATH.read_text(encoding="utf-8")) return [] def load_babelnet_api_key() -> Optional[str]: keys = load_babelnet_api_keys() if keys: return keys[0] return None def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Arricchisce lexicon_it_semantic.json usando BabelNet, se disponibile una API key." ) parser.add_argument( "--api-key", default=load_babelnet_api_key(), help=( f"Chiave API BabelNet. In alternativa imposta {BABELNET_ENV_KEY} " f"o crea {BABELNET_LOCAL_KEY_PATH.name}." ), ) parser.add_argument( "--topic", default=None, help="Topic opzionale da usare per limitare le voci da arricchire.", ) parser.add_argument( "--difficulty", default="medium", help="Difficolta massima delle voci da arricchire: easy, medium, hard, expert oppure 1-5.", ) parser.add_argument( "--limit", type=int, default=100, help="Numero massimo di lemmi da interrogare in questa esecuzione.", ) parser.add_argument( "--sleep", type=float, default=0.2, help="Pausa tra richieste API, utile per non stressare il servizio.", ) parser.add_argument( "--output", type=Path, default=BABELNET_OUTPUT_PATH, help="File JSON di output.", ) return parser.parse_args() def load_json(path: Path, default: object) -> object: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) def write_json(path: Path, payload: object) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def cache_key(endpoint: str, params: Dict[str, str]) -> str: safe_params = {key: value for key, value in params.items() if key != "key"} return f"{endpoint}?{urllib.parse.urlencode(sorted(safe_params.items()))}" def request_json( endpoint: str, params: Dict[str, str], cache: Dict[str, object], stats: Optional[Dict[str, int]] = None, ) -> object: url = f"{BABELNET_API_BASE}/{endpoint}?{urllib.parse.urlencode(params)}" key = cache_key(endpoint, params) if key in cache: if stats is not None: stats["cache_hits"] = stats.get("cache_hits", 0) + 1 return cache[key] if stats is not None: limit = stats.get("api_call_limit") current = stats.get("api_calls", 0) if limit is not None and current >= limit: raise BabelNetApiCallLimitReached("Limite chiamate API BabelNet raggiunto") request = urllib.request.Request(url, headers={"Accept": "application/json"}) try: with urllib.request.urlopen(request, timeout=30) as response: payload = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") if exc.code == 403: raise BabelNetKeyUnavailable(f"Chiave BabelNet non valida o limite giornaliero raggiunto: {detail}") from exc raise RuntimeError(f"Errore BabelNet HTTP {exc.code}: {detail}") from exc cache[key] = payload if stats is not None: stats["api_calls"] = stats.get("api_calls", 0) + 1 stats["responses"] = stats.get("responses", 0) + 1 return payload def entry_topics(entry: Dict[str, object]) -> set[str]: return {str(item).lower() for item in entry.get("topics", [])} def select_entries(payload: Dict[str, object], topic: Optional[str], difficulty_level: int, limit: int) -> List[Dict[str, object]]: selected = [] normalized_topic = topic.strip().lower() if topic else None for entry in payload.get("entries", []): word = str(entry.get("form", "")) if not word or not word.isalpha(): continue if len(word) < 3 or len(word) > 16: continue if int(entry.get("difficulty_word", 5)) > difficulty_level: continue if str(entry.get("pos", "")) not in POS_TO_BABELNET: continue if normalized_topic and normalized_topic not in entry_topics(entry): continue selected.append(entry) if len(selected) >= limit: break return selected def compact_synset_id(payload: Dict[str, object]) -> Dict[str, object]: return { "id": payload.get("id"), "pos": payload.get("pos"), "source": payload.get("source"), } def extract_glosses(payload: Dict[str, object]) -> List[str]: glosses = [] for item in payload.get("glosses", []) or []: language = str(item.get("language", "")).upper() gloss = str(item.get("gloss", "")).strip() if gloss and language in {"IT", "ITA", ""}: glosses.append(gloss) return dedupe(glosses)[:5] def extract_senses(payload: Dict[str, object]) -> List[str]: senses = [] for item in payload.get("senses", []) or []: language = str(item.get("language", "")).upper() lemma = str(item.get("properties", {}).get("simpleLemma") or item.get("fullLemma") or "").strip() if lemma and language in {"IT", "ITA", ""}: senses.append(lemma.replace("_", " ")) return dedupe(senses)[:20] def extract_categories(payload: Dict[str, object]) -> List[str]: categories = [] for item in payload.get("categories", []) or []: category = str(item.get("category", "")).strip() if category: categories.append(category) return dedupe(categories)[:20] def extract_domains(payload: Dict[str, object]) -> List[str]: domains = payload.get("domains", []) if isinstance(domains, dict): return sorted(str(key) for key, value in domains.items() if value) if isinstance(domains, list): return dedupe(str(item) for item in domains if item)[:20] return [] def dedupe(items: Iterable[str]) -> List[str]: seen = set() result = [] for item in items: text = str(item).strip() if not text or text in seen: continue seen.add(text) result.append(text) return result def enrich_entry( entry: Dict[str, object], api_key: str, cache: Dict[str, object], sleep_seconds: float, stats: Optional[Dict[str, int]] = None, ) -> Dict[str, object]: word = str(entry.get("form", "")) pos = POS_TO_BABELNET.get(str(entry.get("pos", ""))) if not pos: return {"matched": False, "reason": "unsupported_pos", "synsets": []} synset_ids = request_json( "getSynsetIds", { "lemma": word, "searchLang": "IT", "pos": pos, "key": api_key, }, cache, stats, ) if sleep_seconds: time.sleep(sleep_seconds) if not isinstance(synset_ids, list) or not synset_ids: return {"matched": False, "reason": "no_synsets", "synsets": []} synsets = [] for synset_ref in synset_ids[:3]: synset_id = synset_ref.get("id") if isinstance(synset_ref, dict) else str(synset_ref) if not synset_id: continue synset_payload = request_json( "getSynset", { "id": synset_id, "targetLang": "IT", "key": api_key, }, cache, stats, ) if sleep_seconds: time.sleep(sleep_seconds) if not isinstance(synset_payload, dict): continue synsets.append( { "id": synset_id, "senses": extract_senses(synset_payload), "glosses": extract_glosses(synset_payload), "categories": extract_categories(synset_payload), "domains": extract_domains(synset_payload), } ) return { "matched": bool(synsets), "synset_refs": [compact_synset_id(item) for item in synset_ids[:5] if isinstance(item, dict)], "synsets": synsets, } def build_babelnet_enrichment(args: argparse.Namespace) -> Dict[str, object]: if not args.api_key: raise SystemExit( f"Chiave BabelNet mancante. Imposta {BABELNET_ENV_KEY} oppure usa --api-key ." ) if not SEMANTIC_LEXICON_OUTPUT_PATH.exists(): raise FileNotFoundError(f"Lessico semantico non trovato: {SEMANTIC_LEXICON_OUTPUT_PATH}") payload = load_json(SEMANTIC_LEXICON_OUTPUT_PATH, {}) cache = load_json(BABELNET_CACHE_PATH, {}) if not isinstance(cache, dict): cache = {} difficulty_level = parse_difficulty(str(args.difficulty)) selected_entries = select_entries(payload, args.topic, difficulty_level, args.limit) enriched_entries = [] for index, entry in enumerate(selected_entries, start=1): enriched = dict(entry) enriched["babelnet"] = enrich_entry(enriched, args.api_key, cache, args.sleep) enriched_entries.append(enriched) print(f"[{index}/{len(selected_entries)}] {entry['form']}: {enriched['babelnet'].get('matched')}") write_json(BABELNET_CACHE_PATH, cache) return { "meta": { "language": "it", "version": 1, "base_lexicon": SEMANTIC_LEXICON_OUTPUT_PATH.name, "source": "BabelNet API", "generated_at": datetime.now().astimezone().isoformat(timespec="seconds"), "topic": args.topic, "difficulty": args.difficulty, "requested_limit": args.limit, "entry_count": len(enriched_entries), }, "entries": enriched_entries, } def main() -> None: args = parse_args() payload = build_babelnet_enrichment(args) write_json(args.output, payload) matched = sum(1 for entry in payload["entries"] if entry.get("babelnet", {}).get("matched")) print(f"Lessico BabelNet generato: {args.output}") print(f"Voci arricchite: {payload['meta']['entry_count']}") print(f"Voci con match BabelNet: {matched}") if __name__ == "__main__": main()