from __future__ import annotations from dataclasses import dataclass import json from pathlib import Path import random import sys import time from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple from crossword_generator import ( DIFFXY, HORIZONTAL, VERTICAL, CrosswordGenerator, CrosswordState, Placement, WORDS, render_grid, ) TARGET_EMPTY_RATIO = 1 / 6 MIN_WORD_LENGTH = 2 MAX_NO_PROGRESS_STEPS = 150 MAX_SLOT_CANDIDATES = 8 EXTENDED_VOCAB_PATH = Path(__file__).with_name("vocaboli_it_esteso.txt") FILTERED_VOCAB_PATH = Path(__file__).with_name("vocaboli_it_filtrato.txt") METADATA_VOCAB_PATH = Path(__file__).with_name("vocaboli_it_metadata.json") VOCAB_PATH = ( FILTERED_VOCAB_PATH if FILTERED_VOCAB_PATH.exists() else EXTENDED_VOCAB_PATH if EXTENDED_VOCAB_PATH.exists() else Path(__file__).with_name("vocaboli_it.txt") ) Coordinate = Tuple[int, int] @dataclass(frozen=True) class FillSlot: x: int y: int direction: str length: int pattern: str fixed_letters: int empty_cells: int candidate_count: int @property def cells(self) -> List[Coordinate]: if self.direction == HORIZONTAL: return [(self.x + offset, self.y) for offset in range(self.length)] return [(self.x, self.y + offset) for offset in range(self.length)] @dataclass(frozen=True) class FillCandidate: word: str slot: FillSlot new_letters: int reused_letters: int local_score: Tuple[int, ...] class CrosswordFiller: def __init__( self, state: CrosswordState, vocabulary: Sequence[str], *, target_empty_ratio: float = TARGET_EMPTY_RATIO, vocabulary_metadata: Optional[Dict[str, Dict[str, object]]] = None, semantic_metadata: Optional[Dict[str, Dict[str, object]]] = None, selected_topic: str = "general", max_themed_fill_words: int = 10, seed: Optional[int] = None, ) -> None: self.state = state.copy() self.initial_state = state.copy() self.target_empty_ratio = target_empty_ratio self.used_words: Set[str] = {placement.word for placement in self.state.placements} self.added_words: List[Placement] = [] self.vocabulary = self._normalize_vocabulary(vocabulary) self.words_by_length = self._index_vocabulary(self.vocabulary) self.vocabulary_metadata = vocabulary_metadata or {} self.semantic_metadata = semantic_metadata or {} self.selected_topics = [ topic.strip().lower() for topic in selected_topic.split(",") if topic.strip() ] or ["general"] self.selected_topic = self.selected_topics[0] self.max_themed_fill_words = max(0, max_themed_fill_words) self.seed = seed self.rng = random.Random(seed) self.bounds = self._compute_bounds(self.state.grid) self.total_cells = self._area(self.bounds) self.target_empty_cells = max(0, int(round(self.total_cells * self.target_empty_ratio))) self.nodes_visited = 0 self.last_spinner_update = 0.0 self.spinner_frames = ["-", "/", "|", "\\"] self.spinner_index = 0 self.started_at = 0.0 self.last_word = "-" @staticmethod def _normalize_vocabulary(words: Sequence[str]) -> List[str]: normalized: List[str] = [] seen: Set[str] = set() for word in words: clean = word.strip().lower() if len(clean) < MIN_WORD_LENGTH or not clean.isalpha() or clean in seen: continue normalized.append(clean) seen.add(clean) return normalized @staticmethod def _index_vocabulary(words: Sequence[str]) -> Dict[int, List[str]]: index: Dict[int, List[str]] = {} for word in words: index.setdefault(len(word), []).append(word) return index @staticmethod def _compute_bounds(grid: Dict[Coordinate, str]) -> Tuple[int, int, int, int]: xs = [x for x, _ in grid] ys = [y for _, y in grid] return min(xs), min(ys), max(xs), max(ys) @staticmethod def _area(bounds: Tuple[int, int, int, int]) -> int: x_min, y_min, x_max, y_max = bounds return (x_max - x_min + 1) * (y_max - y_min + 1) def fill(self) -> CrosswordState: self.started_at = time.perf_counter() self.last_spinner_update = self.started_at no_progress_steps = 0 while self.empty_cells_count() > self.target_empty_cells and no_progress_steps < MAX_NO_PROGRESS_STEPS: self.nodes_visited += 1 slots = self._collect_slots() self._tick_spinner(slots_count=len(slots)) if not slots: break progress = False for slot in slots[:MAX_SLOT_CANDIDATES]: candidate = self._best_candidate_for_slot(slot) if candidate is None: continue self._apply_candidate(candidate) progress = True no_progress_steps = 0 break if not progress: no_progress_steps += 1 self._clear_spinner() return self.state def empty_cells_count(self) -> int: x_min, y_min, x_max, y_max = self.bounds empty = 0 for y in range(y_min, y_max + 1): for x in range(x_min, x_max + 1): if (x, y) not in self.state.grid: empty += 1 return empty def coverage_ratio(self) -> float: return 1.0 - (self.empty_cells_count() / self.total_cells) def _collect_slots(self) -> List[FillSlot]: slots: List[FillSlot] = [] x_min, y_min, x_max, y_max = self.bounds for y in range(y_min, y_max + 1): for x in range(x_min, x_max + 1): if (x, y) in self.state.grid: continue for direction in (HORIZONTAL, VERTICAL): slots.extend(self._slots_from_start(x, y, direction)) unique: Dict[Tuple[int, int, str, int], FillSlot] = {} for slot in slots: key = (slot.x, slot.y, slot.direction, slot.length) current = unique.get(key) if current is None or self._slot_priority(slot) > self._slot_priority(current): unique[key] = slot collected = list(unique.values()) collected.sort(key=self._slot_priority, reverse=True) if len(collected) > 1: top_slice = collected[: min(MAX_SLOT_CANDIDATES, len(collected))] self.rng.shuffle(top_slice) collected = top_slice + collected[min(MAX_SLOT_CANDIDATES, len(collected)) :] return collected def _slots_from_start(self, x: int, y: int, direction: str) -> Iterable[FillSlot]: dx, dy = (1, 0) if direction == HORIZONTAL else (0, 1) x_min, y_min, x_max, y_max = self.bounds prev_cell = (x - dx, y - dy) if self._inside_bounds(prev_cell) and prev_cell in self.state.grid: return [] max_length = 0 cursor_x = x cursor_y = y while x_min <= cursor_x <= x_max and y_min <= cursor_y <= y_max: max_length += 1 cursor_x += dx cursor_y += dy slots: List[FillSlot] = [] for length in range(max_length, MIN_WORD_LENGTH - 1, -1): end_cell = (x + dx * length, y + dy * length) if self._inside_bounds(end_cell) and end_cell in self.state.grid: continue pattern_chars: List[str] = [] fixed_letters = 0 empty_cells = 0 for offset in range(length): cell = (x + dx * offset, y + dy * offset) letter = self.state.grid.get(cell) if letter is None: pattern_chars.append(".") empty_cells += 1 else: pattern_chars.append(letter) fixed_letters += 1 if empty_cells == 0: continue pattern = "".join(pattern_chars) candidate_count = self._count_candidates(pattern) if candidate_count == 0: continue slots.append( FillSlot( x=x, y=y, direction=direction, length=length, pattern=pattern, fixed_letters=fixed_letters, empty_cells=empty_cells, candidate_count=candidate_count, ) ) return slots def _slot_priority(self, slot: FillSlot) -> Tuple[int, int, int, int, int]: return ( slot.fixed_letters, -slot.candidate_count, slot.length, -slot.empty_cells, 1 if slot.direction == HORIZONTAL else 0, ) def _count_candidates(self, pattern: str) -> int: count = 0 for word in self.words_by_length.get(len(pattern), []): if word in self.used_words: continue if self._matches_pattern(word, pattern): count += 1 return count @staticmethod def _matches_pattern(word: str, pattern: str) -> bool: return all(p == "." or p == w for w, p in zip(word, pattern)) def _best_candidate_for_slot(self, slot: FillSlot) -> Optional[FillCandidate]: candidates: List[FillCandidate] = [] for word in self.words_by_length.get(slot.length, []): if word in self.used_words or not self._matches_pattern(word, slot.pattern): continue if not self._placement_is_valid(slot, word): continue new_letters = sum(1 for cell in slot.cells if cell not in self.state.grid) reused_letters = slot.fixed_letters local_score = ( self._semantic_topic_score(word), reused_letters, new_letters, self._word_quality(word), self._semantic_quality(word), len(set(word)), ) candidates.append( FillCandidate( word=word, slot=slot, new_letters=new_letters, reused_letters=reused_letters, local_score=local_score, ) ) if not candidates: return None candidates.sort(key=lambda item: item.local_score, reverse=True) return self.rng.choice(candidates[: min(3, len(candidates))]) def _word_quality(self, word: str) -> int: metadata = self.vocabulary_metadata.get(word) if not metadata: return 0 try: return int(metadata.get("quality", 0)) except (TypeError, ValueError): return 0 def _semantic_entry(self, word: str) -> Dict[str, object]: return self.semantic_metadata.get(word, {}) def _semantic_quality(self, word: str) -> int: entry = self._semantic_entry(word) semantic = entry.get("semantic", {}) score = 0 if semantic.get("matched"): score += 2 score += min(3, len(semantic.get("glosses", []))) score += min(2, len(semantic.get("synonyms", []))) return score def _semantic_topic_score(self, word: str) -> int: if not self.selected_topics or self.selected_topics == ["general"]: return 0 entry = self._semantic_entry(word) try: relevance = int(entry.get("_topic_relevance", 0)) except (TypeError, ValueError): relevance = 0 if relevance: if self._themed_added_count() < self.max_themed_fill_words: return relevance return min(relevance, 10) topics = {str(item).lower() for item in entry.get("topics", [])} semantic = entry.get("semantic", {}) semantic_topics = {str(item).lower() for item in semantic.get("semantic_topics", [])} score = 0 if any(topic in topics for topic in self.selected_topics): score += 4 if any(topic in semantic_topics for topic in self.selected_topics): score += 6 if "general" in topics: score += 1 return score def _themed_added_count(self) -> int: total = 0 for placement in self.added_words: entry = self._semantic_entry(placement.word) try: if int(entry.get("_strong_topic_relevance", 0)) > 0: total += 1 except (TypeError, ValueError): continue return total def _placement_is_valid(self, slot: FillSlot, word: str) -> bool: dx, dy = (1, 0) if slot.direction == HORIZONTAL else (0, 1) before = (slot.x - dx, slot.y - dy) after = (slot.x + dx * slot.length, slot.y + dy * slot.length) if self._inside_bounds(before) and before in self.state.grid: return False if self._inside_bounds(after) and after in self.state.grid: return False intersects_existing = False for offset, cell in enumerate(slot.cells): current = self.state.grid.get(cell) letter = word[offset] if current is not None and current != letter: return False if current == letter: intersects_existing = True return intersects_existing or slot.fixed_letters == 0 def _apply_candidate(self, candidate: FillCandidate) -> None: slot = candidate.slot dx, dy = (1, 0) if slot.direction == HORIZONTAL else (0, 1) intersections = 0 for offset, letter in enumerate(candidate.word): cell = (slot.x + dx * offset, slot.y + dy * offset) if cell in self.state.grid: intersections += 1 self.state.grid[cell] = letter placement = Placement( word=candidate.word, x=slot.x, y=slot.y, direction=slot.direction, intersections=intersections, ) self.state.placements.append(placement) self.state.intersections += intersections self.used_words.add(candidate.word) self.added_words.append(placement) self.last_word = candidate.word print( f"\n[fill] inserita '{candidate.word}' " f"in {slot.direction} da ({slot.x}, {slot.y}), " f"nuove={candidate.new_letters}, intersezioni={intersections}, " f"copertura={self.coverage_ratio() * 100:.1f}%", file=sys.stderr, flush=True, ) def _inside_bounds(self, cell: Coordinate) -> bool: x_min, y_min, x_max, y_max = self.bounds x, y = cell return x_min <= x <= x_max and y_min <= y <= y_max def _tick_spinner(self, *, slots_count: int) -> None: now = time.perf_counter() if now - self.last_spinner_update < 0.08: return frame = self.spinner_frames[self.spinner_index] elapsed = now - self.started_at message = ( f"\r{frame} fill... " f"slot={slots_count} " f"vuote={self.empty_cells_count()}/{self.total_cells} " f"target={self.target_empty_cells} " f"aggiunte={len(self.added_words)} " f"tema={self._themed_added_count()}/{self.max_themed_fill_words} " f"ultima={self.last_word} " f"t={elapsed:0.1f}s" ) print(message, end="", file=sys.stderr, flush=True) self.spinner_index = (self.spinner_index + 1) % len(self.spinner_frames) self.last_spinner_update = now @staticmethod def _clear_spinner() -> None: print("\r" + " " * 140 + "\r", end="", file=sys.stderr, flush=True) def load_vocabulary(path: Path = VOCAB_PATH) -> List[str]: if not path.exists(): raise FileNotFoundError(f"Vocabolario non trovato: {path}") return path.read_text(encoding="utf-8").splitlines() def load_vocabulary_metadata(path: Path = METADATA_VOCAB_PATH) -> Dict[str, Dict[str, object]]: if not path.exists(): return {} return json.loads(path.read_text(encoding="utf-8")) def summarize_fill(initial_state: CrosswordState, final_state: CrosswordState) -> str: initial_bounds = ( initial_state.width(), initial_state.height(), initial_state.shape_difference(), ) final_bounds = ( final_state.width(), final_state.height(), final_state.shape_difference(), ) return ( f"Riempimento completato\n" f"Parole iniziali: {initial_state.placed_words}\n" f"Parole finali: {final_state.placed_words}\n" f"Intersezioni finali: {final_state.intersections}\n" f"Dimensioni iniziali: {initial_bounds[0]}x{initial_bounds[1]} (diff={initial_bounds[2]})\n" f"Dimensioni finali: {final_bounds[0]}x{final_bounds[1]} (diff={final_bounds[2]})\n" f"Celle vuote residue: {sum(1 for _ in iter_empty_cells(final_state))}" ) def iter_empty_cells(state: CrosswordState) -> Iterable[Coordinate]: x_min, y_min, x_max, y_max = state.bounds() for y in range(y_min, y_max + 1): for x in range(x_min, x_max + 1): if (x, y) not in state.grid: yield (x, y) def main() -> None: vocabulary = load_vocabulary() vocabulary_metadata = load_vocabulary_metadata() generator = CrosswordGenerator(WORDS, diffxy=DIFFXY) initial_state = generator.solve() filler = CrosswordFiller(initial_state, vocabulary, vocabulary_metadata=vocabulary_metadata) final_state = filler.fill() print(summarize_fill(initial_state, final_state)) print() print(render_grid(final_state.grid, final_state.placements)) if filler.added_words: print() print("Parole aggiunte dal filler:") for index, placement in enumerate(filler.added_words, start=1): direction = "orizzontale" if placement.direction == HORIZONTAL else "verticale" print(f"{index:>2}. {placement.word} ({placement.x}, {placement.y}) {direction}") if __name__ == "__main__": main()