from __future__ import annotations import argparse import json from copy import deepcopy from datetime import datetime from pathlib import Path from types import SimpleNamespace from typing import Dict, Iterable, List, Optional, Tuple from babelnet_incremental_enricher import ( DEFAULT_TOPIC, merge_babelnet_entries, rebuild_enriched, ) from build_babelnet_enrichment import ( BABELNET_CACHE_PATH, BABELNET_ENV_KEY, BABELNET_OUTPUT_PATH, BabelNetApiCallLimitReached, BabelNetKeyUnavailable, POS_TO_BABELNET, enrich_entry, load_babelnet_api_keys, load_json, write_json, ) from build_enriched_lexicon import ENRICHED_LEXICON_OUTPUT_PATH from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH LOG_DIR = Path(__file__).with_name("logs") DEFAULT_API_CALL_LIMIT = 950 DEFAULT_PER_KEY_API_CALL_LIMIT = 950 DEFAULT_WORD_LIMIT = 10_000 MIN_WORD_LENGTH = 3 MAX_WORD_LENGTH = 16 USEFUL_POS_PRIORITY = { "NOUN": 6, "VERB": 5, "ADJ": 4, "ADV": 3, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Batch giornaliero per fondere progressivamente ItalWordNet e BabelNet: " "arricchisce parole mancanti, aggiorna lexicon_it_babelnet.json e rigenera lexicon_it_enriched.json." ) ) parser.add_argument( "--api-call-limit", type=int, default=DEFAULT_API_CALL_LIMIT, help="Numero massimo complessivo di chiamate API BabelNet reali consentite in questa esecuzione.", ) parser.add_argument( "--per-key-api-call-limit", type=int, default=DEFAULT_PER_KEY_API_CALL_LIMIT, help="Numero massimo di chiamate API reali consentite per ciascuna chiave caricata.", ) parser.add_argument( "--token-index", default=None, help="Usa una o piu chiavi locali, contando da 1. Esempi: --token-index 2 oppure --token-index 1,2,3.", ) parser.add_argument( "--token-indexes", default=None, help="Alias esplicito per una lista di chiavi locali. Esempio: --token-indexes 1,2,3.", ) parser.add_argument( "--word-limit", type=int, default=DEFAULT_WORD_LIMIT, help="Numero massimo di parole candidate da tentare in questa esecuzione.", ) parser.add_argument( "--sleep", type=float, default=0.2, help="Pausa tra richieste API.", ) parser.add_argument( "--topic", default=None, help="Topic opzionale per concentrare il batch su una parte del lessico.", ) parser.add_argument( "--include-not-crossword", action="store_true", help="Include anche voci non marcate allowed_in_crossword.", ) parser.add_argument( "--retry-no-match", action="store_true", help="Riprova anche parole gia marcate come no_match.", ) parser.add_argument( "--dry-run", action="store_true", help="Mostra le prossime parole candidate senza chiamare BabelNet e senza scrivere file.", ) parser.add_argument( "--ignore-cache", action="store_true", help="Ignora la cache in questa esecuzione diagnostica, utile per testare un token specifico.", ) parser.add_argument( "--semantic", type=Path, default=SEMANTIC_LEXICON_OUTPUT_PATH, help="Lessico semantico completo di partenza.", ) parser.add_argument( "--babelnet", type=Path, default=BABELNET_OUTPUT_PATH, help="Archivio incrementale degli arricchimenti BabelNet.", ) parser.add_argument( "--enriched", type=Path, default=ENRICHED_LEXICON_OUTPUT_PATH, help="Lessico fuso da rigenerare dopo il batch.", ) return parser.parse_args() def entry_key(entry: Dict[str, object]) -> Tuple[str, str]: form = str(entry.get("normalized_form") or entry.get("form") or "").strip().lower() pos = str(entry.get("pos") or "").strip().upper() return form, pos def load_source_payload(enriched_path: Path, semantic_path: Path) -> Dict[str, object]: if enriched_path.exists(): payload = load_json(enriched_path, {}) if isinstance(payload, dict) and "entries" in payload: return payload payload = load_json(semantic_path, {}) if isinstance(payload, dict) and "entries" in payload: return payload raise ValueError(f"Nessun lessico valido trovato: {enriched_path} / {semantic_path}") def babelnet_status(entry: Dict[str, object]) -> str: babelnet = entry.get("babelnet", {}) if isinstance(babelnet, dict): return str(babelnet.get("status", "not_requested")) return "not_requested" def entry_topics(entry: Dict[str, object]) -> set[str]: topics = {str(item).lower() for item in entry.get("topics", []) or [] if item} semantic = entry.get("semantic", {}) if isinstance(semantic, dict): topics.update(str(item).lower() for item in semantic.get("semantic_topics", []) or [] if item) return topics def eligible_entry(entry: Dict[str, object], args: argparse.Namespace) -> bool: word = str(entry.get("form", "")).strip().lower() pos = str(entry.get("pos", "")).strip().upper() status = babelnet_status(entry) allowed_statuses = {"not_requested", "api_error"} if args.retry_no_match: allowed_statuses.add("no_match") if status not in allowed_statuses: return False if pos not in POS_TO_BABELNET: return False if not word.isalpha() or not MIN_WORD_LENGTH <= len(word) <= MAX_WORD_LENGTH: return False if not args.include_not_crossword and not entry.get("allowed_in_crossword", False): return False if args.topic and args.topic.strip().lower() not in entry_topics(entry): return False return True def candidate_priority(entry: Dict[str, object]) -> Tuple[int, int, int, int, int, str]: word = str(entry.get("form", "")) pos = str(entry.get("pos", "")).upper() topics = {str(item).lower() for item in entry.get("topics", []) or []} semantic = entry.get("semantic", {}) semantic_topics = set() if isinstance(semantic, dict): semantic_topics = {str(item).lower() for item in semantic.get("semantic_topics", []) or []} useful_topic_bonus = 2 if topics - {DEFAULT_TOPIC, "abstract", "actions"} else 0 semantic_topic_bonus = 1 if semantic_topics else 0 length_bonus = 3 if 4 <= len(word) <= 11 else 1 return ( useful_topic_bonus, semantic_topic_bonus, int(entry.get("quality_score", 0)), USEFUL_POS_PRIORITY.get(pos, 0), length_bonus, word, ) def select_candidates(payload: Dict[str, object], args: argparse.Namespace) -> List[Dict[str, object]]: candidates = [ entry for entry in payload.get("entries", []) or [] if isinstance(entry, dict) and eligible_entry(entry, args) ] candidates.sort(key=candidate_priority, reverse=True) return candidates[: max(0, args.word_limit)] def progress_counts(payload: Dict[str, object]) -> Dict[str, int]: counts: Dict[str, int] = {} for entry in payload.get("entries", []) or []: if not isinstance(entry, dict): continue status = babelnet_status(entry) counts[status] = counts.get(status, 0) + 1 return counts def parse_token_indexes(value: Optional[str], key_count: int, option_name: str) -> Optional[List[int]]: if value is None: return None selected: List[int] = [] seen = set() for raw_part in str(value).replace(";", ",").split(","): part = raw_part.strip() if not part: continue try: index = int(part) except ValueError as exc: raise SystemExit(f"{option_name} deve contenere solo numeri separati da virgola.") from exc if not 1 <= index <= key_count: raise SystemExit( f"{option_name} contiene {index}, ma deve essere tra 1 e {key_count}. Chiavi caricate: {key_count}." ) if index in seen: continue selected.append(index) seen.add(index) if not selected: raise SystemExit(f"{option_name} non contiene nessun indice valido.") return selected def write_batch_log(payload: Dict[str, object]) -> Path: LOG_DIR.mkdir(exist_ok=True) timestamp = datetime.now().astimezone().strftime("%Y%m%d_%H%M%S") path = LOG_DIR / f"babelnet_batch_{timestamp}.json" path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") return path def run_batch(args: argparse.Namespace) -> Dict[str, object]: source_payload = load_source_payload(args.enriched, args.semantic) candidates = select_candidates(source_payload, args) before_counts = progress_counts(source_payload) if args.dry_run: return { "mode": "dry-run", "candidate_count": len(candidates), "selected_words": [entry.get("form") for entry in candidates[:50]], "before_counts": before_counts, } api_keys = load_babelnet_api_keys() if not api_keys: raise SystemExit( f"Chiave BabelNet mancante. Imposta {BABELNET_ENV_KEY} oppure crea .babelnet_api_key.local." ) token_indexes = parse_token_indexes(args.token_index, len(api_keys), "--token-index") token_indexes_alias = parse_token_indexes(args.token_indexes, len(api_keys), "--token-indexes") if token_indexes and token_indexes_alias: raise SystemExit("Usa solo uno tra --token-index e --token-indexes.") selected_token_indexes = token_indexes or token_indexes_alias if selected_token_indexes: api_keys = [api_keys[index - 1] for index in selected_token_indexes] cache = {} if args.ignore_cache else load_json(BABELNET_CACHE_PATH, {}) if not isinstance(cache, dict): cache = {} babelnet_payload = load_json(args.babelnet, {"entries": []}) if not isinstance(babelnet_payload, dict): babelnet_payload = {"entries": []} global_stats = { "api_calls": 0, "cache_hits": 0, "responses": 0, "api_call_limit": max(0, args.api_call_limit), } per_key_limit = max(0, args.per_key_api_call_limit) key_stats = [ { "key_index": selected_token_indexes[index] if selected_token_indexes else index + 1, "local_key_index": index + 1, "api_calls": 0, "cache_hits": 0, "responses": 0, "api_call_limit": per_key_limit, } for index, _ in enumerate(api_keys) ] enriched_entries: List[Dict[str, object]] = [] word_logs = [] stopped_reason = "completed" def select_key_index() -> Optional[int]: available = [ (stats["api_calls"], index) for index, stats in enumerate(key_stats) if stats["api_calls"] < stats["api_call_limit"] ] if not available: return None available.sort() return available[0][1] for index, entry in enumerate(candidates, start=1): if global_stats["api_calls"] >= global_stats["api_call_limit"]: stopped_reason = "api_call_limit" break key_index = select_key_index() if key_index is None: stopped_reason = "per_key_api_call_limit" break before_api_calls = global_stats["api_calls"] before_cache_hits = global_stats["cache_hits"] before_responses = global_stats["responses"] before_key_api_calls = key_stats[key_index]["api_calls"] before_key_cache_hits = key_stats[key_index]["cache_hits"] before_key_responses = key_stats[key_index]["responses"] updated = deepcopy(entry) updated.pop("babelnet", None) try: updated["babelnet"] = enrich_entry(updated, api_keys[key_index], cache, args.sleep, key_stats[key_index]) except BabelNetApiCallLimitReached: global_stats["api_calls"] += key_stats[key_index]["api_calls"] - before_key_api_calls global_stats["cache_hits"] += key_stats[key_index]["cache_hits"] - before_key_cache_hits global_stats["responses"] += key_stats[key_index]["responses"] - before_key_responses stopped_reason = "per_key_api_call_limit" break except BabelNetKeyUnavailable as exc: global_stats["api_calls"] += key_stats[key_index]["api_calls"] - before_key_api_calls global_stats["cache_hits"] += key_stats[key_index]["cache_hits"] - before_key_cache_hits global_stats["responses"] += key_stats[key_index]["responses"] - before_key_responses key_stats[key_index]["api_calls"] = key_stats[key_index]["api_call_limit"] word_logs.append( { "index": index, "word": updated.get("form"), "pos": updated.get("pos"), "key_index": key_stats[key_index]["key_index"], "api_calls": global_stats["api_calls"] - before_api_calls, "cache_hits": global_stats["cache_hits"] - before_cache_hits, "responses": global_stats["responses"] - before_responses, "matched": False, "synsets": 0, "reason": "key_unavailable_or_daily_limit", "error": str(exc), } ) print( f"[{index}/{len(candidates)}] {updated.get('form')}: " f"token={key_stats[key_index]['key_index']} non disponibile o limite giornaliero raggiunto" ) if select_key_index() is None: stopped_reason = "all_keys_unavailable_or_daily_limit" break continue global_stats["api_calls"] += key_stats[key_index]["api_calls"] - before_key_api_calls global_stats["cache_hits"] += key_stats[key_index]["cache_hits"] - before_key_cache_hits global_stats["responses"] += key_stats[key_index]["responses"] - before_key_responses enriched_entries.append(updated) write_json(BABELNET_CACHE_PATH, cache) word_log = { "index": index, "word": updated.get("form"), "pos": updated.get("pos"), "key_index": key_stats[key_index]["key_index"], "api_calls": global_stats["api_calls"] - before_api_calls, "cache_hits": global_stats["cache_hits"] - before_cache_hits, "responses": global_stats["responses"] - before_responses, "matched": bool(updated.get("babelnet", {}).get("matched")), "synsets": len(updated.get("babelnet", {}).get("synsets", []) or []), "reason": updated.get("babelnet", {}).get("reason"), } word_logs.append(word_log) print( f"[{index}/{len(candidates)}] {word_log['word']}: " f"token={word_log['key_index']} api_calls={word_log['api_calls']} cache_hits={word_log['cache_hits']} " f"match={word_log['matched']} tot_api={global_stats['api_calls']}/{global_stats['api_call_limit']}" ) merged_babelnet = merge_babelnet_entries( babelnet_payload, enriched_entries, args.topic or "all", "all", ) write_json(args.babelnet, merged_babelnet) enriched_payload = rebuild_enriched( args.semantic, args.babelnet, args.enriched, args.topic or DEFAULT_TOPIC, ) after_counts = progress_counts(enriched_payload) total_entries = int(enriched_payload.get("meta", {}).get("entry_count", 0)) covered = total_entries - after_counts.get("not_requested", 0) coverage = covered / total_entries if total_entries else 0.0 result = { "mode": "batch", "started_topic": args.topic, "stopped_reason": stopped_reason, "candidate_count": len(candidates), "attempted_words": len(enriched_entries), "matched_words": sum(1 for entry in enriched_entries if entry.get("babelnet", {}).get("matched")), "api_calls": global_stats["api_calls"], "cache_hits": global_stats["cache_hits"], "responses": global_stats["responses"], "api_call_limit": global_stats["api_call_limit"], "api_key_count": len(api_keys), "forced_token_indexes": selected_token_indexes, "per_key_api_call_limit": per_key_limit, "per_key_stats": key_stats, "before_counts": before_counts, "after_counts": after_counts, "total_entries": total_entries, "covered_entries": covered, "coverage_ratio": coverage, "word_logs": word_logs, } log_path = write_batch_log(result) result["log_path"] = str(log_path) return result def print_result(result: Dict[str, object]) -> None: if result["mode"] == "dry-run": print("Dry-run batch BabelNet") print(f"Candidate selezionate: {result['candidate_count']}") print(f"Stati iniziali: {result['before_counts']}") print("Prime parole:") for index, word in enumerate(result["selected_words"], start=1): print(f"{index:>2}. {word}") return print("Batch BabelNet completato") print(f"- motivo stop: {result['stopped_reason']}") print(f"- parole tentate: {result['attempted_words']}/{result['candidate_count']}") print(f"- parole con match: {result['matched_words']}") print(f"- chiamate API reali: {result['api_calls']}/{result['api_call_limit']}") print(f"- chiavi caricate: {result['api_key_count']} (limite per chiave: {result['per_key_api_call_limit']})") if result.get("forced_token_indexes"): print(f"- token forzati: {', '.join('#' + str(index) for index in result['forced_token_indexes'])}") for item in result["per_key_stats"]: print(f" chiave #{item['key_index']}: {item['api_calls']}/{item['api_call_limit']} chiamate API") print(f"- cache hit: {result['cache_hits']}") print(f"- copertura lessico: {result['covered_entries']}/{result['total_entries']} ({result['coverage_ratio'] * 100:.1f}%)") print(f"- stati dopo: {result['after_counts']}") print(f"- log: {result['log_path']}") def main() -> None: args = parse_args() result = run_batch(args) print_result(result) if __name__ == "__main__": main()