341 lines
11 KiB
Python
341 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import site
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
CUDA_MIN_PIXELS = 640 * 360
|
|
|
|
|
|
def opencv_cuda_available() -> bool:
|
|
try:
|
|
return hasattr(cv2, "cuda") and cv2.cuda.getCudaEnabledDeviceCount() > 0
|
|
except cv2.error:
|
|
return False
|
|
|
|
|
|
OPENCV_CUDA_AVAILABLE = opencv_cuda_available()
|
|
|
|
|
|
def cuda_resize(
|
|
image: np.ndarray,
|
|
size: tuple[int, int],
|
|
interpolation: int = cv2.INTER_LINEAR,
|
|
min_pixels: int = CUDA_MIN_PIXELS,
|
|
) -> np.ndarray:
|
|
if not OPENCV_CUDA_AVAILABLE or image.size < min_pixels:
|
|
return cv2.resize(image, size, interpolation=interpolation)
|
|
try:
|
|
gpu = cv2.cuda_GpuMat()
|
|
gpu.upload(image)
|
|
return cv2.cuda.resize(gpu, size, interpolation=interpolation).download()
|
|
except cv2.error:
|
|
return cv2.resize(image, size, interpolation=interpolation)
|
|
|
|
|
|
def cuda_cvt_color(
|
|
image: np.ndarray,
|
|
code: int,
|
|
min_pixels: int = CUDA_MIN_PIXELS,
|
|
) -> np.ndarray:
|
|
if not OPENCV_CUDA_AVAILABLE or image.size < min_pixels:
|
|
return cv2.cvtColor(image, code)
|
|
try:
|
|
gpu = cv2.cuda_GpuMat()
|
|
gpu.upload(image)
|
|
return cv2.cuda.cvtColor(gpu, code).download()
|
|
except cv2.error:
|
|
return cv2.cvtColor(image, code)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Candidate:
|
|
text: str
|
|
score: float
|
|
variant: str
|
|
|
|
|
|
def add_nvidia_dll_dirs() -> None:
|
|
if os.name != "nt":
|
|
return
|
|
for site_dir in site.getsitepackages():
|
|
nvidia_root = Path(site_dir) / "nvidia"
|
|
if not nvidia_root.exists():
|
|
continue
|
|
for bin_dir in nvidia_root.glob("*/bin"):
|
|
if bin_dir.exists():
|
|
os.add_dll_directory(str(bin_dir))
|
|
|
|
|
|
def resize_to_height(image: np.ndarray, target_height: int) -> np.ndarray:
|
|
h, w = image.shape[:2]
|
|
if h == target_height:
|
|
return image
|
|
scale = target_height / max(1, h)
|
|
return cuda_resize(image, (max(1, int(w * scale)), target_height), interpolation=cv2.INTER_CUBIC)
|
|
|
|
|
|
def add_border(image: np.ndarray, ratio: float = 0.08) -> np.ndarray:
|
|
h, w = image.shape[:2]
|
|
pad_x = max(2, int(w * ratio))
|
|
pad_y = max(2, int(h * ratio))
|
|
return cv2.copyMakeBorder(
|
|
image,
|
|
pad_y,
|
|
pad_y,
|
|
pad_x,
|
|
pad_x,
|
|
cv2.BORDER_CONSTANT,
|
|
value=(255, 255, 255),
|
|
)
|
|
|
|
|
|
def clahe_bgr(image: np.ndarray) -> np.ndarray:
|
|
lab = cuda_cvt_color(image, cv2.COLOR_BGR2LAB)
|
|
l_channel, a_channel, b_channel = cv2.split(lab)
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(4, 4))
|
|
merged = cv2.merge([clahe.apply(l_channel), a_channel, b_channel])
|
|
return cuda_cvt_color(merged, cv2.COLOR_LAB2BGR)
|
|
|
|
|
|
def sharpen(image: np.ndarray, strength: float = 0.7) -> np.ndarray:
|
|
blurred = cv2.GaussianBlur(image, (0, 0), sigmaX=1.0)
|
|
return cv2.addWeighted(image, 1.0 + strength, blurred, -strength, 0)
|
|
|
|
|
|
def variants_for_height(image: np.ndarray, target_height: int) -> dict[str, np.ndarray]:
|
|
base = resize_to_height(image, target_height)
|
|
bordered = add_border(base)
|
|
gray = cuda_cvt_color(base, cv2.COLOR_BGR2GRAY)
|
|
clahe = clahe_bgr(base)
|
|
sharp = sharpen(clahe)
|
|
adaptive = cv2.adaptiveThreshold(
|
|
gray,
|
|
255,
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY,
|
|
17,
|
|
5,
|
|
)
|
|
otsu = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
|
|
close = cv2.morphologyEx(otsu, cv2.MORPH_CLOSE, kernel, iterations=1)
|
|
denoise = cv2.fastNlMeansDenoising(gray, h=7, templateWindowSize=7, searchWindowSize=21)
|
|
return {
|
|
"orig": base,
|
|
"orig_border": bordered,
|
|
"gray": cuda_cvt_color(gray, cv2.COLOR_GRAY2BGR),
|
|
"clahe": clahe,
|
|
"clahe_sharp": sharp,
|
|
"adaptive": cuda_cvt_color(adaptive, cv2.COLOR_GRAY2BGR),
|
|
"otsu": cuda_cvt_color(otsu, cv2.COLOR_GRAY2BGR),
|
|
"otsu_close": cuda_cvt_color(close, cv2.COLOR_GRAY2BGR),
|
|
"denoise": cuda_cvt_color(denoise, cv2.COLOR_GRAY2BGR),
|
|
}
|
|
|
|
|
|
def build_variants(image: np.ndarray, target_heights: list[int]) -> dict[str, np.ndarray]:
|
|
all_variants: dict[str, np.ndarray] = {}
|
|
for height in target_heights:
|
|
for name, variant in variants_for_height(image, height).items():
|
|
all_variants[f"h{height}_{name}"] = variant
|
|
return all_variants
|
|
|
|
|
|
def filter_variants(all_variants: dict[str, np.ndarray], variant_set: str) -> dict[str, np.ndarray]:
|
|
if variant_set == "full":
|
|
return all_variants
|
|
balanced_names = ("orig", "orig_border", "clahe", "clahe_sharp", "adaptive")
|
|
fast_names = ("orig", "clahe", "clahe_sharp")
|
|
allowed = balanced_names if variant_set == "balanced" else fast_names
|
|
return {
|
|
name: image
|
|
for name, image in all_variants.items()
|
|
if any(name.endswith(f"_{suffix}") for suffix in allowed)
|
|
}
|
|
|
|
|
|
def extract_candidates(result: Any) -> list[tuple[str, float]]:
|
|
candidates: list[tuple[str, float]] = []
|
|
|
|
def walk(value: Any) -> None:
|
|
if value is None:
|
|
return
|
|
if isinstance(value, dict):
|
|
rec_texts = value.get("rec_texts")
|
|
rec_scores = value.get("rec_scores")
|
|
if isinstance(rec_texts, list):
|
|
for idx, text in enumerate(rec_texts):
|
|
score = rec_scores[idx] if isinstance(rec_scores, list) and idx < len(rec_scores) else 0.0
|
|
if isinstance(text, str):
|
|
candidates.append((text, float(score)))
|
|
text = value.get("rec_text") or value.get("text")
|
|
score = value.get("rec_score") or value.get("score")
|
|
if isinstance(text, str):
|
|
candidates.append((text, float(score) if score is not None else 0.0))
|
|
for child in value.values():
|
|
walk(child)
|
|
return
|
|
if isinstance(value, (list, tuple)):
|
|
if len(value) >= 2 and isinstance(value[1], tuple) and len(value[1]) >= 2:
|
|
text, score = value[1][0], value[1][1]
|
|
if isinstance(text, str):
|
|
candidates.append((text, float(score)))
|
|
for child in value:
|
|
walk(child)
|
|
|
|
walk(result)
|
|
dedup: dict[str, float] = {}
|
|
for text, score in candidates:
|
|
digits = re.sub(r"\D+", "", text)
|
|
if not digits:
|
|
continue
|
|
dedup[digits] = max(score, dedup.get(digits, 0.0))
|
|
return sorted(dedup.items(), key=lambda item: item[1], reverse=True)
|
|
|
|
|
|
def choose_best(candidates: list[Candidate], expected_digits: int) -> tuple[str, float, int, str, float]:
|
|
if not candidates:
|
|
return "", 0.0, 0, "", 0.0
|
|
grouped: dict[str, list[Candidate]] = {}
|
|
for candidate in candidates:
|
|
grouped.setdefault(candidate.text, []).append(candidate)
|
|
|
|
best_text = ""
|
|
best_rank = -999.0
|
|
best_score = 0.0
|
|
best_votes = 0
|
|
best_variant = ""
|
|
for text, group in grouped.items():
|
|
max_conf = max(item.score for item in group)
|
|
votes = len(group)
|
|
unique_variants = len({item.variant for item in group})
|
|
length_penalty = abs(len(text) - expected_digits) * 0.35
|
|
exact_bonus = 0.35 if len(text) == expected_digits else 0.0
|
|
consensus_bonus = min(0.30, votes * 0.035) + min(0.20, unique_variants * 0.025)
|
|
rank = max_conf + exact_bonus + consensus_bonus - length_penalty
|
|
if rank > best_rank:
|
|
best_text = text
|
|
best_rank = rank
|
|
best_score = max_conf
|
|
best_votes = votes
|
|
best_variant = max(group, key=lambda item: item.score).variant
|
|
return best_text, best_score, best_votes, best_variant, best_rank
|
|
|
|
|
|
def parse_target_heights(raw: str) -> list[int]:
|
|
values = []
|
|
for part in raw.split(","):
|
|
part = part.strip()
|
|
if part:
|
|
values.append(int(part))
|
|
return values or [96]
|
|
|
|
|
|
def make_ocr():
|
|
add_nvidia_dll_dirs()
|
|
from paddleocr import PaddleOCR
|
|
|
|
return PaddleOCR(
|
|
lang="en",
|
|
use_doc_orientation_classify=False,
|
|
use_doc_unwarping=False,
|
|
use_textline_orientation=False,
|
|
text_rec_score_thresh=0.0,
|
|
)
|
|
|
|
|
|
def predict_one(ocr: Any, image: np.ndarray) -> list[tuple[str, float]]:
|
|
if hasattr(ocr, "predict"):
|
|
result = ocr.predict(image)
|
|
else:
|
|
result = ocr.ocr(image, det=False, cls=False)
|
|
return extract_candidates(result)
|
|
|
|
|
|
def handle_request(ocr: Any, request: dict[str, Any]) -> dict[str, Any]:
|
|
image_path = Path(str(request["image_path"]))
|
|
target_heights = parse_target_heights(str(request.get("target_heights", "96")))
|
|
variant_set = str(request.get("variant_set", "fast")).strip().lower()
|
|
expected_digits = int(request.get("expected_digits", 6))
|
|
min_votes = int(request.get("min_votes", 2))
|
|
min_confidence = float(request.get("min_confidence", 0.70))
|
|
|
|
image = cv2.imread(str(image_path), cv2.IMREAD_COLOR)
|
|
if image is None:
|
|
return {
|
|
"ok": False,
|
|
"text": "",
|
|
"raw_text": "",
|
|
"confidence": 0.0,
|
|
"votes": 0,
|
|
"variant": "",
|
|
"reason": f"unreadable_image:{image_path}",
|
|
"candidates": [],
|
|
}
|
|
|
|
candidates: list[Candidate] = []
|
|
ocr_variants = filter_variants(build_variants(image, target_heights), variant_set)
|
|
for variant_name, variant_image in ocr_variants.items():
|
|
for text, score in predict_one(ocr, variant_image):
|
|
candidates.append(Candidate(text=text, score=score, variant=variant_name))
|
|
|
|
text, confidence, votes, variant, rank = choose_best(candidates, expected_digits)
|
|
accepted = bool(text) and len(text) == expected_digits and votes >= min_votes and confidence >= min_confidence
|
|
sorted_candidates = sorted(candidates, key=lambda item: item.score, reverse=True)
|
|
raw_text = " | ".join(f"{item.text}:{item.score:.3f}:{item.variant}" for item in sorted_candidates[:18])
|
|
return {
|
|
"ok": accepted,
|
|
"text": text if accepted else "",
|
|
"best_text": text,
|
|
"raw_text": raw_text,
|
|
"confidence": confidence,
|
|
"votes": votes,
|
|
"variant": variant,
|
|
"rank": rank,
|
|
"reason": "ok" if accepted else "low_consensus_or_invalid_length",
|
|
"candidates": [
|
|
{"text": item.text, "score": item.score, "variant": item.variant}
|
|
for item in sorted_candidates[:18]
|
|
],
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
ocr = make_ocr()
|
|
print(json.dumps({"ready": True}), flush=True)
|
|
for line in sys.stdin:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line == "__quit__":
|
|
break
|
|
try:
|
|
request = json.loads(line)
|
|
response = handle_request(ocr, request)
|
|
except Exception as exc:
|
|
response = {
|
|
"ok": False,
|
|
"text": "",
|
|
"raw_text": "",
|
|
"confidence": 0.0,
|
|
"votes": 0,
|
|
"variant": "",
|
|
"reason": f"worker_error:{exc}",
|
|
"candidates": [],
|
|
}
|
|
print(json.dumps(response, ensure_ascii=True), flush=True)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|