386 lines
12 KiB
Python
386 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Optional
|
|
|
|
from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH
|
|
|
|
|
|
BABELNET_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_babelnet.json")
|
|
BABELNET_CACHE_PATH = Path(__file__).with_name(".babelnet_cache.json")
|
|
BABELNET_LOCAL_KEY_PATH = Path(__file__).with_name(".babelnet_api_key.local")
|
|
BABELNET_API_BASE = "https://babelnet.io/v9"
|
|
BABELNET_ENV_KEY = "BABELNET_API_KEY"
|
|
|
|
POS_TO_BABELNET = {
|
|
"NOUN": "NOUN",
|
|
"VERB": "VERB",
|
|
"ADJ": "ADJECTIVE",
|
|
"ADV": "ADVERB",
|
|
}
|
|
|
|
|
|
class BabelNetApiCallLimitReached(RuntimeError):
|
|
pass
|
|
|
|
|
|
class BabelNetKeyUnavailable(RuntimeError):
|
|
pass
|
|
|
|
DIFFICULTY_ALIASES: Dict[str, int] = {
|
|
"easy": 1,
|
|
"medium": 2,
|
|
"hard": 4,
|
|
"expert": 5,
|
|
}
|
|
|
|
|
|
def parse_difficulty(value: str) -> int:
|
|
text = str(value).strip().lower()
|
|
if text in DIFFICULTY_ALIASES:
|
|
return DIFFICULTY_ALIASES[text]
|
|
try:
|
|
level = int(text)
|
|
except ValueError as exc:
|
|
raise SystemExit(
|
|
"Valore non valido per --difficulty. Usa easy, medium, hard, expert oppure un intero tra 1 e 5."
|
|
) from exc
|
|
if not 1 <= level <= 5:
|
|
raise SystemExit("Il valore numerico di --difficulty deve essere compreso tra 1 e 5.")
|
|
return level
|
|
|
|
|
|
def _split_api_keys(text: str) -> List[str]:
|
|
keys = []
|
|
seen = set()
|
|
normalized = text.replace(";", "\n").replace(",", "\n")
|
|
for line in normalized.splitlines():
|
|
key = line.strip()
|
|
if not key or key.startswith("#") or key in seen:
|
|
continue
|
|
keys.append(key)
|
|
seen.add(key)
|
|
return keys
|
|
|
|
|
|
def load_babelnet_api_keys() -> List[str]:
|
|
env_key = os.environ.get(BABELNET_ENV_KEY)
|
|
if env_key:
|
|
return _split_api_keys(env_key)
|
|
if BABELNET_LOCAL_KEY_PATH.exists():
|
|
return _split_api_keys(BABELNET_LOCAL_KEY_PATH.read_text(encoding="utf-8"))
|
|
return []
|
|
|
|
|
|
def load_babelnet_api_key() -> Optional[str]:
|
|
keys = load_babelnet_api_keys()
|
|
if keys:
|
|
return keys[0]
|
|
return None
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Arricchisce lexicon_it_semantic.json usando BabelNet, se disponibile una API key."
|
|
)
|
|
parser.add_argument(
|
|
"--api-key",
|
|
default=load_babelnet_api_key(),
|
|
help=(
|
|
f"Chiave API BabelNet. In alternativa imposta {BABELNET_ENV_KEY} "
|
|
f"o crea {BABELNET_LOCAL_KEY_PATH.name}."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--topic",
|
|
default=None,
|
|
help="Topic opzionale da usare per limitare le voci da arricchire.",
|
|
)
|
|
parser.add_argument(
|
|
"--difficulty",
|
|
default="medium",
|
|
help="Difficolta massima delle voci da arricchire: easy, medium, hard, expert oppure 1-5.",
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=100,
|
|
help="Numero massimo di lemmi da interrogare in questa esecuzione.",
|
|
)
|
|
parser.add_argument(
|
|
"--sleep",
|
|
type=float,
|
|
default=0.2,
|
|
help="Pausa tra richieste API, utile per non stressare il servizio.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=BABELNET_OUTPUT_PATH,
|
|
help="File JSON di output.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_json(path: Path, default: object) -> object:
|
|
if not path.exists():
|
|
return default
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def write_json(path: Path, payload: object) -> None:
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def cache_key(endpoint: str, params: Dict[str, str]) -> str:
|
|
safe_params = {key: value for key, value in params.items() if key != "key"}
|
|
return f"{endpoint}?{urllib.parse.urlencode(sorted(safe_params.items()))}"
|
|
|
|
|
|
def request_json(
|
|
endpoint: str,
|
|
params: Dict[str, str],
|
|
cache: Dict[str, object],
|
|
stats: Optional[Dict[str, int]] = None,
|
|
) -> object:
|
|
url = f"{BABELNET_API_BASE}/{endpoint}?{urllib.parse.urlencode(params)}"
|
|
key = cache_key(endpoint, params)
|
|
if key in cache:
|
|
if stats is not None:
|
|
stats["cache_hits"] = stats.get("cache_hits", 0) + 1
|
|
return cache[key]
|
|
|
|
if stats is not None:
|
|
limit = stats.get("api_call_limit")
|
|
current = stats.get("api_calls", 0)
|
|
if limit is not None and current >= limit:
|
|
raise BabelNetApiCallLimitReached("Limite chiamate API BabelNet raggiunto")
|
|
|
|
request = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
payload = json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|
if exc.code == 403:
|
|
raise BabelNetKeyUnavailable(f"Chiave BabelNet non valida o limite giornaliero raggiunto: {detail}") from exc
|
|
raise RuntimeError(f"Errore BabelNet HTTP {exc.code}: {detail}") from exc
|
|
|
|
cache[key] = payload
|
|
if stats is not None:
|
|
stats["api_calls"] = stats.get("api_calls", 0) + 1
|
|
stats["responses"] = stats.get("responses", 0) + 1
|
|
return payload
|
|
|
|
|
|
def entry_topics(entry: Dict[str, object]) -> set[str]:
|
|
return {str(item).lower() for item in entry.get("topics", [])}
|
|
|
|
|
|
def select_entries(payload: Dict[str, object], topic: Optional[str], difficulty_level: int, limit: int) -> List[Dict[str, object]]:
|
|
selected = []
|
|
normalized_topic = topic.strip().lower() if topic else None
|
|
|
|
for entry in payload.get("entries", []):
|
|
word = str(entry.get("form", ""))
|
|
if not word or not word.isalpha():
|
|
continue
|
|
if len(word) < 3 or len(word) > 16:
|
|
continue
|
|
if int(entry.get("difficulty_word", 5)) > difficulty_level:
|
|
continue
|
|
if str(entry.get("pos", "")) not in POS_TO_BABELNET:
|
|
continue
|
|
if normalized_topic and normalized_topic not in entry_topics(entry):
|
|
continue
|
|
selected.append(entry)
|
|
if len(selected) >= limit:
|
|
break
|
|
|
|
return selected
|
|
|
|
|
|
def compact_synset_id(payload: Dict[str, object]) -> Dict[str, object]:
|
|
return {
|
|
"id": payload.get("id"),
|
|
"pos": payload.get("pos"),
|
|
"source": payload.get("source"),
|
|
}
|
|
|
|
|
|
def extract_glosses(payload: Dict[str, object]) -> List[str]:
|
|
glosses = []
|
|
for item in payload.get("glosses", []) or []:
|
|
language = str(item.get("language", "")).upper()
|
|
gloss = str(item.get("gloss", "")).strip()
|
|
if gloss and language in {"IT", "ITA", ""}:
|
|
glosses.append(gloss)
|
|
return dedupe(glosses)[:5]
|
|
|
|
|
|
def extract_senses(payload: Dict[str, object]) -> List[str]:
|
|
senses = []
|
|
for item in payload.get("senses", []) or []:
|
|
language = str(item.get("language", "")).upper()
|
|
lemma = str(item.get("properties", {}).get("simpleLemma") or item.get("fullLemma") or "").strip()
|
|
if lemma and language in {"IT", "ITA", ""}:
|
|
senses.append(lemma.replace("_", " "))
|
|
return dedupe(senses)[:20]
|
|
|
|
|
|
def extract_categories(payload: Dict[str, object]) -> List[str]:
|
|
categories = []
|
|
for item in payload.get("categories", []) or []:
|
|
category = str(item.get("category", "")).strip()
|
|
if category:
|
|
categories.append(category)
|
|
return dedupe(categories)[:20]
|
|
|
|
|
|
def extract_domains(payload: Dict[str, object]) -> List[str]:
|
|
domains = payload.get("domains", [])
|
|
if isinstance(domains, dict):
|
|
return sorted(str(key) for key, value in domains.items() if value)
|
|
if isinstance(domains, list):
|
|
return dedupe(str(item) for item in domains if item)[:20]
|
|
return []
|
|
|
|
|
|
def dedupe(items: Iterable[str]) -> List[str]:
|
|
seen = set()
|
|
result = []
|
|
for item in items:
|
|
text = str(item).strip()
|
|
if not text or text in seen:
|
|
continue
|
|
seen.add(text)
|
|
result.append(text)
|
|
return result
|
|
|
|
|
|
def enrich_entry(
|
|
entry: Dict[str, object],
|
|
api_key: str,
|
|
cache: Dict[str, object],
|
|
sleep_seconds: float,
|
|
stats: Optional[Dict[str, int]] = None,
|
|
) -> Dict[str, object]:
|
|
word = str(entry.get("form", ""))
|
|
pos = POS_TO_BABELNET.get(str(entry.get("pos", "")))
|
|
if not pos:
|
|
return {"matched": False, "reason": "unsupported_pos", "synsets": []}
|
|
|
|
synset_ids = request_json(
|
|
"getSynsetIds",
|
|
{
|
|
"lemma": word,
|
|
"searchLang": "IT",
|
|
"pos": pos,
|
|
"key": api_key,
|
|
},
|
|
cache,
|
|
stats,
|
|
)
|
|
if sleep_seconds:
|
|
time.sleep(sleep_seconds)
|
|
|
|
if not isinstance(synset_ids, list) or not synset_ids:
|
|
return {"matched": False, "reason": "no_synsets", "synsets": []}
|
|
|
|
synsets = []
|
|
for synset_ref in synset_ids[:3]:
|
|
synset_id = synset_ref.get("id") if isinstance(synset_ref, dict) else str(synset_ref)
|
|
if not synset_id:
|
|
continue
|
|
synset_payload = request_json(
|
|
"getSynset",
|
|
{
|
|
"id": synset_id,
|
|
"targetLang": "IT",
|
|
"key": api_key,
|
|
},
|
|
cache,
|
|
stats,
|
|
)
|
|
if sleep_seconds:
|
|
time.sleep(sleep_seconds)
|
|
if not isinstance(synset_payload, dict):
|
|
continue
|
|
synsets.append(
|
|
{
|
|
"id": synset_id,
|
|
"senses": extract_senses(synset_payload),
|
|
"glosses": extract_glosses(synset_payload),
|
|
"categories": extract_categories(synset_payload),
|
|
"domains": extract_domains(synset_payload),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"matched": bool(synsets),
|
|
"synset_refs": [compact_synset_id(item) for item in synset_ids[:5] if isinstance(item, dict)],
|
|
"synsets": synsets,
|
|
}
|
|
|
|
|
|
def build_babelnet_enrichment(args: argparse.Namespace) -> Dict[str, object]:
|
|
if not args.api_key:
|
|
raise SystemExit(
|
|
f"Chiave BabelNet mancante. Imposta {BABELNET_ENV_KEY} oppure usa --api-key <chiave>."
|
|
)
|
|
if not SEMANTIC_LEXICON_OUTPUT_PATH.exists():
|
|
raise FileNotFoundError(f"Lessico semantico non trovato: {SEMANTIC_LEXICON_OUTPUT_PATH}")
|
|
|
|
payload = load_json(SEMANTIC_LEXICON_OUTPUT_PATH, {})
|
|
cache = load_json(BABELNET_CACHE_PATH, {})
|
|
if not isinstance(cache, dict):
|
|
cache = {}
|
|
|
|
difficulty_level = parse_difficulty(str(args.difficulty))
|
|
selected_entries = select_entries(payload, args.topic, difficulty_level, args.limit)
|
|
enriched_entries = []
|
|
|
|
for index, entry in enumerate(selected_entries, start=1):
|
|
enriched = dict(entry)
|
|
enriched["babelnet"] = enrich_entry(enriched, args.api_key, cache, args.sleep)
|
|
enriched_entries.append(enriched)
|
|
print(f"[{index}/{len(selected_entries)}] {entry['form']}: {enriched['babelnet'].get('matched')}")
|
|
write_json(BABELNET_CACHE_PATH, cache)
|
|
|
|
return {
|
|
"meta": {
|
|
"language": "it",
|
|
"version": 1,
|
|
"base_lexicon": SEMANTIC_LEXICON_OUTPUT_PATH.name,
|
|
"source": "BabelNet API",
|
|
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
|
|
"topic": args.topic,
|
|
"difficulty": args.difficulty,
|
|
"requested_limit": args.limit,
|
|
"entry_count": len(enriched_entries),
|
|
},
|
|
"entries": enriched_entries,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
payload = build_babelnet_enrichment(args)
|
|
write_json(args.output, payload)
|
|
matched = sum(1 for entry in payload["entries"] if entry.get("babelnet", {}).get("matched"))
|
|
print(f"Lessico BabelNet generato: {args.output}")
|
|
print(f"Voci arricchite: {payload['meta']['entry_count']}")
|
|
print(f"Voci con match BabelNet: {matched}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|