from __future__ import annotations import argparse import json import os import time import urllib.error import urllib.parse import urllib.request from datetime import datetime from pathlib import Path from typing import Dict, Iterable, List, Optional from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH from main import parse_difficulty BABELNET_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_babelnet.json") BABELNET_CACHE_PATH = Path(__file__).with_name(".babelnet_cache.json") BABELNET_API_BASE = "https://babelnet.io/v9" BABELNET_ENV_KEY = "BABELNET_API_KEY" POS_TO_BABELNET = { "NOUN": "NOUN", "VERB": "VERB", "ADJ": "ADJECTIVE", "ADV": "ADVERB", } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Arricchisce lexicon_it_semantic.json usando BabelNet, se disponibile una API key." ) parser.add_argument( "--api-key", default=os.environ.get(BABELNET_ENV_KEY), help=f"Chiave API BabelNet. In alternativa imposta la variabile ambiente {BABELNET_ENV_KEY}.", ) parser.add_argument( "--topic", default=None, help="Topic opzionale da usare per limitare le voci da arricchire.", ) parser.add_argument( "--difficulty", default="medium", help="Difficolta massima delle voci da arricchire: easy, medium, hard, expert oppure 1-5.", ) parser.add_argument( "--limit", type=int, default=100, help="Numero massimo di lemmi da interrogare in questa esecuzione.", ) parser.add_argument( "--sleep", type=float, default=0.2, help="Pausa tra richieste API, utile per non stressare il servizio.", ) parser.add_argument( "--output", type=Path, default=BABELNET_OUTPUT_PATH, help="File JSON di output.", ) return parser.parse_args() def load_json(path: Path, default: object) -> object: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) def write_json(path: Path, payload: object) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def request_json(endpoint: str, params: Dict[str, str], cache: Dict[str, object]) -> object: url = f"{BABELNET_API_BASE}/{endpoint}?{urllib.parse.urlencode(params)}" if url in cache: return cache[url] request = urllib.request.Request(url, headers={"Accept": "application/json"}) try: with urllib.request.urlopen(request, timeout=30) as response: payload = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"Errore BabelNet HTTP {exc.code}: {detail}") from exc cache[url] = payload return payload def entry_topics(entry: Dict[str, object]) -> set[str]: return {str(item).lower() for item in entry.get("topics", [])} def select_entries(payload: Dict[str, object], topic: Optional[str], difficulty_level: int, limit: int) -> List[Dict[str, object]]: selected = [] normalized_topic = topic.strip().lower() if topic else None for entry in payload.get("entries", []): word = str(entry.get("form", "")) if not word or not word.isalpha(): continue if len(word) < 3 or len(word) > 16: continue if int(entry.get("difficulty_word", 5)) > difficulty_level: continue if str(entry.get("pos", "")) not in POS_TO_BABELNET: continue if normalized_topic and normalized_topic not in entry_topics(entry): continue selected.append(entry) if len(selected) >= limit: break return selected def compact_synset_id(payload: Dict[str, object]) -> Dict[str, object]: return { "id": payload.get("id"), "pos": payload.get("pos"), "source": payload.get("source"), } def extract_glosses(payload: Dict[str, object]) -> List[str]: glosses = [] for item in payload.get("glosses", []) or []: language = str(item.get("language", "")).upper() gloss = str(item.get("gloss", "")).strip() if gloss and language in {"IT", "ITA", ""}: glosses.append(gloss) return dedupe(glosses)[:5] def extract_senses(payload: Dict[str, object]) -> List[str]: senses = [] for item in payload.get("senses", []) or []: language = str(item.get("language", "")).upper() lemma = str(item.get("properties", {}).get("simpleLemma") or item.get("fullLemma") or "").strip() if lemma and language in {"IT", "ITA", ""}: senses.append(lemma.replace("_", " ")) return dedupe(senses)[:20] def extract_categories(payload: Dict[str, object]) -> List[str]: categories = [] for item in payload.get("categories", []) or []: category = str(item.get("category", "")).strip() if category: categories.append(category) return dedupe(categories)[:20] def extract_domains(payload: Dict[str, object]) -> List[str]: domains = payload.get("domains", []) if isinstance(domains, dict): return sorted(str(key) for key, value in domains.items() if value) if isinstance(domains, list): return dedupe(str(item) for item in domains if item)[:20] return [] def dedupe(items: Iterable[str]) -> List[str]: seen = set() result = [] for item in items: text = str(item).strip() if not text or text in seen: continue seen.add(text) result.append(text) return result def enrich_entry(entry: Dict[str, object], api_key: str, cache: Dict[str, object], sleep_seconds: float) -> Dict[str, object]: word = str(entry.get("form", "")) pos = POS_TO_BABELNET.get(str(entry.get("pos", ""))) if not pos: return {"matched": False, "reason": "unsupported_pos", "synsets": []} synset_ids = request_json( "getSynsetIds", { "lemma": word, "searchLang": "IT", "pos": pos, "key": api_key, }, cache, ) if sleep_seconds: time.sleep(sleep_seconds) if not isinstance(synset_ids, list) or not synset_ids: return {"matched": False, "reason": "no_synsets", "synsets": []} synsets = [] for synset_ref in synset_ids[:3]: synset_id = synset_ref.get("id") if isinstance(synset_ref, dict) else str(synset_ref) if not synset_id: continue synset_payload = request_json( "getSynset", { "id": synset_id, "targetLang": "IT", "key": api_key, }, cache, ) if sleep_seconds: time.sleep(sleep_seconds) if not isinstance(synset_payload, dict): continue synsets.append( { "id": synset_id, "senses": extract_senses(synset_payload), "glosses": extract_glosses(synset_payload), "categories": extract_categories(synset_payload), "domains": extract_domains(synset_payload), } ) return { "matched": bool(synsets), "synset_refs": [compact_synset_id(item) for item in synset_ids[:5] if isinstance(item, dict)], "synsets": synsets, } def build_babelnet_enrichment(args: argparse.Namespace) -> Dict[str, object]: if not args.api_key: raise SystemExit( f"Chiave BabelNet mancante. Imposta {BABELNET_ENV_KEY} oppure usa --api-key ." ) if not SEMANTIC_LEXICON_OUTPUT_PATH.exists(): raise FileNotFoundError(f"Lessico semantico non trovato: {SEMANTIC_LEXICON_OUTPUT_PATH}") payload = load_json(SEMANTIC_LEXICON_OUTPUT_PATH, {}) cache = load_json(BABELNET_CACHE_PATH, {}) if not isinstance(cache, dict): cache = {} difficulty_level = parse_difficulty(str(args.difficulty)) selected_entries = select_entries(payload, args.topic, difficulty_level, args.limit) enriched_entries = [] for index, entry in enumerate(selected_entries, start=1): enriched = dict(entry) enriched["babelnet"] = enrich_entry(enriched, args.api_key, cache, args.sleep) enriched_entries.append(enriched) print(f"[{index}/{len(selected_entries)}] {entry['form']}: {enriched['babelnet'].get('matched')}") write_json(BABELNET_CACHE_PATH, cache) return { "meta": { "language": "it", "version": 1, "base_lexicon": SEMANTIC_LEXICON_OUTPUT_PATH.name, "source": "BabelNet API", "generated_at": datetime.now().astimezone().isoformat(timespec="seconds"), "topic": args.topic, "difficulty": args.difficulty, "requested_limit": args.limit, "entry_count": len(enriched_entries), }, "entries": enriched_entries, } def main() -> None: args = parse_args() payload = build_babelnet_enrichment(args) write_json(args.output, payload) matched = sum(1 for entry in payload["entries"] if entry.get("babelnet", {}).get("matched")) print(f"Lessico BabelNet generato: {args.output}") print(f"Voci arricchite: {payload['meta']['entry_count']}") print(f"Voci con match BabelNet: {matched}") if __name__ == "__main__": main()