From: Aleš Mrázek Date: Tue, 13 Jan 2026 00:28:00 +0000 (+0100) Subject: WIP: data_modeling refactored X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5eed93fec9fb552fe9f9be191af1e17f752fe639;p=thirdparty%2Fknot-resolver.git WIP: data_modeling refactored --- diff --git a/python/knot_resolver/config/__init__.py b/python/knot_resolver/config/__init__.py new file mode 100644 index 000000000..75cedc5fe --- /dev/null +++ b/python/knot_resolver/config/__init__.py @@ -0,0 +1,5 @@ +from .model.config_model import KresConfigModel + +__all__ = [ + "KresConfigModel", +] diff --git a/python/knot_resolver/config/model/__init__.py b/python/knot_resolver/config/model/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/knot_resolver/config/model/config_model.py b/python/knot_resolver/config/model/config_model.py new file mode 100644 index 000000000..f0637ae2c --- /dev/null +++ b/python/knot_resolver/config/model/config_model.py @@ -0,0 +1,5 @@ +from knot_resolver.utils.modeling import ModelNode + + +class KresConfigModel(ModelNode): + pass diff --git a/python/knot_resolver/config/model/types/__init__.py b/python/knot_resolver/config/model/types/__init__.py new file mode 100644 index 000000000..3a86656ad --- /dev/null +++ b/python/knot_resolver/config/model/types/__init__.py @@ -0,0 +1,22 @@ +from .float_types import FloatNonNegative +from .integer_types import ( + Integer0_32, + Integer0_512, + Integer0_65535, + IntegerNonNegative, + IntegerPositive, + Percent, + PortNumber, +) +# from .string_types import + +__all__ = [ + "FloatNonNegative", + "Integer0_32", + "Integer0_512", + "Integer0_65535", + "IntegerNonNegative", + "IntegerPositive", + "Percent", + "PortNumber", +] diff --git a/python/knot_resolver/config/model/types/float_types.py b/python/knot_resolver/config/model/types/float_types.py new file mode 100644 index 000000000..576a0a7b8 --- /dev/null +++ b/python/knot_resolver/config/model/types/float_types.py @@ -0,0 +1,5 @@ +from knot_resolver.utils.modeling.types import BaseFloatRange + + +class FloatNonNegative(BaseFloatRange): + _min: float = 0.0 diff --git a/python/knot_resolver/config/model/types/integer_types.py b/python/knot_resolver/config/model/types/integer_types.py new file mode 100644 index 000000000..6f8ffff65 --- /dev/null +++ b/python/knot_resolver/config/model/types/integer_types.py @@ -0,0 +1,36 @@ +# ruff: noqa: N801 + +from knot_resolver.utils.modeling.types import BaseIntegerRange + + +class Integer0_32(BaseIntegerRange): + _min: int = 0 + _max: int = 32 + + +class Integer0_512(BaseIntegerRange): + _min: int = 0 + _max: int = 512 + + +class Integer0_65535(BaseIntegerRange): + _min: int = 0 + _max: int = 65_535 + + +class IntegerNonNegative(BaseIntegerRange): + _min: int = 0 + + +class IntegerPositive(BaseIntegerRange): + _min: int = 1 + + +class Percent(BaseIntegerRange): + _min: int = 0 + _max: int = 100 + + +class PortNumber(BaseIntegerRange): + _min: int = 1 + _max: int = 65_535 diff --git a/python/knot_resolver/config/model/types/string_types.py b/python/knot_resolver/config/model/types/string_types.py new file mode 100644 index 000000000..277c5bc32 --- /dev/null +++ b/python/knot_resolver/config/model/types/string_types.py @@ -0,0 +1 @@ +from knot_resolver.utils.modeling.types import BaseString diff --git a/python/knot_resolver/config/templates/__init__.py b/python/knot_resolver/config/templates/__init__.py new file mode 100644 index 000000000..9c5d5ed74 --- /dev/null +++ b/python/knot_resolver/config/templates/__init__.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from jinja2 import Environment, FileSystemLoader, StrictUndefined, Template + + +def _get_templates_path() -> Path: + templates_path = Path(__file__).resolve().parent + if not templates_path.exists(): + raise FileNotFoundError(templates_path) + if not templates_path.is_dir(): + raise NotADirectoryError(templates_path) + return templates_path + + +_TEMPLATES_PATH: Path = _get_templates_path() + + +def _load_template_from_str(template: str) -> Template: + loader = FileSystemLoader(_TEMPLATES_PATH) + env = Environment(trim_blocks=True, lstrip_blocks=True, loader=loader, undefined=StrictUndefined) # noqa: S701 + return env.from_string(template) + + +def _import_template(template: str) -> Template: + template_file = _TEMPLATES_PATH / template + with template_file.open() as file: + template = file.read() + return _load_template_from_str(template) + + +WORKER_TEMPLATE: Template = _import_template("worker.lua.j2") + +POLICY_LOADER_TEMPLATE: Template = _import_template("policy-loader.lua.j2") diff --git a/python/knot_resolver/config/templates/policy-loader.lua.j2 b/python/knot_resolver/config/templates/policy-loader.lua.j2 new file mode 100644 index 000000000..4f8d1adec --- /dev/null +++ b/python/knot_resolver/config/templates/policy-loader.lua.j2 @@ -0,0 +1,23 @@ +{% if not cfg.lua.policy_script_only %} + +ffi = require('ffi') +local C = ffi.C + + + +{% endif %} + +-- LUA section -------------------------------------- +-- Custom Lua code cannot be validated + +{% if cfg.lua.policy_script_file %} +{% import cfg.lua.policy_script_file as policy_script_file %} +{{ policy_script_file }} +{% endif %} + +{% if cfg.lua.policy_script %} +{{ cfg.lua.policy_script }} +{% endif %} + +-- exit properly +quit() diff --git a/python/knot_resolver/config/templates/worker.lua.j2 b/python/knot_resolver/config/templates/worker.lua.j2 new file mode 100644 index 000000000..6084c2345 --- /dev/null +++ b/python/knot_resolver/config/templates/worker.lua.j2 @@ -0,0 +1,20 @@ +{% if not cfg.lua.script_only %} + +ffi = require('ffi') +local C = ffi.C + + + +{% endif %} + +-- LUA section -------------------------------------- +-- Custom Lua code cannot be validated + +{% if cfg.lua.script_file %} +{% import cfg.lua.script_file as script_file %} +{{ script_file }} +{% endif %} + +{% if cfg.lua.script %} +{{ cfg.lua.script }} +{% endif %} diff --git a/python/knot_resolver/utils/modeling/__init__.py b/python/knot_resolver/utils/modeling/__init__.py new file mode 100644 index 000000000..2ee389fb6 --- /dev/null +++ b/python/knot_resolver/utils/modeling/__init__.py @@ -0,0 +1,9 @@ +from .model_node import ModelNode +from .parsing import parse_json, parse_yaml, try_to_parse + +__all__ = [ + "ModelNode", + "parse_json", + "parse_yaml", + "try_to_parse", +] diff --git a/python/knot_resolver/utils/modeling/errors.py b/python/knot_resolver/utils/modeling/errors.py new file mode 100644 index 000000000..fcc659997 --- /dev/null +++ b/python/knot_resolver/utils/modeling/errors.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from knot_resolver.errors import BaseKresError + + +class DataModelingError(BaseKresError): + """Base exception class for all data modeling errors.""" + + def __init__(self, msg: str, error_path: str = "") -> None: + super().__init__() + self._msg = f"[{error_path}] {msg}" if error_path else msg + self._error_path = error_path + + def __str__(self) -> str: + return self._msg + + +class DataDescriptionError(DataModelingError): + """Exception class for data description errors.""" + + def __init__(self, msg: str, error_path: str = "") -> None: + msg = f"description error: {msg}" + super().__init__(msg, error_path) + + +class DataAnnotationError(DataModelingError): + """Exception class for data annotation errors.""" + + def __init__(self, msg: str, error_path: str = "") -> None: + msg = f"annotation error: {msg}" + super().__init__(msg, error_path) + + +class DataParsingError(DataModelingError): + """Exception class for data parsing errors.""" + + def __init__(self, msg: str, error_path: str = "") -> None: + msg = f"parsing error: {msg}" + super().__init__(msg, error_path) + + +class DataTypeError(DataModelingError): + """Exception class for data type errors.""" + + def __init__(self, msg: str, error_path: str = "") -> None: + msg = f"type error: {msg}" + super().__init__(msg, error_path) + + +class DataValueError(DataModelingError): + """Exception class for data value errors.""" + + def __init__(self, msg: str, error_path: str = "") -> None: + msg = f"value error: {msg}" + super().__init__(msg, error_path) + + +class DataValidationError(DataModelingError): + """ + Exception class for data validation errors. + + This exception is used as parent for other data modeling errors. + """ + + def __init__(self, msg: str, error_path: str, child_errors: list[DataModelingError] | None = None) -> None: + super().__init__(msg, error_path) + + if child_errors is None: + child_errors = [] + self._child_errors = child_errors + + def recursive_msg(self, indentation: int = 0) -> str: + parts: list[str] = [] + + if indentation == 0: + indentation += 1 + parts.append("Data validation error detected:") + + indent = " " * indentation + parts.append(f"{indent}{self._msg}") + + if self._child_errors: + for error in self._child_errors: + if isinstance(error, DataValidationError): + parts.append(error.recursive_msg(indentation + 1)) + else: + parts.append(indent + f" {error}") + return "\n".join(parts) + + def __str__(self) -> str: + return self.recursive_msg() + + +class AggrDataValidationError(DataValidationError): + """ + Exception class for aggregation of data validation errors. + + This exception is used to aggregate other data modeling errors. + """ + + def __init__(self, error_path: str, child_errors: list[DataModelingError]) -> None: + super().__init__("error due to lower level error", error_path, child_errors) + + def recursive_msg(self, indentation: int = 0) -> str: + inc = 0 + parts: list[str] = [] + + if indentation == 0: + inc = 1 + parts.append("Data validation errors detected:") + + for error in self._child_errors: + if isinstance(error, DataValidationError): + parts.append(error.recursive_msg(indentation + inc)) + else: + parts.append(f" {error}") + return "\n".join(parts) diff --git a/python/knot_resolver/utils/modeling/model_node.py b/python/knot_resolver/utils/modeling/model_node.py new file mode 100644 index 000000000..66023d035 --- /dev/null +++ b/python/knot_resolver/utils/modeling/model_node.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + + +class ModelNode: + """""" + + def __init__(self, source: dict[Any, Any], tree_path: str = "/", base_path: Path = Path()): + self._source = source if source else {} + self._tree_path = tree_path + self._base_path = base_path + + def validate(self) -> None: + pass + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + raise NotImplementedError diff --git a/python/knot_resolver/utils/modeling/parsing.py b/python/knot_resolver/utils/modeling/parsing.py new file mode 100644 index 000000000..8425b736c --- /dev/null +++ b/python/knot_resolver/utils/modeling/parsing.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import json +from enum import Enum, auto +from typing import TYPE_CHECKING, Any + +import yaml +from yaml.constructor import ConstructorError + +from knot_resolver.utils.modeling.errors import DataParsingError + +if TYPE_CHECKING: + from yaml.nodes import MappingNode + + +def _json_raise_duplicates(pairs: list[tuple[Any, Any]]) -> dict[Any, Any]: + """ + JSON hook used in 'json.loads()' that detects duplicate keys in the parsed data. + + The code for this hook was highly inspired by: https://stackoverflow.com/q/14902299/12858520 + """ + mapping: dict[Any, Any] = {} + for key, value in pairs: + if key in mapping: + msg = f"duplicate key detected: {key}" + raise DataParsingError(msg) + mapping[key] = value + return mapping + + +class _YAMLRaiseDuplicatesLoader(yaml.SafeLoader): + """ + YAML loader used in 'yaml.loads()' that detects duplicate keys in the parsed data. + + The code for this loader was highly inspired by: https://gist.github.com/pypt/94d747fe5180851196eb + The loader extends yaml.SafeLoader, so it should be safe, even though the linter reports unsafe-yaml-load (S506). + More about safe loader: https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load + """ + + def construct_mapping(self, node: MappingNode, deep: bool = False) -> dict[Any, Any]: + mapping: dict[Any, Any] = {} + for key_node, value_node in node.value: + key = self.construct_object(key_node, deep=deep) + # we need to check, that the key object can be used in a hash table + try: + _ = hash(key) + except TypeError as exc: + raise ConstructorError( + "while constructing a mapping", + node.start_mark, + f"found unacceptable key ({exc})", + key_node.start_mark, + ) from exc + + # check for duplicate keys + if key in mapping: + msg = f"duplicate key detected: {key_node.start_mark}" + raise DataParsingError(msg) + value = self.construct_object(value_node, deep=deep) + mapping[key] = value + return mapping + + +class DataFormat(Enum): + YAML = auto() + JSON = auto() + + def loads(self, text: str) -> dict[Any, Any]: + """Load data from string in data format and return the data in dictionary.""" + if self is DataFormat.YAML: + return yaml.load(text, Loader=_YAMLRaiseDuplicatesLoader) # noqa: S506 + if self is DataFormat.JSON: + return json.loads(text, object_pairs_hook=_json_raise_duplicates) + msg = f"parsing data from '{self}' format is not implemented" + raise NotImplementedError(msg) + + def dumps(self, data: dict[Any, Any], indent: int | None = None) -> str: + """Dump dictionary data to string in required data format.""" + if self is DataFormat.YAML: + return yaml.safe_dump(data, indent=indent) + if self is DataFormat.JSON: + return json.dumps(data, indent=indent) + msg = f"exporting data to '{self}' format is not implemented" + raise NotImplementedError(msg) + + +def parse_yaml(data: str) -> dict[Any, Any]: + """Parse YAML string and return the data in dictionary.""" + return DataFormat.YAML.loads(data) + + +def parse_json(data: str) -> dict[Any, Any]: + """Parse JSON string and return the data in dictionary.""" + return DataFormat.JSON.loads(data) + + +def try_to_parse(data: str) -> dict[Any, Any]: + """Attempt to parse data string as a JSON or YAML and return it's dictionary.""" + try: + return parse_json(data) + except json.JSONDecodeError: + try: + return parse_yaml(data) + except yaml.YAMLError as e: + # YAML parsing error should be sufficient because the JSON can be parsed by the YAML parser. + # We should receive a helpful error message for JSON as well. + raise DataParsingError(e) from e diff --git a/python/knot_resolver/utils/modeling/types/__init__.py b/python/knot_resolver/utils/modeling/types/__init__.py new file mode 100644 index 000000000..b29ed673c --- /dev/null +++ b/python/knot_resolver/utils/modeling/types/__init__.py @@ -0,0 +1,23 @@ +from .base_float_types import BaseFloat, BaseFloatRange +from .base_generic_types import ListOrItem +from .base_integer_types import BaseInteger, BaseIntegerRange +from .base_string_types import ( + BaseString, + BaseStringLength, + BaseStringPattern, + BaseUnit, +) +from .base_types import NoneType + +__all__ = [ + "BaseFloat", + "BaseFloatRange", + "BaseInteger", + "BaseIntegerRange", + "BaseString", + "BaseStringLength", + "BaseStringPattern", + "BaseUnit", + "ListOrItem", + "NoneType", +] diff --git a/python/knot_resolver/utils/modeling/types/base_float_types.py b/python/knot_resolver/utils/modeling/types/base_float_types.py new file mode 100644 index 000000000..dafa96dc4 --- /dev/null +++ b/python/knot_resolver/utils/modeling/types/base_float_types.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from typing import Any + +from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError + +from .base_types import BaseType + + +class BaseFloat(BaseType): + """Base class to work with float value.""" + + def validate(self) -> None: + if not isinstance(self._value, (float, int)) or isinstance(self._value, bool): + msg = ( + f"Unexpected value for '{type(self)}'." + f" Expected float, got '{self._value}' with type '{type(self._value)}'" + ) + raise DataTypeError(msg, self._tree_path) + + def __int__(self) -> int: + return int(self._value) + + def __float__(self) -> float: + return float(self._value) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + return {"type": "number"} + + +class BaseFloatRange(BaseFloat): + _min: float + _max: float + + def validate(self) -> None: + super().validate() + if hasattr(self, "_min") and (self._value < self._min): + msg = f"value {self._value} is lower than the minimum {self._min}." + raise DataValueError(msg, self._tree_path) + if hasattr(self, "_max") and (self._value > self._max): + msg = f"value {self._value} is higher than the maximum {self._max}" + raise DataValueError(msg, self._tree_path) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + typ: dict[str, Any] = {"type": "number"} + if hasattr(cls, "_min"): + typ["minimum"] = cls._min + if hasattr(cls, "_max"): + typ["maximum"] = cls._max + return typ diff --git a/python/knot_resolver/utils/modeling/types/base_generic_types.py b/python/knot_resolver/utils/modeling/types/base_generic_types.py new file mode 100644 index 000000000..260af0f7a --- /dev/null +++ b/python/knot_resolver/utils/modeling/types/base_generic_types.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import Any, Generic, List, TypeVar, Union + +from .base_types import BaseType + +T = TypeVar("T") + + +class BaseGenericTypeWrapper(Generic[T], BaseType): + """""" + + +class ListOrItem(BaseGenericTypeWrapper[Union[List[T], T]]): + """""" + + def _get_list(self) -> list[T]: + return self._value if isinstance(self._value, list) else [self._value] + + def validate(self) -> None: + self._get_list() + + def __getitem__(self, index: Any) -> T: + return self._get_list()[index] + + def to_std(self) -> list[T]: + return self._get_list() + + def __len__(self) -> int: + return len(self._get_list()) + + def serialize(self) -> list[T] | T: + return self._value diff --git a/python/knot_resolver/utils/modeling/types/base_integer_types.py b/python/knot_resolver/utils/modeling/types/base_integer_types.py new file mode 100644 index 000000000..ece7b08b8 --- /dev/null +++ b/python/knot_resolver/utils/modeling/types/base_integer_types.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from typing import Any + +from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError + +from .base_types import BaseType + + +class BaseInteger(BaseType): + """Base class to work with integer value.""" + + def validate(self) -> None: + if not isinstance(self._value, int) or isinstance(self._value, bool): + msg = ( + f"Unexpected value for '{type(self)}'" + f" Expected integer, got '{self._value}' with type '{type(self._value)}'" + ) + raise DataTypeError(msg, self._tree_path) + + def __int__(self) -> int: + return int(self._value) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + return {"type": "integer"} + + +class BaseIntegerRange(BaseInteger): + _min: int + _max: int + + def validate(self) -> None: + super().validate() + if hasattr(self, "_min") and (self._value < self._min): + msg = f"value {self._value} is lower than the minimum {self._min}." + raise DataValueError(msg, self._tree_path) + if hasattr(self, "_max") and (self._value > self._max): + msg = f"value {self._value} is higher than the maximum {self._max}" + raise DataValueError(msg, self._tree_path) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + typ: dict[str, Any] = {"type": "integer"} + if hasattr(cls, "_min"): + typ["minimum"] = cls._min + if hasattr(cls, "_max"): + typ["maximum"] = cls._max + return typ diff --git a/python/knot_resolver/utils/modeling/types/base_string_types.py b/python/knot_resolver/utils/modeling/types/base_string_types.py new file mode 100644 index 000000000..c1e37a238 --- /dev/null +++ b/python/knot_resolver/utils/modeling/types/base_string_types.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import re +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError + +from .base_types import BaseType + +if TYPE_CHECKING: + from re import Pattern + + +class BaseString(BaseType): + """Base class to work with string value.""" + + def validate(self) -> None: + if not isinstance(self._value, (str, int)) or isinstance(self._value, bool): + msg = ( + f"Unexpected value for '{type(self)}'." + f" Expected string, got '{self._value}' with type '{type(self._value)}'" + ) + raise DataTypeError(msg, self._tree_path) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + return {"type": "string"} + + +class BaseStringLength(BaseString): + _min_bytes: int = 1 + _max_bytes: int + + def validate(self) -> None: + super().validate() + value_bytes = len(self._value.encode("utf-8")) + if hasattr(self, "_min_bytes") and (value_bytes < self._min_bytes): + msg = f"the string value {self._value} is shorter than the minimum {self._min_bytes} bytes." + raise DataValueError(msg, self._tree_path) + if hasattr(self, "_max_bytes") and (value_bytes > self._max_bytes): + msg = f"the string value {self._value} is longer than the maximum {self._max_bytes} bytes." + raise DataValueError(msg, self._tree_path) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + typ: dict[str, Any] = {"type": "string"} + if hasattr(cls, "_min_bytes"): + typ["minLength"] = cls._min_bytes + if hasattr(cls, "_max_bytes"): + typ["maxLength"] = cls._max_bytes + return typ + + +class BaseStringPattern(BaseString): + _re: Pattern[str] + + def validate(self) -> None: + super().validate() + if not type(self)._re.match(self._value): # noqa: SLF001 + msg = f"'{self._value}' does not match '{self._re.pattern}' pattern" + raise DataValueError(msg, self._tree_path) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + return {"type": "string", "pattern": rf"{cls._re.pattern}"} + + +class BaseUnit(BaseString): + _re: Pattern[str] + _units: dict[str, int] + + def __init__(self, value: Any, tree_path: str = "/", base_path: Path = Path()) -> None: + super().__init__(value, tree_path, base_path) + type(self)._re = re.compile(rf"^(\d+)({r'|'.join(type(self)._units.keys())})$") # noqa: SLF001 + + def _get_base_value(self) -> float: + cls = self.__class__ + + super().validate() + if isinstance(self._value, int) and not isinstance(self._value, bool): + return self._value + + grouped = self._re.search(self._value) + if grouped: + val, unit = grouped.groups() + if unit is None: + msg = f"Missing units. Accepted units are {list(cls._units.keys())}" + raise DataValueError(msg, self._tree_path) + if unit not in cls._units: + msg = ( + f"Used unexpected unit '{unit}' for {type(self).__name__}." + f" Accepted units are {list(cls._units.keys())}" + ) + raise DataValueError(msg, self._tree_path) + return float(val) * cls._units[unit] + msg = ( + f"Unexpected value for '{type(self)}'." + " Expected string that matches pattern " + rf"'{type(self)._re.pattern}'." # noqa: SLF001 + f" Positive integer and one of the units {list(type(self)._units.keys())}, got '{self._value}'." # noqa: SLF001 + ) + raise DataValueError(msg, self._tree_path) + + def validate(self) -> None: + self._get_base_value() + + def __int__(self) -> int: + return int(self._get_base_value()) + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + return {"type": "string", "pattern": rf"{cls._re.pattern}"} diff --git a/python/knot_resolver/utils/modeling/types/base_types.py b/python/knot_resolver/utils/modeling/types/base_types.py new file mode 100644 index 000000000..a658d673d --- /dev/null +++ b/python/knot_resolver/utils/modeling/types/base_types.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, TypeVar + +T = TypeVar("T") + +NoneType = type(None) + + +class BaseType: + """""" + + def __init__(self, value: Any, tree_path: str = "/", base_path: Path = Path()) -> None: + self._value = value + self._tree_path = tree_path + self._base_path = base_path + + def __repr__(self) -> str: + cls = self.__class__ + return f'{cls.__name__}("{self._value}")' + + def __eq__(self, o: object) -> bool: + cls = self.__class__ + return isinstance(o, cls) and o._value == self._value + + def __hash__(self) -> int: + return hash(self._value) + + def __str__(self) -> str: + return str(self._value) + + def __int__(self) -> int: + raise NotImplementedError + + def validate() -> None: + raise NotImplementedError + + @classmethod + def json_schema(cls) -> dict[Any, Any]: + raise NotImplementedError diff --git a/python/knot_resolver/utils/modeling/types/inspect.py b/python/knot_resolver/utils/modeling/types/inspect.py new file mode 100644 index 000000000..47a06a280 --- /dev/null +++ b/python/knot_resolver/utils/modeling/types/inspect.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import inspect +from typing import Any, Dict, List, Literal, Tuple, Union + +from knot_resolver.utils.modeling.errors import DataAnnotationError +from knot_resolver.utils.modeling.types.base_generic_types import BaseGenericTypeWrapper + +NoneType = type(None) + + +def get_annotations(obj: Any) -> dict[Any, Any]: + if hasattr(inspect, "get_annotations"): + return inspect.get_annotations(obj) + # TODO: safe to remove in python3.10 + # This fallback only exists for older versions + return obj.__dict__.get("__annotations__", {}) + + +def get_generic_type_arguments(typ: Any) -> list[Any]: + return getattr(typ, "__args__", []) + + +def get_generic_type_argument(typ: Any) -> Any: + args = get_generic_type_arguments(typ) + if len(args) == 1: + return args[0] + msg = f"expected one generic type argument, got {len(args)}" + raise DataAnnotationError(msg) + + +def is_dict(typ: Any) -> bool: + return getattr(typ, "__origin__", None) in (Dict, dict) + + +def is_base_generic_type_wrapper(typ: Any) -> bool: + origin = getattr(typ, "__origin__", None) + return inspect.isclass(origin) and issubclass(origin, BaseGenericTypeWrapper) + + +def get_base_generic_type_wrapper_argument(typ: type[BaseGenericTypeWrapper[Any]]) -> Any: + if not hasattr(typ, "__origin__"): + msg = "" + raise DataAnnotationError(msg) + + origin = getattr(typ, "__origin__") + if not hasattr(origin, "__orig_bases__"): + msg = "" + raise DataAnnotationError(msg) + + orig_base: list[Any] = getattr(origin, "__orig_bases__", [])[0] + arg = get_generic_type_argument(typ) + return get_generic_type_argument(orig_base[arg]) + + +def is_list(typ: Any) -> bool: + return getattr(typ, "__origin__", None) in (List, list) + + +def is_literal(typ: Any) -> bool: + return getattr(typ, "__origin__", None) == Literal + + +def is_none_type(typ: Any) -> bool: + return typ is None or typ == NoneType + + +def is_optional(typ: Any) -> bool: + origin = getattr(typ, "__origin__", None) + args = get_generic_type_arguments(typ) + optional_len = 2 + return origin == Union and len(args) == optional_len and NoneType in args + + +def is_tuple(typ: Any) -> bool: + return getattr(typ, "__origin__", None) in (Tuple, tuple) + + +def is_union(typ: Any) -> bool: + return getattr(typ, "__origin__", None) == Union + + +def get_optional_inner_type(optional: Any) -> Any: + if is_optional(optional): + args = get_generic_type_arguments(optional) + for arg in args: + if not is_none_type(arg): + return arg + msg = "failed to get inner optional type" + raise DataAnnotationError(msg) + + +def getattr_type(obj: Any, attr_name: str) -> Any: + annot = get_annotations(type(obj)) + if hasattr(annot, attr_name): + return annot[attr_name] + msg = "attribute name is missing in data annotations" + raise DataAnnotationError(msg) + + +def is_attr_name_private(attr_name: str) -> bool: + return attr_name.startswith("_") diff --git a/tests/python/knot_resolver/utils/modeling/test_errors.py b/tests/python/knot_resolver/utils/modeling/test_errors.py new file mode 100644 index 000000000..0486ff744 --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/test_errors.py @@ -0,0 +1,46 @@ +import pytest + +from knot_resolver.utils.modeling.errors import ( + AggrDataValidationError, + DataAnnotationError, + DataDescriptionError, + DataModelingError, + DataTypeError, + DataValidationError, + DataValueError, +) + +errors = [ + DataModelingError("this is data modeling error message", "/error"), + DataAnnotationError("this is annotation error message", "/annotation"), + DataDescriptionError("this is description error message", "/description"), + DataTypeError("this is type error message", "/type"), + DataValueError("this is value error message", "/value"), +] + + +def test_data_validation_error() -> None: + error_msg = """Data validation error detected: + [/validation] this is validation error message + [/error] this is data modeling error message + [/annotation] annotation error: this is annotation error message + [/description] description error: this is description error message + [/type] type error: this is type error message + [/value] value error: this is value error message""" + + with pytest.raises(DataValidationError) as error: + raise DataValidationError("this is validation error message", "/validation", errors) + assert str(error.value) == error_msg + + +def test_aggregate_data_validation_error() -> None: + error_msg = """Data validation errors detected: + [/error] this is data modeling error message + [/annotation] annotation error: this is annotation error message + [/description] description error: this is description error message + [/type] type error: this is type error message + [/value] value error: this is value error message""" + + with pytest.raises(AggrDataValidationError) as error: + raise AggrDataValidationError("/", errors) + assert str(error.value) == error_msg diff --git a/tests/python/knot_resolver/utils/modeling/test_parsing.py b/tests/python/knot_resolver/utils/modeling/test_parsing.py new file mode 100644 index 000000000..63fe122da --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/test_parsing.py @@ -0,0 +1,122 @@ +import pytest + +from knot_resolver.utils.modeling.errors import DataParsingError +from knot_resolver.utils.modeling.parsing import parse_json, parse_yaml, try_to_parse + +json_data = """ +{ + "none": null, + "boolean": false, + "number": 2026, + "string": "this is string", + "object": { + "number": 5000, + "string": "this is object string" + }, + "array": [ + "item1", + "item2", + "item3" + ] +} +""" + +json_data_duplicates = """ +{ + "duplicity-key": 1, + "duplicity-key": 2 +} +""" + +json_data_duplicates_inner = """ +{ + "object": { + "duplicity-key": 1, + "duplicity-key": 2 + } +} +""" + +yaml_data = """ +none: null +boolean: false +number: 2026 +string: this is string +object: + number: 5000 + string: this is object string +array: + - item1 + - item2 + - item3 +""" + +yaml_data_duplicates = """ +duplicity-key: 1 +duplicity-key: 2 +""" + +yaml_data_duplicates_inner = """ +object: + duplicity-key: 1 + duplicity-key: 2 +""" + +data_dict = { + "none": None, + "boolean": False, + "number": 2026, + "string": "this is string", + "object": { + "number": 5000, + "string": "this is object string", + }, + "array": [ + "item1", + "item2", + "item3", + ], +} + + +def test_parse_json() -> None: + data = parse_json(json_data) + assert data == data_dict + + +@pytest.mark.parametrize("data", [json_data, yaml_data]) +def test_parse_yaml(data: str) -> None: + data = parse_yaml(data) + assert data == data_dict + + +@pytest.mark.parametrize( + "data", + [ + json_data_duplicates, + json_data_duplicates_inner, + ], +) +def test_parse_json_duplicates(data: str) -> None: + with pytest.raises(DataParsingError): + parse_json(data) + + +@pytest.mark.parametrize( + "data", + [ + json_data_duplicates, + json_data_duplicates_inner, + yaml_data_duplicates, + yaml_data_duplicates_inner, + ], +) +def test_parse_yaml_duplicates(data: str) -> None: + with pytest.raises(DataParsingError): + parse_yaml(data) + + +@pytest.mark.parametrize("data", [json_data, yaml_data]) +def test_try_to_parse(data: str) -> None: + data = try_to_parse(data) + assert data == data_dict diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_float_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_float_types.py new file mode 100644 index 000000000..478e3f21d --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/types/test_base_float_types.py @@ -0,0 +1,79 @@ +import random +import sys +from typing import Any, Optional + +import pytest + +from knot_resolver.utils.modeling.errors import DataModelingError +from knot_resolver.utils.modeling.types.base_float_types import BaseFloat, BaseFloatRange + + +@pytest.mark.parametrize("value", [-65.535, -1, 0, 1, 65.535]) +def test_base_float(value: int): + obj = BaseFloat(value) + obj.validate() + assert float(obj) == value + assert int(obj) == int(value) + assert str(obj) == f"{value}" + + +@pytest.mark.parametrize("value", [True, False, "1"]) +def test_base_float_invalid(value: Any): + with pytest.raises(DataModelingError): + BaseFloat(value).validate() + + +@pytest.mark.parametrize("min,max", [(0.0, None), (None, 0.0), (1.5, 65.535), (-65.535, -1.5)]) +def test_base_float_range(min: Optional[float], max: Optional[float]): + class TestFloatRange(BaseFloatRange): + if min: + _min = min + if max: + _max = max + + if min: + obj = TestFloatRange(min) + obj.validate() + assert float(obj) == min + assert int(obj) == int(min) + assert str(obj) == f"{min}" + if max: + obj = TestFloatRange(max) + obj.validate() + assert float(obj) == max + assert int(obj) == int(max) + assert str(obj) == f"{max}" + + rmin = int(min + 1) if min else -sys.maxsize - 1 + rmax = int(max - 1) if max else sys.maxsize + + n = 100 + values = [float(random.randint(rmin, rmax)) for _ in range(n)] + + for value in values: + obj = TestFloatRange(value) + obj.validate() + assert float(obj) == float(value) + assert str(obj) == f"{value}" + + +@pytest.mark.parametrize("min,max", [(0.0, None), (None, 0.0), (1.5, 65.535), (-65.535, -1.5)]) +def test_base_float_range_invalid(min: Optional[float], max: Optional[float]): + class TestFloatRange(BaseFloatRange): + if min: + _min = min + if max: + _max = max + + n = 100 + invalid_nums = [] + + rmin = int(min + 1) if min else -sys.maxsize - 1 + rmax = int(max - 1) if max else sys.maxsize + + invalid_nums.extend([float(random.randint(rmax + 1, sys.maxsize)) for _ in range(n % 2)] if max else []) + invalid_nums.extend([float(random.randint(-sys.maxsize - 1, rmin - 1)) for _ in range(n % 2)] if max else []) + + for num in invalid_nums: + with pytest.raises(DataModelingError): + TestFloatRange(num).validate() diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_generic_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_generic_types.py new file mode 100644 index 000000000..ed77f4e51 --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/types/test_base_generic_types.py @@ -0,0 +1,26 @@ +from typing import Any, List, Union + +import pytest + +from knot_resolver.utils.modeling.types.base_generic_types import ListOrItem +from knot_resolver.utils.modeling.types.inspect import get_base_generic_type_wrapper_argument + + +@pytest.mark.parametrize("typ", [str, int, float, bool]) +def test_list_or_item_inner_type(typ: Any): + assert get_base_generic_type_wrapper_argument(ListOrItem[typ]) == Union[List[typ], typ] + + +@pytest.mark.parametrize( + "value", + [ + [], + 65_535, + [1, 65_535, 5335, 5000], + ], +) +def test_list_or_item(value: Any): + obj = ListOrItem(value) + assert str(obj) == str(value) + for i, item in enumerate(obj): + assert item == value[i] if isinstance(value, list) else value diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_integer_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_integer_types.py new file mode 100644 index 000000000..9e88041fb --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/types/test_base_integer_types.py @@ -0,0 +1,75 @@ +import random +import sys +from typing import Any, Optional + +import pytest + +from knot_resolver.utils.modeling.errors import DataModelingError +from knot_resolver.utils.modeling.types.base_integer_types import BaseInteger, BaseIntegerRange + + +@pytest.mark.parametrize("value", [-65535, -1, 0, 1, 65535]) +def test_base_integer(value: int): + obj = BaseInteger(value) + obj.validate() + assert int(obj) == value + assert str(obj) == f"{value}" + + +@pytest.mark.parametrize("value", [True, False, "1", 1.1]) +def test_base_integer_invalid(value: Any): + with pytest.raises(DataModelingError): + BaseInteger(value).validate() + + +@pytest.mark.parametrize("min,max", [(0, None), (None, 0), (1, 65535), (-65535, -1)]) +def test_base_integer_range(min: Optional[int], max: Optional[int]): + class TestIntegerRange(BaseIntegerRange): + if min: + _min = min + if max: + _max = max + + if min: + obj = TestIntegerRange(min) + obj.validate() + assert int(obj) == min + assert str(obj) == f"{min}" + if max: + obj = TestIntegerRange(max) + obj.validate() + assert int(obj) == max + assert str(obj) == f"{max}" + + rmin = min if min else -sys.maxsize - 1 + rmax = max if max else sys.maxsize + + n = 100 + values = [random.randint(rmin, rmax) for _ in range(n)] + + for value in values: + obj = TestIntegerRange(value) + obj.validate() + assert str(obj) == f"{value}" + + +@pytest.mark.parametrize("min,max", [(0, None), (None, 0), (1, 65535), (-65535, -1)]) +def test_base_integer_range_invalid(min: Optional[int], max: Optional[int]): + class TestIntegerRange(BaseIntegerRange): + if min: + _min = min + if max: + _max = max + + n = 100 + invalid_nums = [] + + rmin = min if min else -sys.maxsize - 1 + rmax = max if max else sys.maxsize + + invalid_nums.extend([random.randint(rmax + 1, sys.maxsize) for _ in range(n % 2)] if max else []) + invalid_nums.extend([random.randint(-sys.maxsize - 1, rmin - 1) for _ in range(n % 2)] if max else []) + + for num in invalid_nums: + with pytest.raises(DataModelingError): + TestIntegerRange(num).validate() diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_string_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_string_types.py new file mode 100644 index 000000000..e7da68f5d --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/types/test_base_string_types.py @@ -0,0 +1,109 @@ +import random +import string +from typing import Any, Optional + +import pytest + +from knot_resolver.utils.modeling.errors import DataModelingError +from knot_resolver.utils.modeling.types.base_string_types import BaseString, BaseStringLength, BaseUnit + + +@pytest.mark.parametrize("value", [-65_535, -1, 0, 1, 65_535, "a", "abcdef"]) +def test_base_string(value: str): + obj = BaseString(value) + obj.validate() + assert str(obj) == str(value) + + +@pytest.mark.parametrize("value", [True, False]) +def test_base_string_invalid(value: Any): + with pytest.raises(DataModelingError): + BaseString(value).validate() + + +@pytest.mark.parametrize("min,max", [(None, 100), (10, 20), (50, None)]) +def test_base_string_length(min: Optional[int], max: Optional[int]): + class TestStringLength(BaseStringLength): + if min: + _min_bytes = min + if max: + _max_bytes = max + + if min: + rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=min)) + obj = TestStringLength(rand_str) + obj.validate() + assert str(obj) == f"{rand_str}" + if max: + rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=max)) + obj = TestStringLength(rand_str) + obj.validate() + assert str(obj) == f"{rand_str}" + + rmin = min if min else 1 + rmax = max if max else 200 + + n = 100 + values = [ + "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(rmin, rmax))) for _ in range(n) + ] + + for value in values: + obj = TestStringLength(value) + obj.validate() + assert str(obj) == f"{value}" + + +@pytest.mark.parametrize("min,max", [(None, 100), (10, 20), (50, None)]) +def test_base_string_length_invalid(min: Optional[int], max: Optional[int]): + class TestStringLength(BaseStringLength): + if min: + _min_bytes = min + if max: + _max_bytes = max + + n = 100 + invalid_strings = [] + + rmin = min if min else 1 + rmax = max if max else 200 + + invalid_strings.extend( + [ + "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(rmax, rmax + 20))) + for _ in range(n % 2) + ] + if max + else [] + ) + invalid_strings.extend( + [ + "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(1, rmin))) + for _ in range(n % 2) + ] + if max + else [] + ) + + for invalid_string in invalid_strings: + with pytest.raises(DataModelingError): + TestStringLength(invalid_string).validate() + + +@pytest.mark.parametrize("value", [1000, "1000a", "100b", "10c", "1d"]) +def test_base_unit(value: str): + class TestBaseUnit(BaseUnit): + _units = {"a": 1, "b": 10, "c": 100, "d": 1000} + + obj = TestBaseUnit(value) + obj.validate() + assert int(obj) == 1000 + + +@pytest.mark.parametrize("value", [True, False, "1000aa", "10ab", "1e"]) +def test_base_unit_invalid(value: Any): + class TestBaseUnit(BaseUnit): + _units = {"a": 1, "b": 10, "c": 100, "d": 1000} + + with pytest.raises(DataModelingError): + TestBaseUnit(value).validate() diff --git a/tests/python/knot_resolver/utils/modeling/types/test_inspect.py b/tests/python/knot_resolver/utils/modeling/types/test_inspect.py new file mode 100644 index 000000000..856305e60 --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/types/test_inspect.py @@ -0,0 +1,82 @@ +import sys +from typing import Any, Dict, List, Literal, Optional, Tuple, Union + +import pytest + +from knot_resolver.utils.modeling import ModelNode +from knot_resolver.utils.modeling.types.base_types import BaseType +from knot_resolver.utils.modeling.types.inspect import ( + NoneType, + is_dict, + is_list, + is_literal, + is_none_type, + is_optional, + is_tuple, + is_union, +) + +types = [ + Any, + bool, + int, + float, + str, + BaseType, + ModelNode, +] + +literals = [ + Literal[Any], + Literal[True, False], + Literal[0], + Literal[1, 2, 3], + Literal["literal"], + Literal["literal1", "literal2", "literal3"], +] + + +@pytest.mark.parametrize("typ", types) +def test_is_dict(typ: Any) -> None: + assert is_dict(Dict[typ, Any]) + + +@pytest.mark.parametrize("typ", types) +def test_is_list(typ: Any) -> None: + assert is_list(List[typ]) + if sys.version_info >= (3, 9): + assert is_list(list[typ]) + + +@pytest.mark.parametrize("typ", literals) +def test_is_literal(typ: Any) -> None: + assert is_literal(typ) + + +@pytest.mark.parametrize("typ", [None, NoneType]) +def test_is_none_type(typ: Any) -> None: + assert is_none_type(typ) + + +@pytest.mark.parametrize("typ", types) +def test_is_optional(typ: Any) -> None: + assert is_optional(Optional[typ]) + if sys.version_info >= (3, 9): + assert is_optional(typ | None) + assert is_optional(None | typ) + + +@pytest.mark.parametrize("typ", types) +def test_is_tuple(typ: Any) -> None: + assert is_tuple(Tuple[typ]) + if sys.version_info >= (3, 9): + assert is_tuple(tuple[typ]) + + +@pytest.mark.parametrize("typ", types) +def test_is_union(typ: Any) -> None: + assert is_union(Optional[typ]) + assert is_union(Union[typ, None]) + if sys.version_info >= (3, 9): + assert is_union(typ | None) + assert is_union(None | typ)