430 lines
15 KiB
Python
430 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
PRIORITY_INPUT_PATH = Path(__file__).with_name("to_be_review_priority.json")
|
|
PATCH_OUTPUT_PATH = Path(__file__).with_name("llm_rescue_patch.json")
|
|
|
|
|
|
SYSTEM_PROMPT = """Sei un lessicografo italiano che prepara definizioni sintetiche per cruciverba.
|
|
Ricevi un lemma con parte del discorso e contesto semantico parziale.
|
|
Devi proporre una definizione breve in italiano, topic plausibili e tag semantici.
|
|
|
|
Regole:
|
|
- Rispondi solo con JSON valido.
|
|
- La definizione deve essere concisa, naturale e utile per un cruciverba.
|
|
- Evita di includere il lemma o derivati ovvi del lemma nella definizione.
|
|
- Se il termine sembra raro, ambiguo, refuso o poco affidabile, abbassa la confidenza e segnala needs_human_review=true.
|
|
- I topic devono essere pochi, in inglese semplice minuscolo con underscore se serve.
|
|
- I semantic_tags devono essere pochi, descrittivi e in italiano o inglese semplice.
|
|
- Non inventare dettagli enciclopedici troppo specifici se non supportati dal contesto.
|
|
|
|
Formato JSON obbligatorio:
|
|
{
|
|
"definition": "...",
|
|
"topics": ["topic1", "topic2"],
|
|
"semantic_tags": ["tag1", "tag2"],
|
|
"confidence": 0.0,
|
|
"needs_human_review": true,
|
|
"notes": "..."
|
|
}
|
|
"""
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Costruisce una patch di rescue lessicale usando un LLM su un lotto di voci "
|
|
"prioritarie tratte da to_be_review_priority.json."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
type=Path,
|
|
default=PRIORITY_INPUT_PATH,
|
|
help="File to_be_review_priority.json di partenza.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=Path,
|
|
default=PATCH_OUTPUT_PATH,
|
|
help="Patch JSON da generare o aggiornare.",
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=50,
|
|
help="Numero massimo di voci da processare nel lotto. Usa 0 per tutte le voci selezionate.",
|
|
)
|
|
parser.add_argument(
|
|
"--bucket",
|
|
default="red",
|
|
help="Bucket di priorita da considerare: red, orange, yellow oppure all.",
|
|
)
|
|
parser.add_argument(
|
|
"--provider",
|
|
choices=("openai_compatible", "ollama"),
|
|
default="openai_compatible",
|
|
help="Tipo di endpoint LLM da usare.",
|
|
)
|
|
parser.add_argument(
|
|
"--api-base",
|
|
default="",
|
|
help=(
|
|
"Endpoint API. Per openai_compatible: .../v1/chat/completions. "
|
|
"Per ollama: .../api/chat."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--api-key-env",
|
|
default="OPENAI_API_KEY",
|
|
help="Nome della variabile d'ambiente che contiene la API key.",
|
|
)
|
|
parser.add_argument(
|
|
"--model",
|
|
default="gpt-4.1-mini",
|
|
help="Nome del modello da interrogare.",
|
|
)
|
|
parser.add_argument(
|
|
"--temperature",
|
|
type=float,
|
|
default=0.2,
|
|
help="Temperatura della richiesta LLM.",
|
|
)
|
|
parser.add_argument(
|
|
"--sleep",
|
|
type=float,
|
|
default=0.5,
|
|
help="Pausa tra una richiesta e la successiva.",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-existing",
|
|
action="store_true",
|
|
help="Salta le voci gia presenti nell'output con status drafted/reviewed/done.",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Non chiama alcun LLM: prepara solo il lotto e marca le voci come selected.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_json(path: Path, default: object) -> object:
|
|
if not path.exists():
|
|
return default
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def write_json(path: Path, payload: object) -> None:
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def build_record(entry: Dict[str, Any]) -> Dict[str, Any]:
|
|
wiktextract = entry.get("wiktextract") or {}
|
|
wiktextract_defs = wiktextract.get("definitions") if isinstance(wiktextract, dict) else []
|
|
babelnet_best = entry.get("babelnet_best_synset") or {}
|
|
babelnet_glosses = babelnet_best.get("glosses") if isinstance(babelnet_best, dict) else []
|
|
return {
|
|
"form": entry.get("form"),
|
|
"lemma": entry.get("lemma"),
|
|
"pos": entry.get("pos"),
|
|
"priority_bucket": entry.get("priority_bucket"),
|
|
"priority_score": entry.get("priority_score"),
|
|
"review_reasons": entry.get("review_reasons", []),
|
|
"current_topics": entry.get("topics", []),
|
|
"current_definition": entry.get("preferred_definition", ""),
|
|
"current_source": entry.get("preferred_source", ""),
|
|
"context": {
|
|
"topic_suggestions": entry.get("topic_suggestions", []),
|
|
"semantic_glosses": entry.get("semantic_glosses", []),
|
|
"senses": entry.get("senses", []),
|
|
"wiktextract_definitions": wiktextract_defs or [],
|
|
"wiktextract_topic_hints": wiktextract.get("topic_hints", []) if isinstance(wiktextract, dict) else [],
|
|
"babelnet_glosses": babelnet_glosses or [],
|
|
},
|
|
"rescue_definition": "",
|
|
"rescue_source": "",
|
|
"rescue_topics": [],
|
|
"rescue_semantic_tags": [],
|
|
"rescue_notes": "",
|
|
"confidence": 0.0,
|
|
"needs_human_review": True,
|
|
"status": "pending",
|
|
}
|
|
|
|
|
|
def build_user_prompt(entry: Dict[str, Any]) -> str:
|
|
context = entry.get("context") or {}
|
|
payload = {
|
|
"form": entry.get("form"),
|
|
"lemma": entry.get("lemma"),
|
|
"pos": entry.get("pos"),
|
|
"current_topics": entry.get("current_topics", []),
|
|
"review_reasons": entry.get("review_reasons", []),
|
|
"current_definition": entry.get("current_definition", ""),
|
|
"context": context,
|
|
}
|
|
return (
|
|
"Genera una proposta di rescue lessicale per questa voce italiana.\n"
|
|
"Se il termine sembra un refuso o una variante dubbia, segnalalo nelle notes.\n"
|
|
"Payload:\n"
|
|
f"{json.dumps(payload, ensure_ascii=False, indent=2)}"
|
|
)
|
|
|
|
|
|
def resolve_api_base(args: argparse.Namespace) -> str:
|
|
if args.api_base:
|
|
return args.api_base
|
|
if args.provider == "ollama":
|
|
return "http://localhost:11434/api/chat"
|
|
return "https://api.openai.com/v1/chat/completions"
|
|
|
|
|
|
def request_openai_compatible(
|
|
api_base: str,
|
|
api_key: str,
|
|
model: str,
|
|
temperature: float,
|
|
user_prompt: str,
|
|
) -> str:
|
|
payload = {
|
|
"model": model,
|
|
"temperature": temperature,
|
|
"messages": [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_prompt},
|
|
],
|
|
}
|
|
request = urllib.request.Request(
|
|
api_base,
|
|
data=json.dumps(payload).encode("utf-8"),
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {api_key}",
|
|
},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=90) as response:
|
|
body = json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|
raise RuntimeError(f"OpenAI-compatible HTTP {exc.code}: {detail}") from exc
|
|
return str(body["choices"][0]["message"]["content"]).strip()
|
|
|
|
|
|
def request_ollama(
|
|
api_base: str,
|
|
model: str,
|
|
temperature: float,
|
|
user_prompt: str,
|
|
) -> str:
|
|
payload = {
|
|
"model": model,
|
|
"stream": False,
|
|
"options": {"temperature": temperature},
|
|
"messages": [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_prompt},
|
|
],
|
|
}
|
|
request = urllib.request.Request(
|
|
api_base,
|
|
data=json.dumps(payload).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=90) as response:
|
|
body = json.loads(response.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
detail = exc.read().decode("utf-8", errors="replace")
|
|
raise RuntimeError(f"Ollama HTTP {exc.code}: {detail}") from exc
|
|
return str((body.get("message") or {}).get("content", "")).strip()
|
|
|
|
|
|
def extract_json_object(text: str) -> Dict[str, Any]:
|
|
text = text.strip()
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start == -1 or end == -1 or end <= start:
|
|
raise ValueError("Risposta LLM senza oggetto JSON riconoscibile.")
|
|
return json.loads(text[start : end + 1])
|
|
|
|
|
|
def normalize_llm_payload(payload: Dict[str, Any], model: str) -> Dict[str, Any]:
|
|
topics = payload.get("topics")
|
|
tags = payload.get("semantic_tags")
|
|
confidence = payload.get("confidence", 0.0)
|
|
return {
|
|
"rescue_definition": str(payload.get("definition", "")).strip(),
|
|
"rescue_source": f"llm_rescue:{model}",
|
|
"rescue_topics": [str(item).strip().lower() for item in (topics or []) if str(item).strip()],
|
|
"rescue_semantic_tags": [str(item).strip() for item in (tags or []) if str(item).strip()],
|
|
"rescue_notes": str(payload.get("notes", "")).strip(),
|
|
"confidence": max(0.0, min(1.0, float(confidence or 0.0))),
|
|
"needs_human_review": bool(payload.get("needs_human_review", True)),
|
|
"status": "drafted",
|
|
}
|
|
|
|
|
|
def should_skip_existing(entry: Dict[str, Any]) -> bool:
|
|
return str(entry.get("status", "")).lower() in {"drafted", "reviewed", "done"}
|
|
|
|
|
|
def generate_patch(args: argparse.Namespace) -> Dict[str, Any]:
|
|
source_payload = load_json(args.input, {"entries": []})
|
|
if not isinstance(source_payload, dict):
|
|
raise ValueError(f"File priority non valido: {args.input}")
|
|
|
|
output_payload = load_json(args.output, {"entries": []})
|
|
if not isinstance(output_payload, dict):
|
|
output_payload = {"entries": []}
|
|
|
|
existing_by_form = {
|
|
str(entry.get("form", "")).lower(): entry
|
|
for entry in output_payload.get("entries", []) or []
|
|
if isinstance(entry, dict) and entry.get("form")
|
|
}
|
|
|
|
bucket = str(args.bucket or "red").strip().lower()
|
|
source_entries = source_payload.get("practical_entries") or source_payload.get("entries") or []
|
|
|
|
max_items = int(args.limit)
|
|
unlimited = max_items <= 0
|
|
selected: List[Dict[str, Any]] = []
|
|
skipped_preselection = 0
|
|
for entry in source_entries:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
if bucket != "all" and str(entry.get("priority_bucket", "")).lower() != bucket:
|
|
continue
|
|
form = str(entry.get("form", "")).strip().lower()
|
|
if not form:
|
|
continue
|
|
existing = existing_by_form.get(form)
|
|
if args.skip_existing and existing and should_skip_existing(existing):
|
|
skipped_preselection += 1
|
|
continue
|
|
selected.append(entry)
|
|
if not unlimited and len(selected) >= max(1, max_items):
|
|
break
|
|
|
|
api_base = resolve_api_base(args)
|
|
api_key = os.environ.get(args.api_key_env, "") if args.provider == "openai_compatible" else ""
|
|
if not args.dry_run and args.provider == "openai_compatible" and not api_key:
|
|
raise RuntimeError(
|
|
f"Variabile d'ambiente {args.api_key_env} non valorizzata per provider openai_compatible."
|
|
)
|
|
|
|
merged_records: List[Dict[str, Any]] = []
|
|
processed = 0
|
|
skipped_existing = 0
|
|
for source_entry in selected:
|
|
form_key = str(source_entry.get("form", "")).strip().lower()
|
|
existing = existing_by_form.get(form_key)
|
|
record = dict(existing) if isinstance(existing, dict) else build_record(source_entry)
|
|
|
|
if args.skip_existing and existing and should_skip_existing(existing):
|
|
skipped_existing += 1
|
|
merged_records.append(record)
|
|
continue
|
|
|
|
if args.dry_run:
|
|
record["status"] = "selected"
|
|
record["rescue_source"] = f"llm_rescue:{args.model}"
|
|
merged_records.append(record)
|
|
processed += 1
|
|
continue
|
|
|
|
user_prompt = build_user_prompt(record)
|
|
try:
|
|
if args.provider == "ollama":
|
|
raw_text = request_ollama(api_base, args.model, args.temperature, user_prompt)
|
|
else:
|
|
raw_text = request_openai_compatible(
|
|
api_base,
|
|
api_key,
|
|
args.model,
|
|
args.temperature,
|
|
user_prompt,
|
|
)
|
|
llm_payload = extract_json_object(raw_text)
|
|
record.update(normalize_llm_payload(llm_payload, args.model))
|
|
except (urllib.error.URLError, TimeoutError, ValueError, json.JSONDecodeError, RuntimeError) as exc:
|
|
record["rescue_source"] = f"llm_rescue:{args.model}"
|
|
record["rescue_notes"] = f"errore_llm: {exc}"
|
|
record["status"] = "error"
|
|
record["needs_human_review"] = True
|
|
merged_records.append(record)
|
|
processed += 1
|
|
print(
|
|
f"[{processed}/{len(selected)}] {record.get('form')}: "
|
|
f"status={record.get('status')} conf={record.get('confidence', 0.0)}"
|
|
)
|
|
if record.get("status") == "error" and record.get("rescue_notes"):
|
|
print(f" dettaglio: {record.get('rescue_notes')}")
|
|
if args.sleep > 0:
|
|
time.sleep(args.sleep)
|
|
|
|
seen_forms = {str(item.get("form", "")).lower() for item in merged_records}
|
|
for form_key, existing in existing_by_form.items():
|
|
if form_key not in seen_forms:
|
|
merged_records.append(existing)
|
|
|
|
merged_records.sort(
|
|
key=lambda item: (
|
|
{"pending": 0, "selected": 1, "error": 2, "drafted": 3, "reviewed": 4, "done": 5}.get(
|
|
str(item.get("status", "pending")),
|
|
9,
|
|
),
|
|
-int(item.get("priority_score", 0) or 0),
|
|
str(item.get("form", "")),
|
|
)
|
|
)
|
|
|
|
return {
|
|
"meta": {
|
|
"language": "it",
|
|
"version": 1,
|
|
"base_priority": args.input.name,
|
|
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
|
|
"batch_bucket": bucket,
|
|
"batch_limit": int(args.limit),
|
|
"provider": args.provider,
|
|
"api_base": api_base,
|
|
"model": args.model,
|
|
"dry_run": bool(args.dry_run),
|
|
"entry_count": len(merged_records),
|
|
"processed_count": processed,
|
|
"skipped_existing": skipped_existing,
|
|
"skipped_preselection": skipped_preselection,
|
|
},
|
|
"entries": merged_records,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
payload = generate_patch(args)
|
|
write_json(args.output, payload)
|
|
print(f"Patch LLM rescue generata: {args.output}")
|
|
print(f"Voci nel file: {payload['meta']['entry_count']}")
|
|
print(f"Voci processate in questo run: {payload['meta']['processed_count']}")
|
|
print(f"Voci saltate per skip-existing: {payload['meta']['skipped_existing']}")
|
|
print(f"Voci escluse gia in pre-selezione: {payload['meta']['skipped_preselection']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|