from __future__ import annotations import argparse import json import re from copy import deepcopy from datetime import datetime from pathlib import Path from typing import Dict, Iterable, List, Optional, Sequence, Tuple from refine_lexicon_topics import REFINED_LEXICON_OUTPUT_PATH REVIEW_INPUT_PATH = Path(__file__).with_name("to_be_review.json") WIKTEXTRACT_INPUT_PATH = Path(__file__).with_name("raw-wiktextract-data.jsonl") WIKTEXTRACT_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_refined_plus_wiktextract.json") WIKTEXTRACT_INDEX_CACHE_PATH = Path(__file__).with_name(".wiktextract_it_index.json") DEFAULT_REVIEW_REASONS = {"no_viable_definition", "only_general_topics", "babelnet_ambiguous"} POS_MAP = { "noun": "NOUN", "adj": "ADJ", "adj": "ADJ", "verb": "VERB", "adv": "ADV", "prep": "PREP", "conj": "CONJ", "pron": "PRON", "intj": "INTJ", } TOPIC_MAP = { "christianity": "religion", "religion": "religion", "history": "history", "agriculture": "agriculture", "engineering": "technology", "mechanics": "technology", "technology": "technology", "medicine": "health", "geography": "geography", "biology": "nature", "aeronautics": "transport", } CATEGORY_TOPIC_HINTS = { "religione-it": "religion", "cristianesimo-it": "religion", "storia-it": "history", "agricoltura-it": "agriculture", "medicina-it": "health", "ingegneria-it": "technology", "meccanica-it": "technology", "tecnologia-it": "technology", "geografia-it": "geography", "biologia-it": "nature", "aeronautica-it": "transport", } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Arricchisce il lessico refined leggendo offline il file raw-wiktextract-data.jsonl, " "senza effettuare richieste di rete." ) ) parser.add_argument( "--input", type=Path, default=REFINED_LEXICON_OUTPUT_PATH, help="Lessico refined di partenza.", ) parser.add_argument( "--review", type=Path, default=REVIEW_INPUT_PATH, help="File to_be_review.json da usare per selezionare i lemmi prioritari.", ) parser.add_argument( "--wiktextract", type=Path, default=WIKTEXTRACT_INPUT_PATH, help="File JSONL raw estratto da Wiktionary.", ) parser.add_argument( "--output", type=Path, default=WIKTEXTRACT_OUTPUT_PATH, help="Lessico refined con blocco wiktextract aggiunto.", ) parser.add_argument( "--index-cache", type=Path, default=WIKTEXTRACT_INDEX_CACHE_PATH, help="Cache dell'indice lemmi->righe del JSONL per velocizzare i rilanci.", ) parser.add_argument( "--word-limit", type=int, default=0, help="Limite massimo di parole da elaborare. 0 = tutte le candidate.", ) parser.add_argument( "--words", default="", help="Lista separata da virgole di lemmi specifici da arricchire.", ) parser.add_argument( "--review-reasons", default=",".join(sorted(DEFAULT_REVIEW_REASONS)), help="Motivi del file review da trattare con priorita, separati da virgole.", ) parser.add_argument( "--skip-existing", action="store_true", help="Salta le voci che nel lessico di input hanno gia un blocco wiktextract utile.", ) return parser.parse_args() def load_json(path: Path, default: object) -> object: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) def write_json(path: Path, payload: object) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def parse_csv_set(value: str) -> set[str]: return {item.strip().lower() for item in str(value or "").split(",") if item.strip()} def entry_key(entry: Dict[str, object]) -> Tuple[str, str]: form = str(entry.get("normalized_form") or entry.get("form") or "").strip().lower() pos = str(entry.get("pos") or "").strip().upper() return form, pos def load_or_build_index(jsonl_path: Path, index_cache_path: Path) -> Dict[str, List[int]]: cached = load_json(index_cache_path, {}) expected_meta = { "source": str(jsonl_path.resolve()), "size": jsonl_path.stat().st_size if jsonl_path.exists() else 0, "mtime": jsonl_path.stat().st_mtime if jsonl_path.exists() else 0, } if ( isinstance(cached, dict) and cached.get("meta") == expected_meta and isinstance(cached.get("index"), dict) ): return {str(key): list(value) for key, value in cached["index"].items()} index: Dict[str, List[int]] = {} with jsonl_path.open("r", encoding="utf-8") as handle: while True: offset = handle.tell() line = handle.readline() if not line: break raw = line.rstrip("\n") if not raw: continue obj = json.loads(raw) if obj.get("lang_code") != "it": continue word = str(obj.get("word", "")).strip().lower() if word: index.setdefault(word, []).append(offset) write_json(index_cache_path, {"meta": expected_meta, "index": index}) return index def read_jsonl_objects_at_offsets(jsonl_path: Path, offsets: Sequence[int]) -> List[Dict[str, object]]: objects: List[Dict[str, object]] = [] with jsonl_path.open("r", encoding="utf-8") as handle: for offset in offsets: handle.seek(offset) line = handle.readline() if not line: continue objects.append(json.loads(line)) return objects def map_pos(value: str) -> str: normalized = str(value or "").strip().lower() return POS_MAP.get(normalized, normalized.upper() if normalized else "") def normalize_text(text: str) -> str: value = str(text or "").strip() value = re.sub(r"\s+", " ", value) return value def sense_topics(sense: Dict[str, object], categories: Sequence[str]) -> List[str]: topics = set() for topic in sense.get("topics", []) or []: normalized = TOPIC_MAP.get(str(topic).strip().lower()) if normalized: topics.add(normalized) for category in categories: normalized = CATEGORY_TOPIC_HINTS.get(str(category).strip().lower()) if normalized: topics.add(normalized) return sorted(topics) def word_level_topics(entries: Sequence[Dict[str, object]], categories: Sequence[str]) -> List[str]: topics = set() for entry in entries: for sense in entry.get("senses", []) or []: if isinstance(sense, dict): topics.update(sense_topics(sense, categories)) return sorted(topics) def grammar_hints(entries: Sequence[Dict[str, object]]) -> List[str]: hints = set() for entry in entries: pos = str(entry.get("pos", "")).lower() tags = [str(tag).lower() for tag in entry.get("tags", []) or []] if pos == "verb" and "form-of" in tags: hints.add("voce_verbale") if pos == "noun": for sense in entry.get("senses", []) or []: if not isinstance(sense, dict): continue for gloss in sense.get("glosses", []) or []: gloss_text = str(gloss).lower() if "diminutivo" in gloss_text: hints.add("diminutivo") if "accrescitivo" in gloss_text: hints.add("accrescitivo") if "peggiorativo" in gloss_text: hints.add("peggiorativo") for sense in entry.get("senses", []) or []: if not isinstance(sense, dict): continue for gloss in sense.get("glosses", []) or []: gloss_text = str(gloss).lower() if "congiuntivo" in gloss_text: hints.add("congiuntivo") if "imperativo" in gloss_text: hints.add("imperativo") if "plurale" in gloss_text: hints.add("plurale") return sorted(hints) def simplify_entry(obj: Dict[str, object]) -> Dict[str, object]: categories = [normalize_text(item) for item in obj.get("categories", []) or [] if item] senses = [] for sense in obj.get("senses", []) or []: if not isinstance(sense, dict): continue glosses = [normalize_text(item) for item in sense.get("glosses", []) or [] if normalize_text(item)] if not glosses: continue senses.append( { "glosses": glosses, "examples": [ normalize_text(example.get("text", "")) for example in sense.get("examples", []) or [] if isinstance(example, dict) and normalize_text(example.get("text", "")) ], "topics": sense_topics(sense, categories), "tags": [str(tag) for tag in sense.get("tags", []) or [] if tag], "categories": [normalize_text(item) for item in sense.get("categories", []) or [] if item], } ) return { "word": obj.get("word"), "lang": obj.get("lang"), "lang_code": obj.get("lang_code"), "pos": map_pos(str(obj.get("pos", ""))), "pos_title": obj.get("pos_title"), "tags": [str(tag) for tag in obj.get("tags", []) or [] if tag], "categories": categories, "senses": senses, "synonyms": [item for item in obj.get("synonyms", []) or [] if isinstance(item, dict) and item.get("word")], "related": [item for item in obj.get("related", []) or [] if isinstance(item, dict) and item.get("word")], } def choose_best_entries(refined_entry: Dict[str, object], candidates: Sequence[Dict[str, object]]) -> List[Dict[str, object]]: target_pos = str(refined_entry.get("pos", "")).upper() exact = [candidate for candidate in candidates if str(candidate.get("pos", "")).upper() == target_pos] if exact: return exact return list(candidates) def wiktextract_already_useful(entry: Dict[str, object]) -> bool: payload = entry.get("wiktextract", {}) if not isinstance(payload, dict): return False status = str(payload.get("status", "")).lower() if status == "enriched" and (payload.get("definitions") or payload.get("entries")): return True if status in {"missing", "no_match"}: return True return False def select_targets( refined_payload: Dict[str, object], review_payload: Dict[str, object], review_reasons: set[str], explicit_words: set[str], word_limit: int, skip_existing: bool, ) -> Tuple[List[Dict[str, object]], int]: refined_entries = [entry for entry in refined_payload.get("entries", []) or [] if isinstance(entry, dict)] refined_by_word = {str(entry.get("form", "")).lower(): entry for entry in refined_entries if entry.get("form")} if explicit_words: selected = [] skipped_existing_count = 0 for word in explicit_words: entry = refined_by_word.get(word) if entry is None: continue if skip_existing and wiktextract_already_useful(entry): skipped_existing_count += 1 continue selected.append(entry) selected = selected[:word_limit] if word_limit > 0 else selected return selected, skipped_existing_count review_entries = [entry for entry in review_payload.get("entries", []) or [] if isinstance(entry, dict)] selected_words: List[str] = [] seen = set() skipped_existing_count = 0 for review_entry in review_entries: word = str(review_entry.get("form", "")).strip().lower() if not word or word in seen: continue reasons = {str(item).lower() for item in review_entry.get("review_reasons", []) or []} refined = refined_by_word.get(word) if refined is None: continue if skip_existing and wiktextract_already_useful(refined): skipped_existing_count += 1 continue babelnet_status = str((refined.get("babelnet") or {}).get("status", "")).lower() if reasons.intersection(review_reasons) or babelnet_status == "no_match": selected_words.append(word) seen.add(word) if word_limit > 0 and len(selected_words) >= word_limit: break return [refined_by_word[word] for word in selected_words if word in refined_by_word], skipped_existing_count def wiktextract_payload_for_entry(refined_entry: Dict[str, object], matches: Sequence[Dict[str, object]]) -> Dict[str, object]: if not matches: return { "status": "missing", "matched": False, "definitions": [], "entries": [], "topic_hints": [], "grammar_hints": [], } selected_entries = choose_best_entries(refined_entry, matches) definitions = [] for item in selected_entries: for sense in item.get("senses", []) or []: if not isinstance(sense, dict): continue definitions.extend(sense.get("glosses", []) or []) definitions = [normalize_text(item) for item in definitions if normalize_text(item)] all_categories = [] for item in selected_entries: all_categories.extend(item.get("categories", []) or []) return { "status": "enriched" if definitions else "entries_without_definitions", "matched": bool(definitions), "definitions": definitions, "entries": selected_entries, "topic_hints": word_level_topics(selected_entries, all_categories), "grammar_hints": grammar_hints(selected_entries), "categories": sorted(set(normalize_text(item) for item in all_categories if normalize_text(item))), } def enrich_from_wiktextract(args: argparse.Namespace) -> Dict[str, object]: refined_payload = load_json(args.input, {"entries": []}) if not isinstance(refined_payload, dict) or "entries" not in refined_payload: raise ValueError(f"Lessico refined non valido: {args.input}") review_payload = load_json(args.review, {"entries": []}) if not isinstance(review_payload, dict): review_payload = {"entries": []} targets, skipped_existing_count = select_targets( refined_payload, review_payload, parse_csv_set(args.review_reasons), parse_csv_set(args.words), args.word_limit, args.skip_existing, ) print( f"Target selezionati: {len(targets)}" + (f" | già saltati per wiktextract esistente: {skipped_existing_count}" if args.skip_existing else "") ) index = load_or_build_index(args.wiktextract, args.index_cache) refined_index = { entry_key(entry): deepcopy(entry) for entry in refined_payload.get("entries", []) or [] if isinstance(entry, dict) } matched_count = 0 missing_count = 0 for idx, entry in enumerate(targets, start=1): updated = deepcopy(entry) word = str(entry.get("form", "")).strip().lower() offsets = index.get(word, []) objects = [simplify_entry(obj) for obj in read_jsonl_objects_at_offsets(args.wiktextract, offsets)] payload = wiktextract_payload_for_entry(updated, objects) updated["wiktextract"] = payload updated["wiktextract_generated_at"] = datetime.now().astimezone().isoformat(timespec="seconds") refined_index[entry_key(updated)] = updated if payload.get("matched"): matched_count += 1 else: missing_count += 1 print( f"[{idx}/{len(targets)}] {word}: " f"status={payload.get('status')} " f"def={len(payload.get('definitions', []))} " f"topics={len(payload.get('topic_hints', []))} " f"entries={len(payload.get('entries', []))}" ) merged_entries = list(refined_index.values()) merged_entries.sort(key=lambda item: (str(item.get("normalized_form", "")), str(item.get("pos", "")))) merged_payload = { "meta": { **(refined_payload.get("meta", {}) if isinstance(refined_payload.get("meta"), dict) else {}), "wiktextract_source": str(args.wiktextract), "wiktextract_generated_at": datetime.now().astimezone().isoformat(timespec="seconds"), "wiktextract_target_count": len(targets), "wiktextract_skipped_existing_count": skipped_existing_count, "wiktextract_matched_count": matched_count, "wiktextract_missing_count": missing_count, }, "entries": merged_entries, } write_json(args.output, merged_payload) return { "target_count": len(targets), "skipped_existing_count": skipped_existing_count, "matched_count": matched_count, "missing_count": missing_count, "output": str(args.output), } def main() -> None: args = parse_args() result = enrich_from_wiktextract(args) print(f"Lessico con Wiktextract generato: {result['output']}") print(f"Voci trattate: {result['target_count']}") print(f"Voci già saltate: {result['skipped_existing_count']}") print(f"Match Wiktextract: {result['matched_count']}") print(f"Senza match Wiktextract: {result['missing_count']}") if __name__ == "__main__": main()