from __future__ import annotations # --- TOR SOCKS5 PATCH START ------------------------------------------- # Instrada tutte le connessioni TCP via Tor (SOCKS5 su 127.0.0.1:9051). # Assicurati che PySocks sia installato: pip install pysocks # Se la tua istanza Tor usa una porta diversa, modifica TOR_SOCKS_PORT. import socket try: import socks # type: ignore TOR_SOCKS_HOST = "127.0.0.1" TOR_SOCKS_PORT = 9150 socks.set_default_proxy(socks.SOCKS5, TOR_SOCKS_HOST, TOR_SOCKS_PORT) socket.socket = socks.socksocket except ImportError: print("[WARN] PySocks non installato. Installa con 'pip install pysocks' per usare Tor.") # --- TOR SOCKS5 PATCH END --------------------------------------------- import argparse import json from copy import deepcopy from datetime import datetime from pathlib import Path from types import SimpleNamespace from typing import Dict, Iterable, List, Optional, Tuple from babelnet_incremental_enricher import ( DEFAULT_TOPIC, merge_babelnet_entries, rebuild_enriched, ) from build_babelnet_enrichment import ( BABELNET_CACHE_PATH, BABELNET_ENV_KEY, BABELNET_OUTPUT_PATH, BabelNetApiCallLimitReached, BabelNetKeyUnavailable, POS_TO_BABELNET, enrich_entry, load_babelnet_api_keys, load_json, write_json, ) from build_enriched_lexicon import ENRICHED_LEXICON_OUTPUT_PATH from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH LOG_DIR = Path(__file__).with_name("logs") DEFAULT_API_CALL_LIMIT = 950 DEFAULT_PER_KEY_API_CALL_LIMIT = 950 DEFAULT_WORD_LIMIT = 10_000 MIN_WORD_LENGTH = 3 MAX_WORD_LENGTH = 16 USEFUL_POS_PRIORITY = { "NOUN": 6, "VERB": 5, "ADJ": 4, "ADV": 3, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Batch giornaliero per fondere progressivamente ItalWordNet e BabelNet: " "arricchisce parole mancanti, aggiorna lexicon_it_babelnet.json e rigenera lexicon_it_enriched.json." ) ) parser.add_argument( "--api-call-limit", type=int, default=DEFAULT_API_CALL_LIMIT, help="Numero massimo complessivo di chiamate API BabelNet reali consentite in questa esecuzione.", ) parser.add_argument( "--per-key-api-call-limit", type=int, default=DEFAULT_PER_KEY_API_CALL_LIMIT, help="Numero massimo di chiamate API reali consentite per ciascuna chiave caricata.", ) parser.add_argument( "--token-index", default=None, help="Usa una o piu chiavi locali, contando da 1. Esempi: --token-index 2 oppure --token-index 1,2,3.", ) parser.add_argument( "--token-indexes", default=None, help="Alias esplicito per una lista di chiavi locali. Esempio: --token-indexes 1,2,3.", ) parser.add_argument( "--word-limit", type=int, default=DEFAULT_WORD_LIMIT, help="Numero massimo di parole candidate da tentare in questa esecuzione.", ) parser.add_argument( "--sleep", type=float, default=0.2, help="Pausa tra richieste API.", ) parser.add_argument( "--topic", default=None, help="Topic opzionale per concentrare il batch su una parte del lessico.", ) parser.add_argument( "--include-not-crossword", action="store_true", help="Include anche voci non marcate allowed_in_crossword.", ) parser.add_argument( "--retry-no-match", action="store_true", help="Riprova anche parole gia marcate come no_match.", ) parser.add_argument( "--dry-run", action="store_true", help="Mostra le prossime parole candidate senza chiamare BabelNet e senza scrivere file.", ) parser.add_argument( "--ignore-cache", action="store_true", help="Ignora la cache in questa esecuzione diagnostica, utile per testare un token specifico.", ) parser.add_argument( "--semantic", type=Path, default=SEMANTIC_LEXICON_OUTPUT_PATH, help="Lessico semantico completo di partenza.", ) parser.add_argument( "--babelnet", type=Path, default=BABELNET_OUTPUT_PATH, help="Archivio incrementale degli arricchimenti BabelNet.", ) parser.add_argument( "--enriched", type=Path, default=ENRICHED_LEXICON_OUTPUT_PATH, help="Lessico fuso da rigenerare dopo il batch.", ) return parser.parse_args() def entry_key(entry: Dict[str, object]) -> Tuple[str, str]: form = str(entry.get("normalized_form") or entry.get("form") or "").strip().lower() pos = str(entry.get("pos") or "").strip().upper() return form, pos def load_source_payload(enriched_path: Path, semantic_path: Path) -> Dict[str, object]: if enriched_path.exists(): payload = load_json(enriched_path, {}) if isinstance(payload, dict) and "entries" in payload: return payload payload = load_json(semantic_path, {}) if isinstance(payload, dict) and "entries" in payload: return payload raise ValueError(f"Nessun lessico valido trovato: {enriched_path} / {semantic_path}") def babelnet_status(entry: Dict[str, object]) -> str: babelnet = entry.get("babelnet", {}) if isinstance(babelnet, dict): return str(babelnet.get("status", "not_requested")) return "not_requested" def entry_topics(entry: Dict[str, object]) -> set[str]: topics = {str(item).lower() for item in entry.get("topics", []) or [] if item} semantic = entry.get("semantic", {}) if isinstance(semantic, dict): topics.update(str(item).lower() for item in semantic.get("semantic_topics", []) or [] if item) return topics def eligible_entry(entry: Dict[str, object], args: argparse.Namespace) -> bool: word = str(entry.get("form", "")).strip().lower() pos = str(entry.get("pos", "")).strip().upper() status = babelnet_status(entry) allowed_statuses = {"not_requested", "api_error"} if args.retry_no_match: allowed_statuses.add("no_match") if status not in allowed_statuses: return False if pos not in POS_TO_BABELNET: return False if not word.isalpha() or not MIN_WORD_LENGTH <= len(word) <= MAX_WORD_LENGTH: return False if not args.include_not_crossword and not entry.get("allowed_in_crossword", False): return False if args.topic and args.topic.strip().lower() not in entry_topics(entry): return False return True def candidate_priority(entry: Dict[str, object]) -> Tuple[int, int, int, int, int, str]: word = str(entry.get("form", "")) pos = str(entry.get("pos", "")).upper() topics = {str(item).lower() for item in entry.get("topics", []) or []} semantic = entry.get("semantic", {}) semantic_topics = set() if isinstance(semantic, dict): semantic_topics = {str(item).lower() for item in semantic.get("semantic_topics", []) or []} useful_topic_bonus = 2 if topics - {DEFAULT_TOPIC, "abstract", "actions"} else 0 semantic_topic_bonus = 1 if semantic_topics else 0 length_bonus = 3 if 4 <= len(word) <= 11 else 1 return ( useful_topic_bonus, semantic_topic_bonus, int(entry.get("quality_score", 0)), USEFUL_POS_PRIORITY.get(pos, 0), length_bonus, word, ) def select_candidates(payload: Dict[str, object], args: argparse.Namespace) -> List[Dict[str, object]]: candidates = [ entry for entry in payload.get("entries", []) or [] if isinstance(entry, dict) and eligible_entry(entry, args) ] candidates.sort(key=candidate_priority, reverse=True) return candidates[: max(0, args.word_limit)] def progress_counts(payload: Dict[str, object]) -> Dict[str, int]: counts: Dict[str, int] = {} for entry in payload.get("entries", []) or []: if not isinstance(entry, dict): continue status = babelnet_status(entry) counts[status] = counts.get(status, 0) + 1 return counts def parse_token_indexes(value: Optional[str], key_count: int, option_name: str) -> Optional[List[int]]: if value is None: return None selected: List[int] = [] seen = set() for raw_part in str(value).replace(";", ",").split(","): part = raw_part.strip() if not part: continue try: index = int(part) except ValueError as exc: raise SystemExit(f"{option_name} deve contenere solo numeri separati da virgola.") from exc if not 1 <= index <= key_count: raise SystemExit( f"{option_name} contiene {index}, ma deve essere tra 1 e {key_count}. Chiavi caricate: {key_count}." ) if index in seen: continue selected.append(index) seen.add(index) if not selected: raise SystemExit(f"{option_name} non contiene nessun indice valido.") return selected def write_batch_log(payload: Dict[str, object]) -> Path: LOG_DIR.mkdir(exist_ok=True) timestamp = datetime.now().astimezone().strftime("%Y%m%d_%H%M%S") path = LOG_DIR / f"babelnet_batch_{timestamp}.json" path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") return path def run_batch(args: argparse.Namespace) -> Dict[str, object]: source_payload = load_source_payload(args.enriched, args.semantic) candidates = select_candidates(source_payload, args) before_counts = progress_counts(source_payload) if args.dry_run: return { "mode": "dry-run", "candidate_count": len(candidates), "selected_words": [entry.get("form") for entry in candidates[:50]], "before_counts": before_counts, } api_keys = load_babelnet_api_keys() if not api_keys: raise SystemExit( f"Chiave BabelNet mancante. Imposta {BABELNET_ENV_KEY} oppure crea .babelnet_api_key.local." ) token_indexes = parse_token_indexes(args.token_index, len(api_keys), "--token-index") token_indexes_alias = parse_token_indexes(args.token_indexes, len(api_keys), "--token-indexes") if token_indexes and token_indexes_alias: raise SystemExit("Usa solo uno tra --token-index e --token-indexes.") selected_token_indexes = token_indexes or token_indexes_alias if selected_token_indexes: api_keys = [api_keys[index - 1] for index in selected_token_indexes] cache = {} if args.ignore_cache else load_json(BABELNET_CACHE_PATH, {}) if not isinstance(cache, dict): cache = {} babelnet_payload = load_json(args.babelnet, {"entries": []}) if not isinstance(babelnet_payload, dict): babelnet_payload = {"entries": []} global_stats = { "api_calls": 0, "cache_hits": 0, "responses": 0, "api_call_limit": max(0, args.api_call_limit), } per_key_limit = max(0, args.per_key_api_call_limit) key_stats = [ { "key_index": selected_token_indexes[index] if selected_token_indexes else index + 1, "local_key_index": index + 1, "api_calls": 0, "cache_hits": 0, "responses": 0, "api_call_limit": per_key_limit, } for index, _ in enumerate(api_keys) ] enriched_entries: List[Dict[str, object]] = [] word_logs = [] stopped_reason = "completed" def select_key_index() -> Optional[int]: available = [ (stats["api_calls"], index) for index, stats in enumerate(key_stats) if stats["api_calls"] < stats["api_call_limit"] ] if not available: return None available.sort() return available[0][1] for index, entry in enumerate(candidates, start=1): if global_stats["api_calls"] >= global_stats["api_call_limit"]: stopped_reason = "api_call_limit" break key_index = select_key_index() if key_index is None: stopped_reason = "per_key_api_call_limit" break before_api_calls = global_stats["api_calls"] before_cache_hits = global_stats["cache_hits"] before_responses = global_stats["responses"] before_key_api_calls = key_stats[key_index]["api_calls"] before_key_cache_hits = key_stats[key_index]["cache_hits"] before_key_responses = key_stats[key_index]["responses"] updated = deepcopy(entry) updated.pop("babelnet", None) try: updated["babelnet"] = enrich_entry(updated, api_keys[key_index], cache, args.sleep, key_stats[key_index]) except BabelNetApiCallLimitReached: global_stats["api_calls"] += key_stats[key_index]["api_calls"] - before_key_api_calls global_stats["cache_hits"] += key_stats[key_index]["cache_hits"] - before_key_cache_hits global_stats["responses"] += key_stats[key_index]["responses"] - before_key_responses stopped_reason = "per_key_api_call_limit" break except BabelNetKeyUnavailable as exc: global_stats["api_calls"] += key_stats[key_index]["api_calls"] - before_key_api_calls global_stats["cache_hits"] += key_stats[key_index]["cache_hits"] - before_key_cache_hits global_stats["responses"] += key_stats[key_index]["responses"] - before_key_responses key_stats[key_index]["api_calls"] = key_stats[key_index]["api_call_limit"] word_logs.append( { "index": index, "word": updated.get("form"), "pos": updated.get("pos"), "key_index": key_stats[key_index]["key_index"], "api_calls": global_stats["api_calls"] - before_api_calls, "cache_hits": global_stats["cache_hits"] - before_cache_hits, "responses": global_stats["responses"] - before_responses, "matched": False, "synsets": 0, "reason": "key_unavailable_or_daily_limit", "error": str(exc), } ) print( f"[{index}/{len(candidates)}] {updated.get('form')}: " f"token={key_stats[key_index]['key_index']} non disponibile o limite giornaliero raggiunto" ) if select_key_index() is None: stopped_reason = "all_keys_unavailable_or_daily_limit" break continue global_stats["api_calls"] += key_stats[key_index]["api_calls"] - before_key_api_calls global_stats["cache_hits"] += key_stats[key_index]["cache_hits"] - before_key_cache_hits global_stats["responses"] += key_stats[key_index]["responses"] - before_key_responses enriched_entries.append(updated) write_json(BABELNET_CACHE_PATH, cache) word_log = { "index": index, "word": updated.get("form"), "pos": updated.get("pos"), "key_index": key_stats[key_index]["key_index"], "api_calls": global_stats["api_calls"] - before_api_calls, "cache_hits": global_stats["cache_hits"] - before_cache_hits, "responses": global_stats["responses"] - before_responses, "matched": bool(updated.get("babelnet", {}).get("matched")), "synsets": len(updated.get("babelnet", {}).get("synsets", []) or []), "reason": updated.get("babelnet", {}).get("reason"), } word_logs.append(word_log) print( f"[{index}/{len(candidates)}] {word_log['word']}: " f"token={word_log['key_index']} api_calls={word_log['api_calls']} cache_hits={word_log['cache_hits']} " f"match={word_log['matched']} tot_api={global_stats['api_calls']}/{global_stats['api_call_limit']}" ) merged_babelnet = merge_babelnet_entries( babelnet_payload, enriched_entries, args.topic or "all", "all", ) write_json(args.babelnet, merged_babelnet) enriched_payload = rebuild_enriched( args.semantic, args.babelnet, args.enriched, args.topic or DEFAULT_TOPIC, ) after_counts = progress_counts(enriched_payload) total_entries = int(enriched_payload.get("meta", {}).get("entry_count", 0)) covered = total_entries - after_counts.get("not_requested", 0) coverage = covered / total_entries if total_entries else 0.0 result = { "mode": "batch", "started_topic": args.topic, "stopped_reason": stopped_reason, "candidate_count": len(candidates), "attempted_words": len(enriched_entries), "matched_words": sum(1 for entry in enriched_entries if entry.get("babelnet", {}).get("matched")), "api_calls": global_stats["api_calls"], "cache_hits": global_stats["cache_hits"], "responses": global_stats["responses"], "api_call_limit": global_stats["api_call_limit"], "api_key_count": len(api_keys), "forced_token_indexes": selected_token_indexes, "per_key_api_call_limit": per_key_limit, "per_key_stats": key_stats, "before_counts": before_counts, "after_counts": after_counts, "total_entries": total_entries, "covered_entries": covered, "coverage_ratio": coverage, "word_logs": word_logs, } log_path = write_batch_log(result) result["log_path"] = str(log_path) return result def print_result(result: Dict[str, object]) -> None: if result["mode"] == "dry-run": print("Dry-run batch BabelNet") print(f"Candidate selezionate: {result['candidate_count']}") print(f"Stati iniziali: {result['before_counts']}") print("Prime parole:") for index, word in enumerate(result["selected_words"], start=1): print(f"{index:>2}. {word}") return print("Batch BabelNet completato") print(f"- motivo stop: {result['stopped_reason']}") print(f"- parole tentate: {result['attempted_words']}/{result['candidate_count']}") print(f"- parole con match: {result['matched_words']}") print(f"- chiamate API reali: {result['api_calls']}/{result['api_call_limit']}") print(f"- chiavi caricate: {result['api_key_count']} (limite per chiave: {result['per_key_api_call_limit']})") if result.get("forced_token_indexes"): print(f"- token forzati: {', '.join('#' + str(index) for index in result['forced_token_indexes'])}") for item in result["per_key_stats"]: print(f" chiave #{item['key_index']}: {item['api_calls']}/{item['api_call_limit']} chiamate API") print(f"- cache hit: {result['cache_hits']}") print(f"- copertura lessico: {result['covered_entries']}/{result['total_entries']} ({result['coverage_ratio'] * 100:.1f}%)") print(f"- stati dopo: {result['after_counts']}") print(f"- log: {result['log_path']}") def main() -> None: args = parse_args() result = run_batch(args) print_result(result) if __name__ == "__main__": main() # (Rest of the original code remains unchanged) # ... [The rest of the original 400+ lines follow unchanged] ...