1105 lines
40 KiB
Python
1105 lines
40 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import random
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from typing import Dict, List
|
|
|
|
from build_babelnet_enrichment import BABELNET_ENV_KEY, BABELNET_OUTPUT_PATH, BABELNET_LOCAL_KEY_PATH, load_babelnet_api_key
|
|
from build_enriched_lexicon import ENRICHED_LEXICON_OUTPUT_PATH
|
|
from build_vocabulary import (
|
|
FILTERED_OUTPUT_PATH,
|
|
METADATA_OUTPUT_PATH,
|
|
OUTPUT_PATH,
|
|
build_vocabulary,
|
|
)
|
|
from build_lexicon import LEXICON_OUTPUT_PATH, build_lexicon
|
|
from build_semantic_lexicon import SEMANTIC_LEXICON_OUTPUT_PATH, build_semantic_lexicon
|
|
from clue_generator import generate_clues, load_enriched_entries
|
|
from crossword_filler import CrosswordFiller, load_vocabulary, load_vocabulary_metadata
|
|
from crossword_generator import CrosswordGenerator, WORDS, render_grid
|
|
|
|
|
|
DIFFICULTY_ALIASES: Dict[str, int] = {
|
|
"easy": 1,
|
|
"medium": 2,
|
|
"hard": 4,
|
|
"expert": 5,
|
|
}
|
|
|
|
DEFAULT_TOPIC = "general"
|
|
DEFAULT_INITIAL_WORD_COUNT = len(WORDS)
|
|
DEFAULT_RUNTIME_LEXICON_CANDIDATES = (
|
|
"lexicon_it_curated_llm_aggressive.json",
|
|
"lexicon_it_curated_llm.json",
|
|
"lexicon_it_curated.json",
|
|
"lexicon_it_refined_plus_wiktextract.json",
|
|
ENRICHED_LEXICON_OUTPUT_PATH.name,
|
|
SEMANTIC_LEXICON_OUTPUT_PATH.name,
|
|
)
|
|
ABSTRACTISH_SUFFIXES = ("zione", "zioni", "mento", "menti", "ita", "ezza", "anza", "enza", "ismo")
|
|
FILL_ALLOWED_POS = {"NOUN", "VERB", "ADJ", "ADV", "PREP", "CONJ"}
|
|
GENERAL_FILL_MIN_QUALITY = 6
|
|
GENERAL_FILL_MAX_LENGTH = 10
|
|
SOFT_RELATED_FILL_LIMIT = 120
|
|
DEFAULT_THEMED_FILL_WORD_COUNT = 10
|
|
CONCRETE_TOPICS = {
|
|
"animals",
|
|
"plants",
|
|
"nature",
|
|
"ecology",
|
|
"geography",
|
|
"weather",
|
|
"sea",
|
|
"mountain",
|
|
"health",
|
|
"science",
|
|
"sport",
|
|
"history",
|
|
"school",
|
|
"cinema",
|
|
"literature",
|
|
"food",
|
|
"city",
|
|
"transport",
|
|
"work",
|
|
"home",
|
|
}
|
|
|
|
TOPIC_SEED_REQUIRED_SUBSTRINGS: Dict[str, tuple[str, ...]] = {
|
|
"transport": (
|
|
"auto", "mot", "tren", "nav", "barc", "port", "pist", "vol", "aer",
|
|
"bici", "cicl", "rimorch", "reattor", "vettur", "ambul", "imbarc",
|
|
"trattor", "carr", "vap", "rota", "ruot",
|
|
),
|
|
"animals": (
|
|
"can", "gatt", "lup", "ors", "pesc", "aquil", "anatr", "cavall",
|
|
"serpent", "tig", "leon", "volp", "cerv", "capr", "pecor",
|
|
),
|
|
"nature": (
|
|
"mar", "lag", "fium", "vent", "bosch", "mont", "collin", "isol",
|
|
"rocc", "terra", "acqu", "fiore", "fogli", "radic", "affluent",
|
|
"litoral", "piogg", "nev", "onda", "clim",
|
|
),
|
|
"cinema": (
|
|
"film", "cin", "teatr", "attor", "scen", "reg", "doppi", "dialog",
|
|
"comic", "div", "docu", "pellic", "spettacol",
|
|
),
|
|
}
|
|
|
|
TOPIC_SEED_BLOCKED_SUBSTRINGS: Dict[str, tuple[str, ...]] = {
|
|
"transport": (
|
|
"intervist", "intratten", "speriment", "stermin", "investig",
|
|
"intervent", "centometr", "sintetizz", "erot", "adoraz", "esalt",
|
|
"eccit", "traduz", "fluttu", "sollecit",
|
|
),
|
|
"animals": (
|
|
"assicur", "finanz", "coediz", "camerier", "servitor", "indic",
|
|
"estens", "diffus", "difensor", "spessor", "maggior",
|
|
),
|
|
"cinema": (
|
|
"manifest", "riediz", "dissimul", "diffus", "difensor", "estens",
|
|
"malumor", "eversor",
|
|
),
|
|
}
|
|
|
|
ACTIVE_LEXICON_PATH: Path | None = None
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Generatore e filler di cruciverba.")
|
|
parser.add_argument(
|
|
"--build-vocabulary",
|
|
action="store_true",
|
|
help="Rigenera i file lessicali intermedi: vocabolario esteso, filtrato e metadati.",
|
|
)
|
|
parser.add_argument(
|
|
"--build-lexicon",
|
|
action="store_true",
|
|
help="Rigenera `lexicon_it.json` prima dell'esecuzione.",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-fill",
|
|
action="store_true",
|
|
help="Genera solo la griglia iniziale e salta il riempimento con il filler.",
|
|
)
|
|
parser.add_argument(
|
|
"--build-semantic-lexicon",
|
|
action="store_true",
|
|
help="Rigenera `lexicon_it_semantic.json` arricchendo il lessico con IWN-OMW/ItalWordNet.",
|
|
)
|
|
parser.add_argument(
|
|
"--babelnet-enrich",
|
|
action="store_true",
|
|
help="Prima di generare il cruciverba arricchisce incrementalmente il lessico con BabelNet.",
|
|
)
|
|
parser.add_argument(
|
|
"--babelnet-limit",
|
|
type=int,
|
|
default=20,
|
|
help="Numero massimo di parole da interrogare su BabelNet in questa esecuzione.",
|
|
)
|
|
parser.add_argument(
|
|
"--babelnet-sleep",
|
|
type=float,
|
|
default=0.2,
|
|
help="Pausa in secondi tra richieste BabelNet consecutive.",
|
|
)
|
|
parser.add_argument(
|
|
"--vocabulary",
|
|
type=Path,
|
|
default=None,
|
|
help="Percorso opzionale a un vocabolario testuale personalizzato da usare al posto di quello di default.",
|
|
)
|
|
parser.add_argument(
|
|
"--target-empty-ratio",
|
|
type=float,
|
|
default=1 / 6,
|
|
help="Rapporto target di celle vuote residue dopo il filler. Esempio: 0.1667 lascia circa un sesto di celle vuote.",
|
|
)
|
|
parser.add_argument(
|
|
"--time-limit",
|
|
type=float,
|
|
default=8.0,
|
|
help="Tempo massimo in secondi per la fase di generazione iniziale della griglia.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-candidates",
|
|
type=int,
|
|
default=12,
|
|
help="Numero massimo di candidati esplorati per parola nella generazione iniziale.",
|
|
)
|
|
parser.add_argument(
|
|
"--diffxy",
|
|
type=int,
|
|
default=7,
|
|
help="Differenza massima preferita tra larghezza e altezza della griglia iniziale.",
|
|
)
|
|
parser.add_argument(
|
|
"--seed",
|
|
type=int,
|
|
default=None,
|
|
help="Seed casuale per ottenere varianti riproducibili del cruciverba: stesso seed, stesso risultato.",
|
|
)
|
|
parser.add_argument(
|
|
"--difficulty",
|
|
default="medium",
|
|
help="Difficolta lessicale del filler. Alias testuali: easy, medium, hard, expert. Internamente mappati a livelli numerici 1-5.",
|
|
)
|
|
parser.add_argument(
|
|
"--topic",
|
|
default=DEFAULT_TOPIC,
|
|
help="Tema del cruciverba. Puoi indicare un topic o una lista separata da virgole, es. transport,nature,ecology. Se lasci general, i topic possono essere scelti dal lessico con --max-topics.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-topics",
|
|
type=int,
|
|
default=1,
|
|
help="Numero massimo di topic casuali da scegliere dal lessico arricchito quando --topic e' general. Massimo consigliato: 3.",
|
|
)
|
|
parser.add_argument(
|
|
"--initial-word-count",
|
|
type=int,
|
|
default=DEFAULT_INITIAL_WORD_COUNT,
|
|
help="Numero di parole-seme usate per costruire la griglia iniziale prima del filler.",
|
|
)
|
|
parser.add_argument(
|
|
"--themed-fill-count",
|
|
type=int,
|
|
default=DEFAULT_THEMED_FILL_WORD_COUNT,
|
|
help="Numero massimo indicativo di parole aggiunte dal filler da mantenere fortemente legate al tema.",
|
|
)
|
|
parser.add_argument(
|
|
"--definitions",
|
|
action="store_true",
|
|
help="Genera e stampa le definizioni per le parole inserite nel cruciverba.",
|
|
)
|
|
parser.add_argument(
|
|
"--lexicon",
|
|
type=Path,
|
|
default=None,
|
|
help=(
|
|
"File lessicale da usare durante l'esecuzione. Se omesso, il programma usa il lessico "
|
|
"piu avanzato disponibile, preferendo lexicon_it_curated_llm_aggressive.json."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--definition-babelnet-limit",
|
|
type=int,
|
|
default=20,
|
|
help="Numero massimo di parole del cruciverba da arricchire al volo con BabelNet per generare definizioni.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def ensure_vocabulary(args: argparse.Namespace) -> None:
|
|
needs_build = args.build_vocabulary or not FILTERED_OUTPUT_PATH.exists() or not METADATA_OUTPUT_PATH.exists()
|
|
if not needs_build:
|
|
return
|
|
|
|
totals = build_vocabulary()
|
|
print("Vocabolario rigenerato")
|
|
print(f"- esteso: {OUTPUT_PATH}")
|
|
print(f"- filtrato: {FILTERED_OUTPUT_PATH}")
|
|
print(f"- metadati: {METADATA_OUTPUT_PATH}")
|
|
print(f"- parole estese: {totals['extended_words']}")
|
|
print(f"- parole filtrate: {totals['filtered_words']}")
|
|
|
|
|
|
def ensure_lexicon(args: argparse.Namespace) -> None:
|
|
needs_build = args.build_lexicon or not LEXICON_OUTPUT_PATH.exists()
|
|
if not needs_build:
|
|
return
|
|
|
|
lexicon = build_lexicon()
|
|
LEXICON_OUTPUT_PATH.write_text(
|
|
json.dumps(lexicon, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
print("Lessico rigenerato")
|
|
print(f"- file: {LEXICON_OUTPUT_PATH}")
|
|
print(f"- voci: {lexicon['meta']['entry_count']}")
|
|
|
|
|
|
def ensure_semantic_lexicon(args: argparse.Namespace) -> None:
|
|
needs_build = args.build_semantic_lexicon or not SEMANTIC_LEXICON_OUTPUT_PATH.exists()
|
|
if not needs_build:
|
|
return
|
|
|
|
lexicon = build_semantic_lexicon()
|
|
SEMANTIC_LEXICON_OUTPUT_PATH.write_text(
|
|
json.dumps(lexicon, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
matched = sum(1 for entry in lexicon["entries"] if entry.get("semantic", {}).get("matched"))
|
|
print("Lessico semantico rigenerato")
|
|
print(f"- file: {SEMANTIC_LEXICON_OUTPUT_PATH}")
|
|
print(f"- voci: {lexicon['meta']['entry_count']}")
|
|
print(f"- match semantici: {matched}")
|
|
|
|
|
|
def ensure_babelnet_enrichment(args: argparse.Namespace) -> None:
|
|
if not args.babelnet_enrich:
|
|
return
|
|
if args.babelnet_limit <= 0:
|
|
print("BabelNet enrichment saltato: --babelnet-limit <= 0")
|
|
return
|
|
|
|
from babelnet_incremental_enricher import run_incremental_enrichment
|
|
|
|
namespace = SimpleNamespace(
|
|
api_key=load_babelnet_api_key(),
|
|
topic=primary_topic(args.topic),
|
|
difficulty=args.difficulty,
|
|
limit=args.babelnet_limit,
|
|
sleep=args.babelnet_sleep,
|
|
semantic=SEMANTIC_LEXICON_OUTPUT_PATH,
|
|
babelnet=BABELNET_OUTPUT_PATH,
|
|
enriched=ENRICHED_LEXICON_OUTPUT_PATH,
|
|
dry_run=False,
|
|
retry_no_match=False,
|
|
)
|
|
|
|
print("Arricchimento BabelNet incrementale")
|
|
print(f"- tema guida: {primary_topic(args.topic)}")
|
|
print(f"- topic attivi: {args.topic}")
|
|
print(f"- limite parole: {args.babelnet_limit}")
|
|
print(f"- chiave: {BABELNET_ENV_KEY} oppure {BABELNET_LOCAL_KEY_PATH.name}")
|
|
result = run_incremental_enrichment(namespace)
|
|
print("Riepilogo BabelNet")
|
|
print(f"- parole interrogate: {result['selected_count']}")
|
|
print(f"- chiamate API reali: {result['api_call_count']}")
|
|
print(f"- risposte da cache: {result['cache_hit_count']}")
|
|
print(f"- match: {result['matched_count']}")
|
|
for item in result["word_logs"]:
|
|
print(
|
|
f" {item['word']}: api_calls={item['api_calls']}, "
|
|
f"cache_hits={item['cache_hits']}, risposta={item['responses'] > 0}, "
|
|
f"match={item['matched']}, synsets={item['synsets']}"
|
|
)
|
|
print()
|
|
|
|
|
|
def enrich_words_for_definitions(args: argparse.Namespace, words: List[str]) -> None:
|
|
if not args.definitions:
|
|
return
|
|
if args.definition_babelnet_limit <= 0:
|
|
print("Arricchimento BabelNet per definizioni saltato: --definition-babelnet-limit <= 0")
|
|
return
|
|
|
|
from babelnet_incremental_enricher import run_incremental_enrichment
|
|
|
|
namespace = SimpleNamespace(
|
|
api_key=load_babelnet_api_key(),
|
|
topic=primary_topic(args.topic),
|
|
difficulty=args.difficulty,
|
|
limit=args.definition_babelnet_limit,
|
|
sleep=args.babelnet_sleep,
|
|
semantic=SEMANTIC_LEXICON_OUTPUT_PATH,
|
|
babelnet=BABELNET_OUTPUT_PATH,
|
|
enriched=ENRICHED_LEXICON_OUTPUT_PATH,
|
|
dry_run=False,
|
|
retry_no_match=False,
|
|
words=words,
|
|
)
|
|
|
|
print()
|
|
print("Arricchimento BabelNet per definizioni")
|
|
print(f"- parole nel cruciverba: {len(set(words))}")
|
|
print(f"- limite parole: {args.definition_babelnet_limit}")
|
|
result = run_incremental_enrichment(namespace)
|
|
print("Riepilogo BabelNet definizioni")
|
|
print(f"- parole interrogate: {result['selected_count']}")
|
|
print(f"- chiamate API reali: {result['api_call_count']}")
|
|
print(f"- risposte da cache: {result['cache_hit_count']}")
|
|
print(f"- match: {result['matched_count']}")
|
|
for item in result["word_logs"]:
|
|
print(
|
|
f" {item['word']}: api_calls={item['api_calls']}, "
|
|
f"cache_hits={item['cache_hits']}, risposta={item['responses'] > 0}, "
|
|
f"match={item['matched']}, synsets={item['synsets']}"
|
|
)
|
|
|
|
|
|
def placement_words(placements) -> List[str]:
|
|
return [placement.word for placement in placements]
|
|
|
|
|
|
def print_definitions(args: argparse.Namespace, state) -> None:
|
|
if not args.definitions:
|
|
return
|
|
entries = load_enriched_entries(resolve_runtime_lexicon_path(args.lexicon))
|
|
clues = generate_clues(state.placements, entries, primary_topic(args.topic), args.difficulty)
|
|
print()
|
|
print("Definizioni:")
|
|
for clue in clues:
|
|
print(
|
|
f"{clue.number:>2}. {clue.direction} ({clue.x}, {clue.y}) "
|
|
f"[{clue.source}] {clue.text} -> {clue.word.upper()}"
|
|
)
|
|
print_alpha_diagnostics(args, state, entries)
|
|
|
|
|
|
def word_is_on_topic(entry: Dict[str, object], topic: str) -> bool:
|
|
active_topics = parse_topics(topic)
|
|
if len(active_topics) > 1:
|
|
return any(word_is_on_topic(entry, item) for item in active_topics)
|
|
|
|
normalized_topic = active_topics[0]
|
|
if normalized_topic == DEFAULT_TOPIC:
|
|
return True
|
|
|
|
topics = {str(item).lower() for item in entry.get("topics", []) if item}
|
|
if normalized_topic in topics:
|
|
return True
|
|
semantic = entry.get("semantic", {})
|
|
if isinstance(semantic, dict):
|
|
semantic_topics = {str(item).lower() for item in semantic.get("semantic_topics", []) if item}
|
|
if normalized_topic in semantic_topics:
|
|
return True
|
|
|
|
babelnet = entry.get("babelnet", {})
|
|
if isinstance(babelnet, dict):
|
|
best_synset = babelnet.get("best_synset", {})
|
|
if isinstance(best_synset, dict):
|
|
try:
|
|
topic_score = int(best_synset.get("topic_score", 0))
|
|
except (TypeError, ValueError):
|
|
topic_score = 0
|
|
if best_synset.get("topic") == normalized_topic and topic_score >= 40:
|
|
return True
|
|
|
|
try:
|
|
return strong_topic_relevance(entry, normalized_topic) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def pos_label(pos: str) -> str:
|
|
labels = {
|
|
"NOUN": "sostantivi",
|
|
"ADJ": "aggettivi",
|
|
"VERB": "verbi",
|
|
"ADV": "avverbi",
|
|
"PREP": "preposizioni",
|
|
"CONJ": "congiunzioni",
|
|
}
|
|
return labels.get(str(pos).upper(), "altri")
|
|
|
|
|
|
def print_alpha_diagnostics(args: argparse.Namespace, state, entries: Dict[str, Dict[str, object]]) -> None:
|
|
words = placement_words(state.placements)
|
|
unique_words = list(dict.fromkeys(word.lower() for word in words))
|
|
active_topics = parse_topics(args.topic)
|
|
total_cells = state.area()
|
|
filled_cells = len(state.grid)
|
|
empty_cells = total_cells - filled_cells
|
|
empty_ratio = empty_cells / total_cells if total_cells else 0.0
|
|
filled_ratio = filled_cells / total_cells if total_cells else 0.0
|
|
target_empty_cells = round(total_cells * args.target_empty_ratio)
|
|
target_delta = empty_cells - target_empty_cells
|
|
topic_words = []
|
|
off_topic_words = []
|
|
topic_distribution = {topic: 0 for topic in active_topics if topic != DEFAULT_TOPIC}
|
|
pos_counts = {
|
|
"sostantivi": 0,
|
|
"aggettivi": 0,
|
|
"verbi": 0,
|
|
"avverbi": 0,
|
|
"preposizioni": 0,
|
|
"congiunzioni": 0,
|
|
"altri": 0,
|
|
}
|
|
|
|
for word in unique_words:
|
|
entry = entries.get(word, {})
|
|
label = pos_label(str(entry.get("pos", "")))
|
|
pos_counts[label] = pos_counts.get(label, 0) + 1
|
|
if entry and word_is_on_topic(entry, args.topic):
|
|
topic_words.append(word)
|
|
for selected_topic in topic_distribution:
|
|
if word_is_on_topic(entry, selected_topic):
|
|
topic_distribution[selected_topic] += 1
|
|
else:
|
|
off_topic_words.append(word)
|
|
|
|
print()
|
|
print("Diagnostica alpha:")
|
|
print(f"- parole uniche nello schema: {len(unique_words)}")
|
|
print(f"- celle totali: {total_cells}")
|
|
print(f"- celle riempite: {filled_cells} ({filled_ratio * 100:.1f}%)")
|
|
print(f"- celle vuote: {empty_cells} ({empty_ratio * 100:.1f}%)")
|
|
print(f"- target celle vuote: {target_empty_cells} ({args.target_empty_ratio * 100:.1f}%)")
|
|
if target_delta > 0:
|
|
print(f"- distanza dal target: {target_delta} celle vuote in piu del target")
|
|
elif target_delta < 0:
|
|
print(f"- distanza dal target: {-target_delta} celle vuote in meno del target")
|
|
else:
|
|
print("- distanza dal target: centrato")
|
|
print(f"- topic richiesti: {', '.join(active_topics)}")
|
|
print(f"- parole in tema: {len(topic_words)}")
|
|
print(f"- parole fuori tema o non classificate: {len(off_topic_words)}")
|
|
if topic_distribution:
|
|
print("- distribuzione topic:")
|
|
for selected_topic, count in topic_distribution.items():
|
|
print(f" {selected_topic}: {count}")
|
|
if topic_words:
|
|
print(f"- elenco in tema: {', '.join(topic_words)}")
|
|
if off_topic_words:
|
|
print(f"- elenco fuori tema/non classificate: {', '.join(off_topic_words)}")
|
|
print("- parti del discorso:")
|
|
for label in ("sostantivi", "aggettivi", "verbi", "avverbi", "preposizioni", "congiunzioni", "altri"):
|
|
print(f" {label}: {pos_counts.get(label, 0)}")
|
|
|
|
|
|
def parse_difficulty(value: str) -> int:
|
|
text = str(value).strip().lower()
|
|
if text in DIFFICULTY_ALIASES:
|
|
return DIFFICULTY_ALIASES[text]
|
|
try:
|
|
level = int(text)
|
|
except ValueError as exc:
|
|
raise SystemExit(
|
|
"Valore non valido per --difficulty. Usa easy, medium, hard, expert oppure un intero tra 1 e 5."
|
|
) from exc
|
|
if not 1 <= level <= 5:
|
|
raise SystemExit("Il valore numerico di --difficulty deve essere compreso tra 1 e 5.")
|
|
return level
|
|
|
|
|
|
def load_selected_vocabulary(path: Path | None) -> List[str]:
|
|
if path is None:
|
|
return load_vocabulary()
|
|
return path.read_text(encoding="utf-8").splitlines()
|
|
|
|
|
|
def resolve_runtime_lexicon_path(requested: Path | None) -> Path:
|
|
global ACTIVE_LEXICON_PATH
|
|
if requested is not None:
|
|
path = requested if requested.is_absolute() else Path(__file__).resolve().parent / requested
|
|
if not path.exists():
|
|
raise SystemExit(f"Il lessico specificato con --lexicon non esiste: {path}")
|
|
ACTIVE_LEXICON_PATH = path
|
|
return path
|
|
if ACTIVE_LEXICON_PATH is not None:
|
|
return ACTIVE_LEXICON_PATH
|
|
base_dir = Path(__file__).resolve().parent
|
|
for candidate in DEFAULT_RUNTIME_LEXICON_CANDIDATES:
|
|
path = base_dir / candidate
|
|
if path.exists():
|
|
ACTIVE_LEXICON_PATH = path
|
|
return path
|
|
ACTIVE_LEXICON_PATH = ENRICHED_LEXICON_OUTPUT_PATH
|
|
return ACTIVE_LEXICON_PATH
|
|
|
|
|
|
def load_semantic_payload(path: Path | None = None) -> Dict[str, object]:
|
|
runtime_path = resolve_runtime_lexicon_path(path)
|
|
if runtime_path.exists():
|
|
return json.loads(runtime_path.read_text(encoding="utf-8"))
|
|
if not SEMANTIC_LEXICON_OUTPUT_PATH.exists():
|
|
lexicon = build_semantic_lexicon()
|
|
SEMANTIC_LEXICON_OUTPUT_PATH.write_text(
|
|
json.dumps(lexicon, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
return json.loads(SEMANTIC_LEXICON_OUTPUT_PATH.read_text(encoding="utf-8"))
|
|
|
|
|
|
def parse_topics(value: str) -> List[str]:
|
|
topics = []
|
|
seen = set()
|
|
for raw_topic in str(value or DEFAULT_TOPIC).split(","):
|
|
topic = raw_topic.strip().lower()
|
|
if not topic or topic in seen:
|
|
continue
|
|
topics.append(topic)
|
|
seen.add(topic)
|
|
return topics or [DEFAULT_TOPIC]
|
|
|
|
|
|
def primary_topic(value: str) -> str:
|
|
return parse_topics(value)[0]
|
|
|
|
|
|
def available_topics_from_lexicon(payload: Dict[str, object], *, min_words: int = 5) -> List[str]:
|
|
counts: Dict[str, int] = {}
|
|
excluded = {DEFAULT_TOPIC, "abstract", "actions"}
|
|
for entry in payload.get("entries", []) or []:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
if not entry.get("allowed_in_crossword", False):
|
|
continue
|
|
for topic in entry.get("topics", []) or []:
|
|
normalized = str(topic).strip().lower()
|
|
if not normalized or normalized in excluded:
|
|
continue
|
|
counts[normalized] = counts.get(normalized, 0) + 1
|
|
return sorted(topic for topic, count in counts.items() if count >= min_words)
|
|
|
|
|
|
def resolve_topics(args: argparse.Namespace, difficulty_level: int) -> List[str]:
|
|
requested = parse_topics(args.topic)
|
|
max_topics = max(1, min(3, int(args.max_topics)))
|
|
if requested != [DEFAULT_TOPIC]:
|
|
selected = requested[:max_topics]
|
|
args.topic = ",".join(selected)
|
|
args.topic_seed_counts = {
|
|
topic: len(select_initial_words(difficulty_level, topic, args.initial_word_count))
|
|
for topic in selected
|
|
}
|
|
return selected
|
|
|
|
if max_topics <= 1:
|
|
args.topic = DEFAULT_TOPIC
|
|
args.topic_seed_counts = {}
|
|
return [DEFAULT_TOPIC]
|
|
|
|
candidates = []
|
|
for candidate in available_topics_from_lexicon(load_semantic_payload(), min_words=1):
|
|
available = len(select_initial_words(difficulty_level, candidate, args.initial_word_count))
|
|
if available > 0:
|
|
candidates.append((candidate, available))
|
|
if not candidates:
|
|
args.topic = DEFAULT_TOPIC
|
|
args.topic_seed_counts = {}
|
|
return [DEFAULT_TOPIC]
|
|
|
|
rng = random.Random(args.seed)
|
|
rng.shuffle(candidates)
|
|
selected_pairs = candidates[: min(max_topics, len(candidates))]
|
|
selected = [topic for topic, _ in selected_pairs]
|
|
args.topic = ",".join(selected)
|
|
args.topic_seed_counts = dict(selected_pairs)
|
|
return selected
|
|
|
|
|
|
def entry_topics(entry: Dict[str, object]) -> tuple[set[str], set[str]]:
|
|
topics = {str(item).lower() for item in entry.get("topics", [])}
|
|
semantic_topics = {
|
|
str(item).lower()
|
|
for item in entry.get("semantic", {}).get("semantic_topics", [])
|
|
}
|
|
return topics, semantic_topics
|
|
|
|
|
|
def matches_topic_roots(word: str, selected_topic: str) -> bool:
|
|
roots = TOPIC_SEED_REQUIRED_SUBSTRINGS.get(selected_topic, ())
|
|
blocked = TOPIC_SEED_BLOCKED_SUBSTRINGS.get(selected_topic, ())
|
|
if any(part in word for part in blocked):
|
|
return False
|
|
return bool(roots) and any(part in word for part in roots)
|
|
|
|
|
|
def topic_relevance(entry: Dict[str, object], topic: str) -> int:
|
|
active_topics = parse_topics(topic)
|
|
if len(active_topics) > 1:
|
|
return max(topic_relevance(entry, item) for item in active_topics)
|
|
|
|
selected_topic = topic.strip().lower()
|
|
if selected_topic == DEFAULT_TOPIC:
|
|
return 20
|
|
|
|
word = str(entry.get("form", ""))
|
|
topics, semantic_topics = entry_topics(entry)
|
|
score = 0
|
|
if selected_topic in topics:
|
|
score += 100
|
|
if selected_topic in semantic_topics:
|
|
score += 45
|
|
if matches_topic_roots(word, selected_topic):
|
|
score += 35
|
|
if "general" in topics:
|
|
score += 5
|
|
|
|
if any(part in word for part in TOPIC_SEED_BLOCKED_SUBSTRINGS.get(selected_topic, ())):
|
|
score -= 80
|
|
if selected_topic in CONCRETE_TOPICS and word.endswith(ABSTRACTISH_SUFFIXES):
|
|
score -= 15
|
|
return score
|
|
|
|
|
|
def strong_topic_relevance(entry: Dict[str, object], topic: str) -> int:
|
|
active_topics = parse_topics(topic)
|
|
if len(active_topics) > 1:
|
|
return max(strong_topic_relevance(entry, item) for item in active_topics)
|
|
|
|
selected_topic = topic.strip().lower()
|
|
if selected_topic == DEFAULT_TOPIC:
|
|
return 20
|
|
topics, _ = entry_topics(entry)
|
|
return 100 if selected_topic in topics else 0
|
|
|
|
|
|
def lexical_fill_score(entry: Dict[str, object], topic: str) -> tuple[int, int, int, int, int, str]:
|
|
word = str(entry.get("form", ""))
|
|
quality = int(entry.get("quality_score", 0))
|
|
pos = str(entry.get("pos", ""))
|
|
semantic = entry.get("semantic", {})
|
|
pos_bonus = {
|
|
"NOUN": 12,
|
|
"VERB": 8,
|
|
"ADJ": 6,
|
|
"ADV": 4,
|
|
"PREP": 2,
|
|
"CONJ": 2,
|
|
}.get(pos, 0)
|
|
semantic_bonus = 3 if semantic.get("matched") else 0
|
|
length = len(word)
|
|
length_bonus = 3 if 4 <= length <= 10 else 1 if 2 <= length <= 13 else -4
|
|
return (
|
|
topic_relevance(entry, topic),
|
|
quality,
|
|
pos_bonus,
|
|
semantic_bonus,
|
|
length_bonus,
|
|
word,
|
|
)
|
|
|
|
|
|
def is_general_fill_support(entry: Dict[str, object]) -> bool:
|
|
word = str(entry.get("form", ""))
|
|
if int(entry.get("quality_score", 0)) < GENERAL_FILL_MIN_QUALITY:
|
|
return False
|
|
if len(word) > GENERAL_FILL_MAX_LENGTH:
|
|
return False
|
|
if word.endswith(ABSTRACTISH_SUFFIXES):
|
|
return False
|
|
return DEFAULT_TOPIC in {str(item).lower() for item in entry.get("topics", [])}
|
|
|
|
|
|
def load_filtered_entries(level: int, topic: str) -> List[Dict[str, object]]:
|
|
payload = load_semantic_payload()
|
|
normalized_topic = ",".join(parse_topics(topic))
|
|
|
|
eligible = [
|
|
entry
|
|
for entry in payload.get("entries", [])
|
|
if entry.get("allowed_in_crossword", False)
|
|
and int(entry.get("difficulty_word", 5)) <= level
|
|
and str(entry.get("pos", "")) in FILL_ALLOWED_POS
|
|
]
|
|
|
|
if normalized_topic == DEFAULT_TOPIC:
|
|
selected = eligible
|
|
else:
|
|
strong_topic = [entry for entry in eligible if strong_topic_relevance(entry, normalized_topic) > 0]
|
|
soft_related = [
|
|
entry
|
|
for entry in eligible
|
|
if entry not in strong_topic
|
|
and topic_relevance(entry, normalized_topic) > 0
|
|
and int(entry.get("quality_score", 0)) >= GENERAL_FILL_MIN_QUALITY
|
|
and len(str(entry.get("form", ""))) <= GENERAL_FILL_MAX_LENGTH
|
|
and not str(entry.get("form", "")).endswith(ABSTRACTISH_SUFFIXES)
|
|
]
|
|
soft_related.sort(key=lambda entry: lexical_fill_score(entry, normalized_topic), reverse=True)
|
|
|
|
general_support = [
|
|
entry
|
|
for entry in eligible
|
|
if entry not in strong_topic
|
|
and is_general_fill_support(entry)
|
|
]
|
|
general_support.sort(key=lambda entry: lexical_fill_score(entry, DEFAULT_TOPIC), reverse=True)
|
|
selected = strong_topic + soft_related[:SOFT_RELATED_FILL_LIMIT]
|
|
selected += [entry for entry in general_support if entry not in selected]
|
|
|
|
selected.sort(key=lambda entry: lexical_fill_score(entry, normalized_topic), reverse=True)
|
|
return selected
|
|
|
|
|
|
def load_filtered_vocabulary(level: int, topic: str) -> List[str]:
|
|
return [str(entry["form"]) for entry in load_filtered_entries(level, topic)]
|
|
|
|
|
|
def load_semantic_metadata_for_vocabulary(words: List[str], topic: str) -> Dict[str, Dict[str, object]]:
|
|
payload = load_semantic_payload()
|
|
selected = set(words)
|
|
metadata: Dict[str, Dict[str, object]] = {}
|
|
for entry in payload.get("entries", []):
|
|
word = str(entry.get("form", ""))
|
|
if word not in selected:
|
|
continue
|
|
enriched = dict(entry)
|
|
enriched["_topic_relevance"] = topic_relevance(enriched, topic)
|
|
enriched["_strong_topic_relevance"] = strong_topic_relevance(enriched, topic)
|
|
metadata[word] = enriched
|
|
return metadata
|
|
|
|
|
|
def select_initial_words(level: int, topic: str, count: int) -> List[str]:
|
|
active_topics = parse_topics(topic)
|
|
if len(active_topics) > 1:
|
|
topic_pools = {
|
|
selected_topic: select_initial_words(level, selected_topic, count)
|
|
for selected_topic in active_topics
|
|
}
|
|
selected: List[str] = []
|
|
indexes = {selected_topic: 0 for selected_topic in active_topics}
|
|
|
|
while len(selected) < count:
|
|
progressed = False
|
|
for selected_topic in active_topics:
|
|
pool = topic_pools.get(selected_topic, [])
|
|
while indexes[selected_topic] < len(pool) and pool[indexes[selected_topic]] in selected:
|
|
indexes[selected_topic] += 1
|
|
if indexes[selected_topic] >= len(pool):
|
|
continue
|
|
selected.append(pool[indexes[selected_topic]])
|
|
indexes[selected_topic] += 1
|
|
progressed = True
|
|
if len(selected) >= count:
|
|
break
|
|
if not progressed:
|
|
break
|
|
|
|
if len(selected) < count:
|
|
fallback = select_initial_words(level, DEFAULT_TOPIC, count)
|
|
for word in fallback:
|
|
if word not in selected:
|
|
selected.append(word)
|
|
if len(selected) >= count:
|
|
break
|
|
return selected[:count]
|
|
|
|
payload = load_semantic_payload()
|
|
normalized_topic = topic.strip().lower()
|
|
abstract_like_topics = {"abstract", "actions"}
|
|
|
|
def matches(entry: Dict[str, object], selected_topic: str) -> bool:
|
|
topics, semantic_topics = entry_topics(entry)
|
|
return selected_topic in topics
|
|
|
|
def semantic_matches(entry: Dict[str, object], selected_topic: str) -> bool:
|
|
topics, semantic_topics = entry_topics(entry)
|
|
return selected_topic in semantic_topics and selected_topic not in topics
|
|
|
|
def word_score(entry: Dict[str, object], selected_topic: str) -> tuple[int, int, int, int, int, int, str]:
|
|
topics, semantic_topics = entry_topics(entry)
|
|
quality = int(entry.get("quality_score", 0))
|
|
semantic = entry.get("semantic", {})
|
|
semantic_match = 1 if semantic.get("matched") else 0
|
|
glossary_bonus = min(3, len(semantic.get("glosses", [])))
|
|
word = str(entry.get("form", ""))
|
|
length = len(word)
|
|
topical_concreteness_penalty = 0
|
|
topic_bonus = 0
|
|
pos_bonus = 0
|
|
if selected_topic in topics:
|
|
topic_bonus += 4
|
|
if "general" in topics:
|
|
topic_bonus += 1
|
|
if str(entry.get("pos", "")) == "NOUN":
|
|
pos_bonus += 4
|
|
elif str(entry.get("pos", "")) == "ADJ":
|
|
pos_bonus += 1
|
|
if selected_topic not in abstract_like_topics and selected_topic != DEFAULT_TOPIC:
|
|
if "abstract" in topics and selected_topic not in topics:
|
|
topical_concreteness_penalty -= 3
|
|
if "actions" in topics and selected_topic not in topics:
|
|
topical_concreteness_penalty -= 2
|
|
if word.endswith(ABSTRACTISH_SUFFIXES):
|
|
topical_concreteness_penalty -= 4
|
|
if str(entry.get("pos", "")) != "NOUN":
|
|
topical_concreteness_penalty -= 3
|
|
if 5 <= length <= 10:
|
|
length_bonus = 3
|
|
elif 4 <= length <= 12:
|
|
length_bonus = 1
|
|
else:
|
|
length_bonus = -2
|
|
return (
|
|
topic_bonus,
|
|
pos_bonus,
|
|
topical_concreteness_penalty,
|
|
quality,
|
|
semantic_match,
|
|
glossary_bonus,
|
|
length_bonus,
|
|
word,
|
|
)
|
|
|
|
def is_seed_friendly(entry: Dict[str, object], selected_topic: str) -> bool:
|
|
word = str(entry.get("form", ""))
|
|
pos = str(entry.get("pos", ""))
|
|
topics, semantic_topics = entry_topics(entry)
|
|
topic_hit = selected_topic in topics
|
|
if len(word) < 4 or len(word) > 13:
|
|
return False
|
|
if selected_topic in CONCRETE_TOPICS and pos != "NOUN":
|
|
return False
|
|
if selected_topic in CONCRETE_TOPICS and word.endswith(ABSTRACTISH_SUFFIXES):
|
|
return False
|
|
blocked_substrings = TOPIC_SEED_BLOCKED_SUBSTRINGS.get(selected_topic, ())
|
|
if any(part in word for part in blocked_substrings):
|
|
return False
|
|
required_substrings = TOPIC_SEED_REQUIRED_SUBSTRINGS.get(selected_topic)
|
|
if (
|
|
selected_topic in CONCRETE_TOPICS
|
|
and required_substrings
|
|
and selected_topic != DEFAULT_TOPIC
|
|
and not any(part in word for part in required_substrings)
|
|
):
|
|
return False
|
|
if selected_topic != DEFAULT_TOPIC and not topic_hit:
|
|
return False
|
|
return True
|
|
|
|
def is_semantic_seed_friendly(entry: Dict[str, object], selected_topic: str) -> bool:
|
|
word = str(entry.get("form", ""))
|
|
pos = str(entry.get("pos", ""))
|
|
topics, semantic_topics = entry_topics(entry)
|
|
if selected_topic not in semantic_topics:
|
|
return False
|
|
if len(word) < 4 or len(word) > 13:
|
|
return False
|
|
if pos not in {"NOUN", "ADJ", "VERB"}:
|
|
return False
|
|
if word.endswith(ABSTRACTISH_SUFFIXES):
|
|
return False
|
|
if "abstract" in topics:
|
|
return False
|
|
blocked_substrings = TOPIC_SEED_BLOCKED_SUBSTRINGS.get(selected_topic, ())
|
|
if any(part in word for part in blocked_substrings):
|
|
return False
|
|
required_substrings = TOPIC_SEED_REQUIRED_SUBSTRINGS.get(selected_topic)
|
|
if (
|
|
selected_topic in CONCRETE_TOPICS
|
|
and required_substrings
|
|
and selected_topic != DEFAULT_TOPIC
|
|
and not any(part in word for part in required_substrings)
|
|
):
|
|
return False
|
|
return True
|
|
|
|
def overlap_score(left: str, right: str) -> int:
|
|
shared = set(left) & set(right)
|
|
return sum(min(left.count(ch), right.count(ch)) for ch in shared)
|
|
|
|
def pick_seed_set(entries: List[Dict[str, object]], selected_topic: str, target_count: int) -> List[str]:
|
|
if not entries:
|
|
return []
|
|
|
|
ranked = sorted(entries, key=lambda entry: word_score(entry, selected_topic), reverse=True)
|
|
chosen: List[str] = []
|
|
chosen_entries: List[Dict[str, object]] = []
|
|
|
|
first = ranked[0]
|
|
chosen.append(str(first["form"]))
|
|
chosen_entries.append(first)
|
|
|
|
while len(chosen) < target_count:
|
|
best_entry = None
|
|
best_key = None
|
|
for entry in ranked:
|
|
word = str(entry.get("form", ""))
|
|
if word in chosen:
|
|
continue
|
|
overlap_total = sum(overlap_score(word, existing) for existing in chosen)
|
|
max_overlap = max((overlap_score(word, existing) for existing in chosen), default=0)
|
|
distinct_letters = len(set(word))
|
|
same_length_penalty = -sum(1 for existing in chosen if len(existing) == len(word))
|
|
key = (
|
|
1 if max_overlap >= 2 else 0,
|
|
overlap_total,
|
|
max_overlap,
|
|
same_length_penalty,
|
|
distinct_letters,
|
|
word_score(entry, selected_topic),
|
|
)
|
|
if best_key is None or key > best_key:
|
|
best_key = key
|
|
best_entry = entry
|
|
if best_entry is None:
|
|
break
|
|
chosen.append(str(best_entry["form"]))
|
|
chosen_entries.append(best_entry)
|
|
|
|
return chosen
|
|
|
|
eligible = [
|
|
entry
|
|
for entry in payload.get("entries", [])
|
|
if entry.get("allowed_in_crossword", False)
|
|
and int(entry.get("difficulty_word", 5)) <= level
|
|
]
|
|
|
|
lexical_topical = []
|
|
for entry in eligible:
|
|
topics, semantic_topics = entry_topics(entry)
|
|
if normalized_topic in topics:
|
|
lexical_topical.append(entry)
|
|
fallback = [entry for entry in eligible if matches(entry, DEFAULT_TOPIC)]
|
|
if normalized_topic == DEFAULT_TOPIC:
|
|
pool = fallback
|
|
else:
|
|
pool = list(lexical_topical)
|
|
if not pool:
|
|
pool = fallback
|
|
|
|
strict_pool = [entry for entry in pool if is_seed_friendly(entry, normalized_topic)]
|
|
relaxed_pool = sorted(pool, key=lambda entry: word_score(entry, normalized_topic), reverse=True)
|
|
|
|
selected = pick_seed_set(strict_pool, normalized_topic, count)
|
|
if len(selected) < count and normalized_topic != DEFAULT_TOPIC:
|
|
semantic_pool = [
|
|
entry
|
|
for entry in eligible
|
|
if semantic_matches(entry, normalized_topic)
|
|
and is_semantic_seed_friendly(entry, normalized_topic)
|
|
]
|
|
semantic_selected = pick_seed_set(semantic_pool, normalized_topic, count)
|
|
for word in semantic_selected:
|
|
if word not in selected:
|
|
selected.append(word)
|
|
if len(selected) >= count:
|
|
break
|
|
|
|
if len(selected) < count and normalized_topic == DEFAULT_TOPIC:
|
|
relaxed_selected = pick_seed_set(relaxed_pool, normalized_topic, count)
|
|
for word in relaxed_selected:
|
|
if word not in selected:
|
|
selected.append(word)
|
|
if len(selected) >= count:
|
|
break
|
|
|
|
if len(selected) < count and normalized_topic == DEFAULT_TOPIC:
|
|
for word in WORDS:
|
|
if word in selected:
|
|
continue
|
|
selected.append(word)
|
|
if len(selected) >= count:
|
|
break
|
|
|
|
return selected[:count]
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
args.lexicon = resolve_runtime_lexicon_path(args.lexicon)
|
|
ensure_vocabulary(args)
|
|
ensure_lexicon(args)
|
|
ensure_semantic_lexicon(args)
|
|
difficulty_level = parse_difficulty(args.difficulty)
|
|
active_topics = resolve_topics(args, difficulty_level)
|
|
ensure_babelnet_enrichment(args)
|
|
initial_words = select_initial_words(difficulty_level, args.topic, args.initial_word_count)
|
|
|
|
generator = CrosswordGenerator(
|
|
initial_words,
|
|
diffxy=args.diffxy,
|
|
time_limit_seconds=args.time_limit,
|
|
max_candidates_per_word=args.max_candidates,
|
|
seed=args.seed,
|
|
)
|
|
initial_state = generator.solve()
|
|
|
|
print("Griglia iniziale")
|
|
print(f"Parole-seme richieste: {len(initial_words)}")
|
|
print(f"Parole inserite: {initial_state.placed_words}/{len(generator.words)}")
|
|
print(f"Intersezioni: {initial_state.intersections}")
|
|
print(f"Dimensioni: {initial_state.width()} x {initial_state.height()} (diff={initial_state.shape_difference()})")
|
|
print(f"Difficolta filler: {args.difficulty} -> livello {difficulty_level}")
|
|
print(f"Topic attivi: {', '.join(active_topics)}")
|
|
print(f"Lessico runtime: {args.lexicon.name}")
|
|
if getattr(args, "topic_seed_counts", None):
|
|
print(
|
|
"Parole-seme disponibili per topic: "
|
|
+ ", ".join(f"{topic}={count}" for topic, count in args.topic_seed_counts.items())
|
|
)
|
|
if args.seed is not None:
|
|
print(f"Seed: {args.seed}")
|
|
print()
|
|
print(render_grid(initial_state.grid, initial_state.placements))
|
|
print()
|
|
print("Parole-seme selezionate:")
|
|
print(", ".join(initial_words))
|
|
|
|
if args.skip_fill:
|
|
initial_words_for_clues = [placement.word for placement in initial_state.placements]
|
|
enrich_words_for_definitions(args, initial_words_for_clues)
|
|
print_definitions(args, initial_state)
|
|
return
|
|
|
|
vocabulary = load_selected_vocabulary(args.vocabulary) if args.vocabulary else load_filtered_vocabulary(difficulty_level, args.topic)
|
|
metadata = load_vocabulary_metadata()
|
|
semantic_metadata = load_semantic_metadata_for_vocabulary(vocabulary, args.topic) if not args.vocabulary else {}
|
|
filler = CrosswordFiller(
|
|
initial_state,
|
|
vocabulary,
|
|
target_empty_ratio=args.target_empty_ratio,
|
|
vocabulary_metadata=metadata,
|
|
semantic_metadata=semantic_metadata,
|
|
selected_topic=args.topic,
|
|
max_themed_fill_words=args.themed_fill_count,
|
|
seed=args.seed,
|
|
)
|
|
final_state = filler.fill()
|
|
|
|
print()
|
|
print("Griglia riempita")
|
|
print(f"Parole totali: {final_state.placed_words}")
|
|
print(f"Intersezioni totali: {final_state.intersections}")
|
|
print(f"Dimensioni: {final_state.width()} x {final_state.height()} (diff={final_state.shape_difference()})")
|
|
print()
|
|
print(render_grid(final_state.grid, final_state.placements))
|
|
|
|
if filler.added_words:
|
|
print()
|
|
print("Parole aggiunte dal filler:")
|
|
for index, placement in enumerate(filler.added_words, start=1):
|
|
direction = "orizzontale" if placement.direction == "H" else "verticale"
|
|
print(f"{index:>2}. {placement.word} ({placement.x}, {placement.y}) {direction}")
|
|
|
|
final_words = [placement.word for placement in final_state.placements]
|
|
enrich_words_for_definitions(args, final_words)
|
|
print_definitions(args, final_state)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|