--- /dev/null
+from .model.config_model import KresConfigModel
+
+__all__ = [
+ "KresConfigModel",
+]
--- /dev/null
+from knot_resolver.utils.modeling import ModelNode
+
+
+class KresConfigModel(ModelNode):
+ pass
--- /dev/null
+from .float_types import FloatNonNegative
+from .integer_types import (
+ Integer0_32,
+ Integer0_512,
+ Integer0_65535,
+ IntegerNonNegative,
+ IntegerPositive,
+ Percent,
+ PortNumber,
+)
+# from .string_types import
+
+__all__ = [
+ "FloatNonNegative",
+ "Integer0_32",
+ "Integer0_512",
+ "Integer0_65535",
+ "IntegerNonNegative",
+ "IntegerPositive",
+ "Percent",
+ "PortNumber",
+]
--- /dev/null
+from knot_resolver.utils.modeling.types import BaseFloatRange
+
+
+class FloatNonNegative(BaseFloatRange):
+ _min: float = 0.0
--- /dev/null
+# ruff: noqa: N801
+
+from knot_resolver.utils.modeling.types import BaseIntegerRange
+
+
+class Integer0_32(BaseIntegerRange):
+ _min: int = 0
+ _max: int = 32
+
+
+class Integer0_512(BaseIntegerRange):
+ _min: int = 0
+ _max: int = 512
+
+
+class Integer0_65535(BaseIntegerRange):
+ _min: int = 0
+ _max: int = 65_535
+
+
+class IntegerNonNegative(BaseIntegerRange):
+ _min: int = 0
+
+
+class IntegerPositive(BaseIntegerRange):
+ _min: int = 1
+
+
+class Percent(BaseIntegerRange):
+ _min: int = 0
+ _max: int = 100
+
+
+class PortNumber(BaseIntegerRange):
+ _min: int = 1
+ _max: int = 65_535
--- /dev/null
+from knot_resolver.utils.modeling.types import BaseString
--- /dev/null
+from pathlib import Path
+
+from jinja2 import Environment, FileSystemLoader, StrictUndefined, Template
+
+
+def _get_templates_path() -> Path:
+ templates_path = Path(__file__).resolve().parent
+ if not templates_path.exists():
+ raise FileNotFoundError(templates_path)
+ if not templates_path.is_dir():
+ raise NotADirectoryError(templates_path)
+ return templates_path
+
+
+_TEMPLATES_PATH: Path = _get_templates_path()
+
+
+def _load_template_from_str(template: str) -> Template:
+ loader = FileSystemLoader(_TEMPLATES_PATH)
+ env = Environment(trim_blocks=True, lstrip_blocks=True, loader=loader, undefined=StrictUndefined) # noqa: S701
+ return env.from_string(template)
+
+
+def _import_template(template: str) -> Template:
+ template_file = _TEMPLATES_PATH / template
+ with template_file.open() as file:
+ template = file.read()
+ return _load_template_from_str(template)
+
+
+WORKER_TEMPLATE: Template = _import_template("worker.lua.j2")
+
+POLICY_LOADER_TEMPLATE: Template = _import_template("policy-loader.lua.j2")
--- /dev/null
+{% if not cfg.lua.policy_script_only %}
+
+ffi = require('ffi')
+local C = ffi.C
+
+
+
+{% endif %}
+
+-- LUA section --------------------------------------
+-- Custom Lua code cannot be validated
+
+{% if cfg.lua.policy_script_file %}
+{% import cfg.lua.policy_script_file as policy_script_file %}
+{{ policy_script_file }}
+{% endif %}
+
+{% if cfg.lua.policy_script %}
+{{ cfg.lua.policy_script }}
+{% endif %}
+
+-- exit properly
+quit()
--- /dev/null
+{% if not cfg.lua.script_only %}
+
+ffi = require('ffi')
+local C = ffi.C
+
+
+
+{% endif %}
+
+-- LUA section --------------------------------------
+-- Custom Lua code cannot be validated
+
+{% if cfg.lua.script_file %}
+{% import cfg.lua.script_file as script_file %}
+{{ script_file }}
+{% endif %}
+
+{% if cfg.lua.script %}
+{{ cfg.lua.script }}
+{% endif %}
--- /dev/null
+from .model_node import ModelNode
+from .parsing import parse_json, parse_yaml, try_to_parse
+
+__all__ = [
+ "ModelNode",
+ "parse_json",
+ "parse_yaml",
+ "try_to_parse",
+]
--- /dev/null
+from __future__ import annotations
+
+from knot_resolver.errors import BaseKresError
+
+
+class DataModelingError(BaseKresError):
+ """Base exception class for all data modeling errors."""
+
+ def __init__(self, msg: str, error_path: str = "") -> None:
+ super().__init__()
+ self._msg = f"[{error_path}] {msg}" if error_path else msg
+ self._error_path = error_path
+
+ def __str__(self) -> str:
+ return self._msg
+
+
+class DataDescriptionError(DataModelingError):
+ """Exception class for data description errors."""
+
+ def __init__(self, msg: str, error_path: str = "") -> None:
+ msg = f"description error: {msg}"
+ super().__init__(msg, error_path)
+
+
+class DataAnnotationError(DataModelingError):
+ """Exception class for data annotation errors."""
+
+ def __init__(self, msg: str, error_path: str = "") -> None:
+ msg = f"annotation error: {msg}"
+ super().__init__(msg, error_path)
+
+
+class DataParsingError(DataModelingError):
+ """Exception class for data parsing errors."""
+
+ def __init__(self, msg: str, error_path: str = "") -> None:
+ msg = f"parsing error: {msg}"
+ super().__init__(msg, error_path)
+
+
+class DataTypeError(DataModelingError):
+ """Exception class for data type errors."""
+
+ def __init__(self, msg: str, error_path: str = "") -> None:
+ msg = f"type error: {msg}"
+ super().__init__(msg, error_path)
+
+
+class DataValueError(DataModelingError):
+ """Exception class for data value errors."""
+
+ def __init__(self, msg: str, error_path: str = "") -> None:
+ msg = f"value error: {msg}"
+ super().__init__(msg, error_path)
+
+
+class DataValidationError(DataModelingError):
+ """
+ Exception class for data validation errors.
+
+ This exception is used as parent for other data modeling errors.
+ """
+
+ def __init__(self, msg: str, error_path: str, child_errors: list[DataModelingError] | None = None) -> None:
+ super().__init__(msg, error_path)
+
+ if child_errors is None:
+ child_errors = []
+ self._child_errors = child_errors
+
+ def recursive_msg(self, indentation: int = 0) -> str:
+ parts: list[str] = []
+
+ if indentation == 0:
+ indentation += 1
+ parts.append("Data validation error detected:")
+
+ indent = " " * indentation
+ parts.append(f"{indent}{self._msg}")
+
+ if self._child_errors:
+ for error in self._child_errors:
+ if isinstance(error, DataValidationError):
+ parts.append(error.recursive_msg(indentation + 1))
+ else:
+ parts.append(indent + f" {error}")
+ return "\n".join(parts)
+
+ def __str__(self) -> str:
+ return self.recursive_msg()
+
+
+class AggrDataValidationError(DataValidationError):
+ """
+ Exception class for aggregation of data validation errors.
+
+ This exception is used to aggregate other data modeling errors.
+ """
+
+ def __init__(self, error_path: str, child_errors: list[DataModelingError]) -> None:
+ super().__init__("error due to lower level error", error_path, child_errors)
+
+ def recursive_msg(self, indentation: int = 0) -> str:
+ inc = 0
+ parts: list[str] = []
+
+ if indentation == 0:
+ inc = 1
+ parts.append("Data validation errors detected:")
+
+ for error in self._child_errors:
+ if isinstance(error, DataValidationError):
+ parts.append(error.recursive_msg(indentation + inc))
+ else:
+ parts.append(f" {error}")
+ return "\n".join(parts)
--- /dev/null
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+
+
+class ModelNode:
+ """"""
+
+ def __init__(self, source: dict[Any, Any], tree_path: str = "/", base_path: Path = Path()):
+ self._source = source if source else {}
+ self._tree_path = tree_path
+ self._base_path = base_path
+
+ def validate(self) -> None:
+ pass
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ raise NotImplementedError
--- /dev/null
+from __future__ import annotations
+
+import json
+from enum import Enum, auto
+from typing import TYPE_CHECKING, Any
+
+import yaml
+from yaml.constructor import ConstructorError
+
+from knot_resolver.utils.modeling.errors import DataParsingError
+
+if TYPE_CHECKING:
+ from yaml.nodes import MappingNode
+
+
+def _json_raise_duplicates(pairs: list[tuple[Any, Any]]) -> dict[Any, Any]:
+ """
+ JSON hook used in 'json.loads()' that detects duplicate keys in the parsed data.
+
+ The code for this hook was highly inspired by: https://stackoverflow.com/q/14902299/12858520
+ """
+ mapping: dict[Any, Any] = {}
+ for key, value in pairs:
+ if key in mapping:
+ msg = f"duplicate key detected: {key}"
+ raise DataParsingError(msg)
+ mapping[key] = value
+ return mapping
+
+
+class _YAMLRaiseDuplicatesLoader(yaml.SafeLoader):
+ """
+ YAML loader used in 'yaml.loads()' that detects duplicate keys in the parsed data.
+
+ The code for this loader was highly inspired by: https://gist.github.com/pypt/94d747fe5180851196eb
+ The loader extends yaml.SafeLoader, so it should be safe, even though the linter reports unsafe-yaml-load (S506).
+ More about safe loader: https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
+ """
+
+ def construct_mapping(self, node: MappingNode, deep: bool = False) -> dict[Any, Any]:
+ mapping: dict[Any, Any] = {}
+ for key_node, value_node in node.value:
+ key = self.construct_object(key_node, deep=deep)
+ # we need to check, that the key object can be used in a hash table
+ try:
+ _ = hash(key)
+ except TypeError as exc:
+ raise ConstructorError(
+ "while constructing a mapping",
+ node.start_mark,
+ f"found unacceptable key ({exc})",
+ key_node.start_mark,
+ ) from exc
+
+ # check for duplicate keys
+ if key in mapping:
+ msg = f"duplicate key detected: {key_node.start_mark}"
+ raise DataParsingError(msg)
+ value = self.construct_object(value_node, deep=deep)
+ mapping[key] = value
+ return mapping
+
+
+class DataFormat(Enum):
+ YAML = auto()
+ JSON = auto()
+
+ def loads(self, text: str) -> dict[Any, Any]:
+ """Load data from string in data format and return the data in dictionary."""
+ if self is DataFormat.YAML:
+ return yaml.load(text, Loader=_YAMLRaiseDuplicatesLoader) # noqa: S506
+ if self is DataFormat.JSON:
+ return json.loads(text, object_pairs_hook=_json_raise_duplicates)
+ msg = f"parsing data from '{self}' format is not implemented"
+ raise NotImplementedError(msg)
+
+ def dumps(self, data: dict[Any, Any], indent: int | None = None) -> str:
+ """Dump dictionary data to string in required data format."""
+ if self is DataFormat.YAML:
+ return yaml.safe_dump(data, indent=indent)
+ if self is DataFormat.JSON:
+ return json.dumps(data, indent=indent)
+ msg = f"exporting data to '{self}' format is not implemented"
+ raise NotImplementedError(msg)
+
+
+def parse_yaml(data: str) -> dict[Any, Any]:
+ """Parse YAML string and return the data in dictionary."""
+ return DataFormat.YAML.loads(data)
+
+
+def parse_json(data: str) -> dict[Any, Any]:
+ """Parse JSON string and return the data in dictionary."""
+ return DataFormat.JSON.loads(data)
+
+
+def try_to_parse(data: str) -> dict[Any, Any]:
+ """Attempt to parse data string as a JSON or YAML and return it's dictionary."""
+ try:
+ return parse_json(data)
+ except json.JSONDecodeError:
+ try:
+ return parse_yaml(data)
+ except yaml.YAMLError as e:
+ # YAML parsing error should be sufficient because the JSON can be parsed by the YAML parser.
+ # We should receive a helpful error message for JSON as well.
+ raise DataParsingError(e) from e
--- /dev/null
+from .base_float_types import BaseFloat, BaseFloatRange
+from .base_generic_types import ListOrItem
+from .base_integer_types import BaseInteger, BaseIntegerRange
+from .base_string_types import (
+ BaseString,
+ BaseStringLength,
+ BaseStringPattern,
+ BaseUnit,
+)
+from .base_types import NoneType
+
+__all__ = [
+ "BaseFloat",
+ "BaseFloatRange",
+ "BaseInteger",
+ "BaseIntegerRange",
+ "BaseString",
+ "BaseStringLength",
+ "BaseStringPattern",
+ "BaseUnit",
+ "ListOrItem",
+ "NoneType",
+]
--- /dev/null
+from __future__ import annotations
+
+from typing import Any
+
+from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError
+
+from .base_types import BaseType
+
+
+class BaseFloat(BaseType):
+ """Base class to work with float value."""
+
+ def validate(self) -> None:
+ if not isinstance(self._value, (float, int)) or isinstance(self._value, bool):
+ msg = (
+ f"Unexpected value for '{type(self)}'."
+ f" Expected float, got '{self._value}' with type '{type(self._value)}'"
+ )
+ raise DataTypeError(msg, self._tree_path)
+
+ def __int__(self) -> int:
+ return int(self._value)
+
+ def __float__(self) -> float:
+ return float(self._value)
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ return {"type": "number"}
+
+
+class BaseFloatRange(BaseFloat):
+ _min: float
+ _max: float
+
+ def validate(self) -> None:
+ super().validate()
+ if hasattr(self, "_min") and (self._value < self._min):
+ msg = f"value {self._value} is lower than the minimum {self._min}."
+ raise DataValueError(msg, self._tree_path)
+ if hasattr(self, "_max") and (self._value > self._max):
+ msg = f"value {self._value} is higher than the maximum {self._max}"
+ raise DataValueError(msg, self._tree_path)
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ typ: dict[str, Any] = {"type": "number"}
+ if hasattr(cls, "_min"):
+ typ["minimum"] = cls._min
+ if hasattr(cls, "_max"):
+ typ["maximum"] = cls._max
+ return typ
--- /dev/null
+from __future__ import annotations
+
+from typing import Any, Generic, List, TypeVar, Union
+
+from .base_types import BaseType
+
+T = TypeVar("T")
+
+
+class BaseGenericTypeWrapper(Generic[T], BaseType):
+ """"""
+
+
+class ListOrItem(BaseGenericTypeWrapper[Union[List[T], T]]):
+ """"""
+
+ def _get_list(self) -> list[T]:
+ return self._value if isinstance(self._value, list) else [self._value]
+
+ def validate(self) -> None:
+ self._get_list()
+
+ def __getitem__(self, index: Any) -> T:
+ return self._get_list()[index]
+
+ def to_std(self) -> list[T]:
+ return self._get_list()
+
+ def __len__(self) -> int:
+ return len(self._get_list())
+
+ def serialize(self) -> list[T] | T:
+ return self._value
--- /dev/null
+from __future__ import annotations
+
+from typing import Any
+
+from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError
+
+from .base_types import BaseType
+
+
+class BaseInteger(BaseType):
+ """Base class to work with integer value."""
+
+ def validate(self) -> None:
+ if not isinstance(self._value, int) or isinstance(self._value, bool):
+ msg = (
+ f"Unexpected value for '{type(self)}'"
+ f" Expected integer, got '{self._value}' with type '{type(self._value)}'"
+ )
+ raise DataTypeError(msg, self._tree_path)
+
+ def __int__(self) -> int:
+ return int(self._value)
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ return {"type": "integer"}
+
+
+class BaseIntegerRange(BaseInteger):
+ _min: int
+ _max: int
+
+ def validate(self) -> None:
+ super().validate()
+ if hasattr(self, "_min") and (self._value < self._min):
+ msg = f"value {self._value} is lower than the minimum {self._min}."
+ raise DataValueError(msg, self._tree_path)
+ if hasattr(self, "_max") and (self._value > self._max):
+ msg = f"value {self._value} is higher than the maximum {self._max}"
+ raise DataValueError(msg, self._tree_path)
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ typ: dict[str, Any] = {"type": "integer"}
+ if hasattr(cls, "_min"):
+ typ["minimum"] = cls._min
+ if hasattr(cls, "_max"):
+ typ["maximum"] = cls._max
+ return typ
--- /dev/null
+from __future__ import annotations
+
+import re
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError
+
+from .base_types import BaseType
+
+if TYPE_CHECKING:
+ from re import Pattern
+
+
+class BaseString(BaseType):
+ """Base class to work with string value."""
+
+ def validate(self) -> None:
+ if not isinstance(self._value, (str, int)) or isinstance(self._value, bool):
+ msg = (
+ f"Unexpected value for '{type(self)}'."
+ f" Expected string, got '{self._value}' with type '{type(self._value)}'"
+ )
+ raise DataTypeError(msg, self._tree_path)
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ return {"type": "string"}
+
+
+class BaseStringLength(BaseString):
+ _min_bytes: int = 1
+ _max_bytes: int
+
+ def validate(self) -> None:
+ super().validate()
+ value_bytes = len(self._value.encode("utf-8"))
+ if hasattr(self, "_min_bytes") and (value_bytes < self._min_bytes):
+ msg = f"the string value {self._value} is shorter than the minimum {self._min_bytes} bytes."
+ raise DataValueError(msg, self._tree_path)
+ if hasattr(self, "_max_bytes") and (value_bytes > self._max_bytes):
+ msg = f"the string value {self._value} is longer than the maximum {self._max_bytes} bytes."
+ raise DataValueError(msg, self._tree_path)
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ typ: dict[str, Any] = {"type": "string"}
+ if hasattr(cls, "_min_bytes"):
+ typ["minLength"] = cls._min_bytes
+ if hasattr(cls, "_max_bytes"):
+ typ["maxLength"] = cls._max_bytes
+ return typ
+
+
+class BaseStringPattern(BaseString):
+ _re: Pattern[str]
+
+ def validate(self) -> None:
+ super().validate()
+ if not type(self)._re.match(self._value): # noqa: SLF001
+ msg = f"'{self._value}' does not match '{self._re.pattern}' pattern"
+ raise DataValueError(msg, self._tree_path)
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ return {"type": "string", "pattern": rf"{cls._re.pattern}"}
+
+
+class BaseUnit(BaseString):
+ _re: Pattern[str]
+ _units: dict[str, int]
+
+ def __init__(self, value: Any, tree_path: str = "/", base_path: Path = Path()) -> None:
+ super().__init__(value, tree_path, base_path)
+ type(self)._re = re.compile(rf"^(\d+)({r'|'.join(type(self)._units.keys())})$") # noqa: SLF001
+
+ def _get_base_value(self) -> float:
+ cls = self.__class__
+
+ super().validate()
+ if isinstance(self._value, int) and not isinstance(self._value, bool):
+ return self._value
+
+ grouped = self._re.search(self._value)
+ if grouped:
+ val, unit = grouped.groups()
+ if unit is None:
+ msg = f"Missing units. Accepted units are {list(cls._units.keys())}"
+ raise DataValueError(msg, self._tree_path)
+ if unit not in cls._units:
+ msg = (
+ f"Used unexpected unit '{unit}' for {type(self).__name__}."
+ f" Accepted units are {list(cls._units.keys())}"
+ )
+ raise DataValueError(msg, self._tree_path)
+ return float(val) * cls._units[unit]
+ msg = (
+ f"Unexpected value for '{type(self)}'."
+ " Expected string that matches pattern "
+ rf"'{type(self)._re.pattern}'." # noqa: SLF001
+ f" Positive integer and one of the units {list(type(self)._units.keys())}, got '{self._value}'." # noqa: SLF001
+ )
+ raise DataValueError(msg, self._tree_path)
+
+ def validate(self) -> None:
+ self._get_base_value()
+
+ def __int__(self) -> int:
+ return int(self._get_base_value())
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ return {"type": "string", "pattern": rf"{cls._re.pattern}"}
--- /dev/null
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any, TypeVar
+
+T = TypeVar("T")
+
+NoneType = type(None)
+
+
+class BaseType:
+ """"""
+
+ def __init__(self, value: Any, tree_path: str = "/", base_path: Path = Path()) -> None:
+ self._value = value
+ self._tree_path = tree_path
+ self._base_path = base_path
+
+ def __repr__(self) -> str:
+ cls = self.__class__
+ return f'{cls.__name__}("{self._value}")'
+
+ def __eq__(self, o: object) -> bool:
+ cls = self.__class__
+ return isinstance(o, cls) and o._value == self._value
+
+ def __hash__(self) -> int:
+ return hash(self._value)
+
+ def __str__(self) -> str:
+ return str(self._value)
+
+ def __int__(self) -> int:
+ raise NotImplementedError
+
+ def validate() -> None:
+ raise NotImplementedError
+
+ @classmethod
+ def json_schema(cls) -> dict[Any, Any]:
+ raise NotImplementedError
--- /dev/null
+from __future__ import annotations
+
+import inspect
+from typing import Any, Dict, List, Literal, Tuple, Union
+
+from knot_resolver.utils.modeling.errors import DataAnnotationError
+from knot_resolver.utils.modeling.types.base_generic_types import BaseGenericTypeWrapper
+
+NoneType = type(None)
+
+
+def get_annotations(obj: Any) -> dict[Any, Any]:
+ if hasattr(inspect, "get_annotations"):
+ return inspect.get_annotations(obj)
+ # TODO: safe to remove in python3.10
+ # This fallback only exists for older versions
+ return obj.__dict__.get("__annotations__", {})
+
+
+def get_generic_type_arguments(typ: Any) -> list[Any]:
+ return getattr(typ, "__args__", [])
+
+
+def get_generic_type_argument(typ: Any) -> Any:
+ args = get_generic_type_arguments(typ)
+ if len(args) == 1:
+ return args[0]
+ msg = f"expected one generic type argument, got {len(args)}"
+ raise DataAnnotationError(msg)
+
+
+def is_dict(typ: Any) -> bool:
+ return getattr(typ, "__origin__", None) in (Dict, dict)
+
+
+def is_base_generic_type_wrapper(typ: Any) -> bool:
+ origin = getattr(typ, "__origin__", None)
+ return inspect.isclass(origin) and issubclass(origin, BaseGenericTypeWrapper)
+
+
+def get_base_generic_type_wrapper_argument(typ: type[BaseGenericTypeWrapper[Any]]) -> Any:
+ if not hasattr(typ, "__origin__"):
+ msg = ""
+ raise DataAnnotationError(msg)
+
+ origin = getattr(typ, "__origin__")
+ if not hasattr(origin, "__orig_bases__"):
+ msg = ""
+ raise DataAnnotationError(msg)
+
+ orig_base: list[Any] = getattr(origin, "__orig_bases__", [])[0]
+ arg = get_generic_type_argument(typ)
+ return get_generic_type_argument(orig_base[arg])
+
+
+def is_list(typ: Any) -> bool:
+ return getattr(typ, "__origin__", None) in (List, list)
+
+
+def is_literal(typ: Any) -> bool:
+ return getattr(typ, "__origin__", None) == Literal
+
+
+def is_none_type(typ: Any) -> bool:
+ return typ is None or typ == NoneType
+
+
+def is_optional(typ: Any) -> bool:
+ origin = getattr(typ, "__origin__", None)
+ args = get_generic_type_arguments(typ)
+ optional_len = 2
+ return origin == Union and len(args) == optional_len and NoneType in args
+
+
+def is_tuple(typ: Any) -> bool:
+ return getattr(typ, "__origin__", None) in (Tuple, tuple)
+
+
+def is_union(typ: Any) -> bool:
+ return getattr(typ, "__origin__", None) == Union
+
+
+def get_optional_inner_type(optional: Any) -> Any:
+ if is_optional(optional):
+ args = get_generic_type_arguments(optional)
+ for arg in args:
+ if not is_none_type(arg):
+ return arg
+ msg = "failed to get inner optional type"
+ raise DataAnnotationError(msg)
+
+
+def getattr_type(obj: Any, attr_name: str) -> Any:
+ annot = get_annotations(type(obj))
+ if hasattr(annot, attr_name):
+ return annot[attr_name]
+ msg = "attribute name is missing in data annotations"
+ raise DataAnnotationError(msg)
+
+
+def is_attr_name_private(attr_name: str) -> bool:
+ return attr_name.startswith("_")
--- /dev/null
+import pytest
+
+from knot_resolver.utils.modeling.errors import (
+ AggrDataValidationError,
+ DataAnnotationError,
+ DataDescriptionError,
+ DataModelingError,
+ DataTypeError,
+ DataValidationError,
+ DataValueError,
+)
+
+errors = [
+ DataModelingError("this is data modeling error message", "/error"),
+ DataAnnotationError("this is annotation error message", "/annotation"),
+ DataDescriptionError("this is description error message", "/description"),
+ DataTypeError("this is type error message", "/type"),
+ DataValueError("this is value error message", "/value"),
+]
+
+
+def test_data_validation_error() -> None:
+ error_msg = """Data validation error detected:
+ [/validation] this is validation error message
+ [/error] this is data modeling error message
+ [/annotation] annotation error: this is annotation error message
+ [/description] description error: this is description error message
+ [/type] type error: this is type error message
+ [/value] value error: this is value error message"""
+
+ with pytest.raises(DataValidationError) as error:
+ raise DataValidationError("this is validation error message", "/validation", errors)
+ assert str(error.value) == error_msg
+
+
+def test_aggregate_data_validation_error() -> None:
+ error_msg = """Data validation errors detected:
+ [/error] this is data modeling error message
+ [/annotation] annotation error: this is annotation error message
+ [/description] description error: this is description error message
+ [/type] type error: this is type error message
+ [/value] value error: this is value error message"""
+
+ with pytest.raises(AggrDataValidationError) as error:
+ raise AggrDataValidationError("/", errors)
+ assert str(error.value) == error_msg
--- /dev/null
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataParsingError
+from knot_resolver.utils.modeling.parsing import parse_json, parse_yaml, try_to_parse
+
+json_data = """
+{
+ "none": null,
+ "boolean": false,
+ "number": 2026,
+ "string": "this is string",
+ "object": {
+ "number": 5000,
+ "string": "this is object string"
+ },
+ "array": [
+ "item1",
+ "item2",
+ "item3"
+ ]
+}
+"""
+
+json_data_duplicates = """
+{
+ "duplicity-key": 1,
+ "duplicity-key": 2
+}
+"""
+
+json_data_duplicates_inner = """
+{
+ "object": {
+ "duplicity-key": 1,
+ "duplicity-key": 2
+ }
+}
+"""
+
+yaml_data = """
+none: null
+boolean: false
+number: 2026
+string: this is string
+object:
+ number: 5000
+ string: this is object string
+array:
+ - item1
+ - item2
+ - item3
+"""
+
+yaml_data_duplicates = """
+duplicity-key: 1
+duplicity-key: 2
+"""
+
+yaml_data_duplicates_inner = """
+object:
+ duplicity-key: 1
+ duplicity-key: 2
+"""
+
+data_dict = {
+ "none": None,
+ "boolean": False,
+ "number": 2026,
+ "string": "this is string",
+ "object": {
+ "number": 5000,
+ "string": "this is object string",
+ },
+ "array": [
+ "item1",
+ "item2",
+ "item3",
+ ],
+}
+
+
+def test_parse_json() -> None:
+ data = parse_json(json_data)
+ assert data == data_dict
+
+
+@pytest.mark.parametrize("data", [json_data, yaml_data])
+def test_parse_yaml(data: str) -> None:
+ data = parse_yaml(data)
+ assert data == data_dict
+
+
+@pytest.mark.parametrize(
+ "data",
+ [
+ json_data_duplicates,
+ json_data_duplicates_inner,
+ ],
+)
+def test_parse_json_duplicates(data: str) -> None:
+ with pytest.raises(DataParsingError):
+ parse_json(data)
+
+
+@pytest.mark.parametrize(
+ "data",
+ [
+ json_data_duplicates,
+ json_data_duplicates_inner,
+ yaml_data_duplicates,
+ yaml_data_duplicates_inner,
+ ],
+)
+def test_parse_yaml_duplicates(data: str) -> None:
+ with pytest.raises(DataParsingError):
+ parse_yaml(data)
+
+
+@pytest.mark.parametrize("data", [json_data, yaml_data])
+def test_try_to_parse(data: str) -> None:
+ data = try_to_parse(data)
+ assert data == data_dict
--- /dev/null
+import random
+import sys
+from typing import Any, Optional
+
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataModelingError
+from knot_resolver.utils.modeling.types.base_float_types import BaseFloat, BaseFloatRange
+
+
+@pytest.mark.parametrize("value", [-65.535, -1, 0, 1, 65.535])
+def test_base_float(value: int):
+ obj = BaseFloat(value)
+ obj.validate()
+ assert float(obj) == value
+ assert int(obj) == int(value)
+ assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("value", [True, False, "1"])
+def test_base_float_invalid(value: Any):
+ with pytest.raises(DataModelingError):
+ BaseFloat(value).validate()
+
+
+@pytest.mark.parametrize("min,max", [(0.0, None), (None, 0.0), (1.5, 65.535), (-65.535, -1.5)])
+def test_base_float_range(min: Optional[float], max: Optional[float]):
+ class TestFloatRange(BaseFloatRange):
+ if min:
+ _min = min
+ if max:
+ _max = max
+
+ if min:
+ obj = TestFloatRange(min)
+ obj.validate()
+ assert float(obj) == min
+ assert int(obj) == int(min)
+ assert str(obj) == f"{min}"
+ if max:
+ obj = TestFloatRange(max)
+ obj.validate()
+ assert float(obj) == max
+ assert int(obj) == int(max)
+ assert str(obj) == f"{max}"
+
+ rmin = int(min + 1) if min else -sys.maxsize - 1
+ rmax = int(max - 1) if max else sys.maxsize
+
+ n = 100
+ values = [float(random.randint(rmin, rmax)) for _ in range(n)]
+
+ for value in values:
+ obj = TestFloatRange(value)
+ obj.validate()
+ assert float(obj) == float(value)
+ assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("min,max", [(0.0, None), (None, 0.0), (1.5, 65.535), (-65.535, -1.5)])
+def test_base_float_range_invalid(min: Optional[float], max: Optional[float]):
+ class TestFloatRange(BaseFloatRange):
+ if min:
+ _min = min
+ if max:
+ _max = max
+
+ n = 100
+ invalid_nums = []
+
+ rmin = int(min + 1) if min else -sys.maxsize - 1
+ rmax = int(max - 1) if max else sys.maxsize
+
+ invalid_nums.extend([float(random.randint(rmax + 1, sys.maxsize)) for _ in range(n % 2)] if max else [])
+ invalid_nums.extend([float(random.randint(-sys.maxsize - 1, rmin - 1)) for _ in range(n % 2)] if max else [])
+
+ for num in invalid_nums:
+ with pytest.raises(DataModelingError):
+ TestFloatRange(num).validate()
--- /dev/null
+from typing import Any, List, Union
+
+import pytest
+
+from knot_resolver.utils.modeling.types.base_generic_types import ListOrItem
+from knot_resolver.utils.modeling.types.inspect import get_base_generic_type_wrapper_argument
+
+
+@pytest.mark.parametrize("typ", [str, int, float, bool])
+def test_list_or_item_inner_type(typ: Any):
+ assert get_base_generic_type_wrapper_argument(ListOrItem[typ]) == Union[List[typ], typ]
+
+
+@pytest.mark.parametrize(
+ "value",
+ [
+ [],
+ 65_535,
+ [1, 65_535, 5335, 5000],
+ ],
+)
+def test_list_or_item(value: Any):
+ obj = ListOrItem(value)
+ assert str(obj) == str(value)
+ for i, item in enumerate(obj):
+ assert item == value[i] if isinstance(value, list) else value
--- /dev/null
+import random
+import sys
+from typing import Any, Optional
+
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataModelingError
+from knot_resolver.utils.modeling.types.base_integer_types import BaseInteger, BaseIntegerRange
+
+
+@pytest.mark.parametrize("value", [-65535, -1, 0, 1, 65535])
+def test_base_integer(value: int):
+ obj = BaseInteger(value)
+ obj.validate()
+ assert int(obj) == value
+ assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("value", [True, False, "1", 1.1])
+def test_base_integer_invalid(value: Any):
+ with pytest.raises(DataModelingError):
+ BaseInteger(value).validate()
+
+
+@pytest.mark.parametrize("min,max", [(0, None), (None, 0), (1, 65535), (-65535, -1)])
+def test_base_integer_range(min: Optional[int], max: Optional[int]):
+ class TestIntegerRange(BaseIntegerRange):
+ if min:
+ _min = min
+ if max:
+ _max = max
+
+ if min:
+ obj = TestIntegerRange(min)
+ obj.validate()
+ assert int(obj) == min
+ assert str(obj) == f"{min}"
+ if max:
+ obj = TestIntegerRange(max)
+ obj.validate()
+ assert int(obj) == max
+ assert str(obj) == f"{max}"
+
+ rmin = min if min else -sys.maxsize - 1
+ rmax = max if max else sys.maxsize
+
+ n = 100
+ values = [random.randint(rmin, rmax) for _ in range(n)]
+
+ for value in values:
+ obj = TestIntegerRange(value)
+ obj.validate()
+ assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("min,max", [(0, None), (None, 0), (1, 65535), (-65535, -1)])
+def test_base_integer_range_invalid(min: Optional[int], max: Optional[int]):
+ class TestIntegerRange(BaseIntegerRange):
+ if min:
+ _min = min
+ if max:
+ _max = max
+
+ n = 100
+ invalid_nums = []
+
+ rmin = min if min else -sys.maxsize - 1
+ rmax = max if max else sys.maxsize
+
+ invalid_nums.extend([random.randint(rmax + 1, sys.maxsize) for _ in range(n % 2)] if max else [])
+ invalid_nums.extend([random.randint(-sys.maxsize - 1, rmin - 1) for _ in range(n % 2)] if max else [])
+
+ for num in invalid_nums:
+ with pytest.raises(DataModelingError):
+ TestIntegerRange(num).validate()
--- /dev/null
+import random
+import string
+from typing import Any, Optional
+
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataModelingError
+from knot_resolver.utils.modeling.types.base_string_types import BaseString, BaseStringLength, BaseUnit
+
+
+@pytest.mark.parametrize("value", [-65_535, -1, 0, 1, 65_535, "a", "abcdef"])
+def test_base_string(value: str):
+ obj = BaseString(value)
+ obj.validate()
+ assert str(obj) == str(value)
+
+
+@pytest.mark.parametrize("value", [True, False])
+def test_base_string_invalid(value: Any):
+ with pytest.raises(DataModelingError):
+ BaseString(value).validate()
+
+
+@pytest.mark.parametrize("min,max", [(None, 100), (10, 20), (50, None)])
+def test_base_string_length(min: Optional[int], max: Optional[int]):
+ class TestStringLength(BaseStringLength):
+ if min:
+ _min_bytes = min
+ if max:
+ _max_bytes = max
+
+ if min:
+ rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=min))
+ obj = TestStringLength(rand_str)
+ obj.validate()
+ assert str(obj) == f"{rand_str}"
+ if max:
+ rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=max))
+ obj = TestStringLength(rand_str)
+ obj.validate()
+ assert str(obj) == f"{rand_str}"
+
+ rmin = min if min else 1
+ rmax = max if max else 200
+
+ n = 100
+ values = [
+ "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(rmin, rmax))) for _ in range(n)
+ ]
+
+ for value in values:
+ obj = TestStringLength(value)
+ obj.validate()
+ assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("min,max", [(None, 100), (10, 20), (50, None)])
+def test_base_string_length_invalid(min: Optional[int], max: Optional[int]):
+ class TestStringLength(BaseStringLength):
+ if min:
+ _min_bytes = min
+ if max:
+ _max_bytes = max
+
+ n = 100
+ invalid_strings = []
+
+ rmin = min if min else 1
+ rmax = max if max else 200
+
+ invalid_strings.extend(
+ [
+ "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(rmax, rmax + 20)))
+ for _ in range(n % 2)
+ ]
+ if max
+ else []
+ )
+ invalid_strings.extend(
+ [
+ "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(1, rmin)))
+ for _ in range(n % 2)
+ ]
+ if max
+ else []
+ )
+
+ for invalid_string in invalid_strings:
+ with pytest.raises(DataModelingError):
+ TestStringLength(invalid_string).validate()
+
+
+@pytest.mark.parametrize("value", [1000, "1000a", "100b", "10c", "1d"])
+def test_base_unit(value: str):
+ class TestBaseUnit(BaseUnit):
+ _units = {"a": 1, "b": 10, "c": 100, "d": 1000}
+
+ obj = TestBaseUnit(value)
+ obj.validate()
+ assert int(obj) == 1000
+
+
+@pytest.mark.parametrize("value", [True, False, "1000aa", "10ab", "1e"])
+def test_base_unit_invalid(value: Any):
+ class TestBaseUnit(BaseUnit):
+ _units = {"a": 1, "b": 10, "c": 100, "d": 1000}
+
+ with pytest.raises(DataModelingError):
+ TestBaseUnit(value).validate()
--- /dev/null
+import sys
+from typing import Any, Dict, List, Literal, Optional, Tuple, Union
+
+import pytest
+
+from knot_resolver.utils.modeling import ModelNode
+from knot_resolver.utils.modeling.types.base_types import BaseType
+from knot_resolver.utils.modeling.types.inspect import (
+ NoneType,
+ is_dict,
+ is_list,
+ is_literal,
+ is_none_type,
+ is_optional,
+ is_tuple,
+ is_union,
+)
+
+types = [
+ Any,
+ bool,
+ int,
+ float,
+ str,
+ BaseType,
+ ModelNode,
+]
+
+literals = [
+ Literal[Any],
+ Literal[True, False],
+ Literal[0],
+ Literal[1, 2, 3],
+ Literal["literal"],
+ Literal["literal1", "literal2", "literal3"],
+]
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_dict(typ: Any) -> None:
+ assert is_dict(Dict[typ, Any])
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_list(typ: Any) -> None:
+ assert is_list(List[typ])
+ if sys.version_info >= (3, 9):
+ assert is_list(list[typ])
+
+
+@pytest.mark.parametrize("typ", literals)
+def test_is_literal(typ: Any) -> None:
+ assert is_literal(typ)
+
+
+@pytest.mark.parametrize("typ", [None, NoneType])
+def test_is_none_type(typ: Any) -> None:
+ assert is_none_type(typ)
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_optional(typ: Any) -> None:
+ assert is_optional(Optional[typ])
+ if sys.version_info >= (3, 9):
+ assert is_optional(typ | None)
+ assert is_optional(None | typ)
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_tuple(typ: Any) -> None:
+ assert is_tuple(Tuple[typ])
+ if sys.version_info >= (3, 9):
+ assert is_tuple(tuple[typ])
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_union(typ: Any) -> None:
+ assert is_union(Optional[typ])
+ assert is_union(Union[typ, None])
+ if sys.version_info >= (3, 9):
+ assert is_union(typ | None)
+ assert is_union(None | typ)