from __future__ import annotations import argparse import json from pathlib import Path from typing import Any, Dict, List, Tuple DEFAULT_LEXICON_PATH = Path(__file__).with_name("lexicon_it_curated.json") DEFAULT_PATCH_PATH = Path(__file__).with_name("llm_rescue_patch.json") DEFAULT_OUTPUT_PATH = Path(__file__).with_name("lexicon_it_curated_llm.json") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Applica una patch LLM rescue al lessico curato per produrre un lessico operativo aggiornato." ) parser.add_argument("--lexicon", type=Path, default=DEFAULT_LEXICON_PATH, help="Lessico curato di partenza.") parser.add_argument("--patch", type=Path, default=DEFAULT_PATCH_PATH, help="Patch LLM rescue da applicare.") parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT_PATH, help="Lessico aggiornato in uscita.") parser.add_argument( "--min-confidence", type=float, default=0.6, help="Confidenza minima per applicare automaticamente una definizione rescue.", ) parser.add_argument( "--include-needs-review", action="store_true", help="Applica anche voci marcate needs_human_review=true se superano la soglia di confidenza.", ) return parser.parse_args() def load_json(path: Path, default: object) -> object: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) def write_json(path: Path, payload: object) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def normalize_key(form: str, pos: str) -> Tuple[str, str]: return (str(form or "").strip().lower(), str(pos or "").strip().upper()) def merge_topics(existing: List[str], incoming: List[str]) -> List[str]: merged: List[str] = [] seen = set() for item in list(existing or []) + list(incoming or []): value = str(item).strip() if not value: continue key = value.lower() if key in seen: continue seen.add(key) merged.append(value) return merged def apply_patch(args: argparse.Namespace) -> Dict[str, Any]: lexicon_payload = load_json(args.lexicon, {"entries": []}) patch_payload = load_json(args.patch, {"entries": []}) if not isinstance(lexicon_payload, dict): raise ValueError(f"Lessico non valido: {args.lexicon}") lexicon = lexicon_payload.get("entries") if not isinstance(lexicon, list): raise ValueError(f"Lessico non valido: {args.lexicon}") if not isinstance(patch_payload, dict): raise ValueError(f"Patch non valida: {args.patch}") patch_entries = patch_payload.get("entries") or [] patch_by_key = {} for entry in patch_entries: if not isinstance(entry, dict): continue patch_by_key[normalize_key(entry.get("form", ""), entry.get("pos", ""))] = entry applied = 0 skipped = 0 for entry in lexicon: if not isinstance(entry, dict): continue patch = patch_by_key.get(normalize_key(entry.get("form", ""), entry.get("pos", ""))) if not patch: continue confidence = float(patch.get("confidence", 0.0) or 0.0) needs_review = bool(patch.get("needs_human_review", True)) definition = str(patch.get("rescue_definition", "")).strip() if not definition: skipped += 1 continue if confidence < float(args.min_confidence): skipped += 1 continue if needs_review and not args.include_needs_review: skipped += 1 continue entry["preferred_definition"] = definition entry["preferred_source"] = patch.get("rescue_source", "llm_rescue") clue_defs = entry.get("clue_definitions") or {} if not isinstance(clue_defs, dict): clue_defs = {} for level in ("easy", "medium", "hard", "expert"): clue_defs[level] = definition entry["clue_definitions"] = clue_defs entry["topics"] = merge_topics(entry.get("topics", []), patch.get("rescue_topics", [])) entry["semantic_tags"] = merge_topics(entry.get("semantic_tags", []), patch.get("rescue_semantic_tags", [])) entry["alpha_ready"] = True review_reasons = [reason for reason in (entry.get("review_reasons") or []) if reason != "no_viable_definition"] if not args.include_needs_review: review_reasons = [reason for reason in review_reasons if reason != "flagged_by_refined_stage"] entry["review_reasons"] = review_reasons entry["llm_rescue"] = { "definition": definition, "source": patch.get("rescue_source", "llm_rescue"), "topics": patch.get("rescue_topics", []), "semantic_tags": patch.get("rescue_semantic_tags", []), "notes": patch.get("rescue_notes", ""), "confidence": confidence, "needs_human_review": needs_review, "status": patch.get("status", ""), } applied += 1 meta = dict(lexicon_payload.get("meta") or {}) meta["base_lexicon"] = args.lexicon.name meta["generated_from_patch"] = args.patch.name meta["generated_by"] = "apply_llm_rescue_patch.py" meta["entry_count"] = len(lexicon) meta["llm_rescue_applied"] = applied meta["llm_rescue_skipped"] = skipped meta["alpha_ready_count"] = sum(1 for item in lexicon if isinstance(item, dict) and item.get("alpha_ready")) meta["review_count"] = sum( 1 for item in lexicon if isinstance(item, dict) and (item.get("review_reasons") or item.get("needs_review")) ) output_payload = {"meta": meta, "entries": lexicon} write_json(args.output, output_payload) return { "applied": applied, "skipped": skipped, "output": str(args.output), } def main() -> None: args = parse_args() result = apply_patch(args) print(f"Lessico aggiornato generato: {result['output']}") print(f"Patch applicate: {result['applied']}") print(f"Voci saltate: {result['skipped']}") if __name__ == "__main__": main()